原文作者:@玄冬Wong
key word:Non-blocking、Blocking、multi-productor、multi-customer、benchmark、performance
/************************************************************************/ /* 测试多个生产者多个消费者线程环境下,boost::lockfree::queue和std::mutex的性能 */ /************************************************************************/ #define _ENABLE_ATOMIC_ALIGNMENT_FIX #include <windows.h> #include <iostream> #include <time.h> #include <thread> #include <list> #include <boost/lockfree/spsc_queue.hpp> #include <boost/lockfree/queue.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition_variable.hpp> #include <boost/circular_buffer.hpp> #include <mutex> #define LOOP_COUNT 5000000 #define QUEUE_CAPACITY 65534 #define THREAD_COUNT 4 std::mutex m; std::condition_variable cv; boost::circular_buffer<int> cb(QUEUE_CAPACITY); boost::lockfree::queue<int, boost::lockfree::capacity<QUEUE_CAPACITY>>* mul_queue; std::atomic<unsigned int> push_count = 0; std::atomic<unsigned int> pop_count = 0; void blocking_productor() { while (1) { std::unique_lock<std::mutex> lk(m); cv.wait(lk, [] {return cb.size() < QUEUE_CAPACITY; }); cb.push_back(push_count); cv.notify_one(); if (++push_count >= LOOP_COUNT * THREAD_COUNT) { break; } } } void blocking_customer() { while (1) { std::unique_lock<std::mutex> lk(m); cv.wait(lk, [] {return cb.size() > 0; }); cb.pop_front(); cv.notify_one(); if (++pop_count >= LOOP_COUNT * THREAD_COUNT) { break; } } } void nonblocking_productor() { while (1) { if (push_count/*.load(std::memory_order_acquire)*/ >= LOOP_COUNT * THREAD_COUNT) { break; } if (mul_queue->push(push_count)) { ++push_count; //push_count.fetch_add(1, std::memory_order_relaxed); } else { Sleep(1); } } } void nonblocking_customer() { int val; while (1) { if (pop_count/*.load(std::memory_order_acquire)*/ >= LOOP_COUNT * THREAD_COUNT) { break; } if (mul_queue->pop(val)) { ++pop_count; //pop_count.fetch_add(1, std::memory_order_relaxed); } else { Sleep(1); } } } void test_blocking() { std::thread** carray = new std::thread*[THREAD_COUNT]; std::thread** parray = new std::thread*[THREAD_COUNT]; clock_t start = clock(); for (int i = 0; i < THREAD_COUNT; i++) { carray[i] = new std::thread((&blocking_customer)); parray[i] = new std::thread((&blocking_productor)); } for (int i = 0; i < THREAD_COUNT; i++) { carray[i]->join(); parray[i]->join(); } clock_t end = clock(); printf("[test_blocking]\nTHREAD_COUNT:%d\nQUEUE_CAPACITY:%d\ncost:%dms\n", THREAD_COUNT, QUEUE_CAPACITY, end - start); printf("push_count:%d pop_count:%d\n", push_count, pop_count); for (int i = 0; i < THREAD_COUNT; i++) { delete carray[i]; delete parray[i]; } delete[] carray; delete[] parray; } void test_nonblocking() { std::thread** carray = new std::thread*[THREAD_COUNT]; std::thread** parray = new std::thread*[THREAD_COUNT]; clock_t start = clock(); for (int i = 0; i < THREAD_COUNT; i++) { carray[i] = new std::thread((&nonblocking_customer)); parray[i] = new std::thread((&nonblocking_productor)); } for (int i = 0; i < THREAD_COUNT; i++) { carray[i]->join(); parray[i]->join(); } clock_t end = clock(); printf("[test_nonblocking]\nTHREAD_COUNT:%d\nQUEUE_CAPACITY:%d\ncost:%dms\n", THREAD_COUNT, QUEUE_CAPACITY, end - start); printf("push_count:%d pop_count:%d\n", push_count, pop_count); for (int i = 0; i < THREAD_COUNT; i++) { delete carray[i]; delete parray[i]; } delete[] carray; delete[] parray; } int main(char* args, int size) { mul_queue = new boost::lockfree::queue<int, boost::lockfree::capacity<QUEUE_CAPACITY>>; std::cout << mul_queue->is_lock_free() << std::endl; //为了排除测试程序的无关因素,测试时只开启一个:blocking或者nonblocking。 //test_blocking(); test_nonblocking(); }
输出结果:
生产者和消费者分别4个线程
[test_blocking]
THREAD_COUNT:4
QUEUE_CAPACITY:65534
cost:3598ms
push_count:20000003 pop_count:20000003
[test_nonblocking]
THREAD_COUNT:4
QUEUE_CAPACITY:65534
cost:6350ms
push_count:20000003 pop_count:20000003
生产者和消费者分别8个线程
[test_blocking]
THREAD_COUNT:8
QUEUE_CAPACITY:65534
cost:3620ms
push_count:20000007 pop_count:20000007
[test_nonblocking]
THREAD_COUNT:8
QUEUE_CAPACITY:65534
cost:6466ms
push_count:20000007 pop_count:20000004
生产者和消费者分别16个线程
[test_blocking]
THREAD_COUNT:16
QUEUE_CAPACITY:65534
cost:3590ms
push_count:20000015 pop_count:20000015
[test_nonblocking]
THREAD_COUNT:16
QUEUE_CAPACITY:65534
cost:6136ms
push_count:20000007 pop_count:20000007
结论:多读多写的多线程环境下,std::mutex的速度比boost::lockfree::queue的性能高一倍。queue的容量限制不得超过65535,具体原因没仔细看,可能是boost为了实现lock-free而采用了算法只能支持这么大的容量。
PS:上面的不同线程数量下,累加结果是一样的,是因为每个线程的循环次数做了均分,总循环次数为20000000
测试环境:
windows 10 pro x64
VS2015企业版 update2,release x64
CPU:i7二代移动版
相关推荐
注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择。 特征 击倒你的。 单头实现。 只需将其放入您的项目中即可。 完全线程安全的无锁队列。 从任何数量的线程同时使用。 C ++ 11实现-尽...
多线程实现生产者消费者模型:锁(Lock)、信号量(Semaphore、BoundedSemaphore)、条件(Condition)、队列(Queue)、事件(Event) 多进程程实现生产者消费者模型:信号量(Semaphore)、条件(Condition)、...
proirity_queue性能分析源码 比较了三种queue:std::multimap、std::multiset、std::priority_queue 的性能
无锁队列适用于多生产多消费者环境,快速队列真正无锁适用于单生产单消费环境; 消息队列,消息托管队列;
std::move是将对象的状态或者所有权从一个对象转移到另一个对象,只是转移,没有内存的搬迁或者内存拷贝 #pragma once #include pch.h #include #include #include #include int main() { std::string str =
一个快速多生产者,多消费者的C 11无锁并发队列
atomic_queue 基于带有循环缓冲区的C ++ 14多生产者多消费者无锁队列。 这些队列遵循的主要设计原理是极简主义:原子操作的最基本要求,固定大小的缓冲区,值语义。 这些品质也有局限性: 最大队列大小必须在编译...
固定大小的无锁队列(2010) 队列是在生产者/消费者算法中广泛发现的抽象数据类型。 许多良好的实现都是锁死的,并且可能会与数千个并发线程在其上添加和使用数据的情况发生激烈争用。 上述技术可用于实现完全无锁的...
Yet another implementation of a lock-free circular array queue
concurrentqueue, 一种快速多消费者多消费者锁空闲并发队列 moodycamel::ConcurrentQueue...注意:如果你只需要一个生产者,单个消费者队列,我有其中的一个太多。特性Knock-your-socks-off 刻录速度快。单头实现。只需
无锁队列实现,lock_free_queue,大大提高了多线程的处理性能
并发多生产者多消费者队列。 有两种队列: 有限制的队列,容量有限。 无限队列无限制容量。 队列还可以随时关闭。 关闭后,尽管仍然可以弹出其余项目,但无法将更多项目推入队列。 这些功能使在此板条箱顶部...
Lock-free Queue and Ring Buffer
# std::queue<T> -- via pqueue command # std::priority_queue<T> -- via ppqueue command # std::bitset<n> -- via pbitset command # std::string -- via pstring command # std::widestring -- via pwstring ...
queue_atomic 使用C ++ 11原子的多生产者多消费者队列模板。 通过将单调递增的版本号打包到队列的前后偏移量中,解决了ABA问题并实现了两阶段有序更新。 通过检查预期的版本计数器在打包的前后偏移量中是否可见来...
这个类是一个多生产者、多消费者队列。 它同时提供阻塞和非阻塞消费,而生产总是阻塞。 容量是动态调整的。 实现基于std::queue ,使用std::mutex和std::condition_variable实现线程安全。 可以使用右值引用调用...
这是用C ++ 11编写的简单的仅标头单生产者单消费者队列实现。 它旨在将动态分配的对象的所有权从一个线程显式转移到另一个线程。 警告您,它并非设计为具有很高的性能或可扩展性! 如果您对高性能线程安全队列感兴趣...
C++ Queue(带上限的) 上限可以改,做小题可以
注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择其中之一。 特色快如闪电般的快速pe moodycamel :: ConcurrentQueue C ++的工业级无锁队列。 注意:如果您需要的只是一个单一生产者,单一...
这是UET课程中高级课程实践的描述 INT2215-02:N1,N2 PM307-G2,星期...std :: stack和std :: queue关联容器:std :: map和std :: set作业05容器适配器:std :: stack和std :: queue关联容器:std :: map和std :: set