- 浏览: 949527 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Mina 过滤器定义:http://donald-draper.iteye.com/blog/2376161
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
引言:
在前一篇文章中我们看了一下过滤链的抽象实现,先来回顾一下
AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry。
EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。过滤链头为HeadFilter,链尾为TailFilter。
HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。
TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。
添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同;添加前触发过滤器onPreAdd事件,添加后触发过滤器onPostAdd事件;移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
过滤链处理相关事件策略为:与IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,关闭会话,handler并不处理会话事件) 。
今天我们来看具体的过滤链实现SocketFilterChain和DatagramFilterChain,在AbstractIoFilterChain
这篇文章中我们我们过滤链触发IoSession的write/close事件除了传递事件外,
调用实际的事件操作doWrite/doClose,这两个方法为抽象方法,需要子类扩展实现。
在SocketFilterChain和DatagramFilterChain具体中主要是doWrite/doClose这两个方法的具体实现。
先来看一下SocketFilterChain
来看SocketFilterChain实际会话关闭工作
//关闭会话
//SocketIoProcessor
小节:
SocketFilterChain发送消息首先获取Socket会话的的写请求队列;mark buffer的位置,
主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法
将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作。关闭session,即将会话添加到关联的IoProcessor待移除会话队列。
再来看一下DatagramSessionImpl:
从上面DatagramSessionImpl关联的managerDelegate(DatagramService)两种分别为DatagramConnectorDelegate和DatagramAcceptorDelegate
//DatagramAcceptorDelegate
//DatagramConnectorDelegate
今天这篇文章只是简单做一个简单的介绍,主要是对上一篇过滤链抽象实现的补充,文章中涉及的IoService和
IoProcessor我们还有讲到,后面讲到是在具体的说。
从上面可以看出,DatagramFilterChain发送消息首先获取报文会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作。关闭会话委托给session关联的managerDelegate(DatagramService),如果managerDelegate为DatagramConnectorDelegate者直接关闭,如果为DatagramAcceptorDelegate,通知DatagramAcceptorDelegate的监听器会话已关闭,设置会话CloseFuture为已关闭状态。
总结:
SocketFilterChain发送消息首先获取Socket会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作。关闭session,即将会话添加到关联的IoProcessor待移除会话队列。
DatagramFilterChain发送消息首先获取报文会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作。
关闭会话委托给session关联的managerDelegate(DatagramService),如果managerDelegate为DatagramConnectorDelegate者直接关闭,如果为DatagramAcceptorDelegate,通知DatagramAcceptorDelegate的监听器会话已关闭,设置会话CloseFuture为已关闭状态。
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
引言:
在前一篇文章中我们看了一下过滤链的抽象实现,先来回顾一下
AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry。
EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。过滤链头为HeadFilter,链尾为TailFilter。
HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。
TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。
添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同;添加前触发过滤器onPreAdd事件,添加后触发过滤器onPostAdd事件;移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
过滤链处理相关事件策略为:与IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,关闭会话,handler并不处理会话事件) 。
今天我们来看具体的过滤链实现SocketFilterChain和DatagramFilterChain,在AbstractIoFilterChain
这篇文章中我们我们过滤链触发IoSession的write/close事件除了传递事件外,
调用实际的事件操作doWrite/doClose,这两个方法为抽象方法,需要子类扩展实现。
在SocketFilterChain和DatagramFilterChain具体中主要是doWrite/doClose这两个方法的具体实现。
先来看一下SocketFilterChain
package org.apache.mina.transport.socket.nio; import java.io.IOException; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoSession; import org.apache.mina.common.IoFilter.WriteRequest; import org.apache.mina.common.support.AbstractIoFilterChain; import org.apache.mina.util.Queue; /** * An {@link IoFilterChain} for socket transport (TCP/IP). * * @author The Apache Directory Project (mina-dev@directory.apache.org) */ class SocketFilterChain extends AbstractIoFilterChain { SocketFilterChain(IoSession parent) { super(parent); } protected void doWrite(IoSession session, WriteRequest writeRequest) { SocketSessionImpl s = (SocketSessionImpl) session; //获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲 Queue writeRequestQueue = s.getWriteRequestQueue(); // SocketIoProcessor.doFlush() will reset it after write is finished // because the buffer will be passed with messageSent event. //这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件, //待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置 ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage(); buffer.mark(); int remaining = buffer.remaining(); if (remaining == 0) { //BaseIoSession // private final AtomicInteger scheduledWriteRequests = new AtomicInteger(); //更新调度请求计数器+1 s.increaseScheduledWriteRequests(); } else { //BaseIoSession //private final AtomicInteger scheduledWriteBytes = new AtomicInteger(); //更新调度写字节计数器+buffer.remaining() s.increaseScheduledWriteBytes(buffer.remaining()); } synchronized (writeRequestQueue) { //将写请求添加到session写请求队列中 writeRequestQueue.push(writeRequest); } //如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作,这个在以后在具体详说 if (session.getTrafficMask().isWritable()) { s.getIoProcessor().flush(s); } } //关闭会话 protected void doClose(IoSession session) throws IOException { SocketSessionImpl s = (SocketSessionImpl) session; s.getIoProcessor().remove(s);//委托给session关联的IoProcessor } }
来看SocketFilterChain实际会话关闭工作
//关闭会话
protected void doClose(IoSession session) throws IOException { SocketSessionImpl s = (SocketSessionImpl) session; s.getIoProcessor().remove(s); }
//SocketIoProcessor
class SocketIoProcessor { ... private final Queue removingSessions = new Queue();//存放关闭的会话队列 void remove(SocketSessionImpl session) throws IOException { scheduleRemove(session);//将会话添加到待移除会话队列 startupWorker();//这一步我们在后面将SocketIoProcessor的时候再说 } private void scheduleRemove(SocketSessionImpl session) { synchronized (removingSessions) { removingSessions.push(session); } } ... }
小节:
SocketFilterChain发送消息首先获取Socket会话的的写请求队列;mark buffer的位置,
主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法
将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作。关闭session,即将会话添加到关联的IoProcessor待移除会话队列。
再来看一下DatagramSessionImpl:
package org.apache.mina.transport.socket.nio.support; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoSession; import org.apache.mina.common.IoFilter.WriteRequest; import org.apache.mina.common.support.AbstractIoFilterChain; import org.apache.mina.util.Queue; /** * An {@link IoFilterChain} for datagram transport (UDP/IP). * * @author The Apache Directory Project (mina-dev@directory.apache.org) */ class DatagramFilterChain extends AbstractIoFilterChain { DatagramFilterChain(IoSession parent) { super(parent); } protected void doWrite(IoSession session, WriteRequest writeRequest) { DatagramSessionImpl s = (DatagramSessionImpl) session; //获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲 Queue writeRequestQueue = s.getWriteRequestQueue(); // SocketIoProcessor.doFlush() will reset it after write is finished // because the buffer will be passed with messageSent event. //这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件, //待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置 ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage(); buffer.mark(); int remaining = buffer.remaining(); if (remaining == 0) { //BaseIoSession // private final AtomicInteger scheduledWriteRequests = new AtomicInteger(); //更新调度请求计数器+1 s.increaseScheduledWriteRequests(); } else { //BaseIoSession //private final AtomicInteger scheduledWriteBytes = new AtomicInteger(); //更新调度写字节计数器+buffer.remaining() s.increaseScheduledWriteBytes(buffer.remaining()); s.increaseScheduledWriteBytes(buffer.remaining()); } synchronized (writeRequestQueue) { //将写请求添加到session写请求队列中 writeRequestQueue.push(writeRequest); } if (session.getTrafficMask().isWritable()) { //DatagramSessionImpl //private final DatagramService managerDelegate; //如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作, //这个在以后在具体详说 s.getManagerDelegate().flushSession(s); } } protected void doClose(IoSession session) { DatagramSessionImpl s = (DatagramSessionImpl) session; DatagramService manager = s.getManagerDelegate(); ////委托给session关联的managerDelegate(DatagramService)关闭会话 if (manager instanceof DatagramConnectorDelegate) { //如果是DatagramConnectorDelegate者直接关闭会话,则在后面具体再看 ((DatagramConnectorDelegate) manager).closeSession(s); } else { //通知DatagramAcceptorDelegate的监听器会话已关闭 ((DatagramAcceptorDelegate) manager).getListeners() .fireSessionDestroyed(session); //设置会话CloseFuture为已关闭状态 session.getCloseFuture().setClosed(); } } }
从上面DatagramSessionImpl关联的managerDelegate(DatagramService)两种分别为DatagramConnectorDelegate和DatagramAcceptorDelegate
//DatagramAcceptorDelegate
public class DatagramAcceptorDelegate extends BaseIoAcceptor implements IoAcceptor, DatagramService {
//DatagramConnectorDelegate
public class DatagramConnectorDelegate extends BaseIoConnector implements DatagramService {
今天这篇文章只是简单做一个简单的介绍,主要是对上一篇过滤链抽象实现的补充,文章中涉及的IoService和
IoProcessor我们还有讲到,后面讲到是在具体的说。
从上面可以看出,DatagramFilterChain发送消息首先获取报文会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作。关闭会话委托给session关联的managerDelegate(DatagramService),如果managerDelegate为DatagramConnectorDelegate者直接关闭,如果为DatagramAcceptorDelegate,通知DatagramAcceptorDelegate的监听器会话已关闭,设置会话CloseFuture为已关闭状态。
总结:
SocketFilterChain发送消息首先获取Socket会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作。关闭session,即将会话添加到关联的IoProcessor待移除会话队列。
DatagramFilterChain发送消息首先获取报文会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作。
关闭会话委托给session关联的managerDelegate(DatagramService),如果managerDelegate为DatagramConnectorDelegate者直接关闭,如果为DatagramAcceptorDelegate,通知DatagramAcceptorDelegate的监听器会话已关闭,设置会话CloseFuture为已关闭状态。
发表评论
-
Mina 报文连接器(NioDatagramConnector)
2017-06-14 08:46 1336Mina 抽象Polling连接器(A ... -
Mina 报文监听器NioDatagramAcceptor二(发送会话消息等)
2017-06-13 16:01 1487Mina 报文监听器NioDatagramAcceptor一( ... -
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器)
2017-06-13 09:51 2499Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina 报文通信简单示例
2017-06-12 09:01 2481MINA TCP简单通信实例:http://donald-dr ... -
Mina socket连接器(NioSocketConnector)
2017-06-12 08:37 4638Mina 抽象Polling连接器(AbstractPolli ... -
Mina 抽象Polling连接器(AbstractPollingIoConnector)
2017-06-11 21:29 958Mina 连接器接口定义及抽象实现(IoConnector ) ... -
Mina 连接器接口定义及抽象实现(IoConnector )
2017-06-11 13:46 1769Mina IoService接口定义及抽象实现:http:// ... -
Mina socket监听器(NioSocketAcceptor)
2017-06-09 08:44 3332Mina IoService接口定义及抽象实现:http:// ... -
Mina 抽象polling监听器
2017-06-08 22:32 707Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina Io监听器接口定义及抽象实现
2017-06-07 13:02 1294Mina IoService接口定义及抽象实现:http:// ... -
Mina IoService接口定义及抽象实现
2017-06-06 23:44 1122Mina IoHandler接口定义:http://donal ... -
Mina Nio会话(Socket,DataGram)
2017-06-06 12:53 1142Mina Socket会话配置:http://donald-d ... -
Mina 抽象Io会话
2017-06-05 22:45 966Mina Io会话接口定义:http://donald-dra ... -
Mina Io会话接口定义
2017-06-04 23:15 1093Mina Nio处理器:http://donald-drape ... -
Mina Nio处理器
2017-06-04 22:19 690Mina Io处理器抽象实现:http://donald-dr ... -
Mina Io处理器抽象实现
2017-06-03 23:52 1081Mina 过滤链抽象实现:http://donald-drap ... -
Mina IoHandler接口定义
2017-06-01 21:30 1669Mina 过滤链抽象实现:http://donald-drap ... -
MINA 多路复用协议编解码器工厂二(多路复用协议解码器)
2017-06-01 12:52 2191MINA 多路复用协议编解码器工厂一(多路复用协议编码器): ... -
MINA 多路复用协议编解码器工厂一(多路复用协议编码器)
2017-05-31 22:22 1790MINA 多路分离解码器实例:http://donald-dr ... -
Mina 累计协议解码器
2017-05-31 00:09 1153MINA 编解码器实例:http://donald-drape ...
相关推荐
本库是对我在项目中使用的Mina和长连接的一个封装,亲测有效,在网络良好的情况下,几乎能够保证100%的连接和通讯;
mina socket 的一个简单示例!
通过Mina与Socket实现通信,其包含客户端与服务端的实现代码
java mina 服务端,socket客服端传信息,客服端上传消息
mina带心跳长链接,可实现服务间通信。socket长连接实现客户端与服务端的通信。对于通信技术学习是非常好的资料。改造后可实现企业应用
Apache mina socket通信源代码,版本为2.0.7
1.mina socket客户度工程相关类,添加mina jar包后可独立运行。 2.mina若有空闲连接则使用已有连接,若无则新建mina连接; 3.mina空闲连接超过保活时间25分钟后,自动删除; 4.mina发送指令后,接收指定时长内收到的...
mina简单示例,Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速...
java客户端socket与mina服务端通信,保持长连接
是跟手机进行推送功能的时候整理的工具,有长连接也有socket短连接,代码能直接运行,jar包和代码都有 直接放到项目里可以用,有mian测试方法
mina测试框架 mina测试框架 mina测试框架
mina框架中socket使用,有服务端和客户端。这是比较完整的项目。希望能给后人点参考
Socket与mina交互,如果熟悉了解码协议,其他的就OK了。
通过mina实现定长报文传输(报文使用ssl加密-mian自带的ssl过滤器)。
mina socket 代码
集成框架SpringBoot+myBatis+Mina,内附CRC校验工具类和进制转换工具类 SpringBoot集成Socket通讯之Mina框架
NULL 博文链接:https://wwwzhouhui.iteye.com/blog/455628
NULL 博文链接:https://ginge.iteye.com/blog/363178
androidpn项目基础,socket和mina,详情见博客http://blog.csdn.net/specialshoot/article/details/50678374
mina 包 socket 框架