一、条件变量

条件变量是多线程的一种同步方式,它允许多个线程以无竞争的方式等待特定事件发生。无竞争的意思是,当条件满足时,条件满足这个讯号会发送给所有的监听者线程,但多个线程中只有一个能获取到特定事件。条件变量需要配合互斥量一起使用。

相关数据结构和函数:

#include <pthread.h>

// 销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
// 初始化条件变量
int pthread_cond_init(pthread_cond_t *restrict cond,
       const pthread_condattr_t *restrict attr);
// 初始化条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

条件变量的数据结构为pthread_cond_t,和其他同步的数据结构一样,也提供了两种方式来初始化。一种是通过赋值的方式,一种是通过函数的方式。在使用pthread_cond_init对初始化条件变量的时候,attr参数一般为NULL。

对条件变量加锁的方式:

#include <pthread.h>

// 等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex);
// 带有超时条件的等待
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex,
       const struct timespec *restrict abstime);

在对条件变量加锁的时候,需要传入一把已经处于加锁状态的互斥量,此时函数会把当前线程放到条件等待的列表上并对互斥量解锁。此时线程进入条件等待状态,当有信号到达时便会触发。

通知条件满足的函数:

#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

pthread_cond_signal函数会唤醒条件等待队列上的至少一个线程,pthread_cond_broadcast会唤醒条件队列上的所有线程。

条件队列一般用于消息队列,用于统一、协调多个生产者和消费者之间的竞争关系。

二、使用示例

以下使用了三个线程作为消费者,分别从全局队列上获取消息消费:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include "log.h"

#define THREAD_COUNT 3

// 消息结构
struct msg_st {
    unsigned int msg_id;
    struct msg_st *next;
};

static struct msg_st *msg_queue = NULL;
static pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;

// 消费线程函数
void *handle_msg(void *arg) {
    struct msg_st *msg;
    while (1) {
        pthread_mutex_lock(&g_mutex);
        while (msg_queue == NULL) {
            pthread_cond_wait(&g_cond, &g_mutex);
        }

        // 提取消息
        msg = msg_queue;
        msg_queue = msg_queue->next;
        pthread_mutex_unlock(&g_mutex);

        // 退出
        if (msg->msg_id == (unsigned int) -1) {
            info("Thread 0x%x exit!", (unsigned int) pthread_self());
            break;
        }

        info("Thread 0x%x: msg_id = %u", (unsigned int) pthread_self(), msg->msg_id);
    }
    return NULL;
}

// 生产消息函数
void create_msg(struct msg_st *msg) {
    pthread_mutex_lock(&g_mutex);
    msg->next = msg_queue;
    msg_queue = msg;
    pthread_mutex_unlock(&g_mutex);
    pthread_cond_signal(&g_cond);
}

int main() {
    int i;
    struct msg_st msg[10];
    pthread_t pid[THREAD_COUNT];

    debug("start create threads");
    // 创建3个线程作为消费者
    for (i = 0; i < 3; i++) {
        pthread_create(&pid[i], NULL, handle_msg, NULL);
    }

    debug("start create msgs");
    // 生产10个消息
    for (i = 0; i < 10; i++) {
        msg[i].msg_id = i + 1;
        msg[i].next = NULL;
        create_msg(&msg[i]);
    }

    // 休眠1秒,确保所有消息都被消费者消费完成
    sleep(1);

    // 退出所有线程
    debug("start create exit msgs");
    for (i = 0; i < THREAD_COUNT; i++) {
        msg[i].msg_id = (unsigned int) -1;
        msg[i].next = NULL;
        create_msg(&msg[i]);
    }

    // 回收所有线程
    debug("start join threads");
    for (i = 0; i < THREAD_COUNT; i++) {
        pthread_join(pid[i], NULL);
    }

    debug("program end");

    return 0;
}

运行结果,10个消息分别被3个线程打印出来了:

三、其他

3.1 “惊群”效应

条件变量是否存在惊群效应呢?

不会,线程执行wait操作的时候会被放到一个条件等待队列里面去。当条件满足的时候,系统会自动选择队列前面的线程来消费队列。

3.2 为什么wait前要加锁,这样不会死锁吗?

不会,执行wait操作时需要的是一把已经加锁的互斥量,这个锁在wait函数中会解开。

这样做的目的:如果在wait前没有加锁,生产者线程产生了一个消息并发送信号,再执行wait后信号就丢失了。

3.3 生产者的信号为什么放在unlock之后?

wait收到信号之后要对互斥量加锁,此时锁还被生产者持有,消费者依旧还要等待锁的释放,才能持有锁。对消费者而言,最理想的情况就是生产者产生消息后,我立马就能读取消息,此时的互斥量是用来和其他消费者线程同步的,而不是和生产者线程同步。

最后修改:2019 年 12 月 28 日
如果觉得我的文章对你有用,请随意赞赏