- 浏览: 231104 次
- 性别:
- 来自: 南京
最新评论
-
baby8117628:
vc下mp3 IDv1和IDV2的读取 -
gezexu:
你好,我按照你的步骤一步步进行但是安装libvorbis的时候 ...
linux如何搭建强大的FFMPEG环境 -
ini_always:
帅哥,转载也把格式做好点,另外出处也要注明一下吧。。。
MP3文件格式解析
/*-------------------------------------------------------------------------
* tpool.h – 线程池定义
* -------------------------------------------------------------------------
*/
#ifndef _TPOOL_H_
#define _TPOOL_H_
#include <stdio.h>
#include <pthread.h>
/*工作线程链表*/
typedef struct tpool_work{
void (*handler_routine)(); /*任务函数指针*/
void *arg; /*任务函数参数*/
struct tpool_work *next; /*下一个任务链表*/
} tpool_work_t;
/*线程池结构体*/
typedef struct tpool{
int num_threads; /*最大线程数*/
int max_queue_size; /*最大任务链表数*/
int do_not_block_when_full; /*当链表满时是否阻塞*/
pthread_t *threads; /*线程指针*/
int cur_queue_size;
tpool_work_t *queue_head; /*链表头*/
tpool_work_t *queue_tail; /*链表尾*/
pthread_mutex_t queue_lock; /*链表互斥量*/
pthread_cond_t queue_not_full; /*链表条件量-未满*/
pthread_cond_t queue_not_empty; /*链表条件量-非空*/
pthread_cond_t queue_empty; /*链表条件量-空*/
int queue_closed;
int shutdown;
} tpool_t;
/* 初始化连接池 */
extern tpool_t *tpool_init(int num_worker_threads,\
int max_queue_size, int do_not_block_when_full);
/* 添加一个工作线程 */
extern int tpool_add_work(tpool_t *pool, void (*routine)(), void *arg);
/* 清除线程池*/
extern int tpool_destroy(tpool_t *pool, int finish);
#endif /* _TPOOL_H_ */
/* -------------------------------------------------------------------------
* tpool.c – 线程池的实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h"
#include "log.h"
/* 工作线程 */
void *tpool_thread(void *tpool);
/***************线程池初始化*****************************/
tpool_t *tpool_init(int num_worker_threads,\ /*线程池线程个数*/
int max_queue_size, \ /*最大任务数*/
int do_not_block_when_full) /*是否阻塞任务满的时候*/
{
int i, rtn;
tpool_t *pool;
lprintf(log, INFO, "init pool begin ...\n");
/* 创建线程池结构体 */
if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL) {
lprintf(log, FATAL, "Unable to malloc() thread pool!\n");
return NULL;
}
/* 设置线程池架构体成员 */
pool->num_threads = num_worker_threads; /*工作线程个数*/
pool->max_queue_size = max_queue_size; /*任务链表最大长度*/
pool->do_not_block_when_full = do_not_block_when_full; /*任务链表满时是否等待*/
/* 生成线程池缓存 */
if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL) {
lprintf(log, FATAL,"Unable to malloc() thread info array\n");/*虽然malloc()并不是返回数组指针,但仍然可以这样用,表示数组。calloc返回数组指针*/
return NULL;
}
/* 初始化任务链表 */
pool->cur_queue_size = 0;
pool->queue_head = NULL;
pool->queue_tail = NULL;
pool->queue_closed = 0;
pool->shutdown = 0;
/* 初始化互斥变量,条件变量 用于线程之间的同步 */
if((rtn = pthread_mutex_init(&(pool->queue_lock),NULL)) != 0) {
lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_full),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
/* 创建所有的线程 */ /*这里预创建的线程并不能运行,需要客户调度一个存在的函数才能工作。这里创建完线程以后,就放在那睡眠,直到唤醒*/
for(i = 0; i != num_worker_threads; i++) {
if( (rtn=pthread_create(&(pool->threads[i]),NULL,tpool_thread,(void*)pool)) != 0) { /*注意这里的pthread_create(),第一个参数是函数名,这里用malloc产生的指针(数组)来表示,第二个是函数实体tpool_thread,现在的tpool_thread执行部分是空的,就是说没有执行函数体*/
lprintf(log,FATAL,"pthread_create %s\n",strerror(rtn));
return NULL;
}
lprintf(log, INFO, "init pthread %d!\n",i);
}
lprintf(log, INFO, "init pool end!\n");
return pool;
}
/*下面这个函数负责调度。把一个真实的函数调度到线程池,然后排队或者运行*/
int tpool_add_work(tpool_t *pool, \ /*线程池指针*/
void (*routine)(void *),\ /*工作线程函数指针*/
void *arg) /*工作线程函数参数*/
{
int rtn;
tpool_work_t *workp; /*当前工作线程*/
if((rtn = pthread_mutex_lock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 采取独占的形式访问任务链表 */
if((pool->cur_queue_size == pool->max_queue_size) && \
(pool->do_not_block_when_full)) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 等待任务链表为新线程释放空间 */
while((pool->cur_queue_size == pool->max_queue_size) &&
(!(pool->shutdown || pool->queue_closed))) {
if((rtn = pthread_cond_wait(&(pool->queue_not_full),
&(pool->queue_lock)) ) != 0) {
lprintf(log,FATAL,"pthread cond wait failure\n");
return -1;
}
}
if(pool->shutdown || pool->queue_closed) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 分配工作线程结构体 */
if((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL) {
lprintf(log,FATAL,"unable to create work struct\n");
return -1;
}
workp->handler_routine = routine;
workp->arg = arg;
workp->next = NULL;
if(pool->cur_queue_size == 0) {
pool->queue_tail = pool->queue_head = workp;
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0){
lprintf(log,FATAL,"pthread broadcast error\n");
return -1;
}
}
else {
pool->queue_tail->next = workp;
pool->queue_tail = workp;
}
pool->cur_queue_size++;
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
int tpool_destroy(tpool_t *pool, int finish)
{
int i, rtn;
tpool_work_t *cur; /*当前工作线程*/
lprintf(log, INFO, "destroy pool begin!\n");
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_lock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 第一步,设置线程退出标记 */
lprintf(log, INFO, "destroy pool begin 1!\n");
if(pool->queue_closed || pool->shutdown) {
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
/* 第二步,禁止新任务加入任务链表 */
lprintf(log, INFO, "destroy pool begin 2!\n");
pool->queue_closed = 1;
if(finish) {
while(pool->cur_queue_size != 0) {
if((rtn = pthread_cond_wait(&(pool->queue_empty),&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread_cond_wait %d\n",rtn);
return -1;
}
}
}
/* 第三步,设置线程池销毁标记 */
lprintf(log, INFO, "destroy pool begin 3!\n");
pool->shutdown = 1;
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex unlock failure\n");
return -1;
}
/* 第四步,等待所有已建立的线程退出 */
lprintf(log, INFO, "destroy pool begin 4!\n");
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
if((rtn = pthread_cond_broadcast(&(pool->queue_not_full))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
for(i = 0; i < pool->num_threads; i++) {
if((rtn = pthread_join(pool->threads[i],NULL)) != 0)
{
lprintf(log,FATAL,"pthread_join %d\n",rtn);
return -1;
}
}
/* 第五步,释放线程池所占的内存空间 */
free(pool->threads);
while(pool->queue_head != NULL) {
cur = pool->queue_head->next;
pool->queue_head = pool->queue_head->next;
free(cur);
}
free(pool);
lprintf(log, INFO, "destroy pool end!\n");
return 0;
}
void *tpool_thread(void *tpool)
{
tpool_work_t *my_work;
tpool_t *pool = (struct tpool *)tpool;
for(;;){/* 线程内循环 */
pthread_mutex_lock(&(pool->queue_lock));
/* 如果任务列表为0,并且线程池没有关闭,则一直等待,直到任务到来为止 */
while((pool->cur_queue_size == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_empty), &(pool->queue_lock));
}
/* 线程池是否已经关闭,如果线程池关闭则线程自己主动关闭 */
if(pool->shutdown){
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL); /*线程退出状态为空,主线程不捕获各副线程状态*/
}
my_work = pool->queue_head;
pool->cur_queue_size--;
/*将任务链表头部去掉,改任务正在处理中*/
if(pool->cur_queue_size == 0)
pool->queue_head = pool->queue_tail = NULL;
else
pool->queue_head = my_work->next;
/* 任务链表还没有满 */
if((!pool->do_not_block_when_full) &&\
(pool->cur_queue_size == (pool->max_queue_size - 1))) {
pthread_cond_broadcast(&(pool->queue_not_full));
}
/*任务链表为空*/
if(pool->cur_queue_size == 0) {
pthread_cond_signal(&(pool->queue_empty));
}
pthread_mutex_unlock(&(pool->queue_lock));
/*启动线程业务处理逻辑*/
(*(my_work->handler_routine))(my_work->arg);
free(my_work); /*这里释放的是链表头部的线程结构体*/
}
return(NULL);
}
/* -------------------------------------------------------------------------
* log.h 记录函数定义
* -------------------------------------------------------------------------
*/
#ifndef __LOG_H
#define __LOG_H
#include <stdio.h>
#include <semaphore.h>
/*记录的最大长度*/
#define LOGLINE_MAX 1024
/*记录的等级*/
#define DEBUG 1
#define INFO 2
#define WARN 3
#define ERROR 4
#define FATAL 5
/*记录的类型*/
#define LOG_TRUNC 1<<0
#define LOG_NODATE 1<<1
#define LOG_NOLF 1<<2
#define LOG_NOLVL 1<<3
#define LOG_DEBUG 1<<4
#define LOG_STDERR 1<<5
#define LOG_NOTID 1<<6
typedef struct {
int fd;
sem_t sem;
int flags;
} log_t;
/*
* 功能描述: 记录打印函数,将记录打印至记录文件logfile。
* 参数: log_t - log_open()函数的返回值
* level - 可以是: DEBUG, INFO, WARN, ERROR, FATAL
* fmt - 记录的内容,格式同printf()函数
* 返回值: 成功返回0,失败返回-1
*/
int lprintf( log_t *log, unsigned int level, char *fmt, ... );
/*
* 功能描述: 初始化记录文件the logfile
*参数: fname - 记录文件logfile的文件名
* flags - 记录格式的选项
* LOG_TRUNC - 截断打开的记录文件
* LOG_NODATE - 忽略记录中的每一行
* LOG_NOLF - 自动为每条记录新开一行.
* LOG_NOLVL - 不记录消息的等级
* LOG_STDERR - 将消息同时送到STDERR
*返回值:成功返回log_t(>0),失败返回NULL
*/
log_t *log_open( char *fname, int flags );
/*
* 功能描述:关闭记录文件
* 参数: * log - 记录文件的指针
*/
void log_close( log_t *log );
#endif
/* -------------------------------------------------------------------------
* log.c – 记录函数实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "log.h"
int lprintf( log_t *log, unsigned int level, char *fmt, ... ) {
int fd;
int rc;
va_list ap;
time_t now;
char date[50];
static char line[LOGLINE_MAX];
static char threadnum[10];
int cnt;
static char *levels[6] = { "[(bad)] ", "[debug] ", "[info ] ", "[warn ] ", "[error] ", "[fatal] " };
if(!log) return -1;
if( !(log->flags&LOG_DEBUG) && level == DEBUG ) return 0;
fd=log->fd;
/*日期*/
if( !(log->flags&LOG_NODATE) ) {
now=time(NULL);
strcpy(date,ctime(&now));
date[strlen(date)-6]=' ';
date[strlen(date)-5]='\0';
}
/*线程号*/
if( !(log->flags&LOG_NOTID) ) {
sprintf(threadnum, "(TID:%lu) ", pthread_self());
}
cnt = snprintf(line, sizeof(line), "%s%s%s",
log->flags&LOG_NODATE ? "" : date,
log->flags&LOG_NOLVL ? "" :
(level > FATAL ? levels[0] : levels[level]),
log->flags&LOG_NOTID ? "" : threadnum);
va_start(ap, fmt);
vsnprintf(line+cnt, sizeof(line)-cnt, fmt, ap); /*如果输入的日志过长会自动截取*/
va_end(ap);
line[sizeof(line)-1] = '\0';
if( !(log->flags&LOG_NOLF) ) {
/*chomp(line);*/
/*strcpy(line+strlen(line), "\n");*/
}
sem_wait(&log->sem); /*用信号实现同步*/
rc = write(fd, line, strlen(line));
sem_post(&log->sem);
if( !rc ) errno = 0;
return rc;
}
log_t *log_open( char *fname, int flags ) {
log_t *log = malloc(sizeof(log_t));
if(!log) {
fprintf(stderr, "log_open: Unable to malloc()");
goto log_open_a;
}
log->flags=flags;
if( !strcmp(fname,"-") ) {
log->fd = 2;
} else {
log->fd = open(fname, O_WRONLY|O_CREAT|O_NOCTTY |
(flags&LOG_TRUNC ? O_TRUNC : O_APPEND) , 0666);
}
if( log->fd == -1 ) {
fprintf(stderr, "log_open: Opening logfile %s: %s", fname, strerror(errno));
goto log_open_b;
}
if( sem_init(&log->sem, 0, 1) == -1 ) {
fprintf(stderr, "log_open: Could not initialize log semaphore.");
goto log_open_c;
}
return log;
log_open_c:
close(log->fd);
log_open_b:
free(log);
log_open_a:
return NULL;
}
void log_close( log_t *log ) {
sem_wait(&log->sem);
sem_destroy(&log->sem);
close(log->fd);
free(log);
return;
}
/* -------------------------------------------------------------------------
* testpool.c – 线程池测试程序
* -------------------------------------------------------------------------
*/
#include <pthread.h>
#include "log.h"
#include "tpool.h"
log_t *log; /*进程全局日志文件句柄*
/*任务*/
void thread(void *arg)
{
char * ptr=(char *)arg;
sleep(1);
printf("hello world! %s\n",ptr);
}
int main(int argc, char *argv[])
{
tpool_t *pool; /*线程池指针*/
/* 开启记录文件 */
log=log_open("test.log", 0);
/* 创建一个有100个工作线程,最大200个任务队列的线程池 */
pool=tpool_init(100,200,1);
int i;
/* 开启记录文件 */
* 添加100个任务*/
for (i = 0; i<100;i++) /*这里往线程池里一次性添加100个线程,参数都一样。在实际情况下,参数是可变的*/
tpool_add_work(pool,thread,"test!");
sleep(10);
/*终止线程池*/
tpool_destroy(pool,1);
/* 关闭记录文件 */
log_close(log);
pthread_exit(NULL);
}
* tpool.h – 线程池定义
* -------------------------------------------------------------------------
*/
#ifndef _TPOOL_H_
#define _TPOOL_H_
#include <stdio.h>
#include <pthread.h>
/*工作线程链表*/
typedef struct tpool_work{
void (*handler_routine)(); /*任务函数指针*/
void *arg; /*任务函数参数*/
struct tpool_work *next; /*下一个任务链表*/
} tpool_work_t;
/*线程池结构体*/
typedef struct tpool{
int num_threads; /*最大线程数*/
int max_queue_size; /*最大任务链表数*/
int do_not_block_when_full; /*当链表满时是否阻塞*/
pthread_t *threads; /*线程指针*/
int cur_queue_size;
tpool_work_t *queue_head; /*链表头*/
tpool_work_t *queue_tail; /*链表尾*/
pthread_mutex_t queue_lock; /*链表互斥量*/
pthread_cond_t queue_not_full; /*链表条件量-未满*/
pthread_cond_t queue_not_empty; /*链表条件量-非空*/
pthread_cond_t queue_empty; /*链表条件量-空*/
int queue_closed;
int shutdown;
} tpool_t;
/* 初始化连接池 */
extern tpool_t *tpool_init(int num_worker_threads,\
int max_queue_size, int do_not_block_when_full);
/* 添加一个工作线程 */
extern int tpool_add_work(tpool_t *pool, void (*routine)(), void *arg);
/* 清除线程池*/
extern int tpool_destroy(tpool_t *pool, int finish);
#endif /* _TPOOL_H_ */
/* -------------------------------------------------------------------------
* tpool.c – 线程池的实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h"
#include "log.h"
/* 工作线程 */
void *tpool_thread(void *tpool);
/***************线程池初始化*****************************/
tpool_t *tpool_init(int num_worker_threads,\ /*线程池线程个数*/
int max_queue_size, \ /*最大任务数*/
int do_not_block_when_full) /*是否阻塞任务满的时候*/
{
int i, rtn;
tpool_t *pool;
lprintf(log, INFO, "init pool begin ...\n");
/* 创建线程池结构体 */
if((pool = (struct tpool *)malloc(sizeof(struct tpool))) == NULL) {
lprintf(log, FATAL, "Unable to malloc() thread pool!\n");
return NULL;
}
/* 设置线程池架构体成员 */
pool->num_threads = num_worker_threads; /*工作线程个数*/
pool->max_queue_size = max_queue_size; /*任务链表最大长度*/
pool->do_not_block_when_full = do_not_block_when_full; /*任务链表满时是否等待*/
/* 生成线程池缓存 */
if((pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) == NULL) {
lprintf(log, FATAL,"Unable to malloc() thread info array\n");/*虽然malloc()并不是返回数组指针,但仍然可以这样用,表示数组。calloc返回数组指针*/
return NULL;
}
/* 初始化任务链表 */
pool->cur_queue_size = 0;
pool->queue_head = NULL;
pool->queue_tail = NULL;
pool->queue_closed = 0;
pool->shutdown = 0;
/* 初始化互斥变量,条件变量 用于线程之间的同步 */
if((rtn = pthread_mutex_init(&(pool->queue_lock),NULL)) != 0) {
lprintf(log,FATAL,"pthread_mutex_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_not_full),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
if((rtn = pthread_cond_init(&(pool->queue_empty),NULL)) != 0) {
lprintf(log,FATAL,"pthread_cond_init %s",strerror(rtn));
return NULL;
}
/* 创建所有的线程 */ /*这里预创建的线程并不能运行,需要客户调度一个存在的函数才能工作。这里创建完线程以后,就放在那睡眠,直到唤醒*/
for(i = 0; i != num_worker_threads; i++) {
if( (rtn=pthread_create(&(pool->threads[i]),NULL,tpool_thread,(void*)pool)) != 0) { /*注意这里的pthread_create(),第一个参数是函数名,这里用malloc产生的指针(数组)来表示,第二个是函数实体tpool_thread,现在的tpool_thread执行部分是空的,就是说没有执行函数体*/
lprintf(log,FATAL,"pthread_create %s\n",strerror(rtn));
return NULL;
}
lprintf(log, INFO, "init pthread %d!\n",i);
}
lprintf(log, INFO, "init pool end!\n");
return pool;
}
/*下面这个函数负责调度。把一个真实的函数调度到线程池,然后排队或者运行*/
int tpool_add_work(tpool_t *pool, \ /*线程池指针*/
void (*routine)(void *),\ /*工作线程函数指针*/
void *arg) /*工作线程函数参数*/
{
int rtn;
tpool_work_t *workp; /*当前工作线程*/
if((rtn = pthread_mutex_lock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 采取独占的形式访问任务链表 */
if((pool->cur_queue_size == pool->max_queue_size) && \
(pool->do_not_block_when_full)) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0){
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 等待任务链表为新线程释放空间 */
while((pool->cur_queue_size == pool->max_queue_size) &&
(!(pool->shutdown || pool->queue_closed))) {
if((rtn = pthread_cond_wait(&(pool->queue_not_full),
&(pool->queue_lock)) ) != 0) {
lprintf(log,FATAL,"pthread cond wait failure\n");
return -1;
}
}
if(pool->shutdown || pool->queue_closed) {
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return -1;
}
/* 分配工作线程结构体 */
if((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL) {
lprintf(log,FATAL,"unable to create work struct\n");
return -1;
}
workp->handler_routine = routine;
workp->arg = arg;
workp->next = NULL;
if(pool->cur_queue_size == 0) {
pool->queue_tail = pool->queue_head = workp;
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0){
lprintf(log,FATAL,"pthread broadcast error\n");
return -1;
}
}
else {
pool->queue_tail->next = workp;
pool->queue_tail = workp;
}
pool->cur_queue_size++;
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_unlock(&pool->queue_lock)) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
int tpool_destroy(tpool_t *pool, int finish)
{
int i, rtn;
tpool_work_t *cur; /*当前工作线程*/
lprintf(log, INFO, "destroy pool begin!\n");
/* 释放对任务链表的独占 */
if((rtn = pthread_mutex_lock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
/* 第一步,设置线程退出标记 */
lprintf(log, INFO, "destroy pool begin 1!\n");
if(pool->queue_closed || pool->shutdown) {
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex lock failure\n");
return -1;
}
return 0;
}
/* 第二步,禁止新任务加入任务链表 */
lprintf(log, INFO, "destroy pool begin 2!\n");
pool->queue_closed = 1;
if(finish) {
while(pool->cur_queue_size != 0) {
if((rtn = pthread_cond_wait(&(pool->queue_empty),&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread_cond_wait %d\n",rtn);
return -1;
}
}
}
/* 第三步,设置线程池销毁标记 */
lprintf(log, INFO, "destroy pool begin 3!\n");
pool->shutdown = 1;
if((rtn = pthread_mutex_unlock(&(pool->queue_lock))) != 0) {
lprintf(log,FATAL,"pthread mutex unlock failure\n");
return -1;
}
/* 第四步,等待所有已建立的线程退出 */
lprintf(log, INFO, "destroy pool begin 4!\n");
if((rtn = pthread_cond_broadcast(&(pool->queue_not_empty))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
if((rtn = pthread_cond_broadcast(&(pool->queue_not_full))) != 0) {
lprintf(log,FATAL,"pthread_cond_boradcast %d\n",rtn);
return -1;
}
for(i = 0; i < pool->num_threads; i++) {
if((rtn = pthread_join(pool->threads[i],NULL)) != 0)
{
lprintf(log,FATAL,"pthread_join %d\n",rtn);
return -1;
}
}
/* 第五步,释放线程池所占的内存空间 */
free(pool->threads);
while(pool->queue_head != NULL) {
cur = pool->queue_head->next;
pool->queue_head = pool->queue_head->next;
free(cur);
}
free(pool);
lprintf(log, INFO, "destroy pool end!\n");
return 0;
}
void *tpool_thread(void *tpool)
{
tpool_work_t *my_work;
tpool_t *pool = (struct tpool *)tpool;
for(;;){/* 线程内循环 */
pthread_mutex_lock(&(pool->queue_lock));
/* 如果任务列表为0,并且线程池没有关闭,则一直等待,直到任务到来为止 */
while((pool->cur_queue_size == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_empty), &(pool->queue_lock));
}
/* 线程池是否已经关闭,如果线程池关闭则线程自己主动关闭 */
if(pool->shutdown){
pthread_mutex_unlock(&(pool->queue_lock));
pthread_exit(NULL); /*线程退出状态为空,主线程不捕获各副线程状态*/
}
my_work = pool->queue_head;
pool->cur_queue_size--;
/*将任务链表头部去掉,改任务正在处理中*/
if(pool->cur_queue_size == 0)
pool->queue_head = pool->queue_tail = NULL;
else
pool->queue_head = my_work->next;
/* 任务链表还没有满 */
if((!pool->do_not_block_when_full) &&\
(pool->cur_queue_size == (pool->max_queue_size - 1))) {
pthread_cond_broadcast(&(pool->queue_not_full));
}
/*任务链表为空*/
if(pool->cur_queue_size == 0) {
pthread_cond_signal(&(pool->queue_empty));
}
pthread_mutex_unlock(&(pool->queue_lock));
/*启动线程业务处理逻辑*/
(*(my_work->handler_routine))(my_work->arg);
free(my_work); /*这里释放的是链表头部的线程结构体*/
}
return(NULL);
}
/* -------------------------------------------------------------------------
* log.h 记录函数定义
* -------------------------------------------------------------------------
*/
#ifndef __LOG_H
#define __LOG_H
#include <stdio.h>
#include <semaphore.h>
/*记录的最大长度*/
#define LOGLINE_MAX 1024
/*记录的等级*/
#define DEBUG 1
#define INFO 2
#define WARN 3
#define ERROR 4
#define FATAL 5
/*记录的类型*/
#define LOG_TRUNC 1<<0
#define LOG_NODATE 1<<1
#define LOG_NOLF 1<<2
#define LOG_NOLVL 1<<3
#define LOG_DEBUG 1<<4
#define LOG_STDERR 1<<5
#define LOG_NOTID 1<<6
typedef struct {
int fd;
sem_t sem;
int flags;
} log_t;
/*
* 功能描述: 记录打印函数,将记录打印至记录文件logfile。
* 参数: log_t - log_open()函数的返回值
* level - 可以是: DEBUG, INFO, WARN, ERROR, FATAL
* fmt - 记录的内容,格式同printf()函数
* 返回值: 成功返回0,失败返回-1
*/
int lprintf( log_t *log, unsigned int level, char *fmt, ... );
/*
* 功能描述: 初始化记录文件the logfile
*参数: fname - 记录文件logfile的文件名
* flags - 记录格式的选项
* LOG_TRUNC - 截断打开的记录文件
* LOG_NODATE - 忽略记录中的每一行
* LOG_NOLF - 自动为每条记录新开一行.
* LOG_NOLVL - 不记录消息的等级
* LOG_STDERR - 将消息同时送到STDERR
*返回值:成功返回log_t(>0),失败返回NULL
*/
log_t *log_open( char *fname, int flags );
/*
* 功能描述:关闭记录文件
* 参数: * log - 记录文件的指针
*/
void log_close( log_t *log );
#endif
/* -------------------------------------------------------------------------
* log.c – 记录函数实现
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "log.h"
int lprintf( log_t *log, unsigned int level, char *fmt, ... ) {
int fd;
int rc;
va_list ap;
time_t now;
char date[50];
static char line[LOGLINE_MAX];
static char threadnum[10];
int cnt;
static char *levels[6] = { "[(bad)] ", "[debug] ", "[info ] ", "[warn ] ", "[error] ", "[fatal] " };
if(!log) return -1;
if( !(log->flags&LOG_DEBUG) && level == DEBUG ) return 0;
fd=log->fd;
/*日期*/
if( !(log->flags&LOG_NODATE) ) {
now=time(NULL);
strcpy(date,ctime(&now));
date[strlen(date)-6]=' ';
date[strlen(date)-5]='\0';
}
/*线程号*/
if( !(log->flags&LOG_NOTID) ) {
sprintf(threadnum, "(TID:%lu) ", pthread_self());
}
cnt = snprintf(line, sizeof(line), "%s%s%s",
log->flags&LOG_NODATE ? "" : date,
log->flags&LOG_NOLVL ? "" :
(level > FATAL ? levels[0] : levels[level]),
log->flags&LOG_NOTID ? "" : threadnum);
va_start(ap, fmt);
vsnprintf(line+cnt, sizeof(line)-cnt, fmt, ap); /*如果输入的日志过长会自动截取*/
va_end(ap);
line[sizeof(line)-1] = '\0';
if( !(log->flags&LOG_NOLF) ) {
/*chomp(line);*/
/*strcpy(line+strlen(line), "\n");*/
}
sem_wait(&log->sem); /*用信号实现同步*/
rc = write(fd, line, strlen(line));
sem_post(&log->sem);
if( !rc ) errno = 0;
return rc;
}
log_t *log_open( char *fname, int flags ) {
log_t *log = malloc(sizeof(log_t));
if(!log) {
fprintf(stderr, "log_open: Unable to malloc()");
goto log_open_a;
}
log->flags=flags;
if( !strcmp(fname,"-") ) {
log->fd = 2;
} else {
log->fd = open(fname, O_WRONLY|O_CREAT|O_NOCTTY |
(flags&LOG_TRUNC ? O_TRUNC : O_APPEND) , 0666);
}
if( log->fd == -1 ) {
fprintf(stderr, "log_open: Opening logfile %s: %s", fname, strerror(errno));
goto log_open_b;
}
if( sem_init(&log->sem, 0, 1) == -1 ) {
fprintf(stderr, "log_open: Could not initialize log semaphore.");
goto log_open_c;
}
return log;
log_open_c:
close(log->fd);
log_open_b:
free(log);
log_open_a:
return NULL;
}
void log_close( log_t *log ) {
sem_wait(&log->sem);
sem_destroy(&log->sem);
close(log->fd);
free(log);
return;
}
/* -------------------------------------------------------------------------
* testpool.c – 线程池测试程序
* -------------------------------------------------------------------------
*/
#include <pthread.h>
#include "log.h"
#include "tpool.h"
log_t *log; /*进程全局日志文件句柄*
/*任务*/
void thread(void *arg)
{
char * ptr=(char *)arg;
sleep(1);
printf("hello world! %s\n",ptr);
}
int main(int argc, char *argv[])
{
tpool_t *pool; /*线程池指针*/
/* 开启记录文件 */
log=log_open("test.log", 0);
/* 创建一个有100个工作线程,最大200个任务队列的线程池 */
pool=tpool_init(100,200,1);
int i;
/* 开启记录文件 */
* 添加100个任务*/
for (i = 0; i<100;i++) /*这里往线程池里一次性添加100个线程,参数都一样。在实际情况下,参数是可变的*/
tpool_add_work(pool,thread,"test!");
sleep(10);
/*终止线程池*/
tpool_destroy(pool,1);
/* 关闭记录文件 */
log_close(log);
pthread_exit(NULL);
}
发表评论
-
内存屏障
2010-02-26 11:03 1464处理器的乱序和并发执行 目前的高级处理器,为了提高内部逻辑元 ... -
函数调用堆栈分析
2010-02-26 10:53 1363理解调用栈最重要的两 ... -
mtrace检测内存泄露
2010-02-25 09:50 1055[url] http://math.acadiau.ca/AC ... -
c语言编程之字符串操作
2010-02-25 09:41 8161. //在s串中查找与s1相匹配的字符串,找到后用 ... -
linux C 链接库 so制作及调用[转]
2010-02-24 16:26 2549文章分类:C++编程 [文章作者:陈毓端 若转载请标注原文链 ... -
mtrace的使用
2010-02-24 16:02 1258对于内存溢出之类的麻烦可能大家在编写指针比较多的复杂的程序的时 ... -
单片机的C语言中位操作用法(转
2010-02-24 14:27 2185单片机的C语言中位操作用法 作者:郭天祥 在对单处机进 ... -
Linux下的itoa函数
2010-02-21 17:55 1700上篇文章说到linux需要it ... -
va_list、va_start、va_arg、va_end的原理与使用
2010-02-05 10:34 28611. 概述 由于在C语言中没有函数重载,解 ... -
快速排序(quickSort)
2010-02-04 10:50 8271. #include <stdio.h> ... -
C问题---itoa函数
2010-02-04 10:36 997------------------------------ ... -
itoa函数及atoi函数
2010-02-04 10:35 1270C语言提供了几个标准库函数,可以将任意类型(整型、长整型、浮点 ... -
结构体零长度数组的作用
2010-02-04 10:21 1339在一些 C 语言编写的代码中,有时可以看到如下定义的结构: ... -
优化C代码常用的几招
2010-02-04 10:14 734性能优化方面永远注意8 ... -
我经常去的网站
2010-02-03 17:53 1585MFC相关网站 www.codeproject.com ht ... -
可重入函数与不可重入函数
2010-02-03 16:35 890原文地址:http://blog.chin ... -
哈夫曼编码
2010-02-03 16:26 1264本文描述在网上能够找到的最简单,最快速的哈夫曼编码。本方法不使 ... -
优化变成了忧患:String.split引发的“内存泄露”
2010-02-01 17:39 1061一直赞叹Sun对待技术的 ... -
锁无关的(Lock-Free)数据结构——在避免死锁的同时确保线程
2010-01-26 14:47 874http://hi.baidu.com/%5F%E2%64%5 ... -
使用 GNU profiler 来提高代码运行速度
2010-01-26 13:46 758进应用程序的性能是一 ...
相关推荐
linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现
linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池linux 线程池
linux 线程池:线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,这就避免了在处理短时间任务时创建与销毁线程的代价
linux线程池,c语言实现,只是文件后缀名用的是cpp方便在qt里面测试,两种版本,都是参考网上的资料做了一些处理之后的
linux线程池代码(c++实现) 分两部: 1、创建线程池 2、创建任务、加入线程池 可以参考使用
非常详细的线程池函数接口分析,可以帮助初学者加深对线程池的理解,更好的去把线程池运用到实例中去,线程池就是多个线程组合在一起的集合,就像一家公司一样,由多个员工组成的一个集合,当有任务时, 这些线程就...
经典线程池示例程序 已经过调试的好程序
通常我们使用多线程的方式是,需要时创建一个新的线程,在这个新的线程里执行特定的任务,然后在任务完成后退出。这在一般的应用里已经能够满足我们应用的需要,毕竟我们并不是什么时候都需要创建大量的线程,并在...
实现了Linux线程池目录拷贝的功能,包含了线程池,目录检索,文件IO操作,供学习使用。
在Linux下运用Boost来实现的一个线程池,方便而且高效
linux线程池 条件变量 互斥,讲解线程池的使用条件
Linux线程池使用.docx
在Linux下用C写的一个简易线程池。系统是RedHat 9,gcc版本"gcc version 4.1.2 20071124 (Red Hat 4.1.2-42)"。文件夹里的源码是按工程组织好的,在文件夹下的test目录下面有一个小的测试程序和Makefile,编译后即可...
可实现线程池的基本功能,用多线程实现对文件的读取,可读取大文件,经实测代码没有任何bug
linux 实现一个简单的线程池及工作 本实例演示了线程池的创建使用
Linux 线程池 +连接池 ,备份学习。。。。。。。。。。。。。。。
什么是线程池?简单点说,线程池就是有一堆已经创建好了的线程,初始它们都处于空闲等待状态,当有新的任务需要处理的时候,就从这个池子里面取一个空闲等待的线程来处理该任务,当处理完成了就再次把该线程放回池中...
linux线程池使用(动态调整).rar
linux线程池[参考].pdf
Linux环境下的通用线程池设计 资源来自网络资源 版权归原作者