`
dyccsxg
  • 浏览: 202176 次
  • 性别: Icon_minigender_1
  • 来自: 青岛
社区版块
存档分类

ActiveMQ - stomp 通过自定义转换器来支持 Map 消息传递

 
阅读更多

1. 添加配置文件

src\META-INF\services\org\apache\activemq\transport\frametranslator\jms-stomp-map-json

class=org.demo.jms.stomp.StompTranslator

2. Java 代码

修改 org\apache\activemq\transport\stomp\LegacyFrameTranslator.java

package org.demo.jms.stomp;
// ...
import org.codehaus.jackson.map.ObjectMapper;
// ...
public class StompTranslator implements FrameTranslator {

    // Object -> json 
    private ObjectMapper objMapper = new ObjectMapper();

    public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
        // ...
        if (headers.containsKey(Stomp.Headers.AMQ_MESSAGE_TYPE)) {
            String intendedType = (String)headers.get(Stomp.Headers.AMQ_MESSAGE_TYPE);
            if(intendedType.equalsIgnoreCase("text")){
                // ...
            } else if(intendedType.equalsIgnoreCase("bytes")) {
                // ...
            } else if (intendedType.equalsIgnoreCase("map")) {
                // modify start [stompFrame to map]
                ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
                try {
                    Map<String,Object> map = objMapper.readValue(command.getBody(), Map.class);
                    mapMsg.getContentMap().putAll(map);
                } catch (Exception e) {
                    throw new ProtocolException("Map could not bet set: " + e, false, e);
                }
                msg = mapMsg;
                // modify end [stompFrame to map]
            } else {
                throw new ProtocolException("Unsupported message type '"+intendedType+"'",false);
            }
        }
        // ...
        return msg;
    }

    public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
        // ...
        } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
                AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
            // ...
        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
            // modify start [map to stompFrame]
            Map<String, Object> map = new HashMap<String, Object>();
            ActiveMQMapMessage msg = (ActiveMQMapMessage)message;
            Enumeration<String> nameEnum = msg.getMapNames();
            String key;
            Object value;
            while (nameEnum.hasMoreElements()) {
                key = nameEnum.nextElement();
                value = msg.getObject(key);
                map.put(key, value);
            }
            
            // to json 
            byte[] data = objMapper.writeValueAsBytes(map);
            
            headers.put("dataType", "json");
            headers.put("className", "java.util.Map");
            headers.put(Stomp.Headers.CONTENT_LENGTH, data.length + "");
            command.setContent(data);
            // modify end [map to stompFrame]
        }
        return command;
    }
}
分享到:
评论
1 楼 guojigjkill 2014-06-30  

    [111]

相关推荐

Global site tag (gtag.js) - Google Analytics