和多进程程序一样,多线程程序也必须考虑同步问题。pthread_join可以看作是一种简单的线程同步方式,不过很显然,它无法高效的实现复杂的同步需求。比如空值对共享资源的独占式访问,又抑或是在某个条件满足之后唤醒一个线程。下面我们来讨论一下3种专门用于线程同步的机制:POSIX信号量、互斥量和条件变量。
POSIX信号量
在Linux上,信号量API有两组。一组是多进程编程 - 信号量中的信号量,另外一组是我们现在要讨论的POSIX信号量。这两组接口很相似,但不能保证能互换。由于这两种信号量的语义完全相同,原理请参考多进程编程 - 信号量。
POSIX信号量函数的名字都以sem_开头,并不像大多数线程函数那样以pthread_开头。常用的POSIX信号量函数是下面5个:
#include <semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value);
int sem_destory(sem_t *sem);
int sem_wait(sem_t* sem);
int sem_trywait(sem_t* sem);
int sem_post(sem_t* sem);
这些函数的第一个参数sem指向被操作的信号量。
sem_init函数用于初始化一个未命名的信号量(POSIX信号量API支持命名信号量)。pshared参数指定信号量的类型。如果其值为0,就表示这个信号量是当前进程的局部信号量,否则该信号量就可以在多个进程之间共享。value参数指定信号量的初始值。此外,初始化一个已经被初始化的信号量将导致不可预期的结果。
sem_destory函数用于销毁信号量,以释放其占用的内核资源。如果销毁一个正被其他线程等待的信号量,则将导致不可预期的结果。
sem_wait函数以原子操作的方式将信号量的值减1。如果信号量的值为0,则sem_wait将被阻塞,知道这个信号量具有非0值。
sem_trywait与sem_wait相似,不过它始终立即返回,而不论被操作的信号量是否具有非0值,相当于sem_wait的非阻塞版本。当信号量的值非0时,sem_trywait对信号量执行减1操作。当信号量为0时,它将返回-1,并设置errno为EAGAIN。
sem_post函数以原子操作的方式将信号量的值加1。当信号量的值大于0时,其他正在调用sem_wait等待信号量的线程将被唤醒。
上述这些函数成功时返回0,失败则返回-1并设置errno。
互斥锁
互斥锁,也称互斥量,可以用于保护关键代码段,以确保其独占式的访问,这有点像一个二进制信号量。当进入关键代码段时,我们需要获得互斥锁并将其加锁,这等价于二进制信号量的P操作;当离开关键代码段时,我们需要对互斥锁解锁,以唤醒其他等待该互斥锁的线程,这等价于二进制信号量的V操作。
互斥锁基础API
POSIX互斥锁的相关函数主要有如下5个:
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* mutexattr);
int pthread_mutex_destory(pthread_mutex_t* mutex);
int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_trylock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);
这些函数的第一个参数mutex指向要操作的目标互斥锁,互斥锁的类型是pthread_mutex_t结构体。
pthread_mutex_init函数用于初始化互斥锁。mutexattr参数指定互斥锁的属性。如果将它设置为NULL,则表示使用默认属性。具体细节请看下文。
除了这个函数外,我们还可以使用如下方式来初始化一个互斥锁:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
宏PTHREAD_MUTEX_INITIALIZER实际上只是把互斥锁的各个字段都初始化为0.
pthread_mutex_destory函数用于销毁互斥锁,以释放其占用的内核资源。销毁一个已经加锁的互斥锁将导致不可预期的后果。
pthread_mutex_lock函数以原子操作的方式给一个互斥锁加锁。如果目标互斥锁已经被锁上,则pthread_mutex_lock调用将被阻塞,直到该互斥锁的占有者将其解锁。
pthread_mutex_trylock与pthread_mutex_lock函数类似,不过它始终立即返回,而不论被操作的互斥锁是否已经被加锁,相当于pthread_mutex_lock的非阻塞版本。当目标互斥锁未被加锁时,pthread_mutext_trylock对互斥锁执行加锁操作;当互斥锁已经被加锁时,pthread_mutex_trylock将返回错误码EBUSY。需要注意的是,这里讨论的pthread_mutex_lock和pthread_mutex_trylock的行为是针对普通锁而言的。后面你会看到,对于其他类型的锁而言,这两个加锁函数会有不同的行为。
pthread_mutex_unlock函数以原子操作的方式给一个互斥锁解锁。如果此时有其他线程正在等待这个互斥锁,则这些线程中的某一个将获得它。
上述这些函数成功时返回0,失败则返回错误码。
互斥锁属性
pthread_mutexattr_t结构体定义了一套完整的互斥锁属性。线程库提供了一系列函数来操作pthread_mutexattr_t类型的变量,以方便我们获取和设置互斥量属性。这里我们列出一些主要的函数:
#include <pthread.h>
/*初始化互斥锁属性对象*/
int pthread_mutexattr_init(pthread_mutexattr_t* attr);
/*销毁互斥锁属性对象*/
int pthread_mutexattr_destory(pthread_mutex_t * attr);
/*获取和设置互斥锁的pshared属性*/
int pthread_mutexattr_getshared(const pthread_mutexattr_t* attr, int* pshared);
int pthread_mutexattr_setshared(pthread_mutexattr_t* attr, int* pshared);
/*获取和设置互斥锁的type属性*/
int pthread_mutexattr_gettype(const pthread_mutexattr_t* attr, int* type);
int pthread_mutexattr_settype(pthread_mutexattr_t* attr, int* type);
互斥锁属性pshared指定是否允许跨进程共享互斥锁,其可选值有两个:
- PTHREAD_PROCESS_SHARED:互斥锁可以被跨进程共享
- PTHREAD_PROCESS_PRIVATE:互斥锁只能被和锁初始化线程隶属于同一个进程的线程共享。
互斥锁属性type指定互斥锁的类型。Linux支持如下4种类型的互斥锁:
- PTHREAD_MUTEX_NORMAL:普通锁。这是互斥锁默认的类型。当一个线程对一个普通锁加锁以后,其余请求该锁的线程将形成一个队列,并在该锁解锁后按优先级获得它。这种锁类型保证了资源分配的公平性。但这种锁也很容易引发问题:一个线程如果对一个已经加锁的普通锁再次加锁,将引发死锁;对一个已经被其他线程加锁的普通锁解锁,或者对一个已经解锁的普通锁再次解锁,将导致不可预期的后果。
- PTHREAD_MUTEX_ERRORCHECK:检错锁。一个线程如果对一个已经加锁的检错锁再次加锁,则加锁操作返回EDEADLK。对一个已经被其他线程加锁的检错锁解锁,或者对一个已经解锁的检错锁再次解锁,则解锁操作将返回EPERM。
- PTHREAD_MUTEX_RECURSIVE:嵌套锁。这种锁允许一个线程在释放锁之前多次对它加锁而不发生死锁。不过其他线程如果要获得这个锁,则当前锁的拥有者必须执行相应次数的解锁操作。对一个已经被其他线程加锁的嵌套锁解锁,或者对一个已经解锁的嵌套锁再次解锁,则解锁操作将返回EPERM。
- PTHREAD_MUTEX_DEFAULT:默认锁。一个线程如果对一个已经加锁的默认锁再次加锁,或者对一个已经被其他线程加锁的默认锁解锁,或者对一个已经解锁的默认锁再次解锁,将导致不可预期的后果。这种锁在实现的时候可能被映射为上面三种之一。
死锁举例
使用互斥锁的一个噩耗就是死锁。死锁使得一个或多个线程被挂起而无法继续执行,而且这种情况还不容易发现。上一节我们提到:在一个线程中对一个已经加锁的普通锁再次加锁,将导致死锁。这种情况可能出现在设计地不够仔细的递归函数中。另外,如果两个线程按照不同的顺序来申请两个互斥锁,也容易产生死锁,示例代码如下:
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
int a = 0;
int b = 0;
pthread_mutex_t mutex_a;
pthread_mutex_t mutex_b;
void *another(void *arg)
{
pthread_mutex_lock(&mutex_b);
printf("in child thread, got mutex b, waiting for mutex a\n");
sleep(5);
++b;
pthread_mutex_lock(&mutex_a);
b += a++;
pthread_mutex_unlock(&mutex_a);
pthread_mutex_unlock(&mutex_b);
pthread_exit(NULL);
}
int main(int argc, char const *argv[])
{
pthread_t id;
pthread_mutex_init(&mutex_a, NULL);
pthread_mutex_init(&mutex_b, NULL);
pthread_create(&id, NULL, another, NULL);
pthread_mutex_lock(&mutex_a);
printf("in parent thread, got mutex a, waiting for mutex b\n");
sleep(5);
++a;
pthread_mutex_lock(&mutex_b);
a += b++;
pthread_mutex_unlock(&mutex_a);
pthread_mutex_unlock(&mutex_b);
pthread_join(id, NULL);
pthread_mutex_destroy(&mutex_a);
pthread_mutex_destroy(&mutex_b);
return 0;
}
上述代码中,主线程试图先占有互斥锁mutex_a,然后操作被该锁保护的变量a,但操作完毕之后,主线程并没有立即释放互斥锁mutex_a,而是又申请互斥锁mutex_b,并在两个互斥锁的保护下,操作变量a和b,最后才一起释放这两个互斥锁;与此同时,子线程则按照相反的顺序来申请互斥锁mutex_a和mutex_b,并在两个锁的保护下操作变量a和b。我们用sleep函数来模拟连续两次调用pthread_mutex_lock之间的时间差。以确保代码中的两个线程各自先占有一个互斥锁(主线程占有mutex_a,子线程占有mutex_b),然后等待另外一个互斥锁(主线程等待mutex_b,子线程等待mutex_a)。这样,两个线程就僵持住了,设都不能继续往下运行。从而形成死锁。如果代码中不加入sleep函数,则这段代码或许总能呢个成功地运行,从而为程序留下了一个潜在的BUG。
条件变量
如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量则是用于在线程之间同步共享数据的值。条件变量提供了一种线程间的通知机制:当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程。
条件变量的相关函数主要有如下5个:
#include <pthread.h>
int pthread_cond_init(pthread_cond_t* cond, const pthread_condattr_t* cond_attr);
int pthread_cond_destory(pthread_cond_t* cond);
int pthread_cond_boardcast(pthread_cond_t* cond);
int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
这些函数的第一个参数cond指向要操作的目标条件变量,条件变量的类型是pthread_cond_t结构体。
pthread_cond_init函数用于初始化条件变量。cond_attr参数指定条件变量的属性。如果将它设置为NULL。则表示使用默认属性。条件变量的属性不多,而且和互斥锁的属性类型相似。除了pthread_cond_init函数外,我们还可以使用如下方式来初始化一个条件变量:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
宏PTHREAD_COND_INITIALIZER实际上只是把条件变量的各个字段都初始化为0。
pthread_cond_destory函数用于销毁条件变量,以释放其占用的内核资源。销毁一个正在被等待的条件变量将失败并返回EBUSY。
pthread_cond_boardcast函数以广播的方式唤醒所有等待目标条件变量的线程。pthread_cond_signal函数用于唤醒一个等待目标条件变量的线程。至于哪个线程将被唤醒,则取决于线程的优先级和调度策略。有时候我们可能想唤醒一个指定的线程,但pthread没有对该需求提供解决方法。不过我们可以间接的实现该需求:
- 定义个能够唯一表示目标线程的全局变量,
- 在唤醒等待条件变量前先设置该变量为目标线程,
- 然后采用广播方式唤醒所有等待条件变量的线程,
- 这些线程被唤醒后都检查该变量以判断被唤醒的是否是自己,
- 如果是就开始执行后续代码,如果不是则返回继续等待
pthread_cond_wait函数用于等待目标条件变量。mutex参数用于保护条件变量的互斥锁,以确保pthread_cond_wait的原子性。在调用pthread_cond_wait前,必须确保互斥锁mutex已经加锁,否则将导致不可预期的结果。pthread_cond_wait函数执行时,首先把调用线程放入条件变量的等待队列中,然后将互斥锁mutex解锁。可见:
从pthread_cond_wait开始执行到其调用线程被放入条件变量的等待队列之间的这段时间内,pthread_cond_signal和pthread_cond_broadcast等函数不会修改条件变量。换言之,pthread_cond_wait函数不会错过目标条件变量的任何变化。当pthread_cond_wait函数成功返回时,互斥锁mutex将再被锁上。
上述这些函数成功时返回0,失败则返回错误码。
示例代码:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
int sequence = 1;
int eat = 1;
pthread_mutex_t productor_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t productor_mutex1 = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t my_cond1 = PTHREAD_COND_INITIALIZER;
void *productor1(void* arg)
{
int my_sequence = 1;
while(1){
pthread_mutex_lock(&productor_mutex);
while(sequence != my_sequence){
pthread_cond_wait(&my_cond, &productor_mutex);
}
pthread_mutex_lock(&productor_mutex1);
while(eat != 1){
pthread_cond_wait(&my_cond1, &productor_mutex1);
}
printf("A ");
eat = 0;
pthread_cond_broadcast(&my_cond1);
pthread_mutex_unlock(&productor_mutex1);
sequence = 2;
pthread_cond_broadcast(&my_cond);
pthread_mutex_unlock(&productor_mutex);
}
pthread_exit(NULL);
}
void *productor2(void* arg)
{
int my_sequence = 2;
while(1){
pthread_mutex_lock(&productor_mutex);
while(sequence != my_sequence){
pthread_cond_wait(&my_cond, &productor_mutex);
}
pthread_mutex_lock(&productor_mutex1);
while(eat != 1){
pthread_cond_wait(&my_cond1, &productor_mutex1);
}
printf("B ");
eat = 0;
pthread_cond_broadcast(&my_cond1);
pthread_mutex_unlock(&productor_mutex1);
sequence = 1;
pthread_cond_broadcast(&my_cond);
pthread_mutex_unlock(&productor_mutex);
}
pthread_exit(NULL);
}
void *consumer(void *arg)
{
long a = 10;
while (a--)
{
pthread_mutex_lock(&productor_mutex1);
while(eat!=0){
pthread_cond_wait(&my_cond1, &productor_mutex1);
}
printf("C ");
eat = 1;
pthread_cond_broadcast(&my_cond1);
pthread_mutex_unlock(&productor_mutex1);
}
pthread_exit(NULL);
}
int main(int argc, char const *argv[])
{
pthread_t pth1, pth2, pth3;
pthread_create(&pth1, NULL, productor1, NULL);
pthread_create(&pth2, NULL, productor2, NULL);
pthread_create(&pth3, NULL, consumer, NULL);
pthread_join(pth3, NULL);
pthread_cancel(pth1);
pthread_cancel(pth2);
return 0;
}
线程同步机制包装类
#ifndef LOCKER_H
#define LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h>
/*封装信号量类*/
class sem
{
public:
//创建并初始化信号类
sem(){
if(sem_init(&m_mem, 0, 0) != 0){
//构造函数没有返回值,可以通过抛出异常来报告错误
throw std::exception();
}
};
//销毁信号量
~sem(){
sem_destroy(&m_mem);
}
//等待信号量
bool wait(){
return sem_wait(&m_mem) == 0;
}
//增加信号量
bool post(){
return sem_post(&m_mem) == 0;
}
private:
sem_t m_mem;
};
/*封装互斥锁的类*/
class locker
{
public:
//创建并初始化互斥锁
locker(){
if(pthread_mutex_init(&m_mutex, NULL) != 0){
throw std::exception();
}
}
//销毁互斥锁
~locker(){
pthread_mutex_destroy(&m_mutex);
}
//获取互斥锁
bool lock(){
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock(){
return pthread_mutex_unlock(&m_mutex) == 0;
}
private:
pthread_mutex_t m_mutex;
};
/*封装条件变量的类*/
class cond
{
public:
//创建并初始化条件变量
cond(){
if(pthread_mutex_init(&m_mutex, NULL) != 0){
throw std::exception();
}
if(pthread_cond_init(&m_cond, NULL) != 0){
pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}
//销毁条件变量
~cond(){
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
//等待条件变量
bool wait(){
int ret = 0;
pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond, &m_mutex);
pthread_mutex_unlock(&m_mutex);
return ret;
}
//唤醒等待条件变量的线程
bool signal(){
return pthread_cond_signal(&m_cond) == 0;
}
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif