`
aigo
  • 浏览: 2537923 次
  • 性别: Icon_minigender_1
  • 来自: 宜昌
社区版块
存档分类
最新评论

多个生产者多个消费者:boost::lockfree::queue和std::mutex的性能对比(benchmark)

阅读更多

原文作者:@玄冬Wong

key word:Non-blocking、Blocking、multi-productor、multi-customer、benchmark、performance

 

/************************************************************************/
/* 测试多个生产者多个消费者线程环境下,boost::lockfree::queue和std::mutex的性能  */
/************************************************************************/

#define _ENABLE_ATOMIC_ALIGNMENT_FIX

#include <windows.h>
#include <iostream>
#include <time.h>  
#include <thread>
#include <list> 

#include <boost/lockfree/spsc_queue.hpp>  
#include <boost/lockfree/queue.hpp>  
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/circular_buffer.hpp>

#include <mutex>


#define LOOP_COUNT 5000000
#define QUEUE_CAPACITY 65534
#define THREAD_COUNT 4

std::mutex m;
std::condition_variable cv;

boost::circular_buffer<int> cb(QUEUE_CAPACITY);

boost::lockfree::queue<int, boost::lockfree::capacity<QUEUE_CAPACITY>>* mul_queue;


std::atomic<unsigned int> push_count = 0;
std::atomic<unsigned int> pop_count = 0;

void blocking_productor()
{
	while (1)
	{
		std::unique_lock<std::mutex> lk(m);
		cv.wait(lk, [] {return cb.size() < QUEUE_CAPACITY; });
		cb.push_back(push_count);
		cv.notify_one();

		if (++push_count >= LOOP_COUNT * THREAD_COUNT)
		{
			break;
		}
	}
}

void blocking_customer()
{
	while (1)
	{
		std::unique_lock<std::mutex> lk(m);
		cv.wait(lk, [] {return cb.size() > 0; });

		cb.pop_front();
		cv.notify_one();

		if (++pop_count >= LOOP_COUNT * THREAD_COUNT)
		{
			break;
		}
	}
}

void nonblocking_productor()
{
	while (1)
	{
		if (push_count/*.load(std::memory_order_acquire)*/ >= LOOP_COUNT * THREAD_COUNT)
		{
			break;
		}

		if (mul_queue->push(push_count))
		{
			++push_count;
			//push_count.fetch_add(1, std::memory_order_relaxed);
		}
		else
		{
			Sleep(1);
		}
	}
}

void nonblocking_customer()
{
	int val;
	while (1)
	{
		if (pop_count/*.load(std::memory_order_acquire)*/ >= LOOP_COUNT * THREAD_COUNT)
		{
			break;
		}

		if (mul_queue->pop(val))
		{
			++pop_count;
			//pop_count.fetch_add(1, std::memory_order_relaxed);
		}
		else
		{
			Sleep(1); 
		}
	}
}


void test_blocking()
{
	std::thread** carray = new std::thread*[THREAD_COUNT];
	std::thread** parray = new std::thread*[THREAD_COUNT];

	clock_t start = clock();

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		carray[i] = new std::thread((&blocking_customer));
		parray[i] = new std::thread((&blocking_productor));
	}

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		carray[i]->join();
		parray[i]->join();
	}

	clock_t end = clock();
	printf("[test_blocking]\nTHREAD_COUNT:%d\nQUEUE_CAPACITY:%d\ncost:%dms\n", THREAD_COUNT,  QUEUE_CAPACITY, end - start);
	printf("push_count:%d pop_count:%d\n", push_count, pop_count);

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		delete carray[i];
		delete parray[i];
	}
	delete[] carray;
	delete[] parray;
}

void test_nonblocking()
{
	std::thread** carray = new std::thread*[THREAD_COUNT];
	std::thread** parray = new std::thread*[THREAD_COUNT];

	clock_t start = clock();

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		carray[i] = new std::thread((&nonblocking_customer));
		parray[i] = new std::thread((&nonblocking_productor));
	}

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		carray[i]->join();
		parray[i]->join();
	}

	clock_t end = clock();
	printf("[test_nonblocking]\nTHREAD_COUNT:%d\nQUEUE_CAPACITY:%d\ncost:%dms\n", THREAD_COUNT, QUEUE_CAPACITY, end - start);
	printf("push_count:%d pop_count:%d\n", push_count, pop_count);

	for (int i = 0; i < THREAD_COUNT; i++)
	{
		delete carray[i];
		delete parray[i];
	}
	delete[] carray;
	delete[] parray;
}

int main(char* args, int size)
{
	mul_queue = new boost::lockfree::queue<int, boost::lockfree::capacity<QUEUE_CAPACITY>>;
	std::cout << mul_queue->is_lock_free() << std::endl;

	//为了排除测试程序的无关因素,测试时只开启一个:blocking或者nonblocking。
	//test_blocking();
	test_nonblocking();
}

 

输出结果:

生产者和消费者分别4个线程

[test_blocking]

THREAD_COUNT:4

QUEUE_CAPACITY:65534

cost:3598ms

push_count:20000003 pop_count:20000003

 

[test_nonblocking]

THREAD_COUNT:4

QUEUE_CAPACITY:65534

cost:6350ms

push_count:20000003 pop_count:20000003

 

 

生产者和消费者分别8个线程

[test_blocking]

THREAD_COUNT:8

QUEUE_CAPACITY:65534

cost:3620ms

push_count:20000007 pop_count:20000007

 

[test_nonblocking]

THREAD_COUNT:8

QUEUE_CAPACITY:65534

cost:6466ms

push_count:20000007 pop_count:20000004

 

 

生产者和消费者分别16个线程

[test_blocking]

THREAD_COUNT:16

QUEUE_CAPACITY:65534

cost:3590ms

push_count:20000015 pop_count:20000015

 

[test_nonblocking]

THREAD_COUNT:16

QUEUE_CAPACITY:65534

cost:6136ms

push_count:20000007 pop_count:20000007

 

结论:多读多写的多线程环境下,std::mutex的速度比boost::lockfree::queue的性能高一倍。queue的容量限制不得超过65535,具体原因没仔细看,可能是boost为了实现lock-free而采用了算法只能支持这么大的容量。

 PS:上面的不同线程数量下,累加结果是一样的,是因为每个线程的循环次数做了均分,总循环次数为20000000

 

 测试环境:

windows 10 pro x64

VS2015企业版 update2,release x64

CPU:i7二代移动版

 

分享到:
评论

相关推荐

    concurrentqueue:C ++ 11的快速多生产者,多消费者,无锁的并发队列

    注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择。 特征 击倒你的。 单头实现。 只需将其放入您的项目中即可。 完全线程安全的无锁队列。 从任何数量的线程同时使用。 C ++ 11实现-尽...

    python实现生产者消费者并发模型

    多线程实现生产者消费者模型:锁(Lock)、信号量(Semaphore、BoundedSemaphore)、条件(Condition)、队列(Queue)、事件(Event) 多进程程实现生产者消费者模型:信号量(Semaphore)、条件(Condition)、...

    proirity_queue性能分析测试源码

    proirity_queue性能分析源码 比较了三种queue:std::multimap、std::multiset、std::priority_queue 的性能

    lockfree queue

    无锁队列适用于多生产多消费者环境,快速队列真正无锁适用于单生产单消费环境; 消息队列,消息托管队列;

    c++11 std::move() 的使用

    std::move是将对象的状态或者所有权从一个对象转移到另一个对象,只是转移,没有内存的搬迁或者内存拷贝 #pragma once #include pch.h #include #include #include #include int main() { std::string str =

    cpp-一个快速多生产者多消费者的C11无锁并发队列

    一个快速多生产者,多消费者的C 11无锁并发队列

    lockfree-queue:基于数组的无锁队列

    固定大小的无锁队列(2010) 队列是在生产者/消费者算法中广泛发现的抽象数据类型。 许多良好的实现都是锁死的,并且可能会与数千个并发线程在其上添加和使用数据的情况发生激烈争用。 上述技术可用于实现完全无锁的...

    lock-free circular array queue.

    Yet another implementation of a lock-free circular array queue

    atomic_queue:C ++无锁队列

    atomic_queue 基于带有循环缓冲区的C ++ 14多生产者多消费者无锁队列。 这些队列遵循的主要设计原理是极简主义:原子操作的最基本要求,固定大小的缓冲区,值语义。 这些品质也有局限性: 最大队列大小必须在编译...

    concurrentqueue, 一种快速多消费者多消费者锁空闲并发队列.zip

    concurrentqueue, 一种快速多消费者多消费者锁空闲并发队列 moodycamel::ConcurrentQueue...注意:如果你只需要一个生产者,单个消费者队列,我有其中的一个太多。特性Knock-your-socks-off 刻录速度快。单头实现。只需

    lock_free_queue.rar

    无锁队列实现,lock_free_queue,大大提高了多线程的处理性能

    concurrent-queue:并发多生产者多消费者队列

    并发多生产者多消费者队列。 有两种队列: 有限制的队列,容量有限。 无限队列无限制容量。 队列还可以随时关闭。 关闭后,尽管仍然可以弹出其余项目,但无法将更多项目推入队列。 这些功能使在此板条箱顶部...

    Lock-free Queue and Ring Buffer

    Lock-free Queue and Ring Buffer

    stl-views.gdb

    # std::queue&lt;T&gt; -- via pqueue command # std::priority_queue&lt;T&gt; -- via ppqueue command # std::bitset&lt;n&gt; -- via pbitset command # std::string -- via pstring command # std::widestring -- via pwstring ...

    queue_atomic:使用C ++ 11原子的多生产者多消费者队列模板

    queue_atomic 使用C ++ 11原子的多生产者多消费者队列模板。 通过将单调递增的版本号打包到队列的前后偏移量中,解决了ABA问题并实现了两阶段有序更新。 通过检查预期的版本计数器在打包的前后偏移量中是否可见来...

    SafeQueue:C++ 中的线程安全队列实现

    这个类是一个多生产者、多消费者队列。 它同时提供阻塞和非阻塞消费,而生产总是阻塞。 容量是动态调整的。 实现基于std::queue ,使用std::mutex和std::condition_variable实现线程安全。 可以使用右值引用调用...

    spsc_queue:用C ++ 11编写的单生产者单消费者队列实现

    这是用C ++ 11编写的简单的仅标头单生产者单消费者队列实现。 它旨在将动态分配的对象的所有权从一个线程显式转移到另一个线程。 警告您,它并非设计为具有很高的性能或可扩展性! 如果您对高性能线程安全队列感兴趣...

    C++ Queue(带上限的)

    C++ Queue(带上限的) 上限可以改,做小题可以

    适用于C ++ 11的快速多生产者,多消费者,无锁定并发队列-C/C++开发

    注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择其中之一。 特色快如闪电般的快速pe moodycamel :: ConcurrentQueue C ++的工业级无锁队列。 注意:如果您需要的只是一个单一生产者,单一...

    advprogram-2021

    这是UET课程中高级课程实践的描述 INT2215-02:N1,N2 PM307-G2,星期...std :: stack和std :: queue关联容器:std :: map和std :: set作业05容器适配器:std :: stack和std :: queue关联容器:std :: map和std :: set

Global site tag (gtag.js) - Google Analytics