`
Donald_Draper
  • 浏览: 951677 次
社区版块
存档分类
最新评论

Memcached分布式客户端(Xmemcached)

阅读更多
memcached 安装配置:http://donald-draper.iteye.com/blog/2395580
memcached 命令:http://donald-draper.iteye.com/blog/2395628
Memcached Standby客户端:http://donald-draper.iteye.com/blog/2396260
上一篇MemcachedStandbyClient文章,我们测试了XMemcached的Standby模式客户端,
今天来看一下XMemcached分布式客户端:
Memcached的分布是通过客户端实现的,客户端根据key的哈希值得到将要存储的memcached节点,并将对应的value存储到相应的节点。XMemcached同样支持客户端的分布策略,
默认分布的策略是按照key的哈希值模以连接数得到的余数,对应的连接就是将要存储的节点。如果使用默认的分布策略,你不需要做任何配置或者编程。
XMemcached同样支持一致性哈希(consistent hash),通过编程设置:
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil
		.getAddresses(properties.getProperty("test.memcached.servers"))
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
MemcachedClient client=builder.build();

一致性hash可以参见这篇文章:
MemCache超详细解读:http://www.csdn.net/article/2016-03-16/2826609
XMemcached还提供了额外的一种哈希算法——选举散列,在某些场景下可以替代一致性哈希
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
                                        AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));
builder.setSessionLocator(new ElectionMemcachedSessionLocator());
MemcachedClient mc = builder.build();

我们选用用一致性hash算法:
本文的源代码可以从以下地址load:
https://github.com/Donaldhan/memcached-demo
启动3个memcached实例:
[memcached@donald ~]$ memcached -p 11211 -d -u memcached -l 192.168.126.128 -c 1024 
[memcached@donald ~]$ memcached -p 11212 -d -u memcached -l 192.168.126.128 -c 1024 
[memcached@donald ~]$ memcached -p 11213 -d -u memcached -l 192.168.126.128 -c 1024 
[memcached@donald ~]$ ps -ef | grep memcached
root       4514   1248  0 22:01 ?        00:00:00 sshd: memcached [priv]
memcach+   4517   4514  0 22:01 ?        00:00:00 sshd: memcached@pts/1
memcach+   5172      1  0 22:04 ?        00:00:00 memcached -p 11211 -d -u memcached -l 192.168.126.128 -c 1024
memcach+   5208      1  0 22:07 ?        00:00:00 memcached -p 11212 -d -u memcached -l 192.168.126.128 -c 1024
memcach+   5219      1  0 22:07 ?        00:00:00 memcached -p 11213 -d -u memcached -l 192.168.126.128 -c 1024
memcach+   5238   4522  0 22:07 pts/1    00:00:00 grep --color=auto memcached
[memcached@donald ~]$ 


创建memcache属性文件memcached.properties:

############################
##memcached server ip address list
############standby############## 
standbyServerList=192.168.126.128:11211
############Distribute############## 
distributeServerList=192.168.126.128:11211 192.168.126.128:11212 192.168.126.128:11213
poolName=sidsock
poolSize=16


创建属性文件工具类:


package util;


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * 
 * @author donald
 * 2017年9月29日
 * 下午10:21:34
 */
public class PropertiesUtil {
	private static final Logger log  = LoggerFactory.getLogger(PropertiesUtil.class);
	private static final String MEMCACHED_CONFIG_FILE = "memcached.properties";
    private static volatile PropertiesUtil instance = null;
    private static Properties properties = null;
    static{
    	 if (properties == null) {
    		 properties = new Properties();
         }
         try {
        	 InputStream inputStream = Thread.currentThread().getContextClassLoader()
        	            .getResourceAsStream(MEMCACHED_CONFIG_FILE);
        	 properties.load(inputStream);
         } catch (IOException e1) {
             e1.printStackTrace();
         }
    }
    
    /**
     * 
     * @return
     */
    public static synchronized PropertiesUtil getInstance() {
        if (instance == null) {
        	instance = new PropertiesUtil();
        }
        return instance;
    }
    /**
     * 
     * @param key
     * @return
     */
    public String getProperty(String key) {
        return properties.getProperty(key);
    }
    
    /**
     * 获取属性int值
     * @param key
     * @return
     */
    public Integer getInteger(String key) {
    	String value = properties.getProperty(key);
        return Integer.valueOf(value);
    }
    
    
    public static void main(String[] args) {
		log.info("serverList:"+PropertiesUtil.getInstance().getProperty("standbyServerList"));
	}
}

创建memcache分布式客户端:

package bootstrap;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.rubyeye.xmemcached.Counter;
import net.rubyeye.xmemcached.GetsResponse;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.exception.MemcachedException;
import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
import net.rubyeye.xmemcached.utils.AddrUtil;
import util.PropertiesUtil;

/**
 * Memcached distribution 客户端
 * Memcached的分布是通过客户端实现的,客户端根据key的哈希值得到将要存储的memcached节点,
 * 并将对应的value存储到相应的节点。
 * XMemcached同样支持客户端的分布策略,默认分布的策略是按照key的哈希值模以连接数得到的余数,
 * 对应的连接就是将要存储的节点。如果使用默认的分布策略,你不需要做任何配置或者编程。
 * XMemcached同样支持一致性哈希(consistent hash),通过编程设置:
 * MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil
 *				.getAddresses(properties.getProperty("test.memcached.servers"))
 * builder.setSessionLocator(new KetamaMemcachedSessionLocator());
 * MemcachedClient client=builder.build();
 * 具体一致性哈希算法原理见:http://www.csdn.net/article/2016-03-16/2826609
 * XMemcached还提供了额外的一种哈希算法——选举散列,在某些场景下可以替代一致性哈希
 * MemcachedClientBuilder builder = new XMemcachedClientBuilder(
 *                AddrUtil.getAddresses("server1:11211 server2:11211 server3:11211"));
 * builder.setSessionLocator(new ElectionMemcachedSessionLocator());
 * MemcachedClient mc = builder.build();
 * @author donald
 * 2017年10月10日
 * 下午12:49:45
 */
public class MemcachedDistributeClient {
	private static final Logger log = LoggerFactory.getLogger(MemcachedDistributeClient.class);
	private static final String MEMCACHED_SERVER_LIST = "distributeServerList";
	private static PropertiesUtil  propertiesUtil = PropertiesUtil.getInstance();
	private static volatile MemcachedDistributeClient instance;
	private static MemcachedClientBuilder builder;
	private static MemcachedClient memcachedClient;
	static{
		String distributeServerList = propertiesUtil.getProperty(MEMCACHED_SERVER_LIST);
		List<InetSocketAddress> serverAddresses = AddrUtil.getAddresses(distributeServerList);
		builder = new XMemcachedClientBuilder(serverAddresses);
		builder.setSessionLocator(new KetamaMemcachedSessionLocator());
		try {
			memcachedClient = builder.build();
		} catch (IOException e) {
			log.error("连接异常");
			e.printStackTrace();
		}
	}
	public static synchronized MemcachedDistributeClient getInstance() {
		if (instance == null) {
			instance = new MemcachedDistributeClient();
		}
		return instance;
	}

	/**
	 * 
	 * @param key
	 * @param value
	 * @return
	 */
	public boolean set(String key,Object value){
		return set(key, 0, value);
	}
	/**
	 * 
	 * @param key
	 * @param expire 过期时间秒
	 * @param value
	 * @return
	 */
	public boolean set(String key,int expire,Object value){
		boolean finish = false;
		try {
			finish = memcachedClient.set(key, expire, value);
		} catch (TimeoutException e) {
			log.error("set超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("set中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("set错误");
			e.printStackTrace();
		}
		return finish;
	}
	/**
	 * 
	 * @param key
	 * @param value
	 * @return
	 */
	public boolean add(String key,Object value){
		return add(key, 0, value);
	}
	/**
	 * 
	 * @param key
	 * @param expire 过期时间秒
	 * @param value
	 * @return
	 */
	public boolean add(String key,int expire,Object value){
		boolean finish = false;
		try {
			finish = memcachedClient.add(key, expire, value);
		} catch (TimeoutException e) {
			log.error("add超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("add中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("add错误");
			e.printStackTrace();
		}
		return finish;
	}
	/**
	 * 
	 * @param key
	 * @return
	 */
	public Object get(String key){
		Object value = null;
		try {
			value = memcachedClient.get(key);
		} catch (TimeoutException e) {
			log.error("get超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("get中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("get操作错误");
			e.printStackTrace();
		}
		return value;
	}
	/**
	 * 
	 * @param key
	 * @param appendValue
	 * @return
	 */
	public boolean append(String key,Object appendValue){
		boolean finish = false;
		try {
			finish = memcachedClient.append(key, appendValue);
		} catch (TimeoutException e) {
			log.error("append超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("append中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("append操作错误");
			e.printStackTrace();
		}
		return finish;
	}
	/**
	 * 
	 * @param key
	 * @param prependValue
	 * @return
	 */
	public boolean prepend(String key,Object prependValue){
		boolean finish = false;
		try {
			finish = memcachedClient.prepend(key, prependValue);
		} catch (TimeoutException e) {
			log.error("prepend超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("prepend中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("prepend操作错误");
			e.printStackTrace();
		}
		return finish;
	}
	/**
	 * 
	 * @param key
	 * @param value
	 * @return
	 */
	public boolean replace(String key,Object value){
		return replace(key, 0, value);
	}
	/**
	 * 
	 * @param key
	 * @param expire 过期时间秒
	 * @param value
	 * @return
	 */
	public boolean replace(String key,int expire,Object value){
		boolean finish = false;
		try {
			finish = memcachedClient.replace(key, expire, value);
		} catch (TimeoutException e) {
			log.error("replace超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("replace中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("replace错误");
			e.printStackTrace();
		}
		return finish;
	}
	/**
	 * 
	 * @param key
	 * @param expire
	 * @return
	 */
	public boolean touch(String key,int expire){
		boolean finish = false;
		try {
			finish = memcachedClient.touch(key, expire);
		} catch (TimeoutException e) {
			log.error("touch超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("touch中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("touch操作错误");
			e.printStackTrace();
		}
		return finish;
	}
	/**
	 * @param key
	 * @param step
	 * @param defalut
	 * @return
	 */
	public long incr(String key,long step,long defalut){
		long value = 0;
		try {
			value = memcachedClient.incr(key, step, defalut);
		} catch (TimeoutException e) {
			log.error("incr超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("incr中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("incr操作错误");
			e.printStackTrace();
		}
		return value;
	}
	/**
	 * 
	 * @param key
	 * @param step
	 * @return
	 */
	public long incr(String key,long step){
		long value = 0;
		try {
			value = memcachedClient.incr(key, step);
		} catch (TimeoutException e) {
			log.error("incr超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("incr中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("incr操作错误");
			e.printStackTrace();
		}
		return value;
	}
	/**
	 * 
	 * @param key
	 * @param step
	 * @return
	 */
	public long decr(String key,long step){
		long value = 0;
		try {
			value = memcachedClient.decr(key, step);
		} catch (TimeoutException e) {
			log.error("decr超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("decr中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("decr操作错误");
			e.printStackTrace();
		}
		return value;
	}
	/**
	 * 
	 * @param key
	 * @return
	 */
	public long gets(String key){
		long sid = 0;
		try {
			GetsResponse<Integer> result = memcachedClient.gets(key);
			sid = result.getCas(); 
		} catch (TimeoutException e) {
			log.error("gets超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("gets中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("gets操作错误");
			e.printStackTrace();
		}
		return sid;
	}
	/**
	 * 
	 * @param key
	 * @param obj
	 * @return
	 */
	public boolean cas(String key,Object obj){
		boolean finish = false;
		long sid = gets(key);
		finish = cas(key, 0, obj, sid);
		return finish;
	}
	/**
	 * 
	 * @param key
	 * @param expire
	 * @param obj
	 * @param sid key当前版本id
	 * @return
	 */
	public boolean cas(String key, int expire, Object obj, long sid){
		boolean finish = false;
		try {
			finish = memcachedClient.cas(key, expire, obj, sid);
		} catch (TimeoutException e) {
			log.error("cas超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("cas中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("cas操作错误");
			e.printStackTrace();
		}
		return finish;
	}
	/**
	 * @param key
	 * @return
	 */
	public boolean delete(String key){
		boolean finish = false;
		try {
			finish = memcachedClient.delete(key);
		} catch (TimeoutException e) {
			log.error("delete超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("delete中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("delete操作错误");
			e.printStackTrace();
		}
		return finish;
	}
	/**
	 * 
	 * @param key
	 * @return
	 */
	public void deleteWithNoReply(String key){
		try {
			memcachedClient.deleteWithNoReply(key);
		} catch (InterruptedException e) {
			log.error("deleteWithNoReply中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("deleteWithNoReply操作错误");
			e.printStackTrace();
		}
	}
	/**
	 * @param key
	 * @return
	 */
	public Counter getCounter(String key) {
		return getCounter(key,0);
	}
	/**
	 * @param key
	 * @param init
	 * @return
	 */
	public Counter getCounter(String key,int init) {
		Counter counter = memcachedClient.getCounter(key,init);
		return counter;
	}
	/**
	 * 
	 */
	public void flushAll(){
		try {
			memcachedClient.flushAll();
		} catch (TimeoutException e) {
			log.error("flushAll超时");
			e.printStackTrace();
		} catch (InterruptedException e) {
			log.error("flushAll中断异常");
			e.printStackTrace();
		} catch (MemcachedException e) {
			log.error("flushAll操作错误");
			e.printStackTrace();
		}
	}
	public void shutdown(){
		try {
			memcachedClient.shutdown();
		} catch (IOException e) {
			log.error("客户端关闭异常");
			e.printStackTrace();
		}
	}
	/* (non-Javadoc)
	 * @see java.lang.Object#finalize()
	 */
	@Override
	protected void finalize() {
		try {
			
			memcachedClient.shutdown();
		} catch (IOException e) {
			log.error("memcached 关闭客户端连接失败!");
			e.printStackTrace();
		}
	}
}


创建测试类:

package bootstrap;

import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.rubyeye.xmemcached.Counter;
import net.rubyeye.xmemcached.exception.MemcachedException;

/**
 * Memcached 分布式客户端测试类
 * @author 
 * donald 
 * 2017年10月10日 
 * 下午12:49:45
 */
public class MemcachedDistributeClientTest {
	private static final Logger log = LoggerFactory.getLogger(MemcachedDistributeClientTest.class);
	public static void main(String[] args) {
		MemcachedDistributeClient memcachedClient = MemcachedDistributeClient.getInstance();
		memcachedClient.set("name", 0, "donald");
		String value = (String) memcachedClient.get("name");
		log.info("set name={}", value);
		memcachedClient.delete("name");
		value = (String) memcachedClient.get("name");
		log.info("delete name={}", value);
		if (!memcachedClient.set("name", 0, "jamel")) {
			log.error("set error");
		}
		value = (String) memcachedClient.get("name");
		log.info("set name={}", value);
		if (memcachedClient.add("name", 0, "donald")) {
			log.error("Add error,key is existed");
		}
		value = (String) memcachedClient.get("name");
		log.info("name={}", value);
		if (!memcachedClient.replace("name", 0, "rain")) {
			log.error("replace error");
		}
		value = (String) memcachedClient.get("name");
		log.info("repalce name={}", value);
		memcachedClient.append("name", "-han");
		value = (String) memcachedClient.get("name");
		log.info("append name={}", value);
		memcachedClient.prepend("name", "0-");
		value = (String) memcachedClient.get("name");
		log.info("prepend name={}", value);
		memcachedClient.touch("name", 3);
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		value = (String) memcachedClient.get("name");
		log.info("after touch name={}", value);
		memcachedClient.deleteWithNoReply("name");
		memcachedClient.set("age", 0, "27");
		log.info("age={}", memcachedClient.get("age"));
		memcachedClient.incr("age", 2, 1);// age 增加2,age不存在,则为1
		memcachedClient.incr("age", 1);
		log.info("incr age={}", memcachedClient.get("age"));
		memcachedClient.decr("age", 2);
		log.info("decr age={}", memcachedClient.get("age"));
		if (!memcachedClient.cas("age", 27)) {
			log.error("cas error");
		}
		log.info("cas age={}", memcachedClient.get("age"));
		Counter counter = memcachedClient.getCounter("counter", 0);
		try {
			log.info("incrementAndGet counter,{}", counter.incrementAndGet());
			log.info("decrementAndGet counter,{}", counter.decrementAndGet());
			log.info("addAndGet counter,{}", counter.addAndGet(3));
		} catch (MemcachedException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		}
		memcachedClient.shutdown();
	}
}

运行测试类,控制台输出如下:

22:32:41.794 [main] INFO  net.rubyeye.xmemcached.XMemcachedClient 729- XMemcachedClient is using Text protocol
22:32:41.841 [main] INFO  com.google.code.yanf4j.nio.impl.SelectorManager 37- Creating 4 reactors...
22:32:41.859 [main] INFO  com.google.code.yanf4j.core.impl.AbstractController 377- The Controller started at localhost/127.0.0.1:0 ...
22:32:50.918 [Xmemcached-Reactor-0] INFO  com.google.code.yanf4j.core.impl.AbstractController 253- Add a session: 192.168.126.128:11211
22:33:00.033 [Xmemcached-Reactor-0] INFO  com.google.code.yanf4j.core.impl.AbstractController 253- Add a session: 192.168.126.128:11212
22:33:09.118 [Xmemcached-Reactor-0] INFO  com.google.code.yanf4j.core.impl.AbstractController 253- Add a session: 192.168.126.128:11213
22:33:09.185 [main] INFO  bootstrap.MemcachedDistributeClientTest 24- set name=donald
22:33:09.196 [main] INFO  bootstrap.MemcachedDistributeClientTest 27- delete name=null
22:33:09.202 [main] INFO  bootstrap.MemcachedDistributeClientTest 32- set name=jamel
22:33:09.210 [main] INFO  bootstrap.MemcachedDistributeClientTest 37- name=jamel
22:33:09.216 [main] INFO  bootstrap.MemcachedDistributeClientTest 42- repalce name=rain
22:33:09.221 [main] INFO  bootstrap.MemcachedDistributeClientTest 45- append name=rain-han
22:33:09.227 [main] INFO  bootstrap.MemcachedDistributeClientTest 48- prepend name=0-rain-han
22:33:12.234 [main] INFO  bootstrap.MemcachedDistributeClientTest 56- after touch name=null
22:33:12.240 [main] INFO  bootstrap.MemcachedDistributeClientTest 59- age=27
22:33:12.250 [main] INFO  bootstrap.MemcachedDistributeClientTest 62- incr age=30
22:33:12.255 [main] INFO  bootstrap.MemcachedDistributeClientTest 64- decr age=28
22:33:12.267 [main] INFO  bootstrap.MemcachedDistributeClientTest 68- cas age=27
22:33:12.275 [main] INFO  bootstrap.MemcachedDistributeClientTest 71- incrementAndGet counter,1
22:33:12.278 [main] INFO  bootstrap.MemcachedDistributeClientTest 72- decrementAndGet counter,0
22:33:12.281 [main] INFO  bootstrap.MemcachedDistributeClientTest 73- addAndGet counter,3
22:33:12.320 [Xmemcached-Reactor-3] INFO  com.google.code.yanf4j.core.impl.AbstractController 368- Remove a session: 192.168.126.128:11212
22:33:12.326 [Xmemcached-Reactor-1] INFO  com.google.code.yanf4j.core.impl.AbstractController 368- Remove a session: 192.168.126.128:11213
22:33:12.328 [Xmemcached-Reactor-2] INFO  com.google.code.yanf4j.core.impl.AbstractController 368- Remove a session: 192.168.126.128:11211
22:33:12.330 [main] INFO  com.google.code.yanf4j.core.impl.AbstractController 486- Controller has been stopped.

从控制台输出来看,与Standby客户端的测试类控制台输出基本没有区别,唯一不同的是,创建了三个会话,三个会话的Memcached实例为:
192.168.126.128:11211
192.168.126.128:11212
192.168.126.128:11213

关闭客户端时,同时关闭了三个会话:
分别连接3个Memcached实例,分别获取name,age,counter属性:
[memcached@donald ~]$ telnet 192.168.126.128 11211
Trying 192.168.126.128...
Connected to 192.168.126.128.
Escape character is '^]'.
get name
END
get age
END
get counter
VALUE counter 0 1
3
END


Last login: Wed Oct 11 22:22:25 2017 from 192.168.126.1
[memcached@donald ~]$ telnet 192.168.126.128 11212
Trying 192.168.126.128...
Connected to 192.168.126.128.
Escape character is '^]'.
get name
END
get age
END
get counter
END




[memcached@donald ~]$ telnet 192.168.126.128 11213
Trying 192.168.126.128...
Connected to 192.168.126.128.
Escape character is '^]'.
get name
END
get age
VALUE age 512 1

END
get counter
END


从上面可以看出,由于我们name属性已经删除,所有name为空,counter在11211上,
age在11213上。

创建测试实例2:

package bootstrap;

import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.rubyeye.xmemcached.Counter;
import net.rubyeye.xmemcached.exception.MemcachedException;

/**
 * Memcached 分布式客户端测试类
 * @author 
 * donald 
 * 2017年10月10日 
 * 下午12:49:45
 */
public class MemcachedDistributeClientTest2 {
	private static final Logger log = LoggerFactory.getLogger(MemcachedDistributeClientTest2.class);
	public static void main(String[] args) {
		MemcachedDistributeClient memcachedClient = MemcachedDistributeClient.getInstance();
		memcachedClient.flushAll();
		memcachedClient.set("name", 0, "donald");
		String value = (String) memcachedClient.get("name");
		log.info("set name={}", value);
		memcachedClient.set("age", 0, "27");
		log.info("age={}", memcachedClient.get("age"));
		memcachedClient.incr("age", 1);
		log.info("incr age={}", memcachedClient.get("age"));
		Counter counter = memcachedClient.getCounter("counter", 0);
		try {
			log.info("incrementAndGet counter,{}", counter.incrementAndGet());
			log.info("decrementAndGet counter,{}", counter.decrementAndGet());
			log.info("addAndGet counter,{}", counter.addAndGet(3));
		} catch (MemcachedException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		}
		memcachedClient.shutdown();
	}
}


运行测试类控制台输出:
22:53:04.215 [main] INFO  net.rubyeye.xmemcached.XMemcachedClient 729- XMemcachedClient is using Text protocol
22:53:04.257 [main] INFO  com.google.code.yanf4j.nio.impl.SelectorManager 37- Creating 4 reactors...
22:53:04.275 [main] INFO  com.google.code.yanf4j.core.impl.AbstractController 377- The Controller started at localhost/127.0.0.1:0 ...
22:53:13.334 [Xmemcached-Reactor-0] INFO  com.google.code.yanf4j.core.impl.AbstractController 253- Add a session: 192.168.126.128:11211
22:53:22.375 [Xmemcached-Reactor-0] INFO  com.google.code.yanf4j.core.impl.AbstractController 253- Add a session: 192.168.126.128:11212
22:53:31.397 [Xmemcached-Reactor-0] INFO  com.google.code.yanf4j.core.impl.AbstractController 253- Add a session: 192.168.126.128:11213
22:53:31.544 [main] INFO  bootstrap.MemcachedDistributeClientTest2 25- set name=donald
22:53:31.553 [main] INFO  bootstrap.MemcachedDistributeClientTest2 27- age=27
22:53:31.562 [main] INFO  bootstrap.MemcachedDistributeClientTest2 29- incr age=28
22:53:31.570 [main] INFO  bootstrap.MemcachedDistributeClientTest2 32- incrementAndGet counter,1
22:53:31.573 [main] INFO  bootstrap.MemcachedDistributeClientTest2 33- decrementAndGet counter,0
22:53:31.578 [main] INFO  bootstrap.MemcachedDistributeClientTest2 34- addAndGet counter,3
22:53:31.625 [Xmemcached-Reactor-1] INFO  com.google.code.yanf4j.core.impl.AbstractController 368- Remove a session: 192.168.126.128:11213
22:53:31.629 [Xmemcached-Reactor-2] INFO  com.google.code.yanf4j.core.impl.AbstractController 368- Remove a session: 192.168.126.128:11211
22:53:31.631 [Xmemcached-Reactor-3] INFO  com.google.code.yanf4j.core.impl.AbstractController 368- Remove a session: 192.168.126.128:11212
22:53:31.632 [main] INFO  com.google.code.yanf4j.core.impl.AbstractController 486- Controller has been stopped.


分别连接3个Memcached实例,分别获取name,age,counter属性:


[memcached@donald ~]$ telnet 192.168.126.128 11211
Trying 192.168.126.128...
Connected to 192.168.126.128.
Escape character is '^]'.
get name
END
get age
END
get counter
VALUE counter 0 1
3
END


[memcached@donald ~]$ telnet 192.168.126.128 11212
Trying 192.168.126.128...
Connected to 192.168.126.128.
Escape character is '^]'.
get name
END
get age
END
get counter
END
quit

[memcached@donald ~]$ telnet 192.168.126.128 11212
Trying 192.168.126.128...
Connected to 192.168.126.128.
Escape character is '^]'.
get name
END
get age
END
get counter
END

[memcached@donald ~]$ telnet 192.168.126.128 11213
Trying 192.168.126.128...
Connected to 192.168.126.128.
Escape character is '^]'.
get name
VALUE name 0 6
donald
END
get age
VALUE age 0 2
28
END
get counter
END

从上面可以看出,name,age,counter分布在11211,11212,11213上。
0
0
分享到:
评论
1 楼 flyfeifei66 2018-07-09  
打算使用xmemcache作为memcache的客户端,由于xmemcache是基于NIO的异步多路复用IO技术,网上的资料都说只需要一个客户端连接即可(线程安全?)。做法基本就是实例化出来XMemcachedClientBuilder,然后利用这个builder创建一个memcachedClient就可以操作缓存了。
       在其他资料上看到并发量大时一个连接性能也是有问题的,容易timeout,其实也可以用多个client。且XMemcachedClientBuilder确实有个setConnectionPoolSize方法。但是我看了下XMemcachedClientBuilder确没有其他的设置参数的方法,一个真正意义上的池化技术,至少得有连接最大数,默认初始数量,最大空闲等配置,还得有资源的回收/释放方法,我找了下这些XMemcachedClientBuilder统统没有。并且memcachedClient也没有close 、disconnect类似的方法?难道XMemcachedClientBuilder只是个伪池子?设置PoolSize仅仅是设置它能生产client的最大数量,然后自己没有管理这些client的功能?
           我尝试读了下源码,看的非常晕,没找到头绪

另外就是服务器的参数顺序不同,一个度一个写是否会有问题?

相关推荐

Global site tag (gtag.js) - Google Analytics