一、条件变量
条件变量是多线程的一种同步方式,它允许多个线程以无竞争的方式等待特定事件发生。无竞争的意思是,当条件满足时,条件满足这个讯号会发送给所有的监听者线程,但多个线程中只有一个能获取到特定事件。条件变量需要配合互斥量一起使用。
相关数据结构和函数:
#include <pthread.h>
// 销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
// 初始化条件变量
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
// 初始化条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
条件变量的数据结构为pthread_cond_t
,和其他同步的数据结构一样,也提供了两种方式来初始化。一种是通过赋值的方式,一种是通过函数的方式。在使用pthread_cond_init
对初始化条件变量的时候,attr
参数一般为NULL。
对条件变量加锁的方式:
#include <pthread.h>
// 等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
// 带有超时条件的等待
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
在对条件变量加锁的时候,需要传入一把已经处于加锁状态的互斥量,此时函数会把当前线程放到条件等待的列表上并对互斥量解锁。此时线程进入条件等待状态,当有信号到达时便会触发。
通知条件满足的函数:
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
pthread_cond_signal
函数会唤醒条件等待队列上的至少一个线程,pthread_cond_broadcast
会唤醒条件队列上的所有线程。
条件队列一般用于消息队列,用于统一、协调多个生产者和消费者之间的竞争关系。
二、使用示例
以下使用了三个线程作为消费者,分别从全局队列上获取消息消费:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include "log.h"
#define THREAD_COUNT 3
// 消息结构
struct msg_st {
unsigned int msg_id;
struct msg_st *next;
};
static struct msg_st *msg_queue = NULL;
static pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
// 消费线程函数
void *handle_msg(void *arg) {
struct msg_st *msg;
while (1) {
pthread_mutex_lock(&g_mutex);
while (msg_queue == NULL) {
pthread_cond_wait(&g_cond, &g_mutex);
}
// 提取消息
msg = msg_queue;
msg_queue = msg_queue->next;
pthread_mutex_unlock(&g_mutex);
// 退出
if (msg->msg_id == (unsigned int) -1) {
info("Thread 0x%x exit!", (unsigned int) pthread_self());
break;
}
info("Thread 0x%x: msg_id = %u", (unsigned int) pthread_self(), msg->msg_id);
}
return NULL;
}
// 生产消息函数
void create_msg(struct msg_st *msg) {
pthread_mutex_lock(&g_mutex);
msg->next = msg_queue;
msg_queue = msg;
pthread_mutex_unlock(&g_mutex);
pthread_cond_signal(&g_cond);
}
int main() {
int i;
struct msg_st msg[10];
pthread_t pid[THREAD_COUNT];
debug("start create threads");
// 创建3个线程作为消费者
for (i = 0; i < 3; i++) {
pthread_create(&pid[i], NULL, handle_msg, NULL);
}
debug("start create msgs");
// 生产10个消息
for (i = 0; i < 10; i++) {
msg[i].msg_id = i + 1;
msg[i].next = NULL;
create_msg(&msg[i]);
}
// 休眠1秒,确保所有消息都被消费者消费完成
sleep(1);
// 退出所有线程
debug("start create exit msgs");
for (i = 0; i < THREAD_COUNT; i++) {
msg[i].msg_id = (unsigned int) -1;
msg[i].next = NULL;
create_msg(&msg[i]);
}
// 回收所有线程
debug("start join threads");
for (i = 0; i < THREAD_COUNT; i++) {
pthread_join(pid[i], NULL);
}
debug("program end");
return 0;
}
运行结果,10个消息分别被3个线程打印出来了:
三、其他
3.1 “惊群”效应
条件变量是否存在惊群效应呢?
不会,线程执行wait操作的时候会被放到一个条件等待队列里面去。当条件满足的时候,系统会自动选择队列前面的线程来消费队列。
3.2 为什么wait前要加锁,这样不会死锁吗?
不会,执行wait操作时需要的是一把已经加锁的互斥量,这个锁在wait函数中会解开。
这样做的目的:如果在wait前没有加锁,生产者线程产生了一个消息并发送信号,再执行wait后信号就丢失了。
3.3 生产者的信号为什么放在unlock之后?
wait收到信号之后要对互斥量加锁,此时锁还被生产者持有,消费者依旧还要等待锁的释放,才能持有锁。对消费者而言,最理想的情况就是生产者产生消息后,我立马就能读取消息,此时的互斥量是用来和其他消费者线程同步的,而不是和生产者线程同步。
此处评论已关闭