我需要的 pthread 线程集结点功能,使用同一集结点的线程将通过 rend_wait 函数等待,当集结点到达指定数量的线程后同时激发继续执行。使用 pthread 的 mutex 和 cond 超轻量实现。下面 rend.h 是集结点实现,rendezvous.c 是测试应用。
/*
* rend.h
*
* Created on: 2009-11-14
* Author: liuzy (lzy.dev@gmail.com)
*/
#ifndef REND_H_
#define REND_H_
#include <pthread.h>
#include <assert.h>
struct rend_t {
volatile int count;
pthread_mutex_t count_lock;
pthread_cond_t ready;
};
#define DECLARE_REND(name, count) \
struct rend_t name = {(count), PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER}
int rend_init(struct rend_t* prend, int count) {
int ret = 0;
assert(prend);
prend->count = count;
if ((ret = pthread_mutex_init(&prend->count_lock, NULL)))
return ret;
if ((ret = pthread_cond_init(&prend->ready, NULL)))
return ret;
return EXIT_SUCCESS;
}
int rend_wait(struct rend_t* prend) {
int ret = 0;
assert(prend);
if ((ret = pthread_mutex_lock(&prend->count_lock)))
return ret;
/* check count value is ready to weak up block code */
if (prend->count == 1) {
if ((ret = pthread_cond_broadcast(&prend->ready)))
return ret;
if ((ret = pthread_mutex_unlock(&prend->count_lock)))
return ret;
} else {
prend->count--;
ret = pthread_cond_wait(&prend->ready, &prend->count_lock);
prend->count++;
if (ret) {
pthread_mutex_unlock(&prend->count_lock);
return ret;
}
if ((ret = pthread_mutex_unlock(&prend->count_lock)))
return ret;
}
return EXIT_SUCCESS;
}
int rend_free(struct rend_t* prend) {
int ret = 0;
assert(prend);
prend->count = 0;
if ((ret = pthread_mutex_destroy(&prend->count_lock)))
return ret;
if ((ret = pthread_cond_destroy(&prend->ready)))
return ret;
return EXIT_SUCCESS;
}
#endif /* REND_H_ */
rend 使用更简单:
- 定义/初始化 rend_t 集结点对象。DECLARE_REND 宏用于静态定义,rend_init 函数可以对动态创建的集结点结构初始化;
- pthread 线程通过调用 rend_wait 函数 P/V 集结状态。集结关系的线程要 P/V 在同一个 rend_t 集结对象上;
- 释放集结对象,rend_free 函数。
以上函数都是成功返回 0,出错返回 errno 值(非 0)。
/*
==============================
Name : rendezvous.c
Author : liuzy (lzy.dev@gmail.com)
Version : 0.1
==============================
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h> /* va_list */
#include <unistd.h>
#include <string.h>
#include <errno.h> /* errno */
#include <syslog.h> /* for syslog(2) and level */
#include <pthread.h>
#include "rend.h"
static int daemon_proc = 0; /* for syslog in err_doit */
#define MAXLINE 4096 /* max text line length */
void err_doit(int errnoflag, int level, const char* fmt, va_list ap) {
char buf[MAXLINE + 1] = { 0 };
int errno_save = errno, n = 0;
#ifdef HAVE_VSNPRINTF
vsnprintf(buf, MAXLINE, fmt, ap);
#else
vsprintf(buf, fmt, ap);
#endif /* HAVE_VSNPRINTF */
n = strlen(buf);
if (errnoflag)
snprintf(buf + n, MAXLINE - n, ": %s", strerror(errno_save));
strcat(buf, "\n");
if (daemon_proc) {
syslog(level, "%s", buf);
} else {
fflush(stdout);
fputs(buf, stderr);
fflush(stderr);
}
return;
}
void err_msg(const char* fmt, ...) {
va_list ap;
va_start(ap, fmt);
err_doit(0, LOG_INFO, fmt, ap);
va_end(ap);
return;
}
void err_sys(const char* fmt, ...) {
va_list ap;
va_start(ap, fmt);
err_doit(1, LOG_ERR, fmt, ap);
va_end(ap);
exit(EXIT_FAILURE);
}
#define THREAD_COUNT 100 /* rendezvous test thread workers */
struct worker_arg {
int worker_id;
struct rend_t* prend;
};
static void* pthread_worker(void* arg) {
struct worker_arg* parg = (struct worker_arg*) arg;
err_msg("worker #%d running.", (int) parg->worker_id);
srand(parg->worker_id * 2);
sleep(rand() % 5);
rend_wait(parg->prend); /* workers rendezvous */
err_msg("worker #%d exiting.", (int) parg->worker_id);
return EXIT_SUCCESS;
}
int main(void) {
int idx = 0;
void* exitcode = NULL;
pthread_t thds[THREAD_COUNT];
struct worker_arg arg[THREAD_COUNT];
DECLARE_REND(rend, THREAD_COUNT);
err_msg("workers creating.");
for (idx = 0; idx < THREAD_COUNT; idx++) {
arg[idx].prend = &rend;
arg[idx].worker_id = idx;
if (pthread_create(thds + idx, NULL, pthread_worker, (void*) &arg[idx]))
err_sys("worker #%d create error.", idx);
}
puts("workers exiting.");
for (idx = 0; idx < THREAD_COUNT; idx++)
if (pthread_join(thds[idx], &exitcode) || (exitcode != EXIT_SUCCESS))
err_msg("worker #%d exit error.", idx);
err_msg("all done. exit 0.");
rend_free(&rend);
return EXIT_SUCCESS;
}
看了下 semaphore os syscall 及其 infrastructure,也许以后还需要进程间(非 pthread)集结时用得上。kernel 实现的超强啊,呵呵~
// 2009.11.17 14:34 添加 ////
快速用户空间互斥锁(Futex)
快速用户空间互斥锁(fast userspace mutex,Futex)是快速的用户空间的锁,是对传统的System V同步方式的一种替代,传统同步方式如:信号量、文件锁和消息队列,在每次锁访问时需要进行系统调用。而futex仅在有竞争的操作时才用系统调用访问内核,这样,在竞争出现较少的情况下,可以大幅度地减少工作负载
futex在非竞争情况下可从用户空间获取和释放,不需要进入内核。与信号量类似,它有一个可以原子增减的计数器,进程可以等待计数器值变为正数。用户进程通过系统调用对资源的竞争作一个公断。
futex 是一个用户空间的整数值,被多个线程或进程共享。Futex的系统调用对该整数值时进行操作,仲裁竞争的访问。
glibc中的NPTL库封装了futex 系统调用,对futex接口进行了抽象。用户通过NPTL库像传统编程一样地使用线程同步API函数,而不会感觉到futex的存在。
futex 的实现机制是:如果当前进程访问临界区时,该临界区正被另一个进程使用,当前进程将锁用一个值标识,表示“有一个等待者正挂起”,并且调用 sys_futex(FUTEX_WAIT)等待其他进程释放它。内核在内部创建futex队列,以便以后与唤醒者匹配等待者。当临界区拥有者线程释放了 futex,它通过变量值发出通知表示还有多个等待者在挂起,并调用系统调用sys_futex(FUTEX_WAKE)唤醒它们。一旦所有等待者已获取资源并释放锁时,futex回到非竞争状态,并没有内核状态与它相关。
robust futex是为了解决futex锁崩溃而对futex进行了增强。例如:当一个进程在持有pthread_mutex_t锁正与其他进程发生竞争时,进程因某种意外原因而提前退出,如:进程发生段错误,或者被用户用shell命令kill -9-ed”强行退出,此时,需要有一种机制告诉等待者“锁的最一个持有者已经非正常地退出”。“
为了解决此类问题,NPTL创建了robust mutex用户空间API pthread_mutex_lock(),如果锁的拥有者进程提前退出,pthread_mutex_lock()返回一个错误值,新的拥有者进程可以决定是否可以安全恢复被锁保护的数据。
有几点不还不理解:
- “futex 如果说是一个用户空间的整数值,那怎么被多个进程共享?Futex 系统调用在 kernel 态怎么操作该值并仲裁竞争?这是那种直接映射到 userspace 的 kernel 地址么。这个需要程序间通过 mmap 在共享段中访问,与 futex 没什么关系。
- 这个“robust futex”机制指的应该就是 SVRx 传统 sem IPC 里的 SEM_UNDO flag 吧?
一篇不错的文章,引发对 glibc nptl 实现源码的探索:
关于信号量与线程互斥锁的区别与实现
分享到:
相关推荐
东南大学 操作系统实验2 用pthread实现矩阵相乘 含代码及报告
http://blog.csdn.net/infoworld/article/details/49798215 mingw 编译,Win32部分可以直接用vs编译.
pthread, window pthread
使用c++实现了基于pthread的并行矩阵乘法,同时为了提高程序运行效率,首先将矩阵进行转置
操作系统实验,用Pthread实现生产者和消费者问题
PThread header file
pthread.lib文件下载,本人用于crf++安装时使用。。 pthread.lib文件下载,本人用于crf++安装时使用。。 pthread.lib文件下载,本人用于crf++安装时使用。。 pthread.lib文件下载,本人用于crf++安装时使用。。
pthread_testcancel pthread_kill pthread_cancel 的使用例子
高斯消元法的并行实现,vs2015,pthread。同时结合了SSE和AVX,AVX X64下运行。
并行计算pthread多线程求pi的三种方法,编译后在命令行后输入线程数以及计算规模n运行
这是一个在Linux下实现的生产者-消费者进程同步经典问题。编译是使用“gcc -pthread synchro.c -o main"。运行时使用./main后 输入两个参数,第一个是生产者数目,第二个是消费者数目。程序运行30秒结束。
glibc中实现pthread的源码,pthread在linux系统上的实现主要在libc中,libc的实现nptl。
pthread使用手册
pthread affinity pthread affinity
提供c/c++多线程开发的API,兼容windows linux平台
主要是pthread API组成的一个简单的例子,更关键的是能够熟悉pthread_join,他能够一直阻塞整个进程直到他等待的那个线程结束才能继续执行他后面的代码。 编译使用gcc pthread.c -o pthread -pthread
利用Pthread多线程工具 实现桶排序的并行化,并在linux下调试通过。
里面包含pthread.h pthread.lib pthread.dll
Pthread Primer! Pthread线程经典之作!
详见: https://www.sourceware.org/pthreads-win32/