HornetQ集群广播解析
HornetQ支持集群方式来支持扩展性,集群中的节点借助JGroup组件来进行节点间的通信。在UTP方式中,节点在间隔时间内不断的像广播地址中发送消息,而客户端的程序就可以通过监听这个广播地址,解析广播的内容就能够知道集群中有哪些主机。对于使用Java语言的开发者来说,HornetQ的JAVA客户端相关类屏蔽了这些底层代码,使得开发变得简单。然而对于其他语言,需要自己手动解析相关内容。
以Hornet2.1.2版本为准,HornetQ每个节点像广播地址广播的信息遵循以下格式:
节点名称的长度+节点名称+唯一标识长度+唯一标识+连接器个数+各个连接器信息
下面是解析的JAVA代码,包含的内部类主要负责控制读取。
客户端的调用只需要启动该类,然后获取hostInfoMap就能够获得集群中的主机和他们的连接信息以及备用连接信息。如果要客户端的主机信息能够实时更新,可以通过修改主机集合向客户端发消息的方式实现
/* * Discovery .java * 本类非线程安全 */package com.socket.multisocket;import java.io.IOException;import java.net.DatagramPacket;import java.net.InetAddress;import java.net.MulticastSocket;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.Map.Entry;public class Discovery extends Thread { private static final String HOST = "host"; private static final String PORT = "port"; private static final long timeout = Integer.valueOf(System.getProperty("broadcast_host_timeout", "10000")); private static final String spliter = ":"; //存储节点信息 private static Map<String,List<Map<String,String>>> hostInfoMap = new HashMap<String,List<Map<String,String>>>(); //存储节点名称,主要是定时刷新 public final static Map<String, Long> connectors = new HashMap<String, Long>(); /** * 当前读的位置 */ private static int currentIndex = 0; public static int connectorPairsSize = 0; private String broadcastAddress; private int broadcastPort; public Discovery(String broadcastAddress, int broadcastPort) { super(); this.broadcastAddress = broadcastAddress; this.broadcastPort = broadcastPort; } public static Map<String,List<Map<String,String>>> getHost(){ return hostInfoMap; } /** * 重置当前读指针 ** */ public void putConnector(String key, Long currentTime) { connectors.put(key, currentTime); } public void validateConnectors() { Set<Entry<String, Long>> set = new HashSet<Entry<String,Long>>(connectors.entrySet()); for (Entry<String, Long> entry : set) { if (entry.getValue() + timeout < System.currentTimeMillis()) { connectors.remove(entry.getKey()); hostInfoMap.remove(entry.getKey()); } } } public void run() { InetAddress group = null; MulticastSocket server = null; try { group = InetAddress.getByName(broadcastAddress); server = new MulticastSocket(broadcastPort); server.joinGroup(group); final byte[] data = new byte[65535]; DatagramPacket recv = new DatagramPacket(data, data.length); for (;;) { server.receive(recv); String uniqueName = ParseRecieveData.parsebytes(recv .getData()); ParseRecieveData.release(); putConnector(uniqueName, System .currentTimeMillis()); validateConnectors(); } } catch (UnknownHostException e) { throw new RuntimeException("can not find this broadcast address:"+broadcastAddress); } catch (IOException e) { e.printStackTrace(); } finally { if (server != null) try { server.leaveGroup(group); } catch (IOException e) { } } } private static class ParseRecieveData { private static final byte TYPE_BOOLEAN = 0; private static final byte TYPE_INT = 1; private static final byte TYPE_LONG = 2; private static final byte TYPE_STRING = 3; /** * 解析报文 * * @author Jombo * @param bytes */ public static String parsebytes(byte[] bytes) { int nodeIdLength = getInt(bytes, currentIndex); getString(nodeIdLength, bytes, currentIndex); int uniqueIdLength = getInt(bytes, currentIndex); String uniqueName = getString(uniqueIdLength, bytes, currentIndex); connectorPairsSize = getInt(bytes, currentIndex); List<Map<String,String>> connectorList = new ArrayList<Map<String,String>>(); // connectorPairsSize可能有多个 for(int i =0 ;i<connectorPairsSize;i++){ Map<String, String> connectorPair = parseConnectorPairInfo(bytes); connectorList.add(connectorPair); } hostInfoMap.put(uniqueName, connectorList); return uniqueName; } /** * 解析连接器信息 * * @author Jombo * @param bytes */ private static Map<String, String> parseConnectorPairInfo(byte[] bytes) { Map<String, String> ConnectorPairMap = parseConnectorInfo(bytes); Map<String, String> mainConnectorMap = parseConnectorInfo(bytes); byte existBackup = getByte(bytes, currentIndex); boolean existbackupConnector = existBackup == 0 ? false : true; Map<String, String> backConnectorMap = null; if(existbackupConnector){ backConnectorMap = parseConnectorInfo(bytes); } //在这里我们只想看host+port的形式,有兴趣的朋友可以把全部信息放进去 //这是为了获取主备连接 String mainconnector = mainConnectorMap.get(HOST) + spliter + mainConnectorMap.get(PORT); String backconnector = backConnectorMap ==null ? null:backConnectorMap.get(HOST) + spliter + backConnectorMap.get(PORT); ConnectorPairMap.put(mainconnector, backconnector); return ConnectorPairMap; }private static Map<String, String> parseConnectorInfo(byte[] bytes) {Map<String, String> paramMap = new HashMap<String, String>(); int namelength = getInt(bytes, currentIndex); getString(namelength, bytes, currentIndex); int factorylength = getInt(bytes, currentIndex); getString(factorylength, bytes, currentIndex); int paramPairsSize = getInt(bytes, currentIndex); for (int j = 0; j < paramPairsSize; j++) { int keylength = getInt(bytes, currentIndex); String key = getString(keylength, bytes, currentIndex); byte valuetype = getByte(bytes, currentIndex); String value = ""; if (valuetype == TYPE_STRING) { int valuelength = getInt(bytes, currentIndex); value = getString(valuelength, bytes, currentIndex); } else if (valuetype == TYPE_BOOLEAN) { byte booleanvalue = getByte(bytes, currentIndex); value = booleanvalue == 0 ? "false" : "true"; } else if (valuetype == TYPE_INT) { value = String.valueOf(getInt(bytes, currentIndex)); } else if (valuetype == TYPE_LONG) { value = String.valueOf(getLong(bytes, currentIndex)); } else { throw new RuntimeException("invalid type"); } paramMap.put(key, value); }return paramMap;} /** * 读取一个Int * * @author Jombo * @param bytes * @param index * begin index * @return */ public static int getInt(byte[] bytes, int index) { int val = (bytes[index] & 0xff) << 24 | (bytes[index + 1] & 0xff) << 16 | (bytes[index + 2] & 0xff) << 8 | (bytes[index + 3] & 0xff) << 0; setCurrentIndex(index + 4); return val; } /** * 读取一个Byte * * @author Jombo * @param bytes * @param index * @return */ public static byte getByte(byte[] bytes, int index) { byte b = bytes[index]; setCurrentIndex(index + 1); return b; } /** * 读取Short * * @author Jombo * @param bytes * @param index * @return */ public static short getShort(byte[] bytes, int index) { short indexshort = (short) (bytes[index] << 8 | bytes[index + 1] & 0xFF); setCurrentIndex(index + 2); return indexshort; } /** * 读取字符串,根据字符串的长度采取不同的格式 * * @author Jombo * @param length * @param bytes * @param index * @return */ public static String getString(int length, byte[] bytes, int index) { StringBuffer stringBuffer = new StringBuffer(); if (length < 9) { for (int i = 0; i < length; i++) { short indexshort = getShort(bytes, index); stringBuffer.append((char) indexshort); index = index + 2; } } else if (length < 0xfff) { short utflen = getShort(bytes, index); index = index + 2; byte[] strbyte = new byte[utflen]; System.arraycopy(bytes, index, strbyte, 0, utflen); stringBuffer.append(new String(strbyte)); setCurrentIndex(index + utflen); } else { int longlength = getInt(bytes, index); index = index + 4; byte[] strbyte = new byte[longlength]; System.arraycopy(bytes, index, strbyte, 0, longlength); stringBuffer.append(new String(strbyte)); setCurrentIndex(index + longlength); } return stringBuffer.toString(); } /** * 读取Long * * @author Jombo * @param bytes * @param index * @return */ public static long getLong(byte[] bytes, int index) { setCurrentIndex(index + 8); return ((long) bytes[index] & 0xff) << 56 | ((long) bytes[index + 1] & 0xff) << 48 | ((long) bytes[index + 2] & 0xff) << 40 | ((long) bytes[index + 3] & 0xff) << 32 | ((long) bytes[index + 4] & 0xff) << 24 | ((long) bytes[index + 5] & 0xff) << 16 | ((long) bytes[index + 6] & 0xff) << 8 | ((long) bytes[index + 7] & 0xff) << 0; } /** * 重置当前读指针 ** */ public static void release() { currentIndex = 0; } public static void setCurrentIndex(int index) { currentIndex = index; } }}