请阅读上篇文章《并发编程实战: POSIX 使用互斥量和条件变量实现生产者/消费者问题》。当然不阅读亦不影响本篇文章的阅读。
Boost的互斥量,条件变量做了很好的封装,因此比“原生的”POSIX mutex,condition variables好用。然后我们会通过分析boost相关源码看一下boost linux是如何对pthread_mutex_t和pthread_cond_t进行的封装。
首先看一下condition_variable_any的具体实现,代码路径:/boost/thread/pthread/condition_variable.hpp
class condition_variable_any
{
pthread_mutex_t internal_mutex;
pthread_cond_t cond;
condition_variable_any(condition_variable_any&);
condition_variable_any& operator=(condition_variable_any&);
public:
condition_variable_any()
{
int const res=pthread_mutex_init(&internal_mutex,NULL);
if(res)
{
boost::throw_exception(thread_resource_error());
}
int const res2=pthread_cond_init(&cond,NULL);
if(res2)
{
BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));
boost::throw_exception(thread_resource_error());
}
}
~condition_variable_any()
{
BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));
BOOST_VERIFY(!pthread_cond_destroy(&cond));
}
condition_variable_any的构造函数是对于内部使用的mutex和cond的初始化,对应的,析构函数则是这些资源的回收。
BOOST_VERIFY的实现:
#undef BOOST_VERIFY
#if defined(BOOST_DISABLE_ASSERTS) || ( !defined(BOOST_ENABLE_ASSERT_HANDLER) && defined(NDEBUG) )
// 在任何情况下,expr一定会被求值。
#define BOOST_VERIFY(expr) ((void)(expr))
#else
#define BOOST_VERIFY(expr) BOOST_ASSERT(expr)
#endif
因此不同于assert在Release版的被优化掉不同,我们可以放心的使用BOOST_VERITY,因此它的表达式肯定会被求值,而不用担心assert的side effect。
接下来看一下condition_variable_any的核心实现:wait
template<typename lock_type>
void wait(lock_type& m)
{
int res=0;
{
thread_cv_detail::lock_on_exit<lock_type> guard;
detail::interruption_checker check_for_interruption(&internal_mutex,&cond);
guard.activate(m);
res=pthread_cond_wait(&cond,&internal_mutex);
this_thread::interruption_point();
}
if(res)
{
boost::throw_exception(condition_error());
}
}
首先看一下lock_on_exit:
namespace thread_cv_detail
{
template<typename MutexType>
struct lock_on_exit
{
MutexType* m;
lock_on_exit():
m(0)
{}
void activate(MutexType& m_)
{
m_.unlock();
m=&m_;
}
~lock_on_exit()
{
if(m)
{
m->lock();
}
}
};
}
代码很简单,实现了在调用activate时将传入的lock解锁,在该变量生命期结束时将guard的lock加锁。
接下来的detail::interruption_checker check_for_interruption(&internal_mutex,&cond);是什么意思呢?From /boost/thread/pthread/thread_data.hpp
class interruption_checker
{
thread_data_base* const thread_info;
pthread_mutex_t* m;
bool set;
void check_for_interruption()
{
if(thread_info->interrupt_requested)
{
thread_info->interrupt_requested=false;
throw thread_interrupted();
}
}
void operator=(interruption_checker&);
public:
explicit interruption_checker(pthread_mutex_t* cond_mutex,pthread_cond_t* cond):
thread_info(detail::get_current_thread_data()),m(cond_mutex),
set(thread_info && thread_info->interrupt_enabled)
{
if(set)
{
lock_guard<mutex> guard(thread_info->data_mutex);
check_for_interruption();
thread_info->cond_mutex=cond_mutex;
thread_info->current_cond=cond;
BOOST_VERIFY(!pthread_mutex_lock(m));
}
else
{
BOOST_VERIFY(!pthread_mutex_lock(m));
}
}
~interruption_checker()
{
if(set)
{
BOOST_VERIFY(!pthread_mutex_unlock(m));
lock_guard<mutex> guard(thread_info->data_mutex);
thread_info->cond_mutex=NULL;
thread_info->current_cond=NULL;
}
else
{
BOOST_VERIFY(!pthread_mutex_unlock(m));
}
}
代码面前,毫无隐藏。那句话就是此时如果有interrupt,那么就interrupt吧。否则,lock传入的mutex,也是为了res=pthread_cond_wait(&cond,&internal_mutex);做准备。
关于线程的中断点,可以移步《【Boost】boost库中thread多线程详解5——谈谈线程中断》。
对于boost::mutex,大家可以使用同样的方法去解读boost的实现,相对于condition variable,mutex的实现更加直观。代码路径:/boost/thread/pthread/mutex.hpp。
namespace boost
{
class mutex
{
private:
mutex(mutex const&);
mutex& operator=(mutex const&);
pthread_mutex_t m;
public:
mutex()
{
int const res=pthread_mutex_init(&m,NULL);
if(res)
{
boost::throw_exception(thread_resource_error());
}
}
~mutex()
{
BOOST_VERIFY(!pthread_mutex_destroy(&m));
}
void lock()
{
int const res=pthread_mutex_lock(&m);
if(res)
{
boost::throw_exception(lock_error(res));
}
}
void unlock()
{
BOOST_VERIFY(!pthread_mutex_unlock(&m));
}
bool try_lock()
{
int const res=pthread_mutex_trylock(&m);
if(res && (res!=EBUSY))
{
boost::throw_exception(lock_error(res));
}
return !res;
}
typedef pthread_mutex_t* native_handle_type;
native_handle_type native_handle()
{
return &m;
}
typedef unique_lock<mutex> scoped_lock;
typedef detail::try_lock_wrapper<mutex> scoped_try_lock;
};
}
boost对于pthread_mutex_t和pthread_cond_t的封装,方便了开发者的使用的资源的安全有效管理。当然,在不同的公司,可能也都有类似的封装,学习boost的源码,无疑可以加深我们的理解。在某些特定的场合,我们也可以学习boost的封装方法,简化我们的日常开发。
最后,奉上简单的生产者、消费者的boost的实现,和前文《并发编程实战: POSIX 使用互斥量和条件变量实现生产者/消费者问题》相比,我们可以看到boost简化了mutex和condition variable的使用。以下代码引自《Boost程序库完全开发指南》:
#include <boost/thread.hpp>
#include <stack>
using std::stack;
using std::cout;
class buffer
{
private:
boost::mutex mu; // 条件变量需要配合互斥量
boost::condition_variable_any cond_put; // 生产者写入
boost::condition_variable_any cond_get; // 消费者读走
stack<int> stk;
int un_read;
int capacity;
bool is_full()
{
return un_read == capacity;
}
bool is_empty()
{
return 0 == un_read;
}
public:
buffer(size_t capacity) : un_read(0), capacity(capacity)
{}
void put(int x)
{
boost::mutex::scoped_lock lock(mu); // 这里是读锁的门闩类
while (is_full())
{
cout << "full waiting..." << endl;
cond_put.wait(mu); // line:51
}
stk.push(x);
++un_read;
cond_get.notify_one();
}
void get(int *x)
{
boost::mutex::scoped_lock lock(mu); // 这里是读锁的门闩类
while (is_empty())
{
cout << "empty waiting..." << endl;
cond_get.wait(mu);
}
*x = stk.top();
stk.pop();
--un_read;
cond_put.notify_one(); // 通知 51line可以写入了
}
};
buffer buf(5);
void producer(int n)
{
for (int i = 0; i < n; ++i)
{
cout << "put : " << i << endl;
buf.put(i);
}
}
void consumer(int n)
{
int x;
for (int i = 0; i < n; ++i)
{
buf.get(&x);
cout << "get : " << x << endl;
}
}
int main()
{
boost::thread t1(producer, 20);
boost::thread t2(consumer, 10);
boost::thread t3(consumer, 10);
t1.join();
t2.join();
t3.join();
return 0;
}
最后说一句,condition_variable_any == condition, from /
boost/
thread/
condition.hpp
namespace boost
{
typedef condition_variable_any condition;
}
分享到:
相关推荐
软件界面上点“创建线程” 按钮,创建三个生产者线程(P1,P2,P3)和两个消费者线程(C1,C2),生产者和消费者线程共享一个长度为2KB的环型公共缓冲区,生产者向其中投放消息,消费者从中取走消息。只要缓冲区未满...
1、设计目的:通过研究Linux的进程机制和信号量,实现生产者消费者问题的并发控制。 2、说明:有界缓冲区内设有20个存储单元,放入取出的产品设定为1-20个整数。 3、设计要求: 生产者和消费者进程的数目不固定,可...
设计目的:通过研究Linux 的进程机制和信号量实现生产者消费者问题的并发控制。说明:有界缓冲区内设有20 个存储单元,放入/取出的数据项设定为1‐20 这20 个整型数。设计要求:1)每个生产者和消费者对有界缓冲区...
线程并发控制condition 互斥量 多线程写的:生产者、消费者问题
多线程实现生产者消费者模型:锁(Lock)、信号量(Semaphore、BoundedSemaphore)、条件(Condition)、队列(Queue)、事件(Event) 多进程程实现生产者消费者模型:信号量(Semaphore)、条件(Condition)、...
通过研究Linux的线程机制和信号量实现生产者消费者(Bounded Buffer)问题的并发控制。 实验条件要求:每人一台与Linux主机联网的Windows主机,普通用户权限。 (1) 每个生产者和消费者对有界缓冲区进行操作后,...
通过研究进程并发和信号量机制,实现生产者-消费者问题的并发控制。 2、设计要求 1)每个生产者和消费者对有界缓冲区进行操作后,即时显示有界缓冲区的全部内容,当前指针位置和生产者/消费者进程的标识符。 说明:...
本设计通过模拟计算机操作系统中经典的“生产者—消费者问题”,巩固在操作系统原理课上所学的知识,加深对操作系统中进程同步和互斥、临界区管理,管程等问题的认识和理解。前期主要利用P、V信号量来控制各进程间的...
第二步:实现生产者/消费者问题 1)有一群生产者进程在生产产品,并将这些产品提供给消费者进程去消费。为使生产者进程与消费者进程能并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池:生产者进程从文件中读取...
小实验三:根据同步机制的Peterson软件解决方案尝试自己编程实现线程同步机制和用于上述线程并发问题的解决,并基于程序运行时间长短将其与基于Windows互斥信号量的线程同步机制的效率展开比较。 实验要求:线程主体...
实验目的:使用多线程并发程序设计模拟生产者消费者问题,了解P-V操作的实现方法,实现生产者和消费者的同步和互斥,了解各线程如何使用临界资源和临界区。 主要内容:约束条件:生产者进程负责生产产品放到缓冲区,...
(2)信号量机制实现生产者和消费者对缓冲区的互斥访问。 (3)生产者生产产品时,要输出当前缓冲区冲产品的个数和存放产品的位置。 (4)消费者消费产品时,要输出当前缓冲区冲产品的个数和消费产品的位置。 (5)...
在生产者---消费者问题中应注意(信号量名称以多个生产者和多个消费者中的为例):首先,在每个程序中用于互斥的wait(mutex)和signal(mutex)必须成对出现;其次,对资源信号量empty和full的wait和signal操作,同样...
通过研究Linux的线程机制和信号量实现生产者消费者(Bounded Buffer)问题的并发控制。 实验条件要求:每人一台与Linux主机联网的Windows主机,普通用户权限。 (1) 每个生产者和消费者对有界缓冲区进行操作后,即时...
《Java并发编程从入门到精通》作者结合自己10多年Java并发编程经验,详细介绍了Java并发编程的基础概念、工作原理、编程技巧和注意事项,对Java高性能高并发编程有极大的参考价值。 《Java并发编程从入门到精通》...
1、设计目的:通过研究Linux的进程同步机制和信号量,实现生产者消费者问题的并发控制。 2、说明:有界缓冲区内设有20个存储单元,放入取出的产品设定为20个100以内的随机整数。 3、设计要求: 1) 生产者与消费者均...
(1)创建生产者和消费者线程 在Windows2000环境下,创建一个控制台进程,在此进程中创建n个线程来模拟生产者或者消费者。这些线程的信息由本程序定义的“测试用例文件”中予以指定。 该文件的格式和含义如下: 3 1 ...
高并发编程第三阶段11讲 AtomicXXXFieldUpdater源码分析及使用场景分析.mp4 高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4 高并发编程第三阶段13讲 一个JNI程序的编写,通过...
生产者-消费者问题描述的是:有一群生产者进程在生产产品,并将这些产品提供给消费者进程去消费。为使生产者进程与消费者进程能够并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者进程将它所生产的产品...
多线程并发编程-同步与互斥-原⼦变量-并发和⽆锁 数据结构