`
Donald_Draper
  • 浏览: 955146 次
社区版块
存档分类
最新评论

NIO-TCP通信实例(单线程,多线程Server)

    博客分类:
  • NIO
阅读更多
Java Socket通信实例:http://donald-draper.iteye.com/blog/2356695
Java NIO ByteBuffer详解:http://donald-draper.iteye.com/blog/2357084
Java Nio系列教程;http://www.iteye.com/magazines/132-Java-NIO
NIO-TCP简单实例:http://donald-draper.iteye.com/admin/blogs/2369044
在这篇文章之前用BIO实现过TCP的通信,即Java Socket通信实例这篇文章,那边文章
主要利用BIO的ServerSocket和Socket实现加法和乘法的实现,今天我们来用NIO的
ServerSocketChannel和SocketChannel来实现加法和乘法;协议基本一致,做了一点修改
如下:




下面我们来具体的实现:
协议常量类:
package nio.socketchannel;

/**
 * 协议常量
 * @author donald
 * 2017年4月13日
 * 下午10:49:27
 */
public class ProtocolConstants {
	/**
	 * 加法协议编码
	 */
	public static final String SUM_PROTOCOL_300000 = "300000";
	/**
	 * 乘法协议编码
	 */
	public static final String MULTI_PROTOCOL_300100 = "300100";
	/**
	 * 计算结果
	 */
	public static final String ACK_PROTOCOL_300200 = "300200";
	/**
	 * 服务器解析协议失败
	 */
	public static final String ACK_PROTOCOL_300300 = "300300";
	/**
	 * 协议编码长度
	 */
	public static final int PROTOCOL_CODE_LENGTH = 6;
	/**
	 * 协议操作数长度
	 */
	public static final int OPERATE_NUM_LENGTH = 4;
	/**
	 * 字符集
	 */
	public static final String CHARSET_UTF8 = "UTF-8";
}

服务端:
package nio.socketchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import socket.ProtocolConstants;


public class NIOServerCalculate {
	private static final String HOST = "192.168.32.126";
	private static final int PORT = 10000;
	//manager the channel
	private Selector selector;
	/**
	 * stat Server
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException{
		NIOServerCalculate server = new NIOServerCalculate();
		server.initServer(HOST,PORT);
		server.listen();
	}
	/**
	 * get the ServerSocket and finish some initial work
	 * @param port
	 * @throws IOException
	 */
	public void initServer(String host, int port) throws IOException{
		//get the ServerSocket
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		// set no blocking mode
		serverChannel.configureBlocking(false);
		//bind the port
		serverChannel.socket().bind(new InetSocketAddress(host, port));
		//get the channel manager
		this.selector = Selector.open();
		//Register the channel to manager and bind the event
		serverChannel.register(selector,SelectionKey.OP_ACCEPT);
		}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings({ "rawtypes" })
	public void listen() throws IOException{
		System.out.println("=========The Server is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
				if(key.isAcceptable()){
					ServerSocketChannel server = (ServerSocketChannel)key.channel();
					SocketChannel channel = server.accept();
					channel.configureBlocking(false);
					System.out.println("=========channel is Connected:"+channel.isConnected());
					System.out.println("=========channel is Open:"+channel.isOpen());
					System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending());
//					channel.register(this.selector, SelectionKey.OP_READ);
					channel.register(this.selector, SelectionKey.OP_READ,"decodeProtol");
				}
				else if (key.isReadable()) read(key);
			}
			
		}
	}
	/**
	 * deal with the data come from the client
	 * @param key
	 * @throws IOException 
	 */
	public void read(SelectionKey key) throws IOException{
		SocketChannel channel = (SocketChannel) key.channel();
		String  attachedInfo = (String) key.attachment();
		System.out.println("========socketChannel attachedInfo:"+attachedInfo);
		ByteBuffer[] proctols = null;//协议
		ByteBuffer proctolCodeBuffer = null;//协议编码
		proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH);
		ByteBuffer dataBuffer = null;//协议内容:操作数
		dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH);
		proctols = new ByteBuffer[]{proctolCodeBuffer,dataBuffer};
		System.out.println("========read caculate proctol from Client=======");
//		channel.read(proctols);
		while(proctolCodeBuffer.position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && dataBuffer.position() != 2*ProtocolConstants.OPERATE_NUM_LENGTH){
			channel.read(proctols);//待读取完成协议才解析
		}
//		channel.shutdownInput();
		proctolCodeBuffer.flip();
		dataBuffer.flip();
		byte[] proctolCodeBytes = proctolCodeBuffer.array();
		String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim();
		int firstNum = 0;
		int secondNum = 0;
		int result = 0;
		if(proctolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000)){
			System.out.println("========the protocol is sum algorithm=======");
			firstNum = dataBuffer.getInt();
			secondNum = dataBuffer.getInt();
			System.out.println("operate num is:"+firstNum+","+secondNum);
			result = firstNum*secondNum;
			proctolCodeBuffer.clear();
			proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8));
			dataBuffer.clear();
			//针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面
//			dataBuffer.compact()
			dataBuffer.putInt(result);
			proctolCodeBuffer.flip();
			dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中
			channel.write(proctols);
		}
		else if(proctolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100)){
			System.out.println("========the protocol is multiply algorithm=======");
			firstNum = dataBuffer.getInt();
			secondNum = dataBuffer.getInt();
			System.out.println("operate num is:"+firstNum+","+secondNum);
			result = firstNum*secondNum;
			proctolCodeBuffer.clear();
			proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8));
			proctolCodeBuffer.flip();
			dataBuffer.clear();
			//针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面
//			dataBuffer.compact()
			dataBuffer.putInt(result);
			dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中
			channel.write(proctols);
		}
		else{
			System.out.println("========server decode procotol fail......");
			proctolCodeBuffer.clear();
			proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300300.getBytes(ProtocolConstants.CHARSET_UTF8));
			proctolCodeBuffer.flip();
			dataBuffer.clear();
			dataBuffer.putInt(0);
			dataBuffer.flip();
			channel.write(proctols);
		}
		/*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/
//		channel.shutdownOutput();
		/*关闭通道*/
//		channel.close();
		/*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/
	}
	
}


加法客户端:
package nio.socketchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import socket.ProtocolConstants;

/**
 * 加法计算
 * @author donald
 * 2017年4月10日
 * 下午9:32:57
 */
public class NIOClientSum {
	private static final String HOST = "192.168.32.126";
	private static final int PORT = 10000;
	//manager the channel
	private Selector selector;
	/**
	 * stat Client
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException{
		NIOClientSum client = new NIOClientSum();
		client.initClient(HOST,PORT);
		client.listen();
	}
	/**
	 * get the Socket and finish some initial work
	 * @param ip Server ip
	 * @param port connect Server port
	 * @throws IOException
	 */
	public void initClient(String ip,int port) throws IOException{
		//get the Socket
		SocketChannel channel = SocketChannel.open();
		// set no blocking mode
		channel.configureBlocking(false);
		//connect the Server
		channel.connect(new InetSocketAddress(ip,port));
		//get the channel manager
		this.selector = Selector.open();
		//Register the channel to manager and bind the event
		channel.register(selector,SelectionKey.OP_CONNECT);
		}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings("rawtypes")
	public void listen() throws IOException{
		System.out.println("===========The Sum Client is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
				if(key.isConnectable()){
					SocketChannel channel = (SocketChannel)key.channel();
                    //during connecting, finish the connect
                    if(channel.isConnectionPending()){
                    	channel.finishConnect();
                    }
					channel.configureBlocking(false);
					System.out.println("=========channel is Connected:"+channel.isConnected());
					System.out.println("=========channel is Open:"+channel.isOpen());
					System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending());
					ByteBuffer[] proctols = null;//协议
					proctols = new ByteBuffer[2];
					ByteBuffer proctolCodeBuffer = null;//协议编码
					proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH);
//					proctolCodeBuffer = ByteBuffer.wrap(new String("300000").getBytes("UTF-8"));
					System.out.println("ProtocolCode String length:"+ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8).length);
					proctolCodeBuffer.put(ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8));
					System.out.println("ProtocolCode length:"+proctolCodeBuffer.position());
					proctols[0] = proctolCodeBuffer;
					proctolCodeBuffer.flip();
					ByteBuffer dataBuffer = null;//协议内容:操作数
					dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH);
					dataBuffer.putInt(15);
					dataBuffer.putInt(6);
					System.out.println("data length:"+dataBuffer.position());
					proctols[1] = dataBuffer;
					dataBuffer.flip();
					channel.write(proctols);//将缓冲区的内容发送到通道,
//					channel.shutdownOutput();
					System.out.println("=======write proctols to channel");
//					channel.register(this.selector, SelectionKey.OP_READ);
					channel.register(this.selector, SelectionKey.OP_READ,"calculateResult");
				}
				else if (key.isReadable()) read(key);
			}
			
		}
	}
	/**
	 * deal with the data come from the server
	 * @param key
	 * @throws IOException 
	 */
	public void read(SelectionKey key) throws IOException{
		SocketChannel channel = (SocketChannel) key.channel();
		String  attachedInfo = (String) key.attachment();
		System.out.println("========socketChannel attachedInfo:"+attachedInfo);
		ByteBuffer[] proctols = null;
		proctols = new ByteBuffer[]{ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH),ByteBuffer.allocate(ProtocolConstants.OPERATE_NUM_LENGTH)};
		System.out.println("========read caculate result from Server=======");
//		channel.read(proctols);
		while(proctols[0].position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && proctols[1].position() != ProtocolConstants.OPERATE_NUM_LENGTH){
			channel.read(proctols);//待读取完成协议才解析
		}
		proctols[0].flip();
		proctols[1].flip();
		byte[] proctolCodeBytes = proctols[0].array();
		String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim();
		if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300200)){
			int result = proctols[1].getInt();
			System.out.println("========the calculated result from server:"+result);
		}else if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300300)){
			System.out.println("========server decode procotol fail......");
		}
		else {
			System.out.println("========unknow error ...");
		}
		/*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/
//		channel.shutdownOutput();
		/*关闭通道*/
//		channel.close();
		/*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/
	}
	
}



乘法客户端:
package nio.socketchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import socket.ProtocolConstants;

/**
 * 加法计算
 * @author donald
 * 2017年4月10日
 * 下午9:32:57
 */
public class NIOClientSum {
	private static final String HOST = "192.168.32.126";
	private static final int PORT = 10000;
	//manager the channel
	private Selector selector;
	/**
	 * stat Client
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException{
		NIOClientSum client = new NIOClientSum();
		client.initClient(HOST,PORT);
		client.listen();
	}
	/**
	 * get the Socket and finish some initial work
	 * @param ip Server ip
	 * @param port connect Server port
	 * @throws IOException
	 */
	public void initClient(String ip,int port) throws IOException{
		//get the Socket
		SocketChannel channel = SocketChannel.open();
		// set no blocking mode
		channel.configureBlocking(false);
		//connect the Server
		channel.connect(new InetSocketAddress(ip,port));
		//get the channel manager
		this.selector = Selector.open();
		//Register the channel to manager and bind the event
		channel.register(selector,SelectionKey.OP_CONNECT);
		}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings("rawtypes")
	public void listen() throws IOException{
		System.out.println("===========The Sum Client is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
				if(key.isConnectable()){
					SocketChannel channel = (SocketChannel)key.channel();
                    //during connecting, finish the connect
                    if(channel.isConnectionPending()){
                    	channel.finishConnect();
                    }
					channel.configureBlocking(false);
					System.out.println("=========channel is Connected:"+channel.isConnected());
					System.out.println("=========channel is Open:"+channel.isOpen());
					System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending());
					ByteBuffer[] proctols = null;//协议
					proctols = new ByteBuffer[2];
					ByteBuffer proctolCodeBuffer = null;//协议编码
					proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH);
//					proctolCodeBuffer = ByteBuffer.wrap(new String("300000").getBytes("UTF-8"));
					System.out.println("ProtocolCode String length:"+ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8).length);
					proctolCodeBuffer.put(ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8));
					System.out.println("ProtocolCode length:"+proctolCodeBuffer.position());
					proctols[0] = proctolCodeBuffer;
					proctolCodeBuffer.flip();
					ByteBuffer dataBuffer = null;//协议内容:操作数
					dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH);
					dataBuffer.putInt(15);
					dataBuffer.putInt(6);
					System.out.println("data length:"+dataBuffer.position());
					proctols[1] = dataBuffer;
					dataBuffer.flip();
					channel.write(proctols);//将缓冲区的内容发送到通道,
//					channel.shutdownOutput();
					System.out.println("=======write proctols to channel");
//					channel.register(this.selector, SelectionKey.OP_READ);
					channel.register(this.selector, SelectionKey.OP_READ,"calculateResult");
				}
				else if (key.isReadable()) read(key);
			}
			
		}
	}
	/**
	 * deal with the data come from the server
	 * @param key
	 * @throws IOException 
	 */
	public void read(SelectionKey key) throws IOException{
		SocketChannel channel = (SocketChannel) key.channel();
		String  attachedInfo = (String) key.attachment();
		System.out.println("========socketChannel attachedInfo:"+attachedInfo);
		ByteBuffer[] proctols = null;
		proctols = new ByteBuffer[]{ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH),ByteBuffer.allocate(ProtocolConstants.OPERATE_NUM_LENGTH)};
		System.out.println("========read caculate result from Server=======");
//		channel.read(proctols);
		while(proctols[0].position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && proctols[1].position() != ProtocolConstants.OPERATE_NUM_LENGTH){
			channel.read(proctols);//待读取完成协议才解析
		}
		proctols[0].flip();
		proctols[1].flip();
		byte[] proctolCodeBytes = proctols[0].array();
		String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim();
		if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300200)){
			int result = proctols[1].getInt();
			System.out.println("========the calculated result from server:"+result);
		}else if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300300)){
			System.out.println("========server decode procotol fail......");
		}
		else {
			System.out.println("========unknow error ...");
		}
		/*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/
//		channel.shutdownOutput();
		/*关闭通道*/
//		channel.close();
		/*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/
	}
	
}

先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is multiply algorithm=======
operate num is:17,8

加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90

乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136
在上面的测试中,channel.shutdownOutput()关闭Connection,即关闭到通道的连接,
和channel.close()关闭通道时,SocketChannel通道会有一个OP_READ事件,至于为什么,
暂时不知道,以后我们会在后面的文章中,在研究一下。
另外在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。
put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为
清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,这个概念,在ByteBuffer详解文章中已经讲过了,不记得可以再看看。

上面的Server端,以单线程处理Client端的计算请求,下面我们把它改写成多线程的形式,
Server端只处理连接请求,计算的处理单独交给一个线程来处理:
多线程Server如下:
package nio.handler;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import socket.ProtocolConstants;

/**
 * Server
 * @author donald
 * 2017年4月13日
 * 下午11:14:28
 */
public class NIOServerCalculateX {
	private static final String HOST = "192.168.32.126";
	private static final int PORT = 10000;
	private static ExecutorService exec= null;
	static {
		exec = Executors.newFixedThreadPool(2);
	}
	
	//manager the channel
	private Selector selector;
	/**
	 * stat Server
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException{
		NIOServerCalculateX server = new NIOServerCalculateX();
		server.initServer(HOST,PORT);
		server.listen();
	}
	/**
	 * get the ServerSocket and finish some initial work
	 * @param port
	 * @throws IOException
	 */
	public void initServer(String host, int port) throws IOException{
		//get the ServerSocket
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		// set no blocking mode
		serverChannel.configureBlocking(false);
		//bind the port
		serverChannel.socket().bind(new InetSocketAddress(host, port));
		//get the channel manager
		this.selector = Selector.open();
		//Register the channel to manager and bind the event
		serverChannel.register(selector,SelectionKey.OP_ACCEPT);
		}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings({ "rawtypes" })
	public void listen() throws IOException{
		System.out.println("=========The Server is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
				if(key.isAcceptable()){
					ServerSocketChannel server = (ServerSocketChannel)key.channel();
					SocketChannel channel = server.accept();
					channel.configureBlocking(false);
					System.out.println("=========channel is Connected:"+channel.isConnected());
					System.out.println("=========channel is Open:"+channel.isOpen());
					System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending());
//					channel.register(this.selector, SelectionKey.OP_READ);
					HanlderNioSocketChannel hanlderNioSocketChannel= new HanlderNioSocketChannel();
					channel.register(hanlderNioSocketChannel.getSelector(), SelectionKey.OP_READ,"decodeProtol");
					exec.submit(hanlderNioSocketChannel);
				}
			}
			
		}
	}	
}


计算处理线程:
package nio.handler;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import socket.ProtocolConstants;
/**
 * 处理SocketChannel读事件
 * @author donald
 * 2017年4月11日
 * 下午10:32:55
 */
public class HanlderNioSocketChannel implements Runnable{
	private Selector selector;
	private String threadName;
	public HanlderNioSocketChannel() {
		super();
		try {
			this.selector = Selector.open();
		} catch (IOException e) {
			e.printStackTrace();
		}
		threadName = Thread.currentThread().getName();
	}
	public Selector getSelector() {
		return selector;
	}
	public void setSelector(Selector selector) {
		this.selector = selector;
	}

	@Override
	public void run() {
		try {
			listen();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings({ "rawtypes" })
	public void listen() throws IOException{
		System.out.println(threadName+"=========The Server Calculate is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
		        if (key.isReadable()) {
					read(key);
				}
			}
			
		}
	}	
	private void read(SelectionKey key){
		try {
			SocketChannel channel = (SocketChannel) key.channel();
			String  attachedInfo = (String) key.attachment();
			System.out.println(threadName+"========socketChannel attachedInfo:"+attachedInfo);
			ByteBuffer[] proctols = null;//协议
			ByteBuffer proctolCodeBuffer = null;//协议编码
			proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH);
			ByteBuffer dataBuffer = null;//协议内容:操作数
			dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH);
			proctols = new ByteBuffer[]{proctolCodeBuffer,dataBuffer};
			System.out.println(threadName+"========read caculate proctol from Client=======");
	//		channel.read(proctols);
			while(proctolCodeBuffer.position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && dataBuffer.position() != 2*ProtocolConstants.OPERATE_NUM_LENGTH){
				channel.read(proctols);//待读取完成协议才解析
			}
	//		channel.shutdownInput();
			proctolCodeBuffer.flip();
			dataBuffer.flip();
			byte[] proctolCodeBytes = proctolCodeBuffer.array();
			String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim();
			int firstNum = 0;
			int secondNum = 0;
			int result = 0;
			if(proctolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000)){
				System.out.println(threadName+"========the protocol is sum algorithm=======");
				firstNum = dataBuffer.getInt();
				secondNum = dataBuffer.getInt();
				System.out.println("operate num is:"+firstNum+","+secondNum);
				result = firstNum*secondNum;
				proctolCodeBuffer.clear();
				proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8));
				dataBuffer.clear();
				//针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面
	//			dataBuffer.compact()
				dataBuffer.putInt(result);
				proctolCodeBuffer.flip();
				dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中
				channel.write(proctols);
				
			}
			else if(proctolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100)){
				System.out.println(threadName+"========the protocol is multiply algorithm=======");
				firstNum = dataBuffer.getInt();
				secondNum = dataBuffer.getInt();
				System.out.println("operate num is:"+firstNum+","+secondNum);
				result = firstNum*secondNum;
				proctolCodeBuffer.clear();
				proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8));
				proctolCodeBuffer.flip();
				dataBuffer.clear();
				//针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面
	//			dataBuffer.compact()
				dataBuffer.putInt(result);
				dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中
				channel.write(proctols);
			}
			else{
				System.out.println(threadName+"========server decode procotol fail......");
				proctolCodeBuffer.clear();
				proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300300.getBytes(ProtocolConstants.CHARSET_UTF8));
				proctolCodeBuffer.flip();
				dataBuffer.clear();
				dataBuffer.putInt(0);
				dataBuffer.flip();
				channel.write(proctols);
			}
			/*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/
//			channel.shutdownOutput();
			/*关闭通道*/
	        //channel.close();
			/*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is multiply algorithm=======
operate num is:17,8


加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90

乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136

总结:
在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,再次写数据是将覆盖以前的数据。

  • 大小: 51.5 KB
分享到:
评论

相关推荐

    基于naga开发的TCP客户端/服务器程序

    // 还需要一个循环事件,执行事件监听,此处可能需要使用到多线程编程 // 设置接收链接方式 serverSocket.setConnectionAcceptor(ConnectionAcceptor.ALLOW); while (true) { // 循环不断监听...

    JAVA上百实例源码以及开源项目源代码

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    JAVA上百实例源码以及开源项目

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    java开源包1

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包10

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    Java SE实践教程 源代码 下载

    7.1.5 Swing的单线程模型 145 7.2 练习 148 7.2.1 第1个Swing程序 148 7.2.2 外观感觉 150 7.2.3 事件侦听器 151 7.2.4 Swing基本控件和窗口 155 7.2.5 Swing容器 176 7.2.6 Swing高级控件 181 7.3 小结 187...

    java开源包2

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包3

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包6

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包5

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包8

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包7

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包9

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包11

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包4

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    java开源包101

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

    Java SE实践教程 pdf格式电子书 下载(一) 更新

    7.1.5 Swing的单线程模型 145 7.2 练习 148 7.2.1 第1个Swing程序 148 7.2.2 外观感觉 150 7.2.3 事件侦听器 151 7.2.4 Swing基本控件和窗口 155 7.2.5 Swing容器 176 7.2.6 Swing高级控件 181 7.3 小结 187...

    Java SE实践教程 pdf格式电子书 下载(四) 更新

    7.1.5 Swing的单线程模型 145 7.2 练习 148 7.2.1 第1个Swing程序 148 7.2.2 外观感觉 150 7.2.3 事件侦听器 151 7.2.4 Swing基本控件和窗口 155 7.2.5 Swing容器 176 7.2.6 Swing高级控件 181 7.3 小结 187...

    Java资源包01

    xSocket是一个轻量级的基于nio的服务器框架用于开发高性能、可扩展、多线程的服务器。该框架封装了线程处理、异步读/写等方面。 Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的...

Global site tag (gtag.js) - Google Analytics