`

windows下默认的线程池设计(1)

阅读更多

先搞个基于windows的线程池设计

 

基本设计思路:

执行线程

while(true)

{

if(queue.hasJob()){//queue是一个同步队列,因此如果队列空,就持续等待

fetch job;

//选择合适线程---------------à这一步,在windows下可以直接使用QueueUserWorkItemXP以后),//这里是线程池的关键

Thead.dojob();

 

}

}

  • QueueUserWorkItemmsdn上叙述如下:

This topic describes the original thread pool API. The thread pool API introduced in Windows Vista is simpler, more reliable, has better performance, and provides more flexibility for developers. For information on the current thread pool API, see Thread Pools.

QueueUserWorkItem属于原始的线程池APIvista之后,有更加简单的,可靠的,更高性能,且对开发者提供易于扩展的线程池。

The thread pool is created the first time you call QueueUserWorkItem or BindIoCompletionCallback, or when a timer-queue timer or registered wait operation queues a callback function. By default, the number of threads that can be created in the thread pool is about 500. Each thread uses the default stack size and runs at the default priority.

    这里说明,线程池是在第一次QueueUserWorkItem或者BindIoCompletionCallback或者timer-queue timer or registered wait operation调度一个回调函数创建的。缺省情况下,线程池大小为500,每个线程池缺省栈大小和优先级。

  1. 实现一个等待同步队列

上面的queue是一个同步队列,采用循环队列实现

 

 

//具体某个作业

struct CJob{
 INT64 id;
 void* m_data;
 PTHREAD_PROC m_pFunc;
public:
 CJob()
 {
 }
 CJob(PTHREAD_PROC pFunc,void *data){

  this->m_pFunc =pFunc;
  this->m_data = data;
 }
 virtual void run(){
  m_pFunc(m_data);
 }

};

 

//基于临界区的读写锁,vista后,msdn推荐用SRWLock

class RWLock{
 CRITICAL_SECTION m_mutex;
public:
 RWLock()
 {
  InitializeCriticalSectionAndSpinCount(&m_mutex, 1000);//自旋锁临界区,多核
 }
 ~RWLock()
 {
  DeleteCriticalSection(&m_mutex);
 }
 BOOL tryAndLock(int count=0){//尝试多少次获得锁
  int i=0;
  BOOL bRet = FALSE;
  do{
   bRet = TryEnterCriticalSection(&m_mutex);
   if(bRet == TRUE)
    return TRUE;
   ::Sleep(0);
   i++;
  }while(i<count);
  return FALSE;
 }
 void tryAndRelease(){
  LeaveCriticalSection(&m_mutex);
 }
 void lock(){
  ::EnterCriticalSection(&m_mutex);
 }
 void unLock(){
  ::LeaveCriticalSection(&m_mutex);
 }
};

//封装的信号量
class Semaphore{
 HANDLE m_sem;
public:
 explicit Semaphore(int initCount,int maxCount){
  m_sem = ::CreateSemaphore(NULL,initCount,maxCount,NULL);
 }
 ~Semaphore(){

  ::CloseHandle(m_sem);
 }
 void V(){
  ReleaseSemaphore(m_sem,1,NULL);
 }
 void P()
 {
  DWORD dwRet = ::WaitForSingleObject(m_sem,INFINITE);
  switch(dwRet)
  {
   case WAIT_ABANDONED:
    break;
   case WAIT_OBJECT_0 :
    break;
   case WAIT_TIMEOUT:
    break;
   case WAIT_FAILED :
    break;
  }
 }
};

//存储请求需要的同步队列,现有的都是阻塞模型,尚未添加非阻塞模型

class CBlockQueue{
 Semaphore emptySemaphore;
 Semaphore fullSemaphore;
 RWLock lock;
 volatile int front;
 volatile int rear;
 CJob** jobArr;
 int capacity ;

public:
 CBlockQueue(int maxSize=1023):front(0),rear(0),emptySemaphore(maxSize,maxSize),fullSemaphore(0,maxSize),capacity(maxSize+1)
 {
  if(maxSize<=0){
   //
   throw "cannot be negative.";
  }
  jobArr = new CJob*[maxSize+1];
  memset(jobArr,0,sizeof(CJob*)*(1+maxSize));
 }
  ~CBlockQueue(){
   delete[]jobArr;

 }

 //如果添加失败,将会阻塞,如果实现不好阻塞的可能是主线程
 void Put(CJob *pJob)
 {
  if(pJob != NULL){
   emptySemaphore.P();
   lock.lock();
   bool bRet = this->enQueue(pJob);
   lock.unLock();
   fullSemaphore.V();
   
  }
 }
 //获取任务并从队列头部移走;失败(队列空)会阻塞直到有对象可获得;返回对象
 CJob* Take()
 {
  fullSemaphore.P();
  CJob *pJob = DeQueue();
  emptySemaphore.V();
  assert(pJob!=NULL);
  return pJob;
 }
 //不移走队列;不阻塞
 CJob* peek()
 {
  BOOL bRet = lock.tryAndLock();
  if(bRet)
  {
   int tail = (rear-1+capacity)%capacity;
   lock.tryAndRelease();
   return jobArr[tail];
  }
  return NULL;  
 }
private:
 bool enQueue(CJob *pJob){
  //if(isFull())
  // return false;
  jobArr[rear]=pJob;
  rear = (rear+1)%capacity;
  return true;
 }
 CJob * DeQueue(){
  //if(isEmpty())return NULL;
  CJob* pJob=jobArr[front];
  front = (front+1)%capacity;
  return pJob;

 }
 bool isFull(){
  return ((front+1+capacity-rear)%capacity==0);
 }
 bool isEmpty(){
  return front == rear;
 }

};

  

----------------------------------------------------------------

  1. //简单封装的线程池调用,主要为Linux下提供接口

 

class CThreadsPool
{
 UINT64 m_lKeepAliveTime;//计时函数,单位ms,表示如果请求等待时间超出这个时间,那么移除这个请求,并标记请求失败
 int m_nCurThreadNum;
 int m_nMinThreadNum;
 int m_nMaxThreadNum;
 bool m_bAutoBalance;
 //priorityqueue pq;最小队列记录总的空闲线程
 CBlockQueue *m_pBlockingQueue;
 



public:
 CThreadsPool(CBlockQueue *pBlockingQueue,int minThreadNum=128,int maxThreadNum=1024,bool autoBalance=false,UINT64 keepAliveTime=0)
 {
  m_nCurThreadNum= minThreadNum;
  m_nMinThreadNum = minThreadNum;
  m_bAutoBalance = autoBalance;
  m_lKeepAliveTime = keepAliveTime;
  m_pBlockingQueue = pBlockingQueue;
 }
 void InitPool(){
 //create threads ,and suspend to wait job.
 }
 ~CThreadsPool(void)
 {
 
 }
 
 unsigned int DoJob()
 {
  //select an available thread
  //(1)if failed,and current thread size is less than max size,then create a bulk of threads, otherwise ,failed with OVERSIZE
  //(2)success,go on
  if(m_pBlockingQueue!= NULL){
   do{
   CJob* pJob = m_pBlockingQueue->Take();
   if(pJob == NULL)
   {
    //error
   }
   else
   {
    //select a thread;
    BOOL bRet = QueueUserWorkItem((LPTHREAD_START_ROUTINE)(pJob->m_pFunc),pJob->m_data,WT_EXECUTELONGFUNCTION);
    if(bRet ==FALSE){
     DWORD dwError = ::GetLastError();
    }
    else
    {

     //这就执行完了?
    }
   }
   }while(TRUE);
  }
    return 0;
 }
 };

 

---------------------------基本上就可以调用了,继续测试下看看有木有bug :

CBlockQueue queue;
DWORD producer_work(void *pParam)
{
	printf("producer id = %d\n",*(int*)pParam);
	return 0L;
}
DWORD Producer(void *pParam)
{

	static  int id =1;
	for(int i=0;i<102343;i++){
		Sleep(0);//Sleep是为了更好的看到效果
	CJob *pJob = new CJob;

	pJob->id = id++;
	pJob->m_pFunc = producer_work;
	pJob->m_data = &pJob->id;
	printf("put job id = %d\n",pJob->id);
	queue.Put(pJob);
	}
	return 0L;
}


DWORD Consumer(void *pParam)
{
	CThreadsPool pool(&queue);
	pool.InitPool();
	pool.DoJob();
	return 0L;
}
int main(int argc,char*argv[])
{
	DWORD threadID;
	HANDLE hThread1 = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Producer,NULL,0,&threadID);

	DWORD threadID2;
	HANDLE hThread2  = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Consumer,NULL,0,&threadID2);
	::WaitForSingleObject(hThread2,INFINITE);
}

 

后台输出结果:

“ThreadPool.exe”: 已加载“C:\Windows\winsxs\x86_microsoft.vc90.debugcrt_1fc8b3b9a1e18e3b_9.0.30729.1_none_bb1f6aa1308c35eb\msvcr90d.dll”,已加载符号。
线程 'Win32 线程' (0x16bc) 已退出,返回值为 0 (0x0)。
线程 'Win32 线程' (0x9c0) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x14e4) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1520) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xa10) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1680) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x17bc) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x3bc) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x11a4) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1770) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1588) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x3a0) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x12e4) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1498) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xcd0) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1314) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x310) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1634) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xc04) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1164) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x908) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x34c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x175c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x13bc) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x258) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1514) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x680) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x4a8) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x9b4) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xc2c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x2ec) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xd40) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1198) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1774) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x12f0) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x10f8) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x65c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x1768) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x152c) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x784) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x16dc) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0xaa8) 已退出,返回值为 -1073741510 (0xc000013a)。
线程 'Win32 线程' (0x12d0) 已退出,返回值为 -1073741510 (0xc000013a)。
程序“[1848] ThreadPool.exe: 本机”已退出,返回值为 -1073741510 (0xc000013a)。

 

可见线程确实已经启动了。

输出结果:

 上面结果,可以看出,put和take过程确实是异步多线程处理的。

 

 

上面实例是生产者——消费者模型

一个生产者,多个消费者,多个消费者通过线程池控制,具体个数依赖于QueueUserWorkItem;线程池有一个单独的线程控制。

因此实际上一共有  :

             主线程+生产者线程+控制线程+N个消费者线程。

实际上也可以改成  多生产者——多消费者模型,这类模型通常适合于web服务器处理。

我们在实际过程中,采用的是如下模型:

多生产者(web请求)→单消费者(同时又是单生产者,web请求队列排队,创建合适线程个数)→多消费者(多线程获取web请求队列,处理该请求)。

还不了解web服务器采用的是那种模型。

 

  • 大小: 8.5 KB
分享到:
评论

相关推荐

    vc++ 开发实例源码包

    内含各种例子(vc下各种控件的使用方法、标题栏与菜单栏、工具栏与状态栏、图标与光标、程序窗口、程序控制、进程与线程、字符串、文件读写操作、文件与文件夹属性操作、文件与文件夹系统操作、系统控制操作、程序...

    vc++ 应用源码包_1

    MYICQ 0.8 alpha1 (仿腾讯QQ整套聊天系统) 老版qq系统的实现。 MyIE3.0浏览器源代码 如题。完整的代码,重载控件实现,非常适合初学者。 MyPhpServer(原创,有实现的主要代码) microcai-ibus-t9-输入法源码 如题,...

    vc++ 应用源码包_2

    MYICQ 0.8 alpha1 (仿腾讯QQ整套聊天系统) 老版qq系统的实现。 MyIE3.0浏览器源代码 如题。完整的代码,重载控件实现,非常适合初学者。 MyPhpServer(原创,有实现的主要代码) microcai-ibus-t9-输入法源码 如题,...

    vc++ 应用源码包_3

    MYICQ 0.8 alpha1 (仿腾讯QQ整套聊天系统) 老版qq系统的实现。 MyIE3.0浏览器源代码 如题。完整的代码,重载控件实现,非常适合初学者。 MyPhpServer(原创,有实现的主要代码) microcai-ibus-t9-输入法源码 如题,...

    vc++ 应用源码包_6

    MYICQ 0.8 alpha1 (仿腾讯QQ整套聊天系统) 老版qq系统的实现。 MyIE3.0浏览器源代码 如题。完整的代码,重载控件实现,非常适合初学者。 MyPhpServer(原创,有实现的主要代码) microcai-ibus-t9-输入法源码 如题,...

    vc++ 应用源码包_5

    MYICQ 0.8 alpha1 (仿腾讯QQ整套聊天系统) 老版qq系统的实现。 MyIE3.0浏览器源代码 如题。完整的代码,重载控件实现,非常适合初学者。 MyPhpServer(原创,有实现的主要代码) microcai-ibus-t9-输入法源码 如题,...

    Java开发实战1200例(第1卷).(清华出版.李钟尉.陈丹丹).part3

    实例016 设计Windows系统的运行对话框 界面 23 实例017 设计计算器程序界面 26 实例018 设计关于进销存管理系统的界面 27 第2章 Java基础应用 29 2.1 基本语法 30 实例019 输出错误信息与调试信息 30 实例020 从...

    C#5.0本质论第四版(因文件较大传的是百度网盘地址)

    C#5.0本质论第四版,高清扫描的,对C#5.0技术讲的比较详细,第1章 C#概述 1 1.1 Hello,World 1 1.2 C#语法基础 3 1.2.1 C#关键字 3 1.2.2 标识符 4 1.2.3 类型定义 5 1.2.4 Main 6 1.2.5...

Global site tag (gtag.js) - Google Analytics