`
Donald_Draper
  • 浏览: 945580 次
社区版块
存档分类
最新评论

MINA 编解码器实例

    博客分类:
  • Mina
阅读更多
MINA TCP简单通信实例:http://donald-draper.iteye.com/blog/2375297
网络中传输的数据为二进制byte,如何将Java的对象转换成二进制byte序列,这就需要协议编码器和解码器;上一篇文章中我们用的协议编解器工厂为TextLineCodecFactory,TextLineCodecFactory中有一个
文本行编码器TextLineEncoder(ProtocolEncoderAdapter),文本行解码器TextLineDecoder(ProtocolDecoderAdapter);
今天我们来看一个编解码器的实例:
首先我们定义一个短信发送的协议为:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 13688888888
R: 18866666666
L:19
Hello Sms Server...

第一行为短信状态,第二行为发送者,第三行为接受者,第四行为发送内容长度,最后所有内容为发送内容。
这个协议和HTTP协议有点像,首先是状态行,头部,内容。
定义消息:
package mina.tcp.message;


/**
 * 短信
 * @author donald 
 * 2017年5月19日 
 * 下午10:46:36
 */
public class SmsInfo {
	/** 发送者 */
	private String sender;
	/** 接受者 */
	private String receiver;
	/** 短信内容 */
	private String message;
	public String getSender() {
		return sender;
	}
	public void setSender(String sender) {
		this.sender = sender;
	}
	public String getReceiver() {
		return receiver;
	}
	public void setReceiver(String receiver) {
		this.receiver = receiver;
	}
	public String getMessage() {
		return message;
	}
	public void setMessage(String message) {
		this.message = message;
	}
}

编码器:
package mina.tcp.coder;

import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * 短信编码器
 * @author donald
 * 2017年5月19日
 * 下午10:55:48
 */
public class CumulativeSmsEncoder extends ProtocolEncoderAdapter {
	private static final Logger log = LoggerFactory.getLogger(CumulativeSmsEncoder.class);
	private final Charset charset;
	private final CharsetEncoder charsetEncoder;
	public CumulativeSmsEncoder(Charset charset) {
		this.charset = charset;
		charsetEncoder = charset.newEncoder();
	}
	@Override
	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		
		IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
		String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";//状态行
		String sender = sms.getSender();
		String receiver = sms.getReceiver();
		String smsContent = sms.getMessage();
		//组装发送内容,我们以\n来分隔
		buffer.putString(statusLine + '\n', charsetEncoder);//状态行
		buffer.putString("S: " + sender + '\n', charsetEncoder);//短信发送者
		buffer.putString("R: " + receiver + '\n', charsetEncoder);//短信接受者
		buffer.putString("L: " + (smsContent.getBytes(charset).length) + "\n", charsetEncoder);//内容长度
		buffer.putString(smsContent, charsetEncoder);//内容
		//切换读写模式
		buffer.flip();
		out.write(buffer);
		log.info("========短信编码器编码完毕....");
	}
}

编码器继承ProtocolEncoderAdapter;
解码器:
package mina.tcp.coder;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * 短信解码器
 * @author donald
 * 2017年5月19日
 * 下午11:01:50
 */
public class CumulativeSmsDecoder extends CumulativeProtocolDecoder {
	private static final Logger log = LoggerFactory.getLogger(CumulativeSmsDecoder.class);
	private final CharsetDecoder charsetDecoder;
	public CumulativeSmsDecoder(Charset charset) {
		charsetDecoder = charset.newDecoder();
	}
	@Override
	protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
		IoBuffer buffer = IoBuffer.allocate(1024).setAutoExpand(true);
		int matchCount = 0;//记录短信每一行的字节数
		String statusLine = "", sender = "", receiver = "", length = "", sms = "";
		int line = 0;//短信行计数器
		while (in.hasRemaining()) {
			byte b = in.get();
			buffer.put(b);
			// 10 为\n的ASCII编码,短信一行信息读完
			if (b == 10 && line < 4) {
				matchCount++;//一行读取完毕,字节数加1为\n
				if (line == 0) {//状态行
					buffer.flip();
					statusLine = buffer.getString(matchCount, charsetDecoder);
					//剔除最后一个换行符\n
					statusLine = statusLine.substring(0, statusLine.length() - 1);
					log.debug("========短信状态行:"+statusLine);
					matchCount = 0;//重置短信行字节序列计数器
					buffer.clear();//清除短信行字节序列计数器
				}
				if (line == 1) {//短信发送者
					buffer.flip();
					sender = buffer.getString(matchCount, charsetDecoder);
					sender = sender.substring(0, sender.length() - 1);
					log.debug("========短信发送者:"+sender);
					matchCount = 0;
					buffer.clear();
				}
				if (line == 2) {//短信接受者
					buffer.flip();
					receiver = buffer.getString(matchCount, charsetDecoder);
					receiver = receiver.substring(0, receiver.length() - 1);
					log.debug("========短信接受者:"+receiver);
					matchCount = 0;
					buffer.clear();
				}
				if (line == 3) {//短信内容长度
					buffer.flip();
					length = buffer.getString(matchCount, charsetDecoder);
					length = length.substring(0, length.length() - 1);
					log.debug("========短信内容长度:"+length.split(": ")[1]);
					matchCount = 0;
					buffer.clear();
				}
				line++;//短信一行读取完毕
			} else if (line == 4) {//短信内容
				matchCount++;
				//读取短信内容,读到与短息内容长度length相同的字节数时,解析内容
				if (matchCount == Long.parseLong(length.split(": ")[1])) {
					buffer.flip();
					sms = buffer.getString(matchCount, charsetDecoder);
					log.debug("========短信内容:"+sms);
					buffer.clear();
					matchCount = 0;
					line++;
					break;
				}
			}else{
				matchCount++;//一行没读完,记录读取的字节数
			}
		}
		//组装短信
		SmsInfo smsInfo = new SmsInfo();
		smsInfo.setSender(sender.split(": ")[1]);
		smsInfo.setReceiver(receiver.split(": ")[1]);
		smsInfo.setMessage(sms);
		out.write(smsInfo);
		log.info("========短信解码器解码完毕....");
		/*不再调用解码器解码方法解码数据,如果这次数据未读完,及缓冲区还有数据,则
		保存在会话中,以便与下一次的数据合并处理,如果数据读取完,则清空缓冲区。*/
		return false;
	}
}

解码器继承的为CumulativeProtocolDecoder,即可累计的协议解码器,这个我们先用,后面在具体的讲。
协议编解码器工厂:
package mina.tcp.coder;

import java.nio.charset.Charset;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

/**
 * 短信编码解码工厂
 * @author donald 
 * 2017年5月19日 
 * 下午10:53:37
 */
public class CumulativeSmsCodecFactory implements ProtocolCodecFactory {
	private final CumulativeSmsEncoder encoder;
	private final CumulativeSmsDecoder decoder;

	public CumulativeSmsCodecFactory() {
		this(Charset.defaultCharset());
	}

	public CumulativeSmsCodecFactory(Charset charSet) {
		this.encoder = new CumulativeSmsEncoder(charSet);
		this.decoder = new CumulativeSmsDecoder(charSet);
	}
	@Override
	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		return decoder;
	}
	@Override
	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
		return encoder;
	}
}

sever:
package mina.tcp.main;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.coder.CumulativeSmsCodecFactory;
import mina.tcp.handler.SmsServerHandler;
/**
 * SmsServer
 * @author donald
 * 2017年5月19日
 * 下午10:16:29
 */
public class SmsServer {
	private static final Logger log = LoggerFactory.getLogger(SmsServer.class);
	private static final  String ip = "192.168.31.153";
	private static final  int port = 9122;
	private static final  int readBufferSize = 2048;
	private static final  int idleTime = 10;
	private static final Charset charset = Charset.forName("UTF-8");
	public static void main(String[] args) throws IOException {
		 IoAcceptor acceptor=new NioSocketAcceptor();
		//配置socket会话
		 SocketSessionConfig socketSessionConfig = (SocketSessionConfig) acceptor.getSessionConfig();
		 socketSessionConfig.setReadBufferSize(readBufferSize);
		 socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,idleTime);
		 //配置过滤器
		 DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = acceptor.getFilterChain();
		 LoggingFilter loggingFilter = new LoggingFilter();
		 defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
		 CumulativeSmsCodecFactory cmccSipcCodecFactory = new CumulativeSmsCodecFactory(charset);
		 ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(cmccSipcCodecFactory);
		 defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);
		 //配置NioSocketAcceptor处理器
		 SmsServerHandler smsServerHandler = new SmsServerHandler();
		 acceptor.setHandler(smsServerHandler);
		 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
		 acceptor.bind(inetSocketAddress);
		 log.info("=========SmsServer is start============");
	}
}

注意:协议编解码器要包装成协议编解码器工厂ProtocolCodecFactory,然后以ProtocolCodecFilter形式添加到过滤链中。
server handler:
package mina.tcp.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * SmsServerHandler
 * @author donald
 * 2017年5月19日
 * 下午10:45:26
 */
public class SmsServerHandler extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(SmsServerHandler.class);

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		log.info("===message received from "+sms.getSender()+" is:" + sms.getMessage());
		SmsInfo ackSms = new SmsInfo();
		ackSms.setSender(sms.getReceiver());
		ackSms.setReceiver(sms.getSender());
		ackSms.setMessage("收到...");
		session.write(ackSms);
		log.info("===回复短信已发送...");
	}
	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		log.error("===会话异常:"+cause.getMessage());
		cause.printStackTrace();
		session.closeNow();
	}
}

这里我们只是实现需要关注的方法

client:
package mina.tcp.main;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.coder.CumulativeSmsCodecFactory;
import mina.tcp.handler.SmsClientHandler;
/**
 * SmsClient
 * @author donald
 * 2017年5月19日
 * 下午10:27:30
 */
public class SmsClient {
	private static final Logger log = LoggerFactory.getLogger(SmsClient.class);
	private static final  String ip = "192.168.31.153";
	private static final  int port = 9122;
	private static final  int connectTimeoutMillis = 30000;
	private static final Charset charset = Charset.forName("UTF-8");
	public static void main(String[] args) {
		IoConnector connector=new NioSocketConnector();
		 connector.setConnectTimeoutMillis(connectTimeoutMillis);
		//配置过滤器
		 DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = connector.getFilterChain();
		 LoggingFilter loggingFilter = new LoggingFilter();
		 defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
		 CumulativeSmsCodecFactory cmccSipcCodecFactory = new CumulativeSmsCodecFactory(charset);
		 ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(cmccSipcCodecFactory);
		 defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);
		//配置NioSocketConnector处理器
		 SmsClientHandler smsClientHandler = new SmsClientHandler();
		 connector.setHandler(smsClientHandler);
		 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
		 connector.connect(inetSocketAddress);
		 log.info("=========SmsClient is start============");
	}
}


client handler:


package mina.tcp.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * SmsClientHandler
 * @author donald
 * 2017年5月19日
 * 下午10:30:24
 */
public class SmsClientHandler extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(SmsClientHandler.class);
	@Override
	public void sessionOpened(IoSession session) {
		SmsInfo sms = new SmsInfo();
		sms.setSender("13688888888");
		sms.setReceiver("18866666666");
		sms.setMessage("Hello Sms Server...");
		session.write(sms);
		log.info("===短信已发送...");
	}
	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		log.info("===message received from "+sms.getSender()+" is:" + sms.getMessage());
	}
	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		log.error("===会话异常:"+cause.getMessage());
		cause.printStackTrace();
		session.closeNow();
	}
}

启动server,client控制台输出:
server:
[INFO ] 2017-05-21 12:11:40 mina.tcp.main.SmsServer =========SmsServer is start============
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter CREATED
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter OPENED
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=89 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:11:46 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信状态行:M sip:wap.fetion.com.cn SIP-C/2.0
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信发送者:S: 13688888888
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信接受者:R: 18866666666
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信内容长度:19
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信内容:Hello Sms Server...
[INFO ] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信解码器解码完毕....
[INFO ] 2017-05-21 12:11:46 mina.tcp.handler.SmsServerHandler ===message received from 13688888888 is:Hello Sms Server...
[INFO ] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsEncoder ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:11:46 mina.tcp.handler.SmsServerHandler ===回复短信已发送...
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@68fae256
[INFO ] 2017-05-21 12:11:56 org.apache.mina.filter.logging.LoggingFilter IDLE


client:

[INFO ] 2017-05-21 12:11:46 mina.tcp.main.SmsClient =========SmsClient is start============
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter CREATED
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter OPENED
[INFO ] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsEncoder ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:11:46 mina.tcp.handler.SmsClientHandler ===短信已发送...
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@107dfdb8
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=78 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:11:46 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信状态行:M sip:wap.fetion.com.cn SIP-C/2.0
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信发送者:S: 18866666666
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信接受者:R: 13688888888
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信内容长度:9
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信内容:收到...
[INFO ] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信解码器解码完毕....
[INFO ] 2017-05-21 12:11:46 mina.tcp.handler.SmsClientHandler ===message received from 18866666666 is:收到...

在上面的解码器中我们用到的两个状态变量line和matchCount,用于记录解码短信的状态,这两个状态是在解码器中保存,当客户端将一条短信分多次发送,比如先发短信状态行和发送者,再发接收者,短信内容长度和内容,解码器的状态变量line和matchCount将
不能完全起作用,为了应对这种情况我们可以将状态变量line和matchCount保存到会话中,下次再接收数据时,恢复会话中的解码器状态,继续解码。
现在对解码器和进行改造:
package mina.tcp.coder;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * 短信解码器
 * @author donald
 * 2017年5月20日
 * 下午10:55:33
 */
public class CumulativeSmsDecoder2 extends CumulativeProtocolDecoder {
	private static final Logger log = LoggerFactory.getLogger(CumulativeSmsDecoder2.class);
	private final CharsetDecoder charsetDecoder;
	private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");

	public CumulativeSmsDecoder2(Charset charset) {
		charsetDecoder = charset.newDecoder();
	}

	@Override
	protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
		//从会话获取解码器状态上下文
		Context ctx = getContext(session);
		//从上下文获取已经解码的短信相关信息
		int matchCount = ctx.getMatchCount();
		int line = ctx.getLine();
		IoBuffer buffer = ctx.innerBuffer;
		String statusLine = ctx.getStatusLine(), sender = ctx.getSender(), 
				receiver = ctx.getReceiver(),length = ctx.getLength(), 
				sms = ctx.getSms();
		while (in.hasRemaining()) {
			byte b = in.get();
			buffer.put(b);
			matchCount++;
			if (line < 4 && b == 10) {
				if (line == 0) {//状态行
					buffer.flip();
					statusLine = buffer.getString(matchCount, charsetDecoder);
					statusLine = statusLine.substring(0, statusLine.length() - 1);
					matchCount = 0;
					buffer.clear();
					ctx.setStatusLine(statusLine);
				}
				if (line == 1) {//短信发送者
					buffer.flip();
					sender = buffer.getString(matchCount, charsetDecoder);
					sender = sender.substring(0, sender.length() - 1);
					matchCount = 0;
					buffer.clear();
					ctx.setSender(sender);
				}
				if (line == 2) {//短信接受者
					buffer.flip();
					receiver = buffer.getString(matchCount, charsetDecoder);
					receiver = receiver.substring(0, receiver.length() - 1);
					matchCount = 0;
					buffer.clear();
					ctx.setReceiver(receiver);
				}
				if (line == 3) {//短信内容长度
					buffer.flip();
					length = buffer.getString(matchCount, charsetDecoder);
					length = length.substring(0, length.length() - 1);
					matchCount = 0;
					buffer.clear();
					ctx.setLength(length);
				}
				line++;
			} else if (line == 4) {//短信内容,读到与短息内容长度length相同的字节数时,解析内容
				if (matchCount == Long.parseLong(length.split(": ")[1])) {
					buffer.flip();
					sms = buffer.getString(matchCount, charsetDecoder);
					ctx.setSms(sms);
					ctx.setMatchCount(matchCount);
					ctx.setLine(line);
					break;
				}
			}
			ctx.setMatchCount(matchCount);
			ctx.setLine(line);
		}
		//一条短信解码完毕,组装短信,重置解码器上下文
		if (ctx.getLine() == 4 && Long.parseLong(ctx.getLength().split(": ")[1]) == ctx.getMatchCount()) {
			SmsInfo smsObject = new SmsInfo();
			smsObject.setSender(sender.split(": ")[1]);
			smsObject.setReceiver(receiver.split(": ")[1]);
			smsObject.setMessage(sms);
			out.write(smsObject);
			log.info("========短信解码器解码一条短信完毕....");
			ctx.reset();
			/*检查是否读取数据,没有则非法状态异常;
			已经消费了数据,如果缓冲区有数据,则继续读取缓冲区数据并解码*/
			return true;
		} else {
			/*不再调用解码器解码方法解码数据,如果这次数据未读完,及缓冲区还有数据,则
			保存在会话中,以便与下一次的数据合并处理,如果数据读取完,则清空缓冲区。*/
			return false;
		}
	}
    /**
     * 从会话获取短信解码器上下文
     * @param session
     * @return
     */
	private Context getContext(IoSession session) {
		Context context = (Context) session.getAttribute(CONTEXT);
		if (context == null) {
			context = new Context();
			session.setAttribute(CONTEXT, context);
		}
		return context;
	}
    /**
     * 记录解码器的上下文
     */
	private class Context {
		private final IoBuffer innerBuffer;
		private String statusLine = "";
		private String sender = "";
		private String receiver = "";
		private String length = "";
		private String sms = "";
		private int matchCount = 0;
		private int line = 0;

		public Context() {
			innerBuffer = IoBuffer.allocate(1024).setAutoExpand(true);
		}
		public int getMatchCount() {
			return matchCount;
		}
		public void setMatchCount(int matchCount) {
			this.matchCount = matchCount;
		}

		public int getLine() {
			return line;
		}
		public void setLine(int line) {
			this.line = line;
		}
		public String getStatusLine() {
			return statusLine;
		}
		public void setStatusLine(String statusLine) {
			this.statusLine = statusLine;
		}
		public String getSender() {
			return sender;
		}
		public void setSender(String sender) {
			this.sender = sender;
		}
		public String getReceiver() {
			return receiver;
		}
		public void setReceiver(String receiver) {
			this.receiver = receiver;
		}
		public String getLength() {
			return length;
		}
		public void setLength(String length) {
			this.length = length;
		}
		public String getSms() {
			return sms;
		}
		public void setSms(String sms) {
			this.sms = sms;
		}
        /**
         * 重置解码器上下文状态
         */
		public void reset() {
			this.innerBuffer.clear();
			this.matchCount = 0;
			this.line = 0;
			this.statusLine = "";
			this.sender = "";
			this.receiver = "";
			this.length = "";
			this.sms = "";
		}
	}

}

为了模拟将一条短信分两次发送的情况,我们来改造编码器:
package mina.tcp.coder;

import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * 短信编码器
 * @author donald
 * 2017年5月19日
 * 下午10:55:48
 */
public class CumulativeSmsEncoder2 extends ProtocolEncoderAdapter {
	private static final Logger log = LoggerFactory.getLogger(CumulativeSmsEncoder2.class);
	private final Charset charset;
	private final CharsetEncoder charsetEncoder;
	public CumulativeSmsEncoder2(Charset charset) {
		this.charset = charset;
		charsetEncoder = charset.newEncoder();
	}
	@Override
	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		
		IoBuffer buffer = IoBuffer.allocate(1024).setAutoExpand(true);
		String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";//状态行
		String sender = sms.getSender();
		String receiver = sms.getReceiver();
		String smsContent = sms.getMessage();
		//组装发送内容,我们以\n来分隔
		buffer.putString(statusLine + '\n', charsetEncoder);//状态行
		buffer.putString("S: " + sender + '\n', charsetEncoder);//短信发送者
		//在这里先发送短信状态行和短信发送者行,
		//再发送短息其他内容,已测试解码器上下文是否有效
		buffer.flip();
		out.write(buffer);
		IoBuffer bufferx = IoBuffer.allocate(1024).setAutoExpand(true);
		bufferx.putString("R: " + receiver + '\n', charsetEncoder);//短信接受者
		bufferx.putString("L: " + (smsContent.getBytes(charset).length) + "\n", charsetEncoder);//内容长度
		bufferx.putString(smsContent, charsetEncoder);//内容
		//切换读写模式
		bufferx.flip();
		out.write(bufferx);
		log.info("========短信编码器编码完毕....");
	}
}

修改协议编解码器工厂:
package mina.tcp.coder;

import java.nio.charset.Charset;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

/**
 * 短信编码解码工厂
 * @author donald 
 * 2017年5月19日 
 * 下午10:53:37
 */
public class CumulativeSmsCodecFactory2 implements ProtocolCodecFactory {
	private final CumulativeSmsEncoder2 encoder;
	private final CumulativeSmsDecoder2 decoder;

	public CumulativeSmsCodecFactory2() {
		this(Charset.defaultCharset());
	}

	public CumulativeSmsCodecFactory2(Charset charSet) {
		this.encoder = new CumulativeSmsEncoder2(charSet);
		this.decoder = new CumulativeSmsDecoder2(charSet);
	}
	@Override
	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		return decoder;
	}
	@Override
	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
		return encoder;
	}
}

修改server:

package mina.tcp.main;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.coder.CumulativeSmsCodecFactory2;
import mina.tcp.handler.SmsServerHandler2;
/**
 * SmsServer
 * @author donald
 * 2017年5月19日
 * 下午10:16:29
 */
public class SmsServer2 {
	private static final Logger log = LoggerFactory.getLogger(SmsServer2.class);
	private static final  String ip = "192.168.31.153";
	private static final  int port = 9122;
	private static final  int readBufferSize = 2048;
	private static final  int idleTime = 10;
	private static final Charset charset = Charset.forName("UTF-8");
	public static void main(String[] args) throws IOException {
		 IoAcceptor acceptor=new NioSocketAcceptor();
		//配置socket会话
		 SocketSessionConfig socketSessionConfig = (SocketSessionConfig) acceptor.getSessionConfig();
		 socketSessionConfig.setReadBufferSize(readBufferSize);
		 socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,idleTime);
		 //配置过滤器
		 DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = acceptor.getFilterChain();
		 LoggingFilter loggingFilter = new LoggingFilter();
		 defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
		 CumulativeSmsCodecFactory2 cmccSipcCodecFactory2 = new CumulativeSmsCodecFactory2(charset);
		 ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(cmccSipcCodecFactory2);
		 defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);
		 //配置NioSocketAcceptor处理器
		 SmsServerHandler2 smsServerHandler2 = new SmsServerHandler2();
		 acceptor.setHandler(smsServerHandler2);
		 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
		 acceptor.bind(inetSocketAddress);
		 log.info("=========SmsServer2 is start============");
	}
}

server handler:
package mina.tcp.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * SmsServerHandler
 * @author donald
 * 2017年5月19日
 * 下午10:45:26
 */
public class SmsServerHandler2 extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(SmsServerHandler2.class);

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		log.info("===message received from "+sms.getSender()+" is:" + sms.getMessage());
		SmsInfo ackSms = new SmsInfo();
		ackSms.setSender(sms.getReceiver());
		ackSms.setReceiver(sms.getSender());
		ackSms.setMessage("收到...");
		session.write(ackSms);
		log.info("===回复短信已发送...");
	}
	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		log.error("===会话异常:"+cause.getMessage());
		cause.printStackTrace();
		session.closeNow();
	}

}


修改client:

package mina.tcp.main;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.coder.CumulativeSmsCodecFactory2;
import mina.tcp.handler.SmsClientHandler2;
/**
 * SmsClient
 * @author donald
 * 2017年5月19日
 * 下午10:27:30
 */
public class SmsClient2 {
	private static final Logger log = LoggerFactory.getLogger(SmsClient2.class);
	private static final  String ip = "192.168.31.153";
	private static final  int port = 9122;
	private static final  int connectTimeoutMillis = 30000;
	private static final Charset charset = Charset.forName("UTF-8");
	public static void main(String[] args) {
		IoConnector connector=new NioSocketConnector();
		 connector.setConnectTimeoutMillis(connectTimeoutMillis);
		//配置过滤器
		 DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = connector.getFilterChain();
		 LoggingFilter loggingFilter = new LoggingFilter();
		 defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
		 CumulativeSmsCodecFactory2 cmccSipcCodecFactory2 = new CumulativeSmsCodecFactory2(charset);
		 ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(cmccSipcCodecFactory2);
		 defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);
		//配置NioSocketConnector处理器
		 SmsClientHandler2 smsClientHandler2 = new SmsClientHandler2();
		 connector.setHandler(smsClientHandler2);
		 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
		 connector.connect(inetSocketAddress);
		 log.info("=========SmsClient2 is start============");
	}
}

client handler:
package mina.tcp.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * SmsClientHandler
 * @author donald
 * 2017年5月19日
 * 下午10:30:24
 */
public class SmsClientHandler2 extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(SmsClientHandler2.class);
	@Override
	public void sessionOpened(IoSession session) {
		SmsInfo sms = new SmsInfo();
		sms.setSender("13688888888");
		sms.setReceiver("18866666666");
		sms.setMessage("first...");
		session.write(sms);
		sms.setMessage("second...");
		session.write(sms);
		log.info("===短信已发送...");
	}
	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		log.info("===message received from "+sms.getSender()+" is:" + sms.getMessage());
	}
	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		log.error("===会话异常:"+cause.getMessage());
		cause.printStackTrace();
		session.closeNow();
	}
	
}


启动server,client控制台输出:
server:
[INFO ] 2017-05-21 12:27:09 mina.tcp.main.SmsServer2 =========SmsServer2 is start============
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter CREATED
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter OPENED
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=77 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:27:14 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsDecoder2 ========短信解码器解码一条短信完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsServerHandler2 ===message received from 13688888888 is:first...
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsEncoder2 ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsServerHandler2 ===回复短信已发送...
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@4ab24098
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=78 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:27:14 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsDecoder2 ========短信解码器解码一条短信完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsServerHandler2 ===message received from 13688888888 is:second...
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsEncoder2 ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsServerHandler2 ===回复短信已发送...
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@7caee177

client:
[INFO ] 2017-05-21 12:27:14 mina.tcp.main.SmsClient2 =========SmsClient2 is start============
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter CREATED
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter OPENED
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsEncoder2 ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsEncoder2 ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsClientHandler2 ===短信已发送...
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@41785b00
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@41785b00
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=78 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:27:14 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsDecoder2 ========短信解码器解码一条短信完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsClientHandler2 ===message received from 18866666666 is:收到...
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=78 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:27:14 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsDecoder2 ========短信解码器解码一条短信完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsClientHandler2 ===message received from 18866666666 is:收到...
  • lib.rar (1022.7 KB)
  • 下载次数: 2
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics