boost方案
boost提供了三种无锁方案
boost::lockfree::queue:
支持多个生产者和多个消费者线程的无锁队列。
boost::lockfree::stack:
支持多个生产者和多个消费者线程的无锁栈。
boost::lockfree::spsc_queue:
仅支持单个生产者和单个消费者线程的无锁队列。相比boost::lockfree::queue,其效率更高。
注:这些API内部是通过轻量级原子锁实现的lock-free,不是真正意义的无锁。我看到的资料中,貌似只有linux kernel中fifo实现了真正意义上的无锁,但是仅用于与单个消费者单个生产者的环境。
boost官方文档:
http://www.boost.org/doc/libs/1_60_0/doc/html/lockfree.html
queue容量和自增长的问题
可以设置初始容量,添加新元素时如果容量不够,则总容量可能自动增长:queue在当前操作系统上如果支持lock-free,则不会自动增长,如果不支持lock-free,才会自动增长。不同的操作系统其内存分配机制不同,这样会导致在某些操作系统上的queue不支持lockfree
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<2>> q; printf("boost::lockfree:queue is lock free:%s", q.is_lock_free() ? "true" : "false"); //true //push的返回值:1,push成功;0,push失败。 size_t s1 = q.push(9); //1 size_t s2 = q.push(9); //1 size_t s3 = q.push(9); //0 boost::lockfree::queue<int, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<2>> q2; size_t s2_1 = q2.push(9); //1 size_t s2_2 = q2.push(9); //1 size_t s2_3 = q2.push(9); //0 boost::lockfree::queue<int, boost::lockfree::fixed_sized<false>, boost::lockfree::capacity<2>> q3; size_t s3_1 = q3.push(9); //1 size_t s3_2 = q3.push(9); //1 size_t s3_3 = q3.push(9); //0 size_t s3_4 = q3.push(9); //0
如果不需要考虑多线程或者自己实现同步,还有一种方案:boost::circular_buffer
http://www.boost.org/doc/libs/1_60_0/doc/html/circular_buffer.html
C++11 std::atomic方案
网上有人借用std::atomic实现的一套无锁队列,其内部实现参考了boost::lockfree::queue的设计思路,可以适用于多个消费者多个生产者线程。
A High Performance Lock Free Ring Queue
http://www.codeproject.com/Tips/754393/A-High-Performance-Lock-Free-Ring-Queue
下面代码我在原文基础上做了修改:最新的编译器已不支持std::atomic_flag在构造函数中初始化。
lfringqueue.h
#ifndef INCLUDED_UTILS_LFRINGQUEUE #define INCLUDED_UTILS_LFRINGQUEUE #define _ENABLE_ATOMIC_ALIGNMENT_FIX #define ATOMIC_FLAG_INIT 0 #pragma once #include <vector> #include <mutex> #include <thread> #include <atomic> #include <chrono> #include <cstring> #include <iostream> // Lock free ring queue template < typename _TyData, long _uiCount = 100000 > class lfringqueue { public: lfringqueue( long uiCount = _uiCount ) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount( uiCount ) { m_queue = new _TyData*[m_uiCount]; memset( m_queue, 0, sizeof(_TyData*) * m_uiCount ); } ~lfringqueue() { if ( m_queue ) delete [] m_queue; } bool enqueue( _TyData *pdata, unsigned int uiRetries = 1000 ) { if ( NULL == pdata ) { // Null enqueues are not allowed return false; } unsigned int uiCurrRetries = 0; while ( uiCurrRetries < uiRetries ) { // Release fence in order to prevent memory reordering // of any read or write with following write std::atomic_thread_fence(std::memory_order_release); long lHeadIterator = m_lHeadIterator; if ( NULL == m_queue[lHeadIterator] ) { long lHeadIteratorOrig = lHeadIterator; ++lHeadIterator; if ( lHeadIterator >= m_uiCount ) lHeadIterator = 0; // Don't worry if this CAS fails. It only means some thread else has // already inserted an item and set it. if ( std::atomic_compare_exchange_strong( &m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator ) ) { // void* are always atomic (you wont set a partial pointer). m_queue[lHeadIteratorOrig] = pdata; if ( m_lEventSet.test_and_set( )) { m_bHasItem.test_and_set(); } return true; } } else { // The queue is full. Spin a few times to check to see if an item is popped off. ++uiCurrRetries; } } return false; } bool dequeue( _TyData **ppdata ) { if ( !ppdata ) { // Null dequeues are not allowed! return false; } bool bDone = false; bool bCheckQueue = true; while ( !bDone ) { // Acquire fence in order to prevent memory reordering // of any read or write with following read std::atomic_thread_fence(std::memory_order_acquire); //MemoryBarrier(); long lTailIterator = m_lTailIterator; _TyData *pdata = m_queue[lTailIterator]; //volatile _TyData *pdata = m_queue[lTailIterator]; if ( NULL != pdata ) { bCheckQueue = true; long lTailIteratorOrig = lTailIterator; ++lTailIterator; if ( lTailIterator >= m_uiCount ) lTailIterator = 0; //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig )) if ( std::atomic_compare_exchange_strong( &m_lTailIterator, &lTailIteratorOrig, lTailIterator )) { // Sets of sizeof(void*) are always atomic (you wont set a partial pointer). m_queue[lTailIteratorOrig] = NULL; // Gets of sizeof(void*) are always atomic (you wont get a partial pointer). *ppdata = (_TyData*)pdata; return true; } } else { bDone = true; m_lEventSet.clear(); } } *ppdata = NULL; return false; } long countguess() const { long lCount = trycount(); if ( 0 != lCount ) return lCount; // If the queue is full then the item right before the tail item will be valid. If it // is empty then the item should be set to NULL. long lLastInsert = m_lTailIterator - 1; if ( lLastInsert < 0 ) lLastInsert = m_uiCount - 1; _TyData *pdata = m_queue[lLastInsert]; if ( pdata != NULL ) return m_uiCount; return 0; } long getmaxsize() const { return m_uiCount; } bool HasItem() { return m_bHasItem.test_and_set(); } void SetItemFlagBack() { m_bHasItem.clear(); } private: long trycount() const { long lHeadIterator = m_lHeadIterator; long lTailIterator = m_lTailIterator; if ( lTailIterator > lHeadIterator ) return m_uiCount - lTailIterator + lHeadIterator; // This has a bug where it returns 0 if the queue is full. return lHeadIterator - lTailIterator; } private: std::atomic<long> m_lHeadIterator; // enqueue index std::atomic<long> m_lTailIterator; // dequeue index _TyData **m_queue; // array of pointers to the data long m_uiCount; // size of the array std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT; // a flag to use whether we should change the item flag std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT; // a flag to indicate whether there is an item enqueued }; #endif //INCLUDED_UTILS_LFRINGQUEUE
测试:
/* * File: main.cpp * Author: Peng * * Created on February 22, 2014, 9:55 PM */ #include <cstdlib> #include "lfringqueue.h" #include <mutex> #include <stdio.h> #include <string> #include <set> #include <random> #include <chrono> #include <iostream> #include <ctime> #include <atomic> #include <sstream> #include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #include <iostream> #include <boost/atomic.hpp> const long NUM_DATA = 10; const int NUM_ENQUEUE_THREAD = 1; const int NUM_DEQUEUE_THREAD = 1; const long NUM_ITEM = 1000000; using namespace std; class Data { public: Data( int i = 0 ) : m_iData(i) { stringstream ss; ss << i; m_szDataString = ss.str(); //sprintf( m_szDataString, "%l-d", i); } bool operator< ( const Data & aData) const { if ( m_iData < aData.m_iData) return true; else return false; } int& GetData() { return m_iData; } private: int m_iData; string m_szDataString; //char m_szDataString[MAX_DATA_SIZE]; }; Data DataArray[NUM_DATA]; constexpr long size = 0.5 * NUM_DATA; lfringqueue < Data, 1000> LockFreeQueue; boost::lockfree::queue<Data*> BoostQueue(1000); // Since there is a chance that the searched number cannot be found, so the function should return boolean bool BinarySearchNumberInSortedArray( Data datas[], int iStart, int iEnd, int SearchedNum, int &iFound ) { if ( iEnd - iStart <= 1 ) { if ( datas[iStart].GetData() == SearchedNum ) { iFound = iStart; return true; } else if ( datas[iEnd].GetData() == SearchedNum ) { iFound = iEnd; return true; } else return false; } int mid = 0.5 * ( iStart + iEnd ); if ( datas[mid].GetData() == SearchedNum ) { iFound = mid; return true; } if ( datas[mid].GetData() > SearchedNum ) { if ( mid - 1 >= 0) return BinarySearchNumberInSortedArray ( datas, iStart, mid - 1, SearchedNum, iFound); else return false; } else { if ( mid + 1 <= iEnd ) return BinarySearchNumberInSortedArray ( datas, mid + 1, iEnd, SearchedNum, iFound); else return false; } } bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue() { std::uniform_int_distribution<int> dis(1, NUM_DATA); default_random_engine engine{}; for ( long i = 0; i < NUM_ITEM; i++ ) { int x = dis ( engine ); int iFoundIndex; if ( BinarySearchNumberInSortedArray(DataArray, 0, NUM_DATA - 1, x, iFoundIndex ) ) { Data* pData = &DataArray[iFoundIndex]; LockFreeQueue.enqueue( pData ); //BoostQueue.push( pData ); } } } bool Dequeue() { Data *pData; for ( long i = 0; i < NUM_ITEM; i ++) { while ( LockFreeQueue.dequeue( &pData ) ); //while ( BoostQueue.pop( pData ) ) ; } } int main(int argc, char** argv) { for ( int i = 1; i < NUM_DATA + 1; i++ ) { Data data(i); DataArray[i-1] = data; } std::thread PublishThread[NUM_ENQUEUE_THREAD]; std::thread ConsumerThread[NUM_DEQUEUE_THREAD]; std::chrono::duration<double> elapsed_seconds; for ( int i = 0; i < NUM_ENQUEUE_THREAD; i++ ) { PublishThread[i] = std::thread( GenerateRandomNumber_FindPointerToTheNumber_EnQueue ); } auto start = std::chrono::high_resolution_clock::now(); for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ ) { ConsumerThread[i] = std::thread{ Dequeue}; } for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ ) { ConsumerThread[i].join(); } auto end = std::chrono::high_resolution_clock::now(); elapsed_seconds = end - start; std::cout << "Enqueue and Dequeue 1 million item in:" << elapsed_seconds.count() << std::endl; for ( int i = 0; i < NUM_ENQUEUE_THREAD; i++ ) { PublishThread[i].join(); } return 0; }
相关推荐
在C++中,无锁数据结构的实现通常需要对C++11标准库中的原子操作类(std::atomic)有深入理解。concurrentqueue是一种常见的无锁队列实现,它通常用于多线程环境中的消息传递或任务调度。在这个场景中,concurrent...
C++标准库中的`std::queue`或第三方库如Boost的`boost::lockfree::queue`可以用于实现无锁数据传输。 3. **原子操作**:对于简单的数据更新,C++提供了一组原子操作(如`std::atomic`),它们在多线程环境下保证了...
7. **免锁消息队列**:为了在多线程间安全地传递消息,免锁(lock-free)数据结构如无锁队列能提高并发性能。无锁算法需要利用原子操作(atomic operations),如C++的`std::atomic`库。 8. **免锁数据缓冲区**:免...
少儿编程scratch项目源代码文件案例素材-纸人伙计.zip
scratch少儿编程逻辑思维游戏源码-忍者罗伊 V5.zip
scratch少儿编程逻辑思维游戏源码-跑和枪.zip
前端开发_基于jQuery和EasyUI框架_企业级Web应用UI组件库与后台管理系统模板_提供GPL开源版本和商业授权版本的双重授权模式_适用于快速构建响应式管理后台和复杂数据可
少儿编程scratch项目源代码文件案例素材-纸格通关 云变量.zip
微信机器人开发_Wechaty框架_百度云主机部署_自然语言处理_消息自动化处理_多媒体文件管理_聊天记录持久化_表情包导出_语音视频自动保存_文件管理系统集成_跨平台数据同步_个
少儿编程scratch项目源代码文件案例素材-钻机机器人.zip
少儿编程scratch项目源代码文件案例素材-作战基地.zip
云计算_微服务分布式架构SpringCloudSpringBootDubboVuejs_互联网云快速开发框架敏捷开发系统代码生成工作流CMS图表统计地图统计_免费开源JAVA企业
scratch少儿编程逻辑思维游戏源码-日落塔.zip
Tobapuww_GPT-Recovery-Files_12888_1745866661386
少儿编程scratch项目源代码文件案例素材-战斗竞技场.zip
scratch少儿编程逻辑思维游戏源码-球球大作战.zip
聚合支付系统/官方个人免签系统/三方支付系统稳定安全高并发 附教程 系统采用FastAdmin框架独立全新开发,安全稳定,系统支持代理、商户、码商等业务逻辑。 针对最近一些JD,TB等业务定制,子账号业务逻辑API 非常详细,方便内置对接! IP白名单 业务逻辑 支持IP白名单,黑名单,全局白名单,全局黑名单,保障系统的安全。 接口验签名 采用支付宝RSA加密接口方式,防止篡改数据,导致对账困难,资金大量损失,无故少钱 对接灵活 全部对接参数灵活操作 风控完善 轮询、交易金额、随机金额、最大金额、最小金额等 测试环境: Nginx+PHP7.0+MySQL5.6 网站运行目录:/public 伪静态设置为:thinkphp规则 数据库信息修改路径:/application/database.php
校园社交服务_微信小程序云开发_公告资讯失物招领二手交易兼职招聘表白墙_为高校师生提供一站式校园生活服务平台包含校园动态通知课程表查询失物发布与认领二手物品交易平台兼职信息发布与求
yinghuayu2377_myFTPDemo_32152_1745866651913
scratch少儿编程逻辑思维游戏源码-魔法球.zip