简单实现Linux C下的线程池.

系统 3319 0

What I write, what I lose.

 

之前有点时间, 重新熟悉Linux的进程间通讯的东西.

于是想起之前项目中自己写啦个很简单的线程池. 

这次想重新写下.

主要目的是用进程间或者线程间通信的阻塞/取消阻塞方法实现对线程池线程的等待作业和开始作业.

算是对这些代码的一种实践.

以上.

===================================================================

我对一个简单线程池的一些理解.

1.创建大量的线程.

2.工作线程的执行体功能为:

  while(1)

  {

    //按照一定条件(A)阻塞.

 

    //按照任务的参数设置开始执行任务. 

  }

3.控制线程的功能为.

  {

    //接受新任务的参数, 一般为回调函数+参数. (为保持兼容, 我设置的格式为 (void*)(*thread_task)(void*) + void* . 跟线程创建保持形式兼容.)

    //按照一定规则查找空闲的线程.

    //将接受的新任务参数赋给这条线程数据体.

    //解除这条线程的阻塞条件.

  }

 

===================================================================

common-thread-pool.c     线程池主要实现+一个简单的测试代码.

                接口没有拿出来.

thread-control.h         提供线程池线程的等待作业和开始作业接口.

thread-control-xxxxx.c     thread-control.h的接口实现. 可以使用多种方式.

 

 

common-thread-pool.c 

View Code
        #include <stdio.h>
        
#include <stdlib.h>
#include <unistd.h>
#include < string .h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>

#define DBGPRINTF_DEBUG printf
#define DBGPRINTF_ERROR printf

#define ASSERT assert



#include " thread-control.h "

typedef void *(*thread_task_func)( void * arg);

/* 线程执行任务的数据. */
struct _thread_task_t
{
int taskid; /* 任务id. */
thread_task_func task_func; /* 任务函数及参数 */
void * task_arg;
};
typedef struct _thread_task_t thread_task_t;

/* 线程状态. */
typedef enum
{
ethread_status_unknown = 0 ,
ethread_status_idle ,
ethread_status_running ,
ethread_status_terminel ,
ethread_status_cannotuse ,
}thread_status_e;

/* 线程数据. */
struct _thread_data_t
{
int thread_id;
pthread_t pid;
thread_status_e status;

thread_task_t thread_task;
THREAD_CONTROL thread_control;
};
typedef struct _thread_data_t thread_data_t;

/* 线程池数据. */
struct _thread_pool_t
{
thread_data_t* thread_data_set;
int num_thread;
int taskid_base;

pthread_mutex_t thread_pool_lock;
};
typedef struct _thread_pool_t thread_pool_t;

thread_pool_t g_thread_pool;

/* 设置线程状态. */
int thread_pool_setthreadstatus(thread_data_t* thread_data, thread_status_e status)
{
thread_pool_t* thread_pool = &g_thread_pool;
pthread_mutex_lock(&(thread_pool->thread_pool_lock));

thread_data->status = status;

pthread_mutex_unlock(&(thread_pool->thread_pool_lock));

return 0 ;
}


/* 线程池线程函数体. */
void * thread_pool_func( void * arg)
{
sleep( 1 ); // Wait pthread_t count.

thread_data_t* thread_data = (thread_data_t*)arg;
DBGPRINTF_DEBUG( " Thread start run. Thread_id = %d, pid = 0x%x . \n " ,
thread_data->thread_id, (unsigned int )thread_data->pid);

/*
Continue to wait the task, then based on new task_func and task_arg to perform this task.
*/
while ( 1 )
{
thread_control_wait(thread_data->thread_control);

// Need to lock? Yes.
thread_pool_setthreadstatus(thread_data, ethread_status_running);
DBGPRINTF_DEBUG( " Task start. taskid = %d .\n " , thread_data->thread_task.taskid);

thread_data->thread_task.task_func(thread_data->thread_task.task_arg);

DBGPRINTF_DEBUG( " Task end. taskid = %d .\n " , thread_data->thread_task.taskid);
// Need to lock?Yes.
thread_pool_setthreadstatus(thread_data, ethread_status_idle);
}

DBGPRINTF_DEBUG( " Thread end run. Thread_id = %d, pid = 0x%x . \n " ,
thread_data->thread_id, (unsigned int )thread_data->pid);
}


int thread_task_init(thread_task_t* thread_task)
{

thread_task->taskid = - 1 ;
thread_task->task_func = NULL;
thread_task->task_arg = NULL;

return 0 ;
}


int thread_data_init(thread_data_t* thread_data)
{
thread_data->thread_id = - 1 ;
thread_data->pid = 0x0 ;
thread_data->status = ethread_status_unknown ,

thread_task_init(&(thread_data->thread_task));
thread_control_init(&(thread_data->thread_control));

return 0 ;
}


int thread_pool_create( int num_thread)
{
ASSERT(num_thread > 0 && num_thread <= 10 * 1024 );
thread_pool_t* thread_pool = &g_thread_pool;

int i = 0 ;
thread_pool->thread_data_set = (thread_data_t*)malloc( sizeof (thread_data_t) * num_thread);
ASSERT(thread_pool->thread_data_set != NULL);
thread_pool->num_thread = num_thread;
thread_pool->taskid_base = - 1 ;
pthread_mutex_init(&(thread_pool->thread_pool_lock), NULL);

for (i= 0 ; i<num_thread; i++)
{
thread_data_t* thread_data = thread_pool->thread_data_set+i;
thread_data_init(thread_data);
thread_data->thread_id = i;
thread_data->status = ethread_status_idle;

/* pthread_create set to detached. */
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int ret = pthread_create(&(thread_data->pid), &attr, thread_pool_func, thread_data);
if (ret != 0 )
{
DBGPRINTF_DEBUG( " pthread_create error[%d].\n " , i);
break ;
}
}

sleep( 2 );

return 0 ;
}




void * test_func( void * arg)
{
int t_sleep = ( int )arg;
DBGPRINTF_DEBUG( " Test func. Sleep %d .\n " , t_sleep);
/*
int a[2048*1024] = {0};
int i = 0;
for(i=0; i<2028*1024; i++)
{
a[i] = i*i;
}
DBGPRINTF_DEBUG("a[0]=%d. \n", a[0]);
*/
sleep(t_sleep);

DBGPRINTF_DEBUG( " Test func finished. \n " );

return NULL;
}


/* 查询可接收任务的线程. */
int thread_pool_queryfree(thread_data_t** thread_data_found)
{
*thread_data_found = NULL;
thread_pool_t* thread_pool = &g_thread_pool;
pthread_mutex_lock(&(thread_pool->thread_pool_lock));

int i = 0 ;
for (i= 0 ; i<thread_pool->num_thread; i++)
{
thread_data_t* thread_data = thread_pool->thread_data_set+i;
if (thread_data->status == ethread_status_idle)
{
*thread_data_found = thread_data;
break ;
}
}

pthread_mutex_unlock(&(thread_pool->thread_pool_lock));

return 0 ;
}

/* 分配taskid. */
int thread_pool_gettaskid( int * taskid)
{
thread_pool_t* thread_pool = &g_thread_pool;
pthread_mutex_lock(&(thread_pool->thread_pool_lock));

thread_pool->taskid_base ++;
*taskid = thread_pool->taskid_base;

pthread_mutex_unlock(&(thread_pool->thread_pool_lock));

return 0 ;
}

/* 向线程池增加任务. */
int thread_pool_addtask(thread_task_func task_func, void * arg)
{
/* Find a free thread. */
thread_data_t* thread_data_found = NULL;
thread_pool_queryfree(&thread_data_found);

if (thread_data_found != NULL)
{
DBGPRINTF_DEBUG( " Thread [%d] perferm this task.\n " , thread_data_found->thread_id);

/* Set task data. */
thread_data_found->thread_task.task_func = task_func;
thread_data_found->thread_task.task_arg = arg;
thread_pool_gettaskid(&(thread_data_found->thread_task.taskid));

/* Start the task. */
thread_pool_setthreadstatus(thread_data_found, ethread_status_running);
thread_control_start(thread_data_found->thread_control);
DBGPRINTF_DEBUG( " Thread [%d] Add task[%d] finished.\n " ,
thread_data_found->thread_id, thread_data_found->thread_task.taskid);
}
else
{
DBGPRINTF_ERROR( " Thread pool full. Task not added.\n " );
}

return 0 ;
}




int main()
{
thread_pool_create( 10 );
// thread_pool_create(10);
thread_pool_addtask(test_func, ( void *)( 1 << 0 ));
thread_pool_addtask(test_func, ( void *)( 1 << 1 ));
thread_pool_addtask(test_func, ( void *)( 1 << 2 ));
thread_pool_addtask(test_func, ( void *)( 1 << 3 ));
thread_pool_addtask(test_func, ( void *)( 1 << 4 ));
thread_pool_addtask(test_func, ( void *)( 1 << 5 ));
thread_pool_addtask(test_func, ( void *)( 1 << 6 ));
thread_pool_addtask(test_func, ( void *)( 1 << 7 ));

sleep( 6 );
thread_pool_addtask(test_func, ( void *)( 1 << 0 ));
thread_pool_addtask(test_func, ( void *)( 1 << 1 ));
thread_pool_addtask(test_func, ( void *)( 1 << 2 ));
thread_pool_addtask(test_func, ( void *)( 1 << 3 ));
thread_pool_addtask(test_func, ( void *)( 1 << 4 ));
thread_pool_addtask(test_func, ( void *)( 1 << 5 ));
thread_pool_addtask(test_func, ( void *)( 1 << 6 ));
thread_pool_addtask(test_func, ( void *)( 1 << 7 ));

sleep( 100000 );

return 0 ;
}


thread-control.h

View Code
        
          #define
        
         THREAD_CONTROL  void*
        





int thread_control_init(THREAD_CONTROL* thread_control);
int thread_control_deinit(THREAD_CONTROL* thread_control);
int thread_control_wait(THREAD_CONTROL thread_control);
int thread_control_start(THREAD_CONTROL thread_control);

 

thread-control.h的接口实现. 可以使用多种方式.

只要进程间通信/线程间通信中存在阻塞等待/解除阻塞等待的都可以拿来作实验.

比如:条件变量.
thread-control-condition.c   

View Code
        #include <stdio.h>
        
#include <stdlib.h>
#include <unistd.h>
#include < string .h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>

#define DBGPRINTF_DEBUG printf
#define DBGPRINTF_ERROR printf

#define ASSERT assert



#include " thread-control.h "

struct _thread_control_cond_t
{
pthread_mutex_t lock ;
pthread_cond_t condition;
};
typedef struct _thread_control_cond_t thread_control_cond_t;






int thread_control_init(THREAD_CONTROL* thread_control)
{
*thread_control = NULL;

thread_control_cond_t* cond = (thread_control_cond_t*)malloc( sizeof (thread_control_cond_t));
assert(cond != NULL);

pthread_mutex_init(&(cond-> lock ), NULL);
pthread_cond_init(&(cond->condition), NULL);

*thread_control = cond;

return 0 ;
}


int thread_control_deinit(THREAD_CONTROL* thread_control)
{

thread_control_cond_t* cond = (thread_control_cond_t*)(*thread_control);

pthread_mutex_destroy(&(cond-> lock ));
pthread_cond_destroy(&(cond->condition));

free(cond);
*thread_control = NULL;

return 0 ;
}


int thread_control_wait(THREAD_CONTROL thread_control)
{
thread_control_cond_t* cond = (thread_control_cond_t*)(thread_control);

// Wait pthread condition.
pthread_mutex_lock(&(cond-> lock ));
pthread_cond_wait(&(cond->condition), &(cond-> lock ));
pthread_mutex_unlock(&(cond-> lock ));

return 0 ;
}


int thread_control_start(THREAD_CONTROL thread_control)
{
thread_control_cond_t* cond = (thread_control_cond_t*)(thread_control);

// start pthread condition.
pthread_mutex_lock(&(cond-> lock ));
pthread_cond_signal(&(cond->condition));
pthread_mutex_unlock(&(cond-> lock ));

return 0 ;
}


比如:有名管道.

thread-control-fifopipe.c

View Code
        #include <stdio.h>
        
#include <stdlib.h>
#include <unistd.h>
#include < string .h>
#include <pthread.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>

#define DBGPRINTF_DEBUG printf
#define DBGPRINTF_ERROR printf

#define ASSERT assert



#include " thread-control.h "

static int path_index = 0 ;
#define LEN_CMD_PATH 10
struct _fifopipe_control_t
{
char fifopipe_cmd_path[LEN_CMD_PATH];
};
typedef struct _fifopipe_control_t fifopipe_control_t;


int thread_control_init(THREAD_CONTROL* thread_control)
{
*thread_control = NULL;

fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)malloc( sizeof (fifopipe_control_t));
assert(fifopipe_control != NULL);

path_index ++;
snprintf(fifopipe_control->fifopipe_cmd_path, LEN_CMD_PATH, " ./xxx%d " , path_index);

int ret = mkfifo(fifopipe_control->fifopipe_cmd_path, 0666 /* (O_CREAT | O_RDWR) */ );
assert(ret == 0 );


*thread_control = fifopipe_control;

return 0 ;
}


int thread_control_deinit(THREAD_CONTROL* thread_control)
{
fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(*thread_control);


free(fifopipe_control);
*thread_control = NULL;

return 0 ;
}


int thread_control_wait(THREAD_CONTROL thread_control)
{
fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(thread_control);

int fd = open(fifopipe_control->fifopipe_cmd_path, O_RDONLY, 0 );
assert(fd> 0 );

char tmp = 0 ;
read(fd, &tmp, 1 );

return 0 ;
}


int thread_control_start(THREAD_CONTROL thread_control)
{
fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(thread_control);

int fd = open(fifopipe_control->fifopipe_cmd_path, O_WRONLY, 0 );
assert(fd> 0 );

char tmp = 0 ;
write(fd, &tmp, 1 );

return 0 ;
}

 

比如:管道, 消息队列, socket, while(condition?){sleep}等等. 

 

以上代码中, 注释的比较少. 

差不多.其实我都有点不知道自己在写什么.

简单实现Linux C下的线程池.


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论