`
liudunxu2
  • 浏览: 30765 次
  • 性别: Icon_minigender_1
  • 来自: 青岛
文章分类
社区版块
存档分类
最新评论

xmemcached中的一致性hash实现

 
阅读更多
1.内部使用TreeMap<Long, List<Session>>实现session存储和定位,其中key为hash值,value为这个hash值对应的session存储集合。其中并不是有多少session就有几个键值,而是会有NUM_REPS(默认为160)*session.size个键值。还需要与session的权重相乘
2.使用treemap的tailMap方法高效定位session位置(为的是与jdk1.5兼容,jdk6使用的是ceilingKey,metaq做消费者负载均衡默认使用的就是ceilingKey)。
3.性能与hash算法密切相关,因为使用的是sessionList.get(this.random.nextInt(size))方式,如果在buildmap时,多个session的hash映射到同一个key,使用get方式获取时,可能得不到正确的缓存位置。
4.使用虚拟节点,默认个数为session的个数的160倍,也跟session的权重有关,具体一致性hash的虚拟节点介绍,可以参考http://blog.csdn.net/x15594/article/details/6270242
/**
 *Copyright [2009-2010] [dennis zhuang(killme2008@gmail.com)]
 *Licensed under the Apache License, Version 2.0 (the "License");
 *you may not use this file except in compliance with the License.
 *You may obtain a copy of the License at
 *             http://www.apache.org/licenses/LICENSE-2.0
 *Unless required by applicable law or agreed to in writing,
 *software distributed under the License is distributed on an "AS IS" BASIS,
 *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 *either express or implied. See the License for the specific language governing permissions and limitations under the License
 */
package net.rubyeye.xmemcached.impl;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;

import net.rubyeye.xmemcached.HashAlgorithm;
import net.rubyeye.xmemcached.networking.MemcachedSession;

import com.google.code.yanf4j.core.Session;

/**
 * ConnectionFactory instance that sets up a ketama compatible connection.
 *
 * <p>
 * This implementation piggy-backs on the functionality of the
 * <code>DefaultConnectionFactory</code> in terms of connections and queue
 * handling. Where it differs is that it uses both the <code>
 * KetamaNodeLocator</code>
 * and the <code>HashAlgorithm.KETAMA_HASH</code> to provide consistent node
 * hashing.
 *
 * @see http://www.last.fm/user/RJ/journal/2007/04/10/392555/
 *
 * </p>
 */
/**
 * Consistent Hash Algorithm implementation,based on TreeMap.tailMap(hash)
 * method.
 * 
 * @author dennis
 * 
 */
public class KetamaMemcachedSessionLocator extends
AbstractMemcachedSessionLocator {

	static final int NUM_REPS = 160;
	private transient volatile TreeMap<Long, List<Session>> ketamaSessions = new TreeMap<Long, List<Session>>();
	private final HashAlgorithm hashAlg;
	private volatile int maxTries;
	private final Random random = new Random();

	/**
	 * compatible with nginx-upstream-consistent,patched by wolfg1969
	 */
	static final int DEFAULT_PORT = 11211;
	private final boolean cwNginxUpstreamConsistent;

	public KetamaMemcachedSessionLocator() {
		this.hashAlg = HashAlgorithm.KETAMA_HASH;
		this.cwNginxUpstreamConsistent = false;
	}

	public KetamaMemcachedSessionLocator(boolean cwNginxUpstreamConsistent) {
		this.hashAlg = HashAlgorithm.KETAMA_HASH;
		this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
	}

	public KetamaMemcachedSessionLocator(HashAlgorithm alg) {
		this.hashAlg = alg;
		this.cwNginxUpstreamConsistent = false;
	}

	public KetamaMemcachedSessionLocator(HashAlgorithm alg,
			boolean cwNginxUpstreamConsistent) {
		this.hashAlg = alg;
		this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
	}

	public KetamaMemcachedSessionLocator(List<Session> list, HashAlgorithm alg) {
		super();
		this.hashAlg = alg;
		this.cwNginxUpstreamConsistent = false;
		this.buildMap(list, alg);
	}

	private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
		TreeMap<Long, List<Session>> sessionMap = new TreeMap<Long, List<Session>>();

		for (Session session : list) {
			String sockStr = null;
			if (this.cwNginxUpstreamConsistent) {
				InetSocketAddress serverAddress = session
						.getRemoteSocketAddress();
				sockStr = serverAddress.getAddress().getHostAddress();
				if (serverAddress.getPort() != DEFAULT_PORT) {
					sockStr = sockStr + ":" + serverAddress.getPort();
				}
			} else {
				if (session instanceof MemcachedTCPSession) {
					// Always use the first time resolved address.
					sockStr = ((MemcachedTCPSession) session)
							.getInetSocketAddressWrapper()
							.getRemoteAddressStr();
				}
				if (sockStr == null) {
					sockStr = String.valueOf(session.getRemoteSocketAddress());
				}
			}
			/**
			 * Duplicate 160 X weight references
			 */
			int numReps = NUM_REPS;
			if (session instanceof MemcachedTCPSession) {
				numReps *= ((MemcachedSession) session).getWeight();
			}
			if (alg == HashAlgorithm.KETAMA_HASH) {
				for (int i = 0; i < numReps / 4; i++) {
					byte[] digest = HashAlgorithm.computeMd5(sockStr + "-" + i);
					for (int h = 0; h < 4; h++) {
						long k = (long) (digest[3 + h * 4] & 0xFF) << 24
								| (long) (digest[2 + h * 4] & 0xFF) << 16
								| (long) (digest[1 + h * 4] & 0xFF) << 8
								| digest[h * 4] & 0xFF;
						this.getSessionList(sessionMap, k).add(session);
					}

				}
			} else {
				for (int i = 0; i < numReps; i++) {
					long key = alg.hash(sockStr + "-" + i);
					this.getSessionList(sessionMap, key).add(session);
				}
			}
		}
		this.ketamaSessions = sessionMap;
		this.maxTries = list.size();
	}

	private List<Session> getSessionList(
			TreeMap<Long, List<Session>> sessionMap, long k) {
		List<Session> sessionList = sessionMap.get(k);
		if (sessionList == null) {
			sessionList = new ArrayList<Session>();
			sessionMap.put(k, sessionList);
		}
		return sessionList;
	}

	public final Session getSessionByKey(final String key) {
		if (this.ketamaSessions == null || this.ketamaSessions.size() == 0) {
			return null;
		}
		long hash = this.hashAlg.hash(key);
		Session rv = this.getSessionByHash(hash);
		int tries = 0;
		while (!this.failureMode && (rv == null || rv.isClosed())
				&& tries++ < this.maxTries) {
			hash = this.nextHash(hash, key, tries);
			rv = this.getSessionByHash(hash);
		}
		return rv;
	}

	public final Session getSessionByHash(final long hash) {
		TreeMap<Long, List<Session>> sessionMap = this.ketamaSessions;
		if (sessionMap.size() == 0) {
			return null;
		}
		Long resultHash = hash;
		if (!sessionMap.containsKey(hash)) {
			// Java 1.6 adds a ceilingKey method, but xmemcached is compatible
			// with jdk5,So use tailMap method to do this.
			SortedMap<Long, List<Session>> tailMap = sessionMap.tailMap(hash);
			if (tailMap.isEmpty()) {
				resultHash = sessionMap.firstKey();
			} else {
				resultHash = tailMap.firstKey();
			}
		}
		//
		// if (!sessionMap.containsKey(resultHash)) {
		// resultHash = sessionMap.ceilingKey(resultHash);
		// if (resultHash == null && sessionMap.size() > 0) {
		// resultHash = sessionMap.firstKey();
		// }
		// }
		List<Session> sessionList = sessionMap.get(resultHash);
		if (sessionList == null || sessionList.size() == 0) {
			return null;
		}
		int size = sessionList.size();
		return sessionList.get(this.random.nextInt(size));
	}

	public final long nextHash(long hashVal, String key, int tries) {
		long tmpKey = this.hashAlg.hash(tries + key);
		hashVal += (int) (tmpKey ^ tmpKey >>> 32);
		hashVal &= 0xffffffffL; /* truncate to 32-bits */
		return hashVal;
	}

	public final void updateSessions(final Collection<Session> list) {
		this.buildMap(list, this.hashAlg);
	}
}


分享到:
评论

相关推荐

    xmemcached 中文开发手册

    xmemcached中文开发手册,提供的版本较老,但针对2.0的版本次文档同样适用,可以入门

    Xmemcached一个java实现的分布式缓存

    Xmemcached一个java实现的分布式缓存Xmemcached一个java实现的分布式缓存Xmemcached一个java实现的分布式缓存Xmemcached一个java实现的分布式缓存

    xmemcached

    Java的memcached客户端,支持一致性hash算法,支持动态加服务器,减服务器

    xmemcached-1.2.4源码

    xmemcached-1.2.4的官方源码。 xmemcached XMemcached is a high performance, easy to use blocking multithreaded memcached client in java. It's nio based (using my opensource nio framework :yanf4j), ...

    xmemcached源码

    java的memcached客户端,支持一致性hash,支持动态增删服务器,客户端源码

    Xmemcached官方中文手册

    官方中文手册,各种开发实例,集成spring等详尽配置信息,不错的资料!

    xmemcached 2.4.6.rar

    repository\com\googlecode\xmemcached\xmemcached 2.4.6

    xmemcached-1.2.6.2

    xmemcached-1.2.6.2

    xmemcached.chm文档

    xmemcached.chm帮助文档

    xmemcached-1.4.3.jar

    xmemcached-1.4.3.jar

    Xmemcached测试实例

    资源中包括测试类以及依赖的jar包,绝对实用。 测试类包括Xmemcached客户端与memcached client for java两者,可运行比较性能。 XMemcached简介: XMemcached是基于 java nio的Memcached客户端,java nio相比于...

    Xmemcached memcached 实例

    类包括Xmemcached客户端实现和builder实现以及memcached client for java实现,对初学者有借鉴作用,特别是在开发简单例子时出现的超时情况的可以看看是否是同本事例相同。 xmemcached time out 5000 1000

    spring-xmemcached

    spring+xmemcached aop切面 需要xmemcached-1.2.5+spring-2.5.6 class="net.rubyeye.xmemcached.utils.XMemcachedClientFactoryBean" destroy-method="shutdown"&gt; ${XMemcached_servers} &lt;!-- ...

    Xmemcached 缓存开源项目源码及API

    基于java nio实现的高性能可扩展的memcached客户端。虽然Java的memcached库已经很多,但是这些Java开源memcached库并没有一个是基于NIO框架编写,因此并不能够充分发挥Java NIO的性能优势.... xmemcached的项目主页...

    xmemcached api doc

    xmemcached 接口文档,方便开发和学习

    xmemcached jar包,源文件,api

    memcached 客户端 xmemcached jar 包,xmemcached 源文件 及 api

    Xmemcached用户指南

    value缓存系统,用于动态Web应用以减轻数据库负载,现在也有很多人将它作为内存式数据库在使用,memcached通过它的自定义协议与客户端交互,而XMemcached就是它的一个java客户端实现。 Memcached的java客户端已经...

    xmemcached-1.4.2

    xmemcached-1.4.2最新版,可用。memcached java客户端

Global site tag (gtag.js) - Google Analytics