`
unsoundboy
  • 浏览: 60904 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

一个简单的linux线程池

    博客分类:
  • c++
阅读更多


线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。



在linux中,使用的是posix线程库,首先介绍几个常用的函数:

1 线程的创建和取消函数

pthread_create

创建线程

pthread_join

合并线程

pthread_cancel

取消线程

2 线程同步函数

pthread_mutex_lock

pthread_mutex_unlock

pthread_cond_signal

pthread_cond_wait



关于函数的详细说明,参考man手册



线程池的实现:

线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。

主要有两个类来实现,CTask,CThreadPool

/**
执行任务的类,设置任务数据并执行
**/
C代码

   1. class CTask 
   2. { 
   3. protected: 
   4.  string m_strTaskName;  //任务的名称 
   5.  void* m_ptrData;       //要执行的任务的具体数据 
   6. public: 
   7.  CTask(){} 
   8.  CTask(string taskName) 
   9.  { 
  10.   this->m_strTaskName = taskName; 
  11.   m_ptrData = NULL; 
  12.  } 
  13.  virtual int Run()= 0; 
  14.  void SetData(void* data);    //设置任务数据 
  15. }; 

class CTask
{
protected:
string m_strTaskName;  //任务的名称
void* m_ptrData;       //要执行的任务的具体数据
public:
CTask(){}
CTask(string taskName)
{
  this->m_strTaskName = taskName;
  m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data);    //设置任务数据
};



任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。



线程池类

/**
线程池
**/
Java代码

   1. class CThreadPool 
   2. { 
   3. private: 
   4.  vector<CTask*> m_vecTaskList;         //任务列表 
   5.  int m_iThreadNum;                            //线程池中启动的线程数            
   6.  static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合 
   7.  static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合 
   8.  static pthread_mutex_t m_pthreadMutex;    //线程同步锁 
   9.  static pthread_cond_t m_pthreadCond;    //线程同步的条件变量 
  10. protected: 
  11.  static void* ThreadFunc(void * threadData); //新线程的线程函数 
  12.  static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中 
  13.  static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去 
  14.  int Create();          //创建所有的线程 
  15. public: 
  16.  CThreadPool(int threadNum); 
  17.  int AddTask(CTask *task);      //把任务添加到线程池中 
  18.  int StopAll(); 
  19. }; 

class CThreadPool
{
private:
vector<CTask*> m_vecTaskList;         //任务列表
int m_iThreadNum;                            //线程池中启动的线程数          
static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
static pthread_mutex_t m_pthreadMutex;    //线程同步锁
static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
protected:
static void* ThreadFunc(void * threadData); //新线程的线程函数
static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
int Create();          //创建所有的线程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task);      //把任务添加到线程池中
int StopAll();
};



当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。

线程之间的同步用线程锁和条件变量。

这个类的对外接口有两个:

AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。

StopAll函数停止所有的线程


Cpp代码

   1. ************************************************ 
   2.  
   3. 代码: 
   4.  
   5. ××××××××××××××××××××CThread.h 
   6.  
   7.   
   8.  
   9. #ifndef __CTHREAD 
  10. #define __CTHREAD 
  11. #include <vector> 
  12. #include <string> 
  13. #include <pthread.h> 
  14.  
  15. using namespace std; 
  16.  
  17. /**
  18. 执行任务的类,设置任务数据并执行
  19. **/ 
  20. class CTask 
  21. { 
  22. protected: 
  23.  string m_strTaskName;  //任务的名称 
  24.  void* m_ptrData;       //要执行的任务的具体数据 
  25. public: 
  26.  CTask(){} 
  27.  CTask(string taskName) 
  28.  { 
  29.   this->m_strTaskName = taskName; 
  30.   m_ptrData = NULL; 
  31.  } 
  32.  virtual int Run()= 0; 
  33.  void SetData(void* data);    //设置任务数据 
  34. }; 
  35.  
  36. /**
  37. 线程池
  38. **/ 
  39. class CThreadPool 
  40. { 
  41. private: 
  42.  vector<CTask*> m_vecTaskList;         //任务列表 
  43.  int m_iThreadNum;                            //线程池中启动的线程数            
  44.  static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合 
  45.  static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合 
  46.  static pthread_mutex_t m_pthreadMutex;    //线程同步锁 
  47.  static pthread_cond_t m_pthreadCond;    //线程同步的条件变量 
  48. protected: 
  49.  static void* ThreadFunc(void * threadData); //新线程的线程函数 
  50.  static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中 
  51.  static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去 
  52.  int Create();          //创建所有的线程 
  53. public: 
  54.  CThreadPool(int threadNum); 
  55.  int AddTask(CTask *task);      //把任务添加到线程池中 
  56.  int StopAll(); 
  57. }; 
  58.  
  59. #endif 
  60.  
  61.   
  62.  
  63.   
  64.  
  65.   
  66.  
  67. 类的实现为: 
  68.  
  69. ××××××××××××××××××××CThread.cpp 
  70.  
  71.   
  72.  
  73. #include "CThread.h" 
  74. #include <string> 
  75. #include <iostream> 
  76.  
  77. using namespace std; 
  78.  
  79. void CTask::SetData(void * data) 
  80. { 
  81.  m_ptrData = data; 
  82. } 
  83.  
  84. vector<pthread_t> CThreadPool::m_vecBusyThread; 
  85. vector<pthread_t> CThreadPool::m_vecIdleThread; 
  86. pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; 
  87. pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; 
  88.  
  89. CThreadPool::CThreadPool(int threadNum) 
  90. { 
  91.  this->m_iThreadNum = threadNum; 
  92.  Create(); 
  93. } 
  94. int CThreadPool::MoveToIdle(pthread_t tid) 
  95. { 
  96.  vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin(); 
  97.  while(busyIter != m_vecBusyThread.end()) 
  98.  { 
  99.   if(tid == *busyIter) 
100.   { 
101.    break; 
102.   } 
103.   busyIter++; 
104.  } 
105.  m_vecBusyThread.erase(busyIter); 
106.  m_vecIdleThread.push_back(tid); 
107.  return 0; 
108. } 
109.  
110. int CThreadPool::MoveToBusy(pthread_t tid) 
111. { 
112.  vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin(); 
113.  while(idleIter != m_vecIdleThread.end()) 
114.  { 
115.   if(tid == *idleIter) 
116.   { 
117.    break; 
118.   } 
119.   idleIter++; 
120.  } 
121.  m_vecIdleThread.erase(idleIter); 
122.  m_vecBusyThread.push_back(tid); 
123.  return 0; 
124. } 
125. void* CThreadPool::ThreadFunc(void * threadData) 
126. { 
127.  pthread_t tid = pthread_self(); 
128.  while(1) 
129.  { 
130.   pthread_mutex_lock(&m_pthreadMutex); 
131.   pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex); 
132.   cout << "tid:" << tid << " run" << endl; 
133.   //get task 
134.   vector<CTask*>* taskList = (vector<CTask*>*)threadData; 
135.   vector<CTask*>::iterator iter = taskList->begin(); 
136.   while(iter != taskList->end()) 
137.   { 
138.     
139.    MoveToBusy(tid); 
140.    break; 
141.   } 
142.   CTask* task = *iter; 
143.   taskList->erase(iter); 
144.   pthread_mutex_unlock(&m_pthreadMutex); 
145.   cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl; 
146.   cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl; 
147.   //cout << "task to be run:" << taskList->size() << endl; 
148.   task->Run(); 
149.    
150.   //cout << "CThread::thread work" << endl; 
151.   cout << "tid:" << tid << " idle" << endl; 
152.    
153.  } 
154.  return (void*)0; 
155. } 
156.  
157. int CThreadPool::AddTask(CTask *task) 
158. { 
159.  this->m_vecTaskList.push_back(task); 
160.  pthread_cond_signal(&m_pthreadCond); 
161.  return 0; 
162. } 
163. int CThreadPool::Create() 
164. { 
165.  for(int i = 0; i < m_iThreadNum;i++) 
166.  { 
167.   pthread_t tid = 0; 
168.   pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList); 
169.   m_vecIdleThread.push_back(tid); 
170.  } 
171.  return 0; 
172. } 
173.  
174. int CThreadPool::StopAll() 
175. { 
176.  vector<pthread_t>::iterator iter = m_vecIdleThread.begin(); 
177.  while(iter != m_vecIdleThread.end()) 
178.  { 
179.   pthread_cancel(*iter); 
180.   pthread_join(*iter,NULL); 
181.   iter++; 
182.  } 
183.  
184.  iter = m_vecBusyThread.begin(); 
185.  while(iter != m_vecBusyThread.end()) 
186.  { 
187.   pthread_cancel(*iter); 
188.   pthread_join(*iter,NULL); 
189.   iter++; 
190.  } 
191.   
192.  return 0; 
193. } 
194.  
195. 简单示例: 
196.  
197. ××××××××test.cpp 
198.  
199. #include "CThread.h" 
200. #include <iostream> 
201.  
202. using namespace std; 
203.  
204. class CWorkTask: public CTask 
205. { 
206. public: 
207.  CWorkTask() 
208.  {} 
209.  int Run() 
210.  { 
211.   cout << (char*)this->m_ptrData << endl; 
212.   sleep(10); 
213.   return 0; 
214.  } 
215. }; 
216. int main() 
217. { 
218.  CWorkTask taskObj; 
219.  char szTmp[] = "this is the first thread running,haha success"; 
220.  taskObj.SetData((void*)szTmp); 
221.  CThreadPool threadPool(10); 
222.  for(int i = 0;i < 11;i++) 
223.  { 
224.   threadPool.AddTask(&taskObj); 
225.  } 
226.  while(1) 
227.  { 
228.   sleep(120); 
229.  } 
230.  return 0; 
231. } 

************************************************

代码:

××××××××××××××××××××CThread.h



#ifndef __CTHREAD
#define __CTHREAD
#include <vector>
#include <string>
#include <pthread.h>

using namespace std;

/**
执行任务的类,设置任务数据并执行
**/
class CTask
{
protected:
string m_strTaskName;  //任务的名称
void* m_ptrData;       //要执行的任务的具体数据
public:
CTask(){}
CTask(string taskName)
{
  this->m_strTaskName = taskName;
  m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data);    //设置任务数据
};

/**
线程池
**/
class CThreadPool
{
private:
vector<CTask*> m_vecTaskList;         //任务列表
int m_iThreadNum;                            //线程池中启动的线程数          
static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
static pthread_mutex_t m_pthreadMutex;    //线程同步锁
static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
protected:
static void* ThreadFunc(void * threadData); //新线程的线程函数
static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
int Create();          //创建所有的线程
public:
CThreadPool(int threadNum);
int AddTask(CTask *task);      //把任务添加到线程池中
int StopAll();
};

#endif







类的实现为:

××××××××××××××××××××CThread.cpp



#include "CThread.h"
#include <string>
#include <iostream>

using namespace std;

void CTask::SetData(void * data)
{
m_ptrData = data;
}

vector<pthread_t> CThreadPool::m_vecBusyThread;
vector<pthread_t> CThreadPool::m_vecIdleThread;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

CThreadPool::CThreadPool(int threadNum)
{
this->m_iThreadNum = threadNum;
Create();
}
int CThreadPool::MoveToIdle(pthread_t tid)
{
vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
while(busyIter != m_vecBusyThread.end())
{
  if(tid == *busyIter)
  {
   break;
  }
  busyIter++;
}
m_vecBusyThread.erase(busyIter);
m_vecIdleThread.push_back(tid);
return 0;
}

int CThreadPool::MoveToBusy(pthread_t tid)
{
vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
while(idleIter != m_vecIdleThread.end())
{
  if(tid == *idleIter)
  {
   break;
  }
  idleIter++;
}
m_vecIdleThread.erase(idleIter);
m_vecBusyThread.push_back(tid);
return 0;
}
void* CThreadPool::ThreadFunc(void * threadData)
{
pthread_t tid = pthread_self();
while(1)
{
  pthread_mutex_lock(&m_pthreadMutex);
  pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
  cout << "tid:" << tid << " run" << endl;
  //get task
  vector<CTask*>* taskList = (vector<CTask*>*)threadData;
  vector<CTask*>::iterator iter = taskList->begin();
  while(iter != taskList->end())
  {
  
   MoveToBusy(tid);
   break;
  }
  CTask* task = *iter;
  taskList->erase(iter);
  pthread_mutex_unlock(&m_pthreadMutex);
  cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
  cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
  //cout << "task to be run:" << taskList->size() << endl;
  task->Run();
 
  //cout << "CThread::thread work" << endl;
  cout << "tid:" << tid << " idle" << endl;
 
}
return (void*)0;
}

int CThreadPool::AddTask(CTask *task)
{
this->m_vecTaskList.push_back(task);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
int CThreadPool::Create()
{
for(int i = 0; i < m_iThreadNum;i++)
{
  pthread_t tid = 0;
  pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);
  m_vecIdleThread.push_back(tid);
}
return 0;
}

int CThreadPool::StopAll()
{
vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
while(iter != m_vecIdleThread.end())
{
  pthread_cancel(*iter);
  pthread_join(*iter,NULL);
  iter++;
}

iter = m_vecBusyThread.begin();
while(iter != m_vecBusyThread.end())
{
  pthread_cancel(*iter);
  pthread_join(*iter,NULL);
  iter++;
}

return 0;
}

简单示例:

××××××××test.cpp

#include "CThread.h"
#include <iostream>

using namespace std;

class CWorkTask: public CTask
{
public:
CWorkTask()
{}
int Run()
{
  cout << (char*)this->m_ptrData << endl;
  sleep(10);
  return 0;
}
};
int main()
{
CWorkTask taskObj;
char szTmp[] = "this is the first thread running,haha success";
taskObj.SetData((void*)szTmp);
CThreadPool threadPool(10);
for(int i = 0;i < 11;i++)
{
  threadPool.AddTask(&taskObj);
}
while(1)
{
  sleep(120);
}
return 0;
}



分享到:
评论

相关推荐

    linux 实现一个简单的线程池及工作

    linux 实现一个简单的线程池及工作 本实例演示了线程池的创建使用

    linux下的一个线程池类

    在linux下实现的一个简单的线程池,可以实现多任务机制,在linux下实现的一个简单的线程池,可以实现多任务机制

    linux线程池的C语言实现

    通常我们使用多线程的方式是,需要时创建一个新的线程,在这个新的线程里执行特定的...这在一般的应用里已经能够满足我们应用的需要,毕竟我们并不是什么时候都需要创建大量的线程,并在它们执行一个简单的任务后销毁。

    Linux下C线程池实现

    在Linux下用C写的一个简易线程池。系统是RedHat 9,gcc版本"gcc version 4.1.2 20071124 (Red Hat 4.1.2-42)"。文件夹里的源码是按工程组织好的,在文件夹下的test目录下面有一个小的测试程序和Makefile,编译后即可...

    linux c线程池

    linux pthreadpool实现和线程池的用处 简单易懂 互斥和信号量使用

    简单linux C++线程池

    简单linux C++线程池实现 vector存储任务,封装类实现线程池功能。新手学习线程池的好机会,Makefile编译可直接运行。

    Linux下Epoll+线程池的简单Web服务器

    这是一个简单的web服务器利用Linux下的线程池技术和Epoll机制 简单粗略有不足的欢迎指导

    thd_pool_v1.02.tar.bz2.zip_linux_linux 线程池_whale1ce_线程池

    Linux中的一个简单线程池,基于POSIX线程API。 管理线程将检索不必要的空闲线程以保持低资源消耗。

    LINUX通用线程池的构建

    关于LINUX通用线程池的构建 本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行...文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单。

    Linux C线程池简单实现实例

    主要介绍了Linux C线程池简单实现实例的相关资料,需要的朋友可以参考下

    linux下c++线程池

    一个简单好用的线程池,包括如何使用的demo,未考虑动态伸缩。 之前上传的那些线程池资源都不及这个。 用qtcreator可以直接打开。

    VC简单的线程池使用实例

    1.线程池管理器(ThreadPoolManager):用于创建并管理...网上的c/c++线程池多是linux下的,这个是VC6.0的线程池。其涉及的主要文件有:ThreadPoolManage、IThreadExcute、TemporarilyThread、ThreadManage、RegKeyIni。

    linux C++ 实现线程池(避免线程创建的耗时)

    linux下c++写的线程池,可以了解pthread_cond_timewait和pthread_detach的用法,自定义最大使用的线程数量,线程退出线程池的超时时间,任务优先级处理。

    一个简单的c++ 线程池

    一个简单的c++ 线程池,非常实用,只有threadpool.h 和threadpool.cpp 和main 三个文件,希望能给大家提供方便。

    线程池Linux C语言简单版本

    本线程池采用C语言实现。包括以下内容 &gt; - thread_pool_create:创建...主要的核心点集中在thread_pool_post和thread_worker两个函数中,这两个函数也构成了生产者-消费者模型。本文采用队列+互斥锁+条件变量实现。

    线程池_c线程池_

    linux线程池C语言编程,通过简单的例子,可以很好理解到线程池的作用

    C语言实现简单线程池.zip

    由C语言实现简单的线程池,任务调配,合理创建销毁线程处理任务

    linux下的线程池实现

    简单的linux多线程实现,含互斥锁的使用

    线程池实现,通过C语言实现

    linux下通过200行C代码实现简单线程池,附源代码和文档说明: 200行C代码实现简单线程池.doc threadpool.c

    C++写的线程池,环境是ubuntu(Linux)

    比较简单的线程池,自己练手用,大家有兴趣可以参考参考,也可以批评指正

Global site tag (gtag.js) - Google Analytics