`

Ketama is an implementation of a consistent hashing algorithm

 
阅读更多
对Ketama的介绍
Ketama is an implementation of a consistent hashing algorithm, meaning you can add or remove servers from the memcached pool without causing a complete remap of all keys.
Here’s how it works:
* Take your list of servers (eg: 1.2.3.4:11211, 5.6.7.8:11211, 9.8.7.6:11211)
* Hash each server string to several (100-200) unsigned ints
* Conceptually, these numbers are placed on a circle called the continuum. (imagine a clock face that goes from 0 to 2^32)
* Each number links to the server it was hashed from, so servers appear at several points on the continuum, by each of the numbers they hashed to.
* To map a key->server, hash your key to a single unsigned int, and find the next biggest number on the continuum. The server linked to that number is the correct server for that key.
* If you hash your key to a value near 2^32 and there are no points on the continuum greater than your hash, return the first server in the continuum.
If you then add or remove a server from the list, only a small proportion of keys end up mapping to different servers.

前面介绍了ketama的一些概念和特性,接下来我们废话少说,直接上代码。

具体java实现如下:

public final class KetamaNodeLocator {

    /**
     * 存储节点
     */
    private TreeMap<Long, Node> ketamaNodes;

    private HashAlgorithm hashAlg;

    private int numReps = 100;

    /**
     * @param nodes      实际服务器的节点
     * @param alg        采用的hash算法
     * @param nodeCopies 虚节点数量
     */
    public KetamaNodeLocator(List<Node> nodes, HashAlgorithm alg, int nodeCopies) {
        hashAlg = alg;
        ketamaNodes = new TreeMap<Long, Node>();

        numReps = nodeCopies;

        //对所有节点,生成nCopies个虚拟结点
        for (Node node : nodes) {
            //虚拟结点分为四组
            for (int i = 0; i < numReps / 4; i++) {
                //getKeyForNode方法为这组虚拟结点得到惟一名称
                byte[] digest = hashAlg.computeMd5(getKeyForNode(node, i));
                /** Md5是一个16字节长度的数组,将16字节的数组每四个字节一组,
                 分别对应一个虚拟结点,这就是为什么上面把虚拟结点分为组的原因*/
                for (int h = 0; h < 4; h++) {
                    // Md5编码后,每个虚拟结点对应Md5码16个字节中的4个,组成一个long型数值,做为这个虚拟结点在环中的惟一key
                    long m = hashAlg.hash(digest, h);

                    ketamaNodes.put(m, node);
                }
            }
        }
    }

    /**
     * Get the primary location for the given key
     *
     * @param k
     *          需要从节点上获取值的Key
     * @return
     *          存储key值的节点
     */
    public Node getPrimary(final String k) {
        byte[] digest = hashAlg.computeMd5(k);
        Node rv = getNodeForKey(hashAlg.hash(digest, 0));
        return rv;
    }

    private Node getNodeForKey(long hash) {
        final Node rv;
        Long key = hash;
        if (!ketamaNodes.containsKey(key)) {
            //得到大于当前key的那个子Map,然后从中取出第一个key,就是大于且离它最近的那个key
            SortedMap<Long, Node> tailMap = ketamaNodes.tailMap(key);
            if (tailMap.isEmpty()) {
                key = ketamaNodes.firstKey();
            } else {
                key = tailMap.firstKey();
            }
            //For JDK1.6 version
//			key = ketamaNodes.ceilingKey(key);
//			if (key == null) {
//				key = ketamaNodes.firstKey();
//			}
        }

        //如果找到这个节点,直接取节点,返回
        rv = ketamaNodes.get(key);
        return rv;
    }

    /**
     * Returns a uniquely identifying key, suitable for hashing by the
     * KetamaNodeLocator algorithm.
     *
     * @param node       The Node to use to form the unique identifier
     * @param repetition The repetition number for the particular node in
     *                   question (0 is the first repetition)
     * @return The key that represents the specific repetition of the node
     */
    private String getKeyForNode(Node node, int repetition) {
        return node.getName() + repetition;
    }
}


public enum HashAlgorithm {

	/**
	 * MD5-based hash algorithm used by ketama.
	 */
	KETAMA_HASH;

	public long hash(byte[] digest, int nTime) {
		long rv = ((long) (digest[3+nTime*4] & 0xFF) << 24)
				| ((long) (digest[2+nTime*4] & 0xFF) << 16)
				| ((long) (digest[1+nTime*4] & 0xFF) << 8)
				| (digest[0+nTime*4] & 0xFF);
		
		return rv & 0xffffffffL; /* Truncate to 32-bits */
	}

	/**
	 * Get the md5 of the given key.
	 */
	public byte[] computeMd5(String k) {
		MessageDigest md5;
		try {
			md5 = MessageDigest.getInstance("MD5");
		} catch (NoSuchAlgorithmException e) {
			throw new RuntimeException("MD5 not supported", e);
		}
		md5.reset();
		byte[] keyBytes = null;
		try {
			keyBytes = k.getBytes("UTF-8");
		} catch (UnsupportedEncodingException e) {
			throw new RuntimeException("Unknown string :" + k, e);
		}
		
		md5.update(keyBytes);
		return md5.digest();
	}
}



public class Node {

    private String name;

    public Node(String name) {
        this.name = name;
    }

    /**
     * 可以用机器的ip + port
     *
     * @return
     */
    public String getName() {
        return name;
    }

    public String toString() {
        return "name : " + name;
    }

    public boolean equals(Object obj) {
        if(obj == null) return false;
        if(!(obj instanceof Node)) return false;
        Node node = (Node) obj;
        return this.name.equals(node.getName());
    }
}


下面是测试代码:

分布平均性测试:测试随机生成的众多key是否会平均分布到各个结点上
public class HashAlgorithmTest {

	static Random ran = new Random();
	
	/** key's count */
	private static final Integer EXE_TIMES = 100000;
	
	private static final Integer NODE_COUNT = 5;
	
	private static final Integer VIRTUAL_NODE_COUNT = 160;
	
	public static void main(String[] args) {
		HashAlgorithmTest test = new HashAlgorithmTest();
		
		/** Records the times of locating node*/
		Map<Node, Integer> nodeRecord = new HashMap<Node, Integer>();
		
		List<Node> allNodes = test.getNodes(NODE_COUNT);
		KetamaNodeLocator locator = new KetamaNodeLocator(allNodes, HashAlgorithm.KETAMA_HASH, VIRTUAL_NODE_COUNT);

        //模拟初始化所有的key值
		List<String> allKeys = test.getAllStrings();
		for (String key : allKeys) {
			Node node = locator.getPrimary(key);
			
			Integer times = nodeRecord.get(node);
			if (times == null) {
				nodeRecord.put(node, 1);
			} else {
				nodeRecord.put(node, times + 1);
			}
		}
		
		System.out.println("Nodes count : " + NODE_COUNT + ", Keys count : " + EXE_TIMES + ", Normal percent : " + (float) 100 / NODE_COUNT + "%");
		System.out.println("-------------------- boundary  ----------------------");
		for (Map.Entry<Node, Integer> entry : nodeRecord.entrySet()) {
			System.out.println("Node name :" + entry.getKey() + " - Times : " + entry.getValue() + " - Percent : " + (float)entry.getValue() / EXE_TIMES * 100 + "%");
		}
		
	}
	
	
	/**
	 * Gets the mock node by the material parameter
	 * 
	 * @param nodeCount 
	 * 		the count of node wanted
	 * @return
	 * 		the node list
	 */
	private List<Node> getNodes(int nodeCount) {
		List<Node> nodes = new ArrayList<Node>();
		
		for (int k = 1; k <= nodeCount; k++) {
			Node node = new Node("node" + k);
			nodes.add(node);
		}
		
		return nodes;
	}
	
	/**
	 *	All the keys	
	 */
	private List<String> getAllStrings() {
		List<String> allStrings = new ArrayList<String>(EXE_TIMES);
		
		for (int i = 0; i < EXE_TIMES; i++) {
			allStrings.add(generateRandomString(ran.nextInt(50)));
		}
		
		return allStrings;
	}
	
	/**
	 * To generate the random string by the random algorithm
	 * <br>
	 * The char between 32 and 127 is normal char
	 * 
	 * @param length
	 * @return
	 */
	private String generateRandomString(int length) {
		StringBuffer sb = new StringBuffer(length);
		
		for (int i = 0; i < length; i++) {
			sb.append((char) (ran.nextInt(95) + 32));
		}
		
		return sb.toString();
	}
}


节点增删测试:在环上插入N个结点,每个节点nCopies个虚拟结点。随机生成众多key,在增删节点时,测试同一个key选择相同节点的概率
public class HashAlgorithmPercentTest {

	static Random ran = new Random();
	
	/** key's count */
	private static final Integer EXE_TIMES = 100000;
	
	private static final Integer NODE_COUNT = 50;
	
	private static final Integer VIRTUAL_NODE_COUNT = 160;
	
	static List<String> allKeys = null;
	
	static {
		allKeys = getAllStrings();
	}
	
	public static void main(String[] args) {
		
		Map<String, List<Node>> map = generateRecord();
		
		List<Node> allNodes = getNodes(NODE_COUNT);
		System.out.println("Normal case : nodes count : " + allNodes.size());
		call(allNodes, map);
		
		allNodes = getNodes(NODE_COUNT + 8);
		System.out.println("Added case : nodes count : " + allNodes.size());
		call(allNodes, map);
		
		allNodes = getNodes(NODE_COUNT - 10);
		System.out.println("Reduced case : nodes count : " + allNodes.size());
		call(allNodes, map);
		
		int addCount = 0;
		int reduceCount = 0;
		for (Map.Entry<String, List<Node>> entry : map.entrySet()) {
			List<Node> list = entry.getValue();
			
			if (list.size() == 3) {
				if (list.get(0).equals(list.get(1))) {
					addCount++;
				}
				
				if (list.get(0).equals(list.get(2))) {
					reduceCount++;
				}
			} else {
				System.out.println("It's wrong size of list, key is " + entry.getKey() + ", size is " + list.size());
			}
		}
		
		System.out.println(addCount + "   ---   " + reduceCount);
		
		System.out.println("Same percent in added case : " + (float) addCount * 100/ EXE_TIMES + "%");
		System.out.println("Same percent in reduced case : " + (float) reduceCount * 100/ EXE_TIMES + "%");
		
	}
	
	private static void call(List<Node> nodes, Map<String, List<Node>> map) {
		KetamaNodeLocator locator = new KetamaNodeLocator(nodes, HashAlgorithm.KETAMA_HASH, VIRTUAL_NODE_COUNT);
		
		for (Map.Entry<String, List<Node>> entry : map.entrySet()) {
			Node node = locator.getPrimary(entry.getKey());
			
			if (node != null) {
				List<Node> list = entry.getValue();
				list.add(node);
			}
		}
	}
	
	private static Map<String, List<Node>> generateRecord() {
		Map<String, List<Node>> record = new HashMap<String, List<Node>>(EXE_TIMES);
		
		for (String key : allKeys) {
			List<Node> list = record.get(key);
			if (list == null) {
				list = new ArrayList<Node>();
				record.put(key, list);
			}
		}
		
		return record;
	}
	
	
	/**
	 * Gets the mock node by the material parameter
	 * 
	 * @param nodeCount 
	 * 		the count of node wanted
	 * @return
	 * 		the node list
	 */
	private static List<Node> getNodes(int nodeCount) {
		List<Node> nodes = new ArrayList<Node>();
		
		for (int k = 1; k <= nodeCount; k++) {
			Node node = new Node("node" + k);
			nodes.add(node);
		}
		
		return nodes;
	}
	
	/**
	 *	All the keys	
	 */
	private static List<String> getAllStrings() {
		List<String> allStrings = new ArrayList<String>(EXE_TIMES);
		
		for (int i = 0; i < EXE_TIMES; i++) {
			allStrings.add(generateRandomString(ran.nextInt(50)));
		}
		
		return allStrings;
	}
	
	/**
	 * To generate the random string by the random algorithm
	 * <br>
	 * The char between 32 and 127 is normal char
	 * 
	 * @param length
	 * @return
	 */
	private static String generateRandomString(int length) {
		StringBuffer sb = new StringBuffer(length);
		
		for (int i = 0; i < length; i++) {
			sb.append((char) (ran.nextInt(95) + 32));
		}
		
		return sb.toString();
	}
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics