`

消息队列

 
阅读更多
msgget函数用于创建一个新的消息队列或访问一个已存在的消息队列

IPC_NOWAIT标志使得msgsend调用非阻塞:如果没有存放新消息的可用空间,该函数就马上返回.这个条件可能发生的情况包括:
1.在指定的队列中已有太多的字节
2.在系统范围存在太多的消息
如果这两个条件中有一个存在,而且IPC_NOWAIT标志已指定,msgsnd就返回一个EAGAIN错误.如果这两个条件中有一个存在,但是IPC_NOWAIT标志未指定,那么调用纯种被投入睡眠.直到:
1.具备存放新消息的空间
2.由msqid标识的消息队列从系统中删除(这种情况下返回一个EIDRM错误)
3.调用线程被某个捕获的信号反中断(这种情况下返回一个EINTR错误)

msgctl函数提供一个消息队列上的各种控制操作

http://dash1982.iteye.com/blog/296583

MsgQueue.h
#ifndef MSGQUEUE_H
#define MSGQUEUE_H

namespace Utility
{

#define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)

#define COMMON_MESSAGE 1

struct Message
{
    long msgType;
    int bufLen;
    int addr;//地址
    int reserved1;
    int reserved2;
};

//初始化消息队列
int initMessageQueue(const char* msgfile,int& msgId,int msgType);

//关闭消息队列
int closeMessageQueue(int& msgId);

//清空消息
void clearMessage(int key,int& msgId,int msgType);

//发送普通消息
int sendMessage(const Message &msg,int msgId);

}//end of namespace Utility

#endif // MSGQUEUE_H




MsgQueue.cpp
#include "MsgQueue.h"
#include <sys/msg.h>
#include <string.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <iostream>
using namespace std;

namespace Utility
{

/*
    清空消息
*/
void clearMessage(int key,int& msgId,int msgType)
{
    int ret = 0;
    msgId = msgget(key, 0660);
    if (msgId<0){
        return;
    }
    while (1)
    {
        if(msgType==COMMON_MESSAGE)
        {
            Message msg;
            bzero(&msg,sizeof(msg));
            ret=msgrcv(msgId, &msg, sizeof(msg), 0, IPC_NOWAIT);
            if(ret<0){
                break;
            }
        }
    }
}


/*
    初始化消息队列
*/
int initMessageQueue(const char* msgFile,int & msgId,int msgType)
{
    //删除文件,避免引起不必要的问题,不删除可能会造成msgsend时卡死或发送失败的现象
    unlink(msgFile);
    msgId = -1;
    int fid = open(msgFile, O_RDWR | O_CREAT, FILE_MODE);
    if(fid<0)
    {
        cout << "创建文件:" << msgFile << "失败!" << endl;
        return fid;
    }
    close(fid);

    int key = ftok(msgFile, 'a');
    if(key<0)
    {
        cout << "创建消息队列失败!" << endl;
        return key;
    }

    if((msgId = msgget(key, IPC_CREAT | IPC_EXCL | 0660))==-1)
    {
        clearMessage(key,msgId,msgType);
        if (msgId<0)
        {
            cout << "创建消息队列失败!" << strerror(errno) << endl;
            return msgId;
        }
    }
    cout << "初始化消息队列成功.msgFile:" << msgFile << ",msgId:" << msgId << endl;
    return 0;
}

/*
    关闭消息队列
*/
int closeMessageQueue(int& msgId)
{
    int tmpMsgId = msgId;
    int ret = msgctl(msgId, IPC_RMID, 0);
    if(ret<0){
        cout << "关闭消息队列失败" << endl;
    }else{
        cout << "关闭消息队列成功,msgId:" << tmpMsgId << endl;
    }
    return ret;
}

/*
    发送普通信息
*/
int sendMessage(const Message &msg,int msgId)
{
    int ret = msgsnd(msgId, &msg, sizeof(msg), IPC_NOWAIT);
    if(ret<0){
        cout << "向消息队列发送消息失败,msgId:" << msgId << endl;
    }else{
        cout << "向消息队列发送消息成功,msgId:" << msgId << ",长度:" << msg.bufLen << endl;
    }
    return ret;
}

}//end of namespace Utility






test
#include "MsgQueue.h"
#include <pthread.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <unistd.h>
#include <iostream>
using namespace std;
using namespace Utility;

int g_msgId = 0;

class A
{
public:
    int value;
};

void* testRecvMessage(void*)
{
    Message msg;//收数据的结构体
    char rcvbuf[100];
    bzero(rcvbuf,100);
    int ret = 0;
    while(true)
    {
        ret = msgrcv(g_msgId, &msg, sizeof(msg), 0, IPC_NOWAIT);
        if (-1 != ret)
        {
            if(msg.msgType==COMMON_MESSAGE)
            {
                A** a = (A**)(msg.addr);
                cout << "bufLen:" << msg.bufLen << ",value:" << (*a)->value << endl;
                delete *a;
                *a = NULL;
                msg.addr = 0;
            }
            break;
        }else
        {
            usleep(1000);
        }
    }
    pthread_detach(pthread_self());
    return 0;
}

int main()
{
    pthread_t pid;
    pthread_create(&pid,NULL,testRecvMessage,NULL);
    int msgType = COMMON_MESSAGE;
    initMessageQueue("/tmp/test1.msg",g_msgId,msgType);
    A* a = new A;
    a->value = 10;
    Message message;
    bzero(&message,sizeof(message));
    message.msgType = COMMON_MESSAGE;
    message.bufLen = sizeof(message);
    message.addr = (int)&a;
    sendMessage(message,g_msgId);
    sleep(5);
    closeMessageQueue(g_msgId);
}


初始化消息队列成功.msgFile:/tmp/test1.msg,msgId:32769
向消息队列发送消息成功,msgId:32769,长度:20
bufLen:20,value:10
关闭消息队列成功,msgId:32769
分享到:
评论

相关推荐

    C#消息队列,windows使用消息队列,Queue消息队列

    此文档是C#开发的消息队列系统,适用于消息队列入门与新手。 在Windows 7 上安装消息队列的步骤 打开“控制面板”。 单击“程序”,然后在“程序和功能”下, 单击“打开或关闭 Windows 功能”。 -或者-单击“经典...

    linux c消息队列实现

    发送端读取指定的文件,并且按照环境变量中设置的消息队列键值进行发送。如果要改代码,只要把键值改一下,结构体储存要发送的消息的那个数组对应改成自己想发送的值,就可以很好的实现功能。接收端同样按环境变量...

    C++基于消息队列的多线程实现示例代码

    实现消息队列的关键因素是考量不同线程访问消息队列的同步问题。本实现涉及到几个知识点 std::lock_guard 介绍 std::lock_gurad 是 C++11 中定义的模板类。定义如下: template &lt;class&gt; class lock_guard; lock_...

    进程间通信之消息队列 ( message queue )——完整代码

    进程间通信之消息队列 ( message queue ) 消息队列是消息的链表,具有特定的格式,并由消息队列标识符标识. 七种进程间通信方式: 一.无名管道( pipe ) 二.有名管道( fifo ) 三.共享内存 ( shared memory ) 四....

    C#消息队列发送及接收

    C#消息队列发送及接收

    各种消息队列对比

    消息队列中间件调研文档。ActiveMQ、RabbitMQ、RocketMq、Joram、HornetQ、OpenMQ等的对比。

    FreeRTOS PPT_消息队列

    FreeRTOS PPT_消息队列的介绍,本章节为大家讲解 FreeRTOS 的一个重要的通信机制----消息队列,初学者要熟练掌握,因为消息队列在实际项目中应用较多。 消息队列的概念及其作用 消息队列就是通过RTOS 内核提供的...

    亲测可用基于Linux消息队列的简易聊天室(C语言).zip

    亲测可用的基于Linux消息队列的简易聊天室(C语言)(附源代码)采用客户-服务器结构,其中服务器实现各个用户的登录并存储相关信息,客户端通过服务器端获取当前登录用户信息,然后各客户进程通过消息队列实现双向通信...

    C++ 跨平台 异步消息队列

    C++封装实现的异步加锁消息队列,支持多线程,完美封装,可用于消息接收、处理

    Java消息队列常见面试题2022

    Java消息队列常见面试题,2022年最新,104页,祝您斩获高薪offer! Java消息队列常见面试题,2022年最新,104页,祝您斩获高薪offer! Java消息队列常见面试题,2022年最新,104页,祝您斩获高薪offer! Java消息...

    Redis 用于消息队列的存储

    博文a 中的老师,提供了Redis 实现消息队列的整体思路,言简意赅,但部分类库a 老师并未提供,因此我参照了博文b 中老师的RedisHelper 类,主要借鉴的方法为ListLeftPop及ListRightPush,及实现消息队列的核心思想,...

    ucOS消息队列使用

    描述ucOS消息队列的使用,此文网络截文,感谢原创作者。

    解决 消息队列不可用 附加Demo

    解决 “消息队列不可用 ” 附加案例 :消息队列异步接收 消息队列局域网发送与接收

    RabbitMQ实战 高效部署分布式消息队列

    RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列

    查看消息队列软件,消息队列工具

    消息队列工具,这是一个在查看消息队列中消息的一个很好的工具软件。

    进程与消息队列进程与消息队列简单例子

    进程与消息队列进程与消息队列简单例进程与消息队列进程与消息队列简单例进程与消息队列进程与消息队列简单例进程与消息队列进程与消息队列简单例进程与消息队列进程与消息队列简单例

    基于Linux消息队列的简易聊天室(C语言)(附源代码)

    采用客户-服务器结构,其中服务器实现各个用户的登录并存储相关信息,客户端通过服务器端获取当前登录用户信息,然后各客户进程通过消息队列实现双向通信。 Linux IPC通信利用消息队列消息机制,多线程通信,字符串...

    消息队列 Queue与Topic区别.docx

    消息队列 Queue与Topic区别

    C++多线程,消息队列用法

    C++多线程,消息队列用法,为了凑够20个字,拼了。

Global site tag (gtag.js) - Google Analytics