`
潜心修炼
  • 浏览: 18687 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HornetQ集群广播解析

阅读更多
HornetQ支持集群方式来支持扩展性,集群中的节点借助JGroup组件来进行节点间的通信。在UTP方式中,节点在间隔时间内不断的像广播地址中发送消息,而客户端的程序就可以通过监听这个广播地址,解析广播的内容就能够知道集群中有哪些主机。对于使用Java语言的开发者来说,HornetQ的JAVA客户端相关类屏蔽了这些底层代码,使得开发变得简单。然而对于其他语言,需要自己手动解析相关内容。
以Hornet2.1.2版本为准,HornetQ每个节点像广播地址广播的信息遵循以下格式:

节点名称的长度+节点名称+唯一标识长度+唯一标识+连接器个数+各个连接器信息

下面是解析的JAVA代码,包含的内部类主要负责控制读取。

客户端的调用只需要启动该类,然后获取hostInfoMap就能够获得集群中的主机和他们的连接信息以及备用连接信息。如果要客户端的主机信息能够实时更新,可以通过修改主机集合向客户端发消息的方式实现



/*
 * Discovery .java
 * 本类非线程安全
 */
package com.socket.multisocket;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

public class Discovery extends Thread {

  private static final String HOST = "host";

  private static final String PORT = "port";

  private static final long timeout = Integer.valueOf(System.getProperty("broadcast_host_timeout", "10000"));

  private static final String spliter = ":";
  
  //存储节点信息
  private static  Map<String,List<Map<String,String>>> hostInfoMap = new HashMap<String,List<Map<String,String>>>();
  
  //存储节点名称,主要是定时刷新
  public final static Map<String, Long> connectors = new HashMap<String, Long>();

  /**
   * 当前读的位置
   */
  private static int currentIndex = 0;

  public static int connectorPairsSize = 0;

 

  private String broadcastAddress;

  private int broadcastPort;

  public Discovery(String broadcastAddress, int broadcastPort) {
    super();
    this.broadcastAddress = broadcastAddress;
    this.broadcastPort = broadcastPort;
  }
  
  
  public static Map<String,List<Map<String,String>>> getHost(){
     return  hostInfoMap;
  }

 

  /**
   * 重置当前读指针
   ** 
   */
  public void putConnector(String key, Long currentTime) {
    connectors.put(key, currentTime);
 
  }

  public void validateConnectors() {
	
    Set<Entry<String, Long>> set = new HashSet<Entry<String,Long>>(connectors.entrySet());
    for (Entry<String, Long> entry : set) {
      if (entry.getValue() + timeout < System.currentTimeMillis()) {
        connectors.remove(entry.getKey());
        hostInfoMap.remove(entry.getKey());
      }
    }
  }

  public void run() {
    InetAddress group = null;
    MulticastSocket server = null;
    try {
      group = InetAddress.getByName(broadcastAddress);
      server = new MulticastSocket(broadcastPort);
      server.joinGroup(group);
      final byte[] data = new byte[65535];
      DatagramPacket recv = new DatagramPacket(data, data.length);
      for (;;) {
        server.receive(recv);
        String uniqueName = ParseRecieveData.parsebytes(recv
            .getData());
        ParseRecieveData.release();
        putConnector(uniqueName, System
            .currentTimeMillis());
        validateConnectors();
      }

    } catch (UnknownHostException e) {
      throw new RuntimeException("can not find this broadcast address:"+broadcastAddress);
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (server != null)
        try {
          server.leaveGroup(group);
        } catch (IOException e) {
        }
    }
  }

  private static class ParseRecieveData {
    private static final byte TYPE_BOOLEAN = 0;

    private static final byte TYPE_INT = 1;

    private static final byte TYPE_LONG = 2;

    private static final byte TYPE_STRING = 3;

    /**
     * 解析报文
     * 
     * @author Jombo
     * @param bytes
     */
    public static String parsebytes(byte[] bytes) {
      int nodeIdLength = getInt(bytes, currentIndex);
      getString(nodeIdLength, bytes, currentIndex);

      int uniqueIdLength = getInt(bytes, currentIndex);
      String uniqueName = getString(uniqueIdLength, bytes, currentIndex);

      connectorPairsSize = getInt(bytes, currentIndex);
      List<Map<String,String>> connectorList = new ArrayList<Map<String,String>>();
      // connectorPairsSize可能有多个
      for(int i =0 ;i<connectorPairsSize;i++){
       Map<String, String> connectorPair = parseConnectorPairInfo(bytes);
       connectorList.add(connectorPair);
     }
      
      hostInfoMap.put(uniqueName, connectorList);
      return uniqueName;
    }

    /**
     * 解析连接器信息
     * 
     * @author Jombo
     * @param bytes
     */
    private static Map<String, String> parseConnectorPairInfo(byte[] bytes) {
      Map<String, String>  ConnectorPairMap = parseConnectorInfo(bytes);
      Map<String, String> mainConnectorMap = parseConnectorInfo(bytes);
      byte existBackup =  getByte(bytes, currentIndex);
      boolean existbackupConnector =  existBackup == 0 ? false : true;
      Map<String, String> backConnectorMap = null;
      if(existbackupConnector){
    	  backConnectorMap = parseConnectorInfo(bytes);
      }
      //在这里我们只想看host+port的形式,有兴趣的朋友可以把全部信息放进去
      //这是为了获取主备连接
      String mainconnector = mainConnectorMap.get(HOST) + spliter + mainConnectorMap.get(PORT);
      String backconnector = backConnectorMap ==null ? null:backConnectorMap.get(HOST) + spliter + backConnectorMap.get(PORT);
      ConnectorPairMap.put(mainconnector, backconnector);
      return ConnectorPairMap;
    }

	private static Map<String, String> parseConnectorInfo(byte[] bytes) {
		Map<String, String> paramMap = new HashMap<String, String>();
		  int namelength = getInt(bytes, currentIndex);
		  getString(namelength, bytes, currentIndex);
		  int factorylength = getInt(bytes, currentIndex);
		  getString(factorylength, bytes, currentIndex);

		  int paramPairsSize = getInt(bytes, currentIndex);
		  
		  for (int j = 0; j < paramPairsSize; j++) {
		    int keylength = getInt(bytes, currentIndex);
		    String key = getString(keylength, bytes, currentIndex);
		    byte valuetype = getByte(bytes, currentIndex);
		    String value = "";
		    if (valuetype == TYPE_STRING) {
		      int valuelength = getInt(bytes, currentIndex);
		      value = getString(valuelength, bytes, currentIndex);
		    } else if (valuetype == TYPE_BOOLEAN) {
		      byte booleanvalue = getByte(bytes, currentIndex);
		      value = booleanvalue == 0 ? "false" : "true";
		    } else if (valuetype == TYPE_INT) {
		      value = String.valueOf(getInt(bytes, currentIndex));
		    } else if (valuetype == TYPE_LONG) {
		      value = String.valueOf(getLong(bytes, currentIndex));
		    } else {
		      throw new RuntimeException("invalid type");
		    }
		    paramMap.put(key, value);
		  }
		return paramMap;
	}

    /**
     * 读取一个Int
     * 
     * @author Jombo
     * @param bytes
     * @param index
     *          begin index
     * @return
     */
    public static int getInt(byte[] bytes, int index) {
      int val = (bytes[index] & 0xff) << 24 | (bytes[index + 1] & 0xff) << 16
          | (bytes[index + 2] & 0xff) << 8 | (bytes[index + 3] & 0xff) << 0;
      setCurrentIndex(index + 4);
      return val;
    }

    /**
     * 读取一个Byte
     * 
     * @author Jombo
     * @param bytes
     * @param index
     * @return
     */
    public static byte getByte(byte[] bytes, int index) {
      byte b = bytes[index];
      setCurrentIndex(index + 1);
      return b;
    }

    /**
     * 读取Short
     * 
     * @author Jombo
     * @param bytes
     * @param index
     * @return
     */
    public static short getShort(byte[] bytes, int index) {
      short indexshort = (short) (bytes[index] << 8 | bytes[index + 1] & 0xFF);
      setCurrentIndex(index + 2);
      return indexshort;
    }

    /**
     * 读取字符串,根据字符串的长度采取不同的格式
     * 
     * @author Jombo
     * @param length
     * @param bytes
     * @param index
     * @return
     */
    public static String getString(int length, byte[] bytes, int index) {
      StringBuffer stringBuffer = new StringBuffer();
      if (length < 9) {
        for (int i = 0; i < length; i++) {
          short indexshort = getShort(bytes, index);
          stringBuffer.append((char) indexshort);
          index = index + 2;
        }
      } else if (length < 0xfff) {
        short utflen = getShort(bytes, index);
        index = index + 2;
        byte[] strbyte = new byte[utflen];
        System.arraycopy(bytes, index, strbyte, 0, utflen);
        stringBuffer.append(new String(strbyte));
        setCurrentIndex(index + utflen);
      } else {
        int longlength = getInt(bytes, index);
        index = index + 4;
        byte[] strbyte = new byte[longlength];
        System.arraycopy(bytes, index, strbyte, 0, longlength);
        stringBuffer.append(new String(strbyte));
        setCurrentIndex(index + longlength);
      }
      return stringBuffer.toString();
    }

    /**
     * 读取Long
     * 
     * @author Jombo
     * @param bytes
     * @param index
     * @return
     */
    public static long getLong(byte[] bytes, int index) {
      setCurrentIndex(index + 8);
      return ((long) bytes[index] & 0xff) << 56
          | ((long) bytes[index + 1] & 0xff) << 48
          | ((long) bytes[index + 2] & 0xff) << 40
          | ((long) bytes[index + 3] & 0xff) << 32
          | ((long) bytes[index + 4] & 0xff) << 24
          | ((long) bytes[index + 5] & 0xff) << 16
          | ((long) bytes[index + 6] & 0xff) << 8
          | ((long) bytes[index + 7] & 0xff) << 0;
    }

    /**
     * 重置当前读指针
     ** 
     */
    public static void release() {
      currentIndex = 0;
    }

    public static void setCurrentIndex(int index) {
      currentIndex = index;
    }
  }
}




0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics