`
阿尔萨斯
  • 浏览: 4244540 次
社区版块
存档分类
最新评论

眉目传情之并发无锁环形队列的实现

 
阅读更多

眉目传情之并发无锁环形队列的实现

  • Author:Echo Chen(陈斌)

  • Email:chenb19870707@gmail.com

  • Blog:Blog.csdn.net/chen19870707

  • Date:October 10th, 2014

    前面在《眉目传情之匠心独运的kfifo》一文中详细解析了 linux 内核并发无锁环形队列kfifo的原理和实现,kfifo鬼斧神工,博大精深,让人叹为观止,但遗憾的是kfifo为内核提供服务,并未开放出来。剑不试则利钝暗,弓不试则劲挠诬,鹰不试则巧拙惑,马不试则良驽疑,光说不练是不能学到精髓的,下面就动手实现自己的并发无锁队列UnlockQueue(单生产者单消费者)。

    一、UnlockQueue声明

       1: #ifndef _UNLOCK_QUEUE_H
       2: #define _UNLOCK_QUEUE_H
       3: 
       4: class UnlockQueue
       5: {
       6: public:
       7: UnlockQueue(int nSize);
       8: virtual ~UnlockQueue();
       9: 
      10: bool Initialize();
      11: 
      12: unsignedint Put(constunsignedchar *pBuffer, unsignedint nLen);
      13: unsignedint Get(unsignedchar *pBuffer, unsignedint nLen);
      14: 
      15: inlinevoid Clean() { m_nIn = m_nOut = 0; }
      16: inlineunsignedint GetDataLen() const { return m_nIn - m_nOut; }
      17: 
      18: private:
      19: inlinebool is_power_of_2(unsignedlong n) { return (n != 0 && ((n & (n - 1)) == 0)); };
      20: inlineunsignedlong roundup_power_of_two(unsignedlong val);
      21: 
      22: private:
      23: unsignedchar *m_pBuffer; /* the buffer holding the data */
      24: unsignedint m_nSize; /* the size of the allocated buffer */
      25: unsignedint m_nIn; /* data is added at offset (in % size) */
      26: unsignedint m_nOut; /* data is extracted from off. (out % size) */
      27: };
      28: 
      29: #endif

    UnlockQueue与kfifo 结构相同相同,也是由一下变量组成:

    UnlockQueue kfifo 作用
    m_pBuffer buffer 用于存放数据的缓存
    m_nSize size 缓冲区空间的大小,圆整为2的次幂
    m_nIn in 指向buffer中队头
    m_nOut out 指向buffer中的队尾
    UnlockQueue的设计是用在单生产者单消费者情况下,所以不需要锁 lock 如果使用不能保证任何时间最多只有一个读线程和写线程,必须使用该lock实施同步。

    二、UnlockQueue构造函数和初始化

       1: UnlockQueue::UnlockQueue(int nSize)
       2: :m_pBuffer(NULL)
       3: ,m_nSize(nSize)
       4: ,m_nIn(0)
       5: ,m_nOut(0)
       6: {
       7: //round up to the next power of 2
       8: if (!is_power_of_2(nSize))
       9: {
      10: m_nSize = roundup_power_of_two(nSize);
      11: }
      12: }
      13: 
      14: UnlockQueue::~UnlockQueue()
      15: {
      16: if(NULL != m_pBuffer)
      17: {
      18: delete[] m_pBuffer;
      19: m_pBuffer = NULL;
      20: }
      21: }
      22: 
      23: bool UnlockQueue::Initialize()
      24: {
      25: m_pBuffer = newunsignedchar[m_nSize];
      26: if (!m_pBuffer)
      27: {
      28: return false;
      29: }
      30: 
      31: m_nIn = m_nOut = 0;
      32: 
      33: return true;
      34: }
      35: 
      36: unsignedlong UnlockQueue::roundup_power_of_two(unsignedlong val)
      37: {
      38: if((val & (val-1)) == 0)
      39: return val;
      40: 
      41: unsignedlong maxulong = (unsignedlong)((unsignedlong)~0);
      42: unsignedlong andv = ~(maxulong&(maxulong>>1));
      43: while((andv & val) == 0)
      44: andv = andv>>1;
      45: 
      46: return andv<<1;
      47: }

    1.在构造函数中,对传入的size进行2的次幂圆整,圆整的好处是可以将m_nIn % m_nSize 可以转化为 m_nIn & (m_nSize – 1),取模运算”的效率并没有 “位运算” 的效率高。

    2.在构造函数中,未给buffer分配内存,而在Initialize中分配,这样做的原因是:我们知道在new UnlockQueue的时候有两步操作,第一步分配内存,第二步调用构造函数,如果将buffer的分配放在构造函数中,那么就可能 buffer 就可能分配失败,而后面用到buffer,还需要判空。

    三、UnlockQueue入队和出队操作

       1: unsignedint UnlockQueue::Put(constunsignedchar *buffer, unsignedint len)
       2: {
       3: unsignedint l;
       4: 
       5: len = std::min(len, m_nSize - m_nIn + m_nOut);
       6: 
       7: /*
       8:  * Ensure that we sample the m_nOut index -before- we
       9:  * start putting bytes into the UnlockQueue.
      10:  */
      11: __sync_synchronize();
      12: 
      13: /* first put the data starting from fifo->in to buffer end */
      14: l = std::min(len, m_nSize - (m_nIn & (m_nSize - 1)));
      15: memcpy(m_pBuffer + (m_nIn & (m_nSize - 1)), buffer, l);
      16: 
      17: /* then put the rest (if any) at the beginning of the buffer */
      18: memcpy(m_pBuffer, buffer + l, len - l);
      19: 
      20: /*
      21:  * Ensure that we add the bytes to the kfifo -before-
      22:  * we update the fifo->in index.
      23:  */
      24: __sync_synchronize();
      25: 
      26: m_nIn += len;
      27: 
      28: return len;
      29: }
      30: 
      31: unsignedint UnlockQueue::Get(unsignedchar *buffer, unsignedint len)
      32: {
      33: unsignedint l;
      34: 
      35: len = std::min(len, m_nIn - m_nOut);
      36: 
      37: /*
      38:  * Ensure that we sample the fifo->in index -before- we
      39:  * start removing bytes from the kfifo.
      40:  */
      41: __sync_synchronize();
      42: 
      43: /* first get the data from fifo->out until the end of the buffer */
      44: l = std::min(len, m_nSize - (m_nOut & (m_nSize - 1)));
      45: memcpy(buffer, m_pBuffer + (m_nOut & (m_nSize - 1)), l);
      46: 
      47: /* then get the rest (if any) from the beginning of the buffer */
      48: memcpy(buffer + l, m_pBuffer, len - l);
      49: 
      50: /*
      51:  * Ensure that we remove the bytes from the kfifo -before-
      52:  * we update the fifo->out index.
      53:  */
      54: __sync_synchronize();
      55: 
      56: m_nOut += len;
      57: 
      58: return len;
      59: }

    入队和出队操作与kfifo相同,用到的技巧也完全相同,有不理解的童鞋可以参考前面一篇文章《眉目传情之匠心独运的kfifo》。这里需要指出的是__sync_synchronize()函数,由于linux并未开房出内存屏障函数,而在gcc4.2以上版本提供This builtin issues a full memory barrier,有兴趣同学可以参考Built-in functions for atomic memory access

    四、测试程序

    如图所示,我们设计了两个线程,一个生产者随机生成学生信息放入队列,一个消费者从队列中取出学生信息并打印,可以看到整个代码是无锁的

    image

       1: #include "UnlockQueue.h"
       2: #include <iostream>
       3: #include <algorithm>
       4: #include <pthread.h>
       5: #include <time.h>
       6: #include <stdio.h>
       7: #include <errno.h>
       8: #include <string.h>
       9: 
      10: struct student_info
      11: {
      12: long stu_id;
      13: unsignedint age;
      14: unsignedint score;
      15: };
      16: 
      17: void print_student_info(const student_info *stu_info)
      18: {
      19: if(NULL == stu_info)
      20: return;
      21: 
      22: printf("id:%ld\t",stu_info->stu_id);
      23: printf("age:%u\t",stu_info->age);
      24: printf("score:%u\n",stu_info->score);
      25: }
      26: 
      27: student_info * get_student_info(time_t timer)
      28: {
      29: student_info *stu_info = (student_info *)malloc(sizeof(student_info));
      30: if (!stu_info)
      31: {
      32: fprintf(stderr, "Failed to malloc memory.\n");
      33: return NULL;
      34: }
      35: srand(timer);
      36: stu_info->stu_id = 10000 + rand() % 9999;
      37: stu_info->age = rand() % 30;
      38: stu_info->score = rand() % 101;
      39: //print_student_info(stu_info);
      40: return stu_info;
      41: }
      42: 
      43: void * consumer_proc(void *arg)
      44: {
      45: UnlockQueue* queue = (UnlockQueue *)arg;
      46: student_info stu_info;
      47: while(1)
      48: {
      49: sleep(1);
      50: unsignedint len = queue->Get((unsignedchar *)&stu_info, sizeof(student_info));
      51: if(len > 0)
      52: {
      53: printf("------------------------------------------\n");
      54: printf("UnlockQueue length: %u\n", queue->GetDataLen());
      55: printf("Get a student\n");
      56: print_student_info(&stu_info);
      57: printf("------------------------------------------\n");
      58: }
      59: }
      60: return (void *)queue;
      61: }
      62: 
      63: void * producer_proc(void *arg)
      64: {
      65: time_t cur_time;
      66: UnlockQueue *queue = (UnlockQueue*)arg;
      67: while(1)
      68: {
      69: time(&cur_time);
      70: srand(cur_time);
      71: int seed = rand() % 11111;
      72: printf("******************************************\n");
      73: student_info *stu_info = get_student_info(cur_time + seed);
      74: printf("put a student info to queue.\n");
      75: queue->Put( (unsignedchar *)stu_info, sizeof(student_info));
      76: free(stu_info);
      77: printf("UnlockQueue length: %u\n", queue->GetDataLen());
      78: printf("******************************************\n");
      79: sleep(1);
      80: }
      81: return (void *)queue;
      82: }
      83: 
      84: 
      85: int main()
      86: {
      87: UnlockQueue unlockQueue(1024);
      88: if(!unlockQueue.Initialize())
      89: {
      90: return -1;
      91: }
      92: 
      93: pthread_t consumer_tid, producer_tid;
      94: 
      95: printf("multi thread test.......\n");
      96: 
      97: if(0 != pthread_create(&producer_tid, NULL, producer_proc, (void*)&unlockQueue))
      98: {
      99: fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
     100: errno, strerror(errno));
     101: return -1;
     102: }
     103: 
     104: if(0 != pthread_create(&consumer_tid, NULL, consumer_proc, (void*)&unlockQueue))
     105: {
     106: fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
     107: errno, strerror(errno));
     108: return -1;
     109: }
     110: 
     111: pthread_join(producer_tid, NULL);
     112: pthread_join(consumer_tid, NULL);
     113: 
     114: return 0;
     115: }

    运行结果:

    image

    -

    Echo Chen:Blog.csdn.net/chen19870707

    -

  • 分享到:
    评论

    相关推荐

    Global site tag (gtag.js) - Google Analytics