0%

Linux系统编程-多线程同步条件变量


专注于用户空间的系统级编程–即内核之上的所有内容。

与互斥锁不同,条件变量是用来等待而不是用来上锁的。
条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。

条件变量使我们可以睡眠等待某种条件出现。
条件变量是利用线程间共享的全局变量进行同步的一种机制,
主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使”条件成立”(给出条件成立信号)。

条件的检测是在互斥锁的保护下进行的。
如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。

如果另一个线程改变了条件,它发信号给关联的条件变量,
唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。
如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。

使用条件变量之前要先进行初始化。
可以在单个语句中生成和初始化一个条件变量如:pthread_cond_t my_condition=PTHREAD_COND_INITIALIZER;(用于进程间线程的通信)。
可以利用函数pthread_cond_init动态初始化。

条件变量分为两部分: 条件和变量。
条件本身是由互斥量保护的。
线程在改变条件状态前先要锁住互斥量。
它利用线程间共享的全局变量进行同步的一种机制。

API

int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t *cond_attr);     
int pthread_cond_destroy(pthread_cond_t *cond);  
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);  //解除所有线程的阻塞

条件变量采用的数据类型是pthread_cond_t, 在使用之前必须要进行初始化, 这包括两种方式:

  • 静态: 可以把常量PTHREAD_COND_INITIALIZER给静态分配的条件变量
  • 动态: pthread_cond_init函数, 释放动态条件变量的内存空间之前, 要用pthread_cond_destroy对其进行清理

pthread_cond_wait()函数用于等待条件被触发。
该函数传入两个参数,一个条件变量一个互斥量,函数将条件变量和互斥量进行关联,
互斥量对该条件进行保护,传入的互斥量必须是已经锁住的。
调用pthread_cond_wait()函数后,会原子的执行以下两个动作:

  • 将调用线程放到等待条件的线程列表上,即进入睡眠
  • 对互斥量进行解锁

由于这两个操作时原子操作,
这样就关闭了条件检查和线程进入睡眠等待条件改变这两个操作之间的时间通道,
这样就不会错过任何条件的变化。
当pthread_cond_wait()返回后,互斥量会再次被锁住。

pthread_cond_timedwait()函数和pthread_cond_wait()的工作方式相似,只是多了一个等待时间。
等待时间的结构为struct timespec,

struct timespec{  
    time_t  tv_sec    //Seconds.  
    long    tv_nsec   //Nanoseconds.  
};

函数要求传入的时间值是一个绝对值,不是相对值,
例如,想要等待3分钟,必须先获得当前时间,然后加上3分钟。
要想获得当前系统时间的timespec值,没有直接可调用的函数,
需要通过调用gettimeofday函数获取timeval结构,然后转换成timespec结构,转换公式就是:

timeSpec.tv_sec = timeVal.tv_sec;  
timeSpec.tv_nsec = timeVal.tv_usec * 1000; 

所以要等待3分钟,timespec时间结构的获得应该如下所示:

struct timeval now;  
struct timespec until;  
gettimeofday(&now);//获得系统当前时间  

//把时间从timeval结构转换成timespec结构  
until.tv_sec = now.tv_sec;  
until.tv_nsec = now.tv_usec * 1000;  

//增加min  
until.tv_sec += 3 * 60;  

如果时间到后,条件还没有发生,那么会返回ETIMEDOUT错误。

从pthread_cond_wait()和pthread_cond_timewait()成功返回时,
线程需要重新计算条件,因为其他线程可能在运行过程中已经改变条件。

pthread_cond_signal() 和 pthread_cond_broadcast()
这两个函数都是用于向等待条件的线程发送唤醒信号,
pthread_cond_signal()函数只会唤醒等待该条件的某个线程,
pthread_cond_broadcast()会广播条件状态的改变,以唤醒等待该条件的所有线程。
例如多个线程只读共享资源,这是可以将它们都唤醒。
这里要注意的是:一定要在改变条件状态后,再给线程发送信号。
考虑条件变量信号单播发送和广播发送的一种候选方式是坚持使用广播发送。
只有在等待者代码编写确切,只有一个等待者需要唤醒,
且唤醒哪个线程无所谓,那么此时为这种情况使用单播,
所以其他情况下都必须使用广播发送。

