`
M_ittrue
  • 浏览: 75223 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

线程间同步

阅读更多

    前面专门找了一些资料去了解函数的可重入性以及线程安全同步,我们知道在多线程并发访问共享数据时候往往会出现数据冲突,这跟之前的函数可重入性是一样的.比如两个线程要对一个全局变量加1.但这个过程在x86平台上并非原子操作,它可能会经过以下三步才能完成:

1:从内存中读取变量到寄存器中

2:寄存器加1

3:将寄存器的值写回内存

 

这一个非原子操作可想而知,如果有多线程并发执行,那么这三步有可能就在线程间交叉执行.如:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NLOOP 5000
int counter;
/* incremented by threads */
void *doit(void *);
int main(int argc, char **argv)
{
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, &doit, NULL);
pthread_create(&tidB, NULL, &doit, NULL);
/* wait for both threads to terminate */
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
return 0;
}
void *doit(void *vptr)
{
int
i, val;
/*
* Each thread fetches, prints, and increments the counter
NLOOP times.
* The value of the counter should increase monotonically.
*/
for (i = 0; i < NLOOP; i++) {
val = counter;
printf("%x: %d\n", (unsigned int)pthread_self(),
val + 1);
counter = val + 1;
}
return NULL;
}
 由#define NLOOP 5000
可知,我们创建的两个线程分别对counter进行了5000次加1操作,最终结果应该是10000才对,但我们看输出却是:
6cb6c700: 1
6cb6c700: 2
6cb6c700: 3
6cb6c700: 4
6cb6c700: 5
6cb6c700: 6
6cb6c700: 7
6cb6c700: 8
6cb6c700: 9
6cb6c700: 10
6cb6c700: 11
6cb6c700: 12
6cb6c700: 13
6cb6c700: 14
6cb6c700: 15
6cb6c700: 16
6cb6c700: 17
6cb6c700: 18
6cb6c700: 19
6cb6c700: 20
6cb6c700: 21
6d36d700: 1
6d36d700: 2
6d36d700: 3
6d36d700: 4
6d36d700: 5
6d36d700: 6
6d36d700: 7
6d36d700: 8
6d36d700: 9
6d36d700: 10
6d36d700: 11
6d36d700: 12
6d36d700: 13
6d36d700: 14
6cb6c700: 22
6cb6c700: 23
6cb6c700: 24
6cb6c700: 25
6cb6c700: 26
6cb6c700: 27
6cb6c700: 28
6cb6c700: 29
6cb6c700: 30
6cb6c700: 31
6cb6c700: 32
6cb6c700: 33
.......................
这个输出的不同会因不同的CPU调度,总线周期而定.

普遍情况下,我们为保证共享数据在多线程访问下的正确性,会引入互斥锁(Mutex Exclusive Lock,它与自旋锁的区别请百度).引入锁在目的在于使线程对数据的"读-修改-写"形成一组原子操作,要么执行完,要么不执行,不能被打断

使用锁的基本过程一般包括,1:锁初始化  2:取得锁 3:释放锁,对于互斥锁有如下函数声名
Mutex用pthread_mutex_t 类型的变量表示,可以这样初始化和销毁:
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
返回值:成功返回0,失败返回错误号。
pthread_mutex_init 函数对Mutex做初始化,参数attr 设定Mutex的属性,如果attr 为NULL 则表
示缺省属性,本章不详细介绍Mutex属性,感兴趣的读者可以参考[APUE2e]。
用pthread_mutex_init 函数初始化的Mutex可以用pthread_mutex_destroy销毁。如果Mutex变量
是静态分配的(全局变量或static变量),也可以用宏定义PTHREAD_MUTEX_INITIALIZER 来初始
化,相当于用pthread_mutex_init 初始化并且attr 参数为NULL 。Mutex的加锁和解锁操作可以用
下列函数:
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号。
一个线程可以调用pthread_mutex_lock获得Mutex,如果这时另一个线程已经调
用pthread_mutex_lock获得了该Mutex,则当前线程需要挂起等待,直到另一个线程调
用pthread_mutex_unlock释放Mutex,当前线程被唤醒,才能获得该Mutex并继续执行。
如果一个线程既想获得锁,又不想挂起等待,可以调用pthread_mutex_trylock,如果Mutex已经
被另一个线程获得,这个函数会失败返回EBUSY,而不会使线程挂起等待。
 
现在我们就用Mutex来改写一下刚才的程序:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NLOOP 5000
int counter;
/* incremented by threads */
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
void *doit(void *);
int main(int argc, char **argv)
{
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, doit, NULL);
pthread_create(&tidB, NULL, doit, NULL);
/* wait for both threads to terminate */
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
return 0;
}
void *doit(void *vptr)
{
int
i, val;
/*
* Each thread fetches, prints, and increments the counter
NLOOP times.
* The value of the counter should increase monotonically.
*/
for (i = 0; i < NLOOP; i++) {
pthread_mutex_lock(&counter_mutex);//如果了解了函数的可重入性与线程安全我们就知道,我们的锁应该是加在全局变量上
val = counter;//counter全局变量是导致非线程安全的所在
printf("%x: %d\n", (unsigned int)pthread_self(),
val + 1);
counter = val + 1;
pthread_mutex_unlock(&counter_mutex);//释放锁
}
return NULL;
}
 这样运行一下就可得到10000的正确结果.

在这里简单初充一下Mutex的简单实现原理,
当Mutex为1时表示锁空闲(未被取得),为0时表示锁已被取得.当线程进行lock() or trylock()操作时,如果Mutex为1,则被取得.
如果为0时,那么调用lock的线程则会被挂起,如果不想被挂起则调用trylock().


lock:
if(mutex > 0){
mutex = 0;
return 0;
} else
挂起等待;
goto lock;
unlock:
mutex = 1;
唤醒等待Mute
 而对于unlock()操作可以有多种实现,可以只唤醒一个等待线程,也可以让等待线程队列去竞争这个锁资料(可能按优先级,FIFO等调度算法去选择).

由以上可知,我们引进Mutex是为了实现一组原子操作,这个互斥过程在大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存
器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问
内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待
总线周期。现在我们把lock和unlock的伪代码改一下(以x86的xchg指令为例):
lock:
movb $0, %al
xchgb %al, mutex
if(al寄存器的内容 > 0){
return 0;
} else
挂起等待;
goto lock;
unlock:
movb $1, mutex
唤醒等待Mutex的线程;
return 0;
 
unlock中的释放锁操作同样只用一条指令实现,以保证它的原子性。

Condition Variable

线程间的同步还有这样一种情况:线程A需要等某个条件成立才能继续往下执行,现在这个条件
不成立,线程A就阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执
行。在pthread库中通过条件变量(Condition Variable)来阻塞等待一个条件,或者唤醒等待这
个条件的线程。Condition Variable用pthread_cond_t 类型的变量表示,可以这样初始化和销
毁:
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
 
返回值:成功返回0,失败返回错误号。
和Mutex的初始化和销毁类似,pthread_cond_init函数初始化一个Condition Variable,attr 参
数为NULL 则表示缺省属性,pthread_cond_destroy 函数销毁一个Condition Variable。如
果Condition Variable是静态分配的,也可以用宏定义PTHEAD_COND_INITIALIZER 初始化,相当于
用pthread_cond_init函数初始化并且attr 参数为NULL 。Condition Variable的操作可以用下列函
数:
#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
 
返回值:成功返回0,失败返回错误号。
可见,一个Condition Variable总是和一个Mutex搭配使用的。一个线程可以调
用pthread_cond_wait在一个Condition Variable上阻塞等待,这个函数做以下三步操作:
1. 释放Mutex
2. 阻塞等待
3. 当被唤醒时,重新获得Mutex并返回
pthread_cond_timedwait 函数还有一个额外的参数可以设定等待超时,如果到达了abstime 所指
定的时刻仍然没有别的线程来唤醒当前线程,就返回ETIMEDOUT 。一个线程可以调
用pthread_cond_signal 唤醒在某个Condition Variable上等待的另一个线程,也可以调
用pthread_cond_broadcast唤醒在这个Condition Variable上等待的所有线程。
下面的程序演示了一个生产者-消费者的例子,生产者生产一个结构体串在链表的表头上,消费
者从表头取走结构体。
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
struct msg {
struct msg *next;
int num;
};
struct msg *head;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void *consumer(void *p)
{
struct msg *mp;
for (;;) {
pthread_mutex_lock(&lock);
while (head == NULL)
pthread_cond_wait(&has_product, &lock);
mp = head;
head = mp->next;
pthread_mutex_unlock(&lock);
printf("Consume %d\n", mp->num);
free(mp);
sleep(rand() % 5);
}
}
void *producer(void *p)
{
struct msg *mp;
for (;;) {
mp = malloc(sizeof(struct msg));
mp->num = rand() % 1000 + 1;
printf("Produce %d\n", mp->num);
pthread_mutex_lock(&lock);
mp->next = head;
head = mp;
pthread_mutex_unlock(&lock);
pthread_cond_signal(&has_product);
sleep(rand() % 5);
}
}
int main(int argc, char *argv[])
{
pthread_t pid, cid;
srand(time(NULL));
pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);
pthread_join(pid, NULL);
pthread_join(cid, NULL);
return 0;
}
执行结果如下:
$ ./a.out
Produce 744
Consume 744
Produce 567
Produce 881
Consume 881
Produce 911
Consume 911
Consume 567
Produce 698
Consume 698
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics