`
sipgreen
  • 浏览: 25454 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
最近访客 更多访客>>
社区版块
存档分类
最新评论

BlockingQueue C++实现

    博客分类:
  • C++
 
阅读更多

 

// BlockingQueue.h: interface for the CBlockingQueue class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_)
#define AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include <windows.h>
#include <LIST>
#include <vector>
#include <STRING>
using namespace std;

class CBlockingQueue
{
public:
	CBlockingQueue(int size);
	virtual ~CBlockingQueue();
private:
	CBlockingQueue();
	HANDLE m_MssageNullEvent;
	HANDLE m_synSignal;
	int m_size;
	list<string> m_msgs;
public:
		string Dequeue();
		void Enqueue(string msg);
};

#endif // !defined(AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_)

 

 

 

 

// BlockingQueue.cpp: implementation of the CBlockingQueue class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "BlockingQueue.h"
#include <afxcom_.h>

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CBlockingQueue::CBlockingQueue()
{

}

CBlockingQueue::CBlockingQueue(int ssize)
{
	m_size=ssize;
	m_synSignal=CreateSemaphore(NULL,1,1,"m_synSignal");
	m_MssageNullEvent=CreateEvent(NULL,TRUE,TRUE,"m_MssageNullEvent");
}

CBlockingQueue::~CBlockingQueue()
{
	
}
string CBlockingQueue::Dequeue()
{
	string msg="";
	WaitForSingleObject(m_synSignal,INFINITE);
	while(0==m_msgs.size())
	{
		bool breset=ResetEvent(m_MssageNullEvent);
		ReleaseSemaphore(m_synSignal,1,NULL);
		DWORD nret=WaitForSingleObject(m_MssageNullEvent,INFINITE);
		//ASSERT(WAIT_TIMEOUT!=);	
	}
	string temp=*m_msgs.begin();
	m_msgs.pop_front();
	SetEvent(m_MssageNullEvent);
	ReleaseSemaphore(m_synSignal,1,NULL);
	
	return temp;
}

void CBlockingQueue::Enqueue(string msg)
{
	WaitForSingleObject(m_synSignal,INFINITE);
	int a=m_msgs.size();
	while(m_size==m_msgs.size())
	{
		bool bret=	ResetEvent(m_MssageNullEvent);
		ReleaseSemaphore(m_synSignal,1,NULL);
		DWORD nret=	WaitForSingleObject(m_MssageNullEvent,INFINITE);	
	}
	m_msgs.push_back(msg);
	ReleaseSemaphore(m_synSignal,1,NULL);
	if (1==m_msgs.size())
		SetEvent(m_MssageNullEvent);
}

 

 以上不具有普遍性,更合理的设计如下:

 

将  同步内核对象抽象出来 成为 CMonitor

 

// Monitor.h: interface for the CMonitor class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_MONITOR_H__26A4800D_6F3C_41BF_97AC_1D20860517AC__INCLUDED_)
#define AFX_MONITOR_H__26A4800D_6F3C_41BF_97AC_1D20860517AC__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include <windows.h>

class CMonitor 
{

	

public:
	CMonitor();
	virtual ~CMonitor();


private:
	HANDLE m_h[2];
	DWORD m_lastThreadId;
public:
	void wait(DWORD timeout);
	bool pulse();
	void Enter();
	void Exit();
	


};

#endif // !defined(AFX_MONITOR_H__26A4800D_6F3C_41BF_97AC_1D20860517AC__INCLUDED_)

 

// Monitor.cpp: implementation of the CMonitor class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "Monitor.h"
#include <exception>

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CMonitor::CMonitor()
{
	m_h[0]=CreateEvent(NULL,TRUE,TRUE,"h1");
	m_h[1]=CreateEvent(NULL,TRUE,TRUE,"h2");
	m_lastThreadId=0;
}

CMonitor::~CMonitor()
{

}

void CMonitor::Enter()
{
	m_lastThreadId=::GetCurrentThreadId();	
	WaitForSingleObject(m_h[0],INFINITE);//ResetEvent();
}

void CMonitor::Exit()
{
	m_lastThreadId=NULL;
	SetEvent(m_h[0]);
	SetEvent(m_h[1]);
}

bool CMonitor::pulse()
{
	if (!(m_lastThreadId!=NULL || m_lastThreadId!=::GetCurrentThreadId()))
	{
		throw exception("The wait could only be excuted by the monitor owner!");
	}
	SetEvent(m_h[1]);

	return true;
}

void CMonitor::wait( DWORD timeout )
{
	if (!(m_lastThreadId!=NULL || m_lastThreadId!=::GetCurrentThreadId()))
	{
		throw exception("The wait could only be excuted by the monitor owner!");
	}
	m_lastThreadId=NULL;
	ResetEvent(m_h[1]);
	WaitForSingleObject(m_h[1],timeout);
	SetEvent(m_h[0]);
}


 

 

那么新的 BlockingQueue代码如下:

 

// BlockingQueue.h: interface for the CBlockingQueue class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_)
#define AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include <windows.h>
#include <LIST>
#include <vector>
#include <STRING>
#include "Monitor.h"
using namespace std;

class CBlockingQueue
{
public:
	CBlockingQueue(int size);
	virtual ~CBlockingQueue();
private:
	CBlockingQueue();
	int m_size;
	list<string> m_msgs;

	CMonitor m_monitor;
public:
		string Dequeue();
		void Enqueue(string msg);
};

#endif // !defined(AFX_BLOCKINGQUEUE_H__E6C614E8_4A5D_4D18_A38D_845018DA75B6__INCLUDED_)

 

 

// BlockingQueue.cpp: implementation of the CBlockingQueue class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "BlockingQueue.h"
#include <afxcom_.h>

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CBlockingQueue::CBlockingQueue()
{

}

CBlockingQueue::CBlockingQueue(int ssize)
{
	m_size=ssize;
}

CBlockingQueue::~CBlockingQueue()
{
	
}
string CBlockingQueue::Dequeue()
{
	string msg="";

	m_monitor.Enter();
	{
		while(0==m_msgs.size())
		{
			m_monitor.wait(INFINITE);
		}
		msg=*m_msgs.begin();
		m_msgs.pop_front();
		if ((m_size-1)==m_msgs.size())
			m_monitor.pulse();
	}
	m_monitor.Exit();

	return msg;
}

void CBlockingQueue::Enqueue(string msg)
{
	m_monitor.Enter();
	{
		int a=m_msgs.size();
		while(m_size==m_msgs.size())
		{
			m_monitor.wait(INFINITE);
		}
		m_msgs.push_back(msg);
		if (1==m_msgs.size())
			m_monitor.pulse();
	}
	m_monitor.Exit();
}

 

1
8
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics