条件变量 6个月前

编程语言
441
条件变量

1. 条件变量函数

严格意义上来说,条件变量的主要作用不是处理线程同步, 而是进行线程的阻塞。 多线程程序中只使用条件变量无法实现线程的同步, 必须要配合互斥锁来使用。 虽然条件变量和互斥锁都能阻塞线程,但是二者的效果是不一样的,二者的区别如下:

  • 假设有A-Z 26个线程,这26个线程共同访问同一把互斥锁,如果线程A加锁成功,那么其余B-Z线程访问互斥锁都阻塞,所有的线程只能顺序访问临界区
  • 条件变量只有在满足指定条件下才会阻塞线程,如果条件不满足,多个线程可以同时进入临界区,同时读写临界资源,这种情况下会出现共享资源中数据的混乱。

一般情况下条件变量用于处理生产者和消费者模型,并且和互斥锁配合使用。条件变量类型对应的类型为pthread_cond_t,这样就可以定义一个条件变量类型的变量了:

pthread_cond_t cond;

被条件变量阻塞的线程的线程信息会被记录到这个变量中,以便在解除阻塞的时候使用。

条件变量操作函数函数原型如下:

#include <pthread.h>
pthread_cond_t cond;
// 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
                          const pthread_condattr_t *restrict attr);
// 销毁释放资源        
int pthread_cond_destroy(pthread_cond_t *cond);
  • 参数: cond: 条件变量的地址 attr: 条件变量属性, 一般使用默认属性, 指定为NULL

// 线程阻塞函数, 哪个线程调用这个函数, 哪个线程就会被阻塞
int pthread_cond_wait(pthread_cond_t *restrict cond, 
                        pthread_mutex_t *restrict mutex);

该函数在阻塞线程的时候,需要一个互斥锁参数,这个互斥锁主要功能是进行线程同步,让线程顺序进入临界区,避免出现数共享资源的数据混乱。 该函数会对这个互斥锁做以下几件事情:

  1. 在阻塞线程时候,如果线程已经对互斥锁mutex上锁,那么会将这把锁打开,这样做是为了避免死锁
  2. 当线程解除阻塞的时候,函数内部会帮助这个线程再次将这个mutex互斥锁锁上,继续向下访问临界区

// 表示的时间是从1970.1.1到某个时间点的时间, 总长度使用秒/纳秒表示
struct timespec {
    time_t tv_sec;      // Seconds 
    long   tv_nsec;     // Nanoseconds [0 .. 999999999]
};
// 将线程阻塞一定的时间长度, 时间到达之后, 线程就解除阻塞了
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
                               pthread_mutex_t *restrict mutex, 
                               const struct timespec *restrict abstime);

这个函数的前两个参数和pthread_cond_wait函数是一样的 第三个参数表示线程阻塞的时长 但是需要额外注意一点:struct timespec这个结构体中记录的时间是从1970.1.1到某个时间点的时间,总长度使用秒/纳秒表示。因此赋值方式相对要麻烦一点:

time_t mytime = time(NULL);    // 1970.1.1 0:0:0 到当前的总秒数
struct timespec tmsp;
tmsp.tv_nsec = 0;
tmsp.tv_sec = mytime + 100;    // 线程阻塞100s

// 唤醒阻塞在条件变量上的线程, 至少有一个被解除阻塞
int pthread_cond_signal(pthread_cond_t *cond);

// 唤醒阻塞在条件变量上的线程, 被阻塞的线程全部解除阻塞
int pthread_cond_broadcast(pthread_cond_t *cond);

调用上面两个函数中的任一,都可以唤醒被pthread_cond_wait或者pthread_cond_timedwait阻塞的线程 区别在于pthread_cond_signal是唤醒至少一个被阻塞的线程(总个数不定),pthread_cond_broadcast是唤醒所有被阻塞的线程

2. 生产者消费者

生产者和消费者模型的组成:

  1. 生产者线程 -> 若干个
  • 生产商品或者任务放入到任务队列中
  • 任务队列满了就阻塞, 不满的时候就工作
  • 通过一个生产者的条件变量控制生产者线程阻塞和非阻塞
  1. 消费者线程 -> 若干个
  • 读任务队列, 将任务或者数据取出
  • 任务队列中有数据就消费,没有数据就阻塞
  • 通过一个消费者的条件变量控制消费者线程阻塞和非阻塞
  1. 队列 -> 存储任务/数据,对应一块内存,为了读写访问可以通过一个数据结构维护这块内存
  • 可以是数组、链表,也可以使用stl容器:queue / stack / list / vector