在实际编程中,把共享数据和它们的同步变量集合到一个结构中,这往往是一个较好的编程技巧:

struct{  
    pthread_mutex_t mutex;  
    pthread_cond_t cond;  
    int data;  
}Data;  

Source Code

#include <pthread.h>  
#include <stdio.h>  
#include <stdlib.h>  
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;/* 初始化互斥锁*/  
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;/* 初始化条件变量*/  
void *thread1(void *);  
void *thread2(void *);  
int i=1;  
int main(void)  
{  
    pthread_t t_a;  
    pthread_t t_b;  

    pthread_create(&t_a,NULL,thread1,(void *)NULL);/* 创建进程t_a*/  
    sleep(1);
    pthread_create(&t_b,NULL,thread2,(void *)NULL); /* 创建进程t_b*/  

    pthread_join(t_a, NULL);/* 等待进程t_a结束*/  
    pthread_join(t_b, NULL);/* 等待进程t_b结束*/  

    pthread_mutex_destroy(&mutex);  
    pthread_cond_destroy(&cond);  

    exit(0);  
}  
void *thread1(void *junk)  
{  
    for(i=1;i<=6;i++)  
    {  
        pthread_mutex_lock(&mutex);/* 锁住互斥量*/  
        printf("thread1: lock %d\t i %d\n", __LINE__, i);  

        if(i%3==0){  
            printf("thread1:signal 1  %d\n", __LINE__);  
            pthread_cond_signal(&cond);/* 条件改变,发送信号,通知t_b进程*/  
            printf("thread1:signal 2  %d\n", __LINE__);  
            sleep(1);  
        }  

        pthread_mutex_unlock(&mutex);/* 解锁互斥量*/  
        printf("thread1: unlock %d\t i %d\n\n", __LINE__, i);  
        sleep(1);  
    }  
}  
void *thread2(void *junk)  
{  
    while(i<6)  
    {  
        pthread_mutex_lock(&mutex);  
        printf("thread2: lock %d\t i %d\n", __LINE__, i);  

        if(i%3!=0){  
            printf("thread2: wait 1  %d\n", __LINE__);  
            pthread_cond_wait(&cond,&mutex);/* 解锁mutex,并等待cond改变*/  
            printf("thread2: wait 2  %d\n", __LINE__);  
        }  

        pthread_mutex_unlock(&mutex);  
        printf("thread2: unlock %d\t i %d\n\n", __LINE__, i);  
        sleep(1);  
    }  
}

编译运行结果如下:

thread1: lock 31     i 1
thread1: unlock 41   i 1

thread2: lock 50     i 1
thread2: wait 1  53
thread1: lock 31     i 2
thread1: unlock 41   i 2

thread1: lock 31     i 3
thread1:signal 1  34
thread1:signal 2  36
thread1: unlock 41   i 3

thread2: wait 2  55
thread2: unlock 59   i 3

thread1: lock 31     i 4
thread1: unlock 41   i 4

thread2: lock 50     i 4
thread2: wait 1  53
thread1: lock 31     i 5
thread1: unlock 41   i 5

thread1: lock 31     i 6
thread1:signal 1  34
thread1:signal 2  36
thread1: unlock 41   i 6

thread2: wait 2  55
thread2: unlock 59   i 6

线程1先执行,获得mutex锁,打印,然后释放mutex锁,然后阻塞自己1秒。

线程2执行,阻塞在pthread_mutex_lock(&mutex)
这行语句中,直到线程1释放mutex锁。
然后线程2得已执行,获取metux锁,满足if条件,到pthread_cond_wait (&cond,&mutex)
这里线程2阻塞,不仅仅是等待cond变量发生改变,同时释放mutex锁。
mutex锁释放后,线程1终于获得了mutex锁,得已继续运行,当线程1的if(i%3==0)的条件满足后,
通过pthread_cond_signal发送信号,
告诉等待cond的变量的线程(线程2),cond条件变量已经发生了改变。

线程2并没有立即得到运行,因为线程2还在等待mutex锁的释放,
所以线程1继续往下走,直到线程1释放mutex锁,
线程2才能停止等待,打印语句,
然后往下走通过pthread_mutex_unlock(&mutex)释放mutex锁,进入下一个循环。

References