`

spymemcached CAS操作

 
阅读更多
package spymemcached;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.internal.OperationFuture;

/**
 * 
 * @author fairjm 
 * fair-jm.iteye.com
 *
 */
public class MemcachedCASSpec {

    public static final int CONCURRENT_THREAD_NUM = 100;

    public static void main(String[] args) throws IOException, InterruptedException {
        final MemcachedClient mc = new MemcachedClient(new InetSocketAddress("10.1.2.194", 11211));

        ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREAD_NUM);
        final CountDownLatch countDown = new CountDownLatch(CONCURRENT_THREAD_NUM);

        final AtomicInteger oks = new AtomicInteger(0);
        final AtomicInteger okThenModifieds = new AtomicInteger(0);

        for (int i = 0; i < CONCURRENT_THREAD_NUM; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    Random ran = new Random(Thread.currentThread().getId()
                            + System.currentTimeMillis());
                    int num = ran.nextInt(10000);

                    boolean isOk = false;
                    while (!isOk) {
                        OperationFuture<Boolean> future;

                        CASValue<Object> oldValue = mc.gets("testKey");
                        // 不存在的情况 此时进行CAS操作 无论指定什么key 正常返回都会是NOT_FOUND需要手工插入
                        // 这里存在问题 需要用锁 实际使用进行CAS前最好保证key已经存在
                        if (oldValue == null) {
                            future = mc.set("testKey", 0, num);
                            try {
                                future.get();
                            } catch (InterruptedException | ExecutionException e) {
                                e.printStackTrace();
                            }
                            isOk = true;
                        } else {
                            switch (mc.cas("testKey", oldValue.getCas(), num)) {
                            case OK:
                                oks.incrementAndGet();
                                System.out.println("OK");
                                isOk = true;
                                break;
                            // key不存在的情况 需要进行add(set也可以)操作
                            case NOT_FOUND:
                                System.out.println("NOT_FOUND");
                                break;
                            // 冲突 现有的CASID低于memcached中的
                            case EXISTS:
                                System.out.println("Exists retry");
                                break;
                            // 参数有问题
                            case OBSERVE_ERROR_IN_ARGS:
                                System.out.println("error in args");
                                break;
                            // 操作超时
                            case OBSERVE_TIMEOUT:
                                System.out.println("timeout");
                                break;
                            // 操作成功 但是在观察期间又被修改了(更新成功后马上被修改)
                            case OBSERVE_MODIFIED:
                                System.out.println("succeed but modified then");
                                isOk = true;
                                okThenModifieds.incrementAndGet();
                                break;
                            }
                        }
                    }

                    countDown.countDown();
                }
            });
        }
        countDown.await();

        System.out.println("testKey:" + mc.get("testKey"));

        System.out.println("count:" + (oks.get() + okThenModifieds.get()));

        if (mc != null) {
            mc.shutdown();
        }
        executor.shutdown();

    }
}

 对memcached有版本要求 低版本不支持CAS

感觉怪怪的 key不存在的时候拿不到CAS ID也不能用CAS了....只有自己add(或set) 需要用锁....

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics