`
dyccsxg
  • 浏览: 202171 次
  • 性别: Icon_minigender_1
  • 来自: 青岛
社区版块
存档分类

ActiveMQ - tcp 协议接收消息

 
阅读更多
    /**
     * 基于tcp协议接收JMS消息
     * @param args 参数
     */
    public static void main(String[] args)
    {
        ConnectionFactory connFactory;
        Connection conn = null;
        Session session;
        Destination destination;
        MessageConsumer consumer;
        
        try {
            connFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            conn = connFactory.createConnection("user1", "password1");
            conn.start();
            
            // 第一个参数为 false, 表示不使用事务
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(JMSInfo.QUEUE_NAME);
            // destination = session.createTopic(JMSInfo.TOPIC_NAME);
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener()
            {
                @Override
                public void onMessage(Message msg)
                {
                    try {
                        if (msg instanceof TextMessage) {
                            TextMessage textMsg = (TextMessage)msg;
                            System.out.println("received text: " + textMsg.getText());
                        } else if (msg instanceof MapMessage){
                            ActiveMQMapMessage map = (ActiveMQMapMessage)msg;
                            System.out.println("received map: ");
                            printMapMessage(map);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
//            if (null != conn) {
//                try {conn.close(); } catch (Exception e) {}
//            }
        }
    }
    
    /**
     * 输出 map 消息
     * @param map 参数
     */
    @SuppressWarnings("unchecked")
    public static void printMapMessage(ActiveMQMapMessage map) throws Exception 
    {
        String key = null;
        String value = null;
        
        // print headers
        System.out.println("JMS Header:");
        Enumeration<String> headerKeys = map.getPropertyNames();
        while (headerKeys.hasMoreElements()) {
            key = headerKeys.nextElement();
            value = map.getStringProperty(key);
            System.out.println(key + " : " + value);
        }
        
        // print body
        System.out.println("JMS Body:");
        Enumeration<String> bodyKeys = map.getMapNames();
        while (bodyKeys.hasMoreElements()) {
            key = bodyKeys.nextElement();
            value = map.getString(key);
            System.out.println(key + " : " + value);
        }
    }

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics