`
weiyuhu
  • 浏览: 231104 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

linux线程池及其测试

阅读更多
/*-------------------------------------------------------------------------
* tpool.h – 线程池定义
* -------------------------------------------------------------------------
*/
#ifndef _TPOOL_H_
#define _TPOOL_H_
#include  <stdio.h>
#include  <pthread.h>
/*工作线程链表*/
typedef struct tpool_work{
void (*handler_routine)();            /*任务函数指针*/
void *arg;                        /*任务函数参数*/
struct tpool_work *next;                /*下一个任务链表*/
} tpool_work_t;
/*线程池结构体*/
typedef struct tpool{
int num_threads;                    /*最大线程数*/
int max_queue_size;                /*最大任务链表数*/
int do_not_block_when_full;            /*当链表满时是否阻塞*/
pthread_t *threads;                /*线程指针*/
int cur_queue_size;
tpool_work_t *queue_head;            /*链表头*/
tpool_work_t *queue_tail;            /*链表尾*/
pthread_mutex_t queue_lock;        /*链表互斥量*/
pthread_cond_t queue_not_full;        /*链表条件量-未满*/
pthread_cond_t queue_not_empty;    /*链表条件量-非空*/
pthread_cond_t queue_empty;        /*链表条件量-空*/
int queue_closed;
int shutdown;
} tpool_t;
/* 初始化连接池 */
extern tpool_t *tpool_init(int num_worker_threads,\
int max_queue_size, int do_not_block_when_full);
/* 添加一个工作线程 */
extern int tpool_add_work(tpool_t *pool, void  (*routine)(), void *arg);
/* 清除线程池*/
extern int tpool_destroy(tpool_t *pool, int finish);
#endif /* _TPOOL_H_ */

/* -------------------------------------------------------------------------
* tpool.c – 线程池的实现
* -------------------------------------------------------------------------
*/
#include  <stdio.h>
#include  <stdlib.h>
#include  <string.h>
#include  <pthread.h>
#include "tpool.h"
#include "log.h"
/* 工作线程 */
void *tpool_thread(void *tpool);
/***************线程池初始化*****************************/
tpool_t *tpool_init(int num_worker_threads,\    /*线程池线程个数*/
int max_queue_size, \        /*最大任务数*/
int do_not_block_when_full)    /*是否阻塞任务满的时候*/
{
int i, rtn;
tpool_t *pool;
lprintf(log, INFO, "init pool begin ...\n");
/* 创建线程池结构体 */
if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL) {
lprintf(log, FATAL, "Unable to malloc() thread pool!\n");
return NULL;
}
/* 设置线程池架构体成员 */
pool->num_threads = num_worker_threads;                      /*工作线程个数*/
pool->max_queue_size = max_queue_size;                       /*任务链表最大长度*/
pool->do_not_block_when_full = do_not_block_when_full;       /*任务链表满时是否等待*/
/* 生成线程池缓存 */
if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL) {
lprintf(log, FATAL,"Unable to malloc() thread info array\n");/*虽然malloc()并不是返回数组指针,但仍然可以这样用,表示数组。calloc返回数组指针*/
return NULL;
}
/* 初始化任务链表 */
pool->cur_queue_size = 0;
pool->queue_head = NULL;
pool->queue_tail = NULL;
pool->queue_closed = 0;
pool->shutdown = 0;
/* 初始化互斥变量,条件变量 用于线程之间的同步 */
if((rtn = pthread_mutex_init(&(pool->queue_lock),NULL)) != 0) {
lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_full),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
/* 创建所有的线程 */ /*这里预创建的线程并不能运行,需要客户调度一个存在的函数才能工作。这里创建完线程以后,就放在那睡眠,直到唤醒*/
for(i = 0; i != num_worker_threads; i++) {
if( (rtn=pthread_create(&(pool->threads[i]),NULL,tpool_thread,(void*)pool)) != 0) { /*注意这里的pthread_create(),第一个参数是函数名,这里用malloc产生的指针(数组)来表示,第二个是函数实体tpool_thread,现在的tpool_thread执行部分是空的,就是说没有执行函数体*/
lprintf(log,FATAL,"pthread_create %s\n",strerror(rtn));
return NULL;
}
lprintf(log, INFO, "init pthread  %d!\n",i);
}
lprintf(log, INFO, "init pool end!\n");
return pool;
}
/*下面这个函数负责调度。把一个真实的函数调度到线程池,然后排队或者运行*/
int tpool_add_work(tpool_t *pool,   \           /*线程池指针*/
void (*routine)(void *),\   /*工作线程函数指针*/
void *arg)                  /*工作线程函数参数*/
{
int rtn;
tpool_work_t *workp; /*当前工作线程*/
if((rtn = pthread_mutex_lock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 采取独占的形式访问任务链表 */
if((pool->cur_queue_size == pool->max_queue_size) && \
(pool->do_not_block_when_full)) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 等待任务链表为新线程释放空间 */
while((pool->cur_queue_size == pool->max_queue_size) &&
(!(pool->shutdown || pool->queue_closed))) {
if((rtn = pthread_cond_wait(&(pool->queue_not_full),
&(pool->queue_lock)) ) != 0) {
lprintf(log,FATAL,"pthread cond wait failure\n");
return -1;
}
}
if(pool->shutdown || pool->queue_closed) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0)  {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 分配工作线程结构体 */
if((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL) {
lprintf(log,FATAL,"unable to create work struct\n");
return -1;
}
workp->handler_routine = routine;
workp->arg = arg;
workp->next = NULL;
if(pool->cur_queue_size == 0) {
pool->queue_tail = pool->queue_head = workp;
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0){
lprintf(log,FATAL,"pthread broadcast error\n");
return -1;
}
}
else {
pool->queue_tail->next = workp;
pool->queue_tail = workp;
}
pool->cur_queue_size++;
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
int tpool_destroy(tpool_t *pool, int finish)
{
int i, rtn;
tpool_work_t *cur;   /*当前工作线程*/
lprintf(log, INFO, "destroy pool begin!\n");
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_lock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 第一步,设置线程退出标记 */
lprintf(log, INFO, "destroy pool begin 1!\n");
if(pool->queue_closed || pool->shutdown) {
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
/* 第二步,禁止新任务加入任务链表 */
lprintf(log, INFO, "destroy pool begin 2!\n");
pool->queue_closed = 1;
if(finish) {
while(pool->cur_queue_size != 0) {
if((rtn = pthread_cond_wait(&(pool->queue_empty),&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread_cond_wait %d\n",rtn);
return -1;
}
}
}
/* 第三步,设置线程池销毁标记 */
lprintf(log, INFO, "destroy pool begin 3!\n");
pool->shutdown = 1;
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex unlock failure\n");
return -1;
}
/* 第四步,等待所有已建立的线程退出 */
lprintf(log, INFO, "destroy pool begin 4!\n");
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
if((rtn = pthread_cond_broadcast(&(pool->queue_not_full)))  != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
for(i = 0; i < pool->num_threads; i++) {
if((rtn = pthread_join(pool->threads[i],NULL)) != 0)
{
lprintf(log,FATAL,"pthread_join %d\n",rtn);
return -1;
}
}
/* 第五步,释放线程池所占的内存空间 */
free(pool->threads);
while(pool->queue_head != NULL) {
cur = pool->queue_head->next;
pool->queue_head = pool->queue_head->next;
free(cur);
}
free(pool);
lprintf(log, INFO, "destroy pool end!\n");
return 0;
}
void *tpool_thread(void *tpool)
{
tpool_work_t *my_work;
tpool_t *pool = (struct tpool *)tpool;

for(;;){/* 线程内循环 */
pthread_mutex_lock(&(pool->queue_lock)); 
/* 如果任务列表为0,并且线程池没有关闭,则一直等待,直到任务到来为止  */
while((pool->cur_queue_size == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_empty), &(pool->queue_lock));
}
/* 线程池是否已经关闭,如果线程池关闭则线程自己主动关闭 */
if(pool->shutdown){
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL);      /*线程退出状态为空,主线程不捕获各副线程状态*/
}
my_work = pool->queue_head;
pool->cur_queue_size--;
/*将任务链表头部去掉,改任务正在处理中*/
if(pool->cur_queue_size == 0)  
pool->queue_head = pool->queue_tail = NULL;
else
pool->queue_head = my_work->next;
/* 任务链表还没有满 */
if((!pool->do_not_block_when_full) &&\
(pool->cur_queue_size == (pool->max_queue_size - 1))) {
pthread_cond_broadcast(&(pool->queue_not_full));
}
/*任务链表为空*/
if(pool->cur_queue_size == 0) {
pthread_cond_signal(&(pool->queue_empty));
}
pthread_mutex_unlock(&(pool->queue_lock));
/*启动线程业务处理逻辑*/
(*(my_work->handler_routine))(my_work->arg);
free(my_work);  /*这里释放的是链表头部的线程结构体*/
}
return(NULL);
}

/* -------------------------------------------------------------------------
* log.h 记录函数定义
* -------------------------------------------------------------------------
*/
#ifndef __LOG_H
#define __LOG_H

#include <stdio.h>
#include <semaphore.h>
/*记录的最大长度*/
#define LOGLINE_MAX 1024
/*记录的等级*/
#define DEBUG 1
#define INFO  2
#define WARN  3
#define ERROR 4
#define FATAL 5
/*记录的类型*/
#define LOG_TRUNC   1<<0
#define LOG_NODATE  1<<1
#define LOG_NOLF    1<<2
#define LOG_NOLVL   1<<3
#define LOG_DEBUG   1<<4
#define LOG_STDERR  1<<5
#define LOG_NOTID   1<<6
typedef struct {
int fd;
sem_t sem;
int flags;
} log_t;

/*
* 功能描述:    记录打印函数,将记录打印至记录文件logfile。
* 参数:        log_t - log_open()函数的返回值
*             level - 可以是: DEBUG, INFO, WARN, ERROR, FATAL
*            fmt  - 记录的内容,格式同printf()函数
* 返回值:    成功返回0,失败返回-1
*/
int lprintf( log_t *log, unsigned int level, char *fmt, ... );
/*
* 功能描述: 初始化记录文件the logfile
*参数:    fname - 记录文件logfile的文件名
*         flags     -  记录格式的选项
*          LOG_TRUNC  -    截断打开的记录文件
*          LOG_NODATE -    忽略记录中的每一行
*          LOG_NOLF    -    自动为每条记录新开一行.
*          LOG_NOLVL   - 不记录消息的等级
*          LOG_STDERR -    将消息同时送到STDERR
*返回值:成功返回log_t(>0),失败返回NULL
*/
log_t *log_open( char *fname, int flags );

/*
* 功能描述:关闭记录文件
* 参数:        * log  - 记录文件的指针
*/
void log_close( log_t *log );

#endif
/* -------------------------------------------------------------------------
* log.c – 记录函数实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "log.h"
int lprintf( log_t *log, unsigned int level, char *fmt, ... ) {
int fd;
int rc;
va_list ap;
time_t now;
char date[50];
static char line[LOGLINE_MAX];
static char threadnum[10];
int cnt;
static char *levels[6] = { "[(bad)] ", "[debug] ", "[info ] ", "[warn ] ",  "[error] ", "[fatal] " };
if(!log) return -1;
if( !(log->flags&LOG_DEBUG) && level == DEBUG ) return 0;
fd=log->fd;
/*日期*/
if( !(log->flags&LOG_NODATE) ) {
now=time(NULL);
strcpy(date,ctime(&now));
date[strlen(date)-6]=' ';
date[strlen(date)-5]='\0';
}
/*线程号*/
if( !(log->flags&LOG_NOTID) ) {
sprintf(threadnum, "(TID:%lu) ", pthread_self());
}
cnt = snprintf(line, sizeof(line), "%s%s%s",
log->flags&LOG_NODATE ? "" : date,
log->flags&LOG_NOLVL  ? "" :
(level > FATAL ? levels[0] : levels[level]),
log->flags&LOG_NOTID  ? "" : threadnum);
va_start(ap, fmt);
vsnprintf(line+cnt, sizeof(line)-cnt, fmt, ap);    /*如果输入的日志过长会自动截取*/
va_end(ap);
line[sizeof(line)-1] = '\0';
if( !(log->flags&LOG_NOLF) ) {
/*chomp(line);*/
/*strcpy(line+strlen(line), "\n");*/
}
sem_wait(&log->sem);         /*用信号实现同步*/
rc = write(fd, line, strlen(line));
sem_post(&log->sem);
if( !rc ) errno = 0;
return rc;
}

log_t *log_open( char *fname, int flags ) {
log_t *log = malloc(sizeof(log_t));
if(!log) {
fprintf(stderr, "log_open: Unable to malloc()");
goto log_open_a;
}
log->flags=flags;
if( !strcmp(fname,"-") ) {
log->fd = 2;
} else {
log->fd = open(fname, O_WRONLY|O_CREAT|O_NOCTTY |
(flags&LOG_TRUNC ? O_TRUNC : O_APPEND) , 0666);
}
if( log->fd == -1 ) {
fprintf(stderr, "log_open: Opening logfile %s: %s", fname, strerror(errno));
goto log_open_b;
}
if( sem_init(&log->sem, 0, 1) == -1 ) {
fprintf(stderr, "log_open: Could not initialize log semaphore.");
goto log_open_c;
}
return log;

log_open_c:
close(log->fd);
log_open_b:
free(log);
log_open_a:
return NULL;
}
void log_close( log_t *log ) {
sem_wait(&log->sem);
sem_destroy(&log->sem);
close(log->fd);
free(log);
return;
}
/* -------------------------------------------------------------------------
* testpool.c – 线程池测试程序
* -------------------------------------------------------------------------
*/

#include <pthread.h>
#include "log.h"
#include "tpool.h"

log_t *log;  /*进程全局日志文件句柄*
/*任务*/
void thread(void *arg)
{
char * ptr=(char *)arg;

sleep(1);
printf("hello world! %s\n",ptr);
}

int main(int argc, char *argv[])
{
tpool_t *pool;  /*线程池指针*/

/* 开启记录文件 */
log=log_open("test.log", 0);
/* 创建一个有100个工作线程,最大200个任务队列的线程池 */
pool=tpool_init(100,200,1);
int i;
/* 开启记录文件 */
* 添加100个任务*/
for (i = 0; i<100;i++)  /*这里往线程池里一次性添加100个线程,参数都一样。在实际情况下,参数是可变的*/
tpool_add_work(pool,thread,"test!");
sleep(10);
/*终止线程池*/
tpool_destroy(pool,1);
/* 关闭记录文件 */
log_close(log);
pthread_exit(NULL);
}
分享到:
评论

相关推荐

    linux线程池创建c实现

    linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现

    linux线程池c源码

    linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池

    linux 线程池实现和测试用例

    linux 线程池:线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,这就避免了在处理短时间任务时创建与销毁线程的代价

    linux线程池,c语言实现

    linux线程池,c语言实现,只是文件后缀名用的是cpp方便在qt里面测试,两种版本,都是参考网上的资料做了一些处理之后的

    linux线程池代码(c++实现)

    linux线程池代码(c++实现) 分两部: 1、创建线程池 2、创建任务、加入线程池 可以参考使用

    Linux 线程池源码分析

    非常详细的线程池函数接口分析,可以帮助初学者加深对线程池的理解,更好的去把线程池运用到实例中去,线程池就是多个线程组合在一起的集合,就像一家公司一样,由多个员工组成的一个集合,当有任务时, 这些线程就...

    linux线程池示例程序

    经典线程池示例程序 已经过调试的好程序

    linux线程池的C语言实现

    通常我们使用多线程的方式是,需要时创建一个新的线程,在这个新的线程里执行特定的任务,然后在任务完成后退出。这在一般的应用里已经能够满足我们应用的需要,毕竟我们并不是什么时候都需要创建大量的线程,并在...

    Linux线程池目录拷贝

    实现了Linux线程池目录拷贝的功能,包含了线程池,目录检索,文件IO操作,供学习使用。

    Linux线程池

    在Linux下运用Boost来实现的一个线程池,方便而且高效

    linux线程池 条件变量

    linux线程池 条件变量 互斥,讲解线程池的使用条件

    Linux线程池使用.docx

    Linux线程池使用.docx

    Linux下C线程池实现

    在Linux下用C写的一个简易线程池。系统是RedHat 9,gcc版本"gcc version 4.1.2 20071124 (Red Hat 4.1.2-42)"。文件夹里的源码是按工程组织好的,在文件夹下的test目录下面有一个小的测试程序和Makefile,编译后即可...

    Linux 线程池

    可实现线程池的基本功能,用多线程实现对文件的读取,可读取大文件,经实测代码没有任何bug

    linux 实现一个简单的线程池及工作

    linux 实现一个简单的线程池及工作 本实例演示了线程池的创建使用

    Linux 线程池+连接池

    Linux 线程池 +连接池 ,备份学习。。。。。。。。。。。。。。。

    Linux下通用线程池的构建

    什么是线程池?简单点说,线程池就是有一堆已经创建好了的线程,初始它们都处于空闲等待状态,当有新的任务需要处理的时候,就从这个池子里面取一个空闲等待的线程来处理该任务,当处理完成了就再次把该线程放回池中...

    linux线程池使用(动态调整).rar

    linux线程池使用(动态调整).rar

    linux线程池[参考].pdf

    linux线程池[参考].pdf

    Linux环境下的通用线程池设计

    Linux环境下的通用线程池设计 资源来自网络资源 版权归原作者

Global site tag (gtag.js) - Google Analytics