image

场景描述:使用条件变量实现生产者和消费者模型 生产者有5个,往链表头部添加节点,消费者也有5个,删除链表头部的节点。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>

// 链表的节点
struct Node
{
    int number;
    struct Node* next;
};
// 指向头结点的指针
struct Node* head = NULL;

// 定义条件变量, 控制消费者线程
pthread_cond_t cond;
// 互斥锁变量
pthread_mutex_t mutex;


// 生产者的回调函数
void* producer(void* arg)
{
    // 一直生产
    while(1)
    {
        pthread_mutex_lock(&mutex);
        // 创建一个链表的新节点
        struct Node* pnew = (struct Node*)malloc(sizeof(struct Node));
        // 节点初始化
        pnew->number = rand() % 1000;
        // 节点的连接, 添加到链表的头部, 新节点就新的头结点
        pnew->next = head;
        // head指针前移
        head = pnew;
        printf("+++producer, number = %d, tid = %ld\n",pnew>number,
                                                        pthread_self());
        pthread_mutex_unlock(&mutex);

        // 生产了任务, 通知消费者消费
        pthread_cond_broadcast(&cond);

        // 生产慢一点
        sleep(rand() % 3);
    }
    return NULL;
}

// 消费者的回调函数
void* consumer(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&mutex);
        //if(head == NULL)   // 这样写有bug
        while(head == NULL)  //当没有东西消费时
        {
            // 消费者线程需要阻塞
            // 线程加互斥锁成功, 但是线程阻塞在这行代码上, 锁没解开
            // 函数自动将线程拥有的锁解开
            pthread_cond_wait(&cond, &mutex);
            // 当消费者线程解除阻塞之后, 会自动将这把锁锁上
            // 这时当前这个线程又重新拥有了这把互斥锁
        }
        // 取出链表的头结点, 将其删除
        struct Node* pnode = head;
        printf("--consumer: number: %d, tid = %ld\n", pnode>number,
                                                        pthread_self());
        head  = pnode->next;
        free(pnode);
        pthread_mutex_unlock(&mutex);        

        sleep(rand() % 3);
    }
    return NULL;
}

int main()
{
    // 初始化条件变量
    pthread_cond_init(&cond, NULL);
    pthread_mutex_init(&mutex, NULL);

    // 创建5个生产者, 5个消费者
    pthread_t ptid[5];
    pthread_t ctid[5];
    for(int i=0; i<5; ++i)
    {
        pthread_create(&ptid[i], NULL, producer, NULL);
    }

    for(int i=0; i<5; ++i)
    {
        pthread_create(&ctid[i], NULL, consumer, NULL);
    }

    // 释放资源
    for(int i=0; i<5; ++i)
    {
        // 阻塞等待子线程退出
        pthread_join(ptid[i], NULL);
    }

    for(int i=0; i<5; ++i)
    {
        pthread_join(ctid[i], NULL);
    }

    // 销毁条件变量
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);

    return 0;
}

为什么在第7行使用if 有bug:

  • 当链表为空, 一个消费者线程都会被函数 pthread_cond_wait(&cond, &mutex);阻塞
  • 阻塞时会打开锁,这时其他的消费者进程可能会趁机进入然后也阻塞在这里
  • 当生产者生产了1个节点, 调用 pthread_cond_broadcast(&cond); 唤醒了所有阻塞的线程
  • 有一个消费者线程通过 pthread_cond_wait()加锁, 其余没有加锁的线程继续阻塞
  • 加锁成功的线程向下运行, 并成功删除一个节点, 然后解锁
  • 没有加锁成功的线程解除阻塞继续抢这把锁, 另一个子线程加锁
  • 但是这个线程删除链表节点的时候链表已经为空了, 后边访问这个空节点时就会出现段错误
  • 需要循环对链表是否为空进行判断, 需要将if 该成 while
image
EchoEcho官方
无论前方如何,请不要后悔与我相遇。
1377
发布数
439
关注者
2244170
累计阅读

热门教程文档

Vue
25小节
Typescript
31小节
Djiango
17小节
Objective-C
29小节
MyBatis
19小节