我需要的 pthread 线程集结点功能,使用同一集结点的线程将通过 rend_wait 函数等待,当集结点到达指定数量的线程后同时激发继续执行。使用 pthread 的 mutex 和 cond 超轻量实现。下面 rend.h 是集结点实现,rendezvous.c 是测试应用。
- /*
- *rend.h
- *
- *Createdon:2009-11-14
- *Author:liuzy(lzy.dev@gmail.com)
- */
- #ifndefREND_H_
- #defineREND_H_
- #include<pthread.h>
- #include<assert.h>
- struct rend_t{
- volatile int count;
- pthread_mutex_tcount_lock;
- pthread_cond_tready;
- };
- #defineDECLARE_REND(name,count)/
- struct rend_tname={(count),PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER}
- int rend_init( struct rend_t*prend, int count){
- int ret=0;
- assert(prend);
- prend->count=count;
- if ((ret=pthread_mutex_init(&prend->count_lock,NULL)))
- return ret;
- if ((ret=pthread_cond_init(&prend->ready,NULL)))
- return ret;
- return EXIT_SUCCESS;
- }
- int rend_wait( struct rend_t*prend){
- int ret=0;
- assert(prend);
- if ((ret=pthread_mutex_lock(&prend->count_lock)))
- return ret;
- /*checkcountvalueisreadytoweakupblockcode*/
- if (prend->count==1){
- if ((ret=pthread_cond_broadcast(&prend->ready)))
- return ret;
- if ((ret=pthread_mutex_unlock(&prend->count_lock)))
- return ret;
- } else {
- prend->count--;
- ret=pthread_cond_wait(&prend->ready,&prend->count_lock);
- prend->count++;
- if (ret){
- pthread_mutex_unlock(&prend->count_lock);
- return ret;
- }
- if ((ret=pthread_mutex_unlock(&prend->count_lock)))
- return ret;
- }
- return EXIT_SUCCESS;
- }
- int rend_free( struct rend_t*prend){
- int ret=0;
- assert(prend);
- prend->count=0;
- if ((ret=pthread_mutex_destroy(&prend->count_lock)))
- return ret;
- if ((ret=pthread_cond_destroy(&prend->ready)))
- return ret;
- return EXIT_SUCCESS;
- }
- #endif/*REND_H_*/
rend 使用更简单:
- 定义/初始化 rend_t 集结点对象。DECLARE_REND 宏用于静态定义,rend_init 函数可以对动态创建的集结点结构初始化;
- pthread 线程通过调用 rend_wait 函数 P/V 集结状态。集结关系的线程要 P/V 在同一个 rend_t 集结对象上;
- 释放集结对象,rend_free 函数。
以上函数都是成功返回 0,出错返回 errno 值(非 0)。
- /*
- ==============================
- Name:rendezvous.c
- Author:liuzy(lzy.dev@gmail.com)
- Version:0.1
- ==============================
- */
- #include<stdio.h>
- #include<stdlib.h>
- #include<stdarg.h>/*va_list*/
- #include<unistd.h>
- #include<string.h>
- #include<errno.h>/*errno*/
- #include<syslog.h>/*forsyslog(2)andlevel*/
- #include<pthread.h>
- #include"rend.h"
- static int daemon_proc=0; /*forsysloginerr_doit*/
- #defineMAXLINE4096/*maxtextlinelength*/
- void err_doit( int errnoflag, int level, const char *fmt, va_list ap){
- char buf[MAXLINE+1]={0};
- int errno_save=errno,n=0;
- #ifdefHAVE_VSNPRINTF
- vsnprintf(buf,MAXLINE,fmt,ap);
- #else
- vsprintf(buf,fmt,ap);
- #endif/*HAVE_VSNPRINTF*/
- n=strlen(buf);
- if (errnoflag)
- snprintf(buf+n,MAXLINE-n, ":%s" ,strerror(errno_save));
- strcat(buf, "/n" );
- if (daemon_proc){
- syslog(level, "%s" ,buf);
- } else {
- fflush(stdout);
- fputs(buf,stderr);
- fflush(stderr);
- }
- return ;
- }
- void err_msg( const char *fmt,...){
- va_list ap;
- va_start(ap,fmt);
- err_doit(0,LOG_INFO,fmt,ap);
- va_end(ap);
- return ;
- }
- void err_sys( const char *fmt,...){
- va_list ap;
- va_start(ap,fmt);
- err_doit(1,LOG_ERR,fmt,ap);
- va_end(ap);
- exit(EXIT_FAILURE);
- }
- #defineTHREAD_COUNT100/*rendezvoustestthreadworkers*/
- struct worker_arg{
- int worker_id;
- struct rend_t*prend;
- };
- static void *pthread_worker( void *arg){
- struct worker_arg*parg=( struct worker_arg*)arg;
- err_msg( "worker#%drunning." ,( int )parg->worker_id);
- srand(parg->worker_id*2);
- sleep(rand()%5);
- rend_wait(parg->prend); /*workersrendezvous*/
- err_msg( "worker#%dexiting." ,( int )parg->worker_id);
- return EXIT_SUCCESS;
- }
- int main( void ){
- int idx=0;
- void *exitcode=NULL;
- pthread_tthds[THREAD_COUNT];
- struct worker_argarg[THREAD_COUNT];
- DECLARE_REND(rend,THREAD_COUNT);
- err_msg( "workerscreating." );
- for (idx=0;idx<THREAD_COUNT;idx++){
- arg[idx].prend=&rend;
- arg[idx].worker_id=idx;
- if (pthread_create(thds+idx,NULL,pthread_worker,( void *)&arg[idx]))
- err_sys( "worker#%dcreateerror." ,idx);
- }
- puts( "workersexiting." );
- for (idx=0;idx<THREAD_COUNT;idx++)
- if (pthread_join(thds[idx],&exitcode)||(exitcode!=EXIT_SUCCESS))
- err_msg( "worker#%dexiterror." ,idx);
- err_msg( "alldone.exit0." );
- rend_free(&rend);
- return EXIT_SUCCESS;
- }
看了下 semaphore os syscall 及其 infrastructure,也许以后还需要进程间(非 pthread)集结时用得上。kernel 实现的超强啊,呵呵~
// 2009.11.17 14:34 添加 ////
快速用户空间互斥锁(Futex)
快速用户空间互斥锁(fast userspace mutex,Futex)是快速的用户空间的锁,是对传统的System V同步方式的一种替代,传统同步方式如:信号量、文件锁和消息队列,在每次锁访问时需要进行系统调用。而futex仅在有竞争的操作时才用系统调用访问内核,这样,在竞争出现较少的情况下,可以大幅度地减少工作负载
futex在非竞争情况下可从用户空间获取和释放,不需要进入内核。与信号量类似,它有一个可以原子增减的计数器,进程可以等待计数器值变为正数。用户进程通过系统调用对资源的竞争作一个公断。
futex 是一个用户空间的整数值,被多个线程或进程共享。Futex的系统调用对该整数值时进行操作,仲裁竞争的访问。 glibc中的NPTL库封装了futex 系统调用,对futex接口进行了抽象。用户通过NPTL库像传统编程一样地使用线程同步API函数,而不会感觉到futex的存在。
futex 的实现机制是:如果当前进程访问临界区时,该临界区正被另一个进程使用,当前进程将锁用一个值标识,表示“有一个等待者正挂起”,并且调用 sys_futex(FUTEX_WAIT)等待其他进程释放它。内核在内部创建futex队列,以便以后与唤醒者匹配等待者。当临界区拥有者线程释放了 futex,它通过变量值发出通知表示还有多个等待者在挂起,并调用系统调用sys_futex(FUTEX_WAKE)唤醒它们。一旦所有等待者已获取资源并释放锁时,futex回到非竞争状态,并没有内核状态与它相关。
robust futex是为了解决futex锁崩溃而对futex进行了增强。例如:当一个进程在持有pthread_mutex_t锁正与其他进程发生竞争时,进程因某种意外原因而提前退出,如:进程发生段错误,或者被用户用shell命令kill -9-ed”强行退出,此时,需要有一种机制告诉等待者“锁的最一个持有者已经非正常地退出”。“
为了解决此类问题,NPTL创建了robust mutex用户空间API pthread_mutex_lock(),如果锁的拥有者进程提前退出,pthread_mutex_lock()返回一个错误值,新的拥有者进程可以决定是否可以安全恢复被锁保护的数据。
futex在非竞争情况下可从用户空间获取和释放,不需要进入内核。与信号量类似,它有一个可以原子增减的计数器,进程可以等待计数器值变为正数。用户进程通过系统调用对资源的竞争作一个公断。
futex 是一个用户空间的整数值,被多个线程或进程共享。Futex的系统调用对该整数值时进行操作,仲裁竞争的访问。 glibc中的NPTL库封装了futex 系统调用,对futex接口进行了抽象。用户通过NPTL库像传统编程一样地使用线程同步API函数,而不会感觉到futex的存在。
futex 的实现机制是:如果当前进程访问临界区时,该临界区正被另一个进程使用,当前进程将锁用一个值标识,表示“有一个等待者正挂起”,并且调用 sys_futex(FUTEX_WAIT)等待其他进程释放它。内核在内部创建futex队列,以便以后与唤醒者匹配等待者。当临界区拥有者线程释放了 futex,它通过变量值发出通知表示还有多个等待者在挂起,并调用系统调用sys_futex(FUTEX_WAKE)唤醒它们。一旦所有等待者已获取资源并释放锁时,futex回到非竞争状态,并没有内核状态与它相关。
robust futex是为了解决futex锁崩溃而对futex进行了增强。例如:当一个进程在持有pthread_mutex_t锁正与其他进程发生竞争时,进程因某种意外原因而提前退出,如:进程发生段错误,或者被用户用shell命令kill -9-ed”强行退出,此时,需要有一种机制告诉等待者“锁的最一个持有者已经非正常地退出”。“
为了解决此类问题,NPTL创建了robust mutex用户空间API pthread_mutex_lock(),如果锁的拥有者进程提前退出,pthread_mutex_lock()返回一个错误值,新的拥有者进程可以决定是否可以安全恢复被锁保护的数据。
有几点不还不理解:
- “futex 如果说是一个用户空间的整数值,那怎么被多个进程共享?Futex 系统调用在 kernel 态怎么操作该值并仲裁竞争?这是那种直接映射到 userspace 的 kernel 地址么。 这个需要程序间通过 mmap 在共享段中访问,与 futex 没什么关系。
- 这个“robust futex”机制指的应该就是 SVRx 传统 sem IPC 里的 SEM_UNDO flag 吧?
一篇不错的文章,引发对 glibc nptl 实现源码的探索: