`
Donald_Draper
  • 浏览: 949594 次
社区版块
存档分类
最新评论

Mina socket连接器(NioSocketConnector)

    博客分类:
  • Mina
阅读更多
Mina 抽象Polling连接器(AbstractPollingIoConnector):http://donald-draper.iteye.com/blog/2378978
引言:
   上一盘文章我们看了抽象Polling连接器,先来回顾一下:
     抽象拉取连接器内部有一个连接请求队列connectQueue,连接请求取消队列cancelQueue,Io处理器和连接线程引用connectorRef。拉取连接器构造主要初始化会话配置,IO事件执行器和IO处理器。连接操作,首先根据本地socket地址创建SocketChannel,连接远端socket地址,根据IO处理器和SocketChannel构建Io会话,将会话添加到会话关联的IO处理器中,根据SocketChannel和会话初始化sessionInitializer构建连接请求,添加到连接请求队列,最后启动连接器线程。
     连接器线程首先计算选择超时时间,执行超时选择操作,注册连接请求SocketChannel连接事件到选择器;如果没有任何连接请求SocketChannel需要处理,置空连接器连接线程引用,清空连接请求队列,如果有连接请求已经连接完成,即触发SocketChannel兴趣连接事件,处理连接事件就绪的连接请求,这个过程首先调用finishConnect完成SocketChannel连接后续工作,根据Io处理器和SocketChannel创建会话,初始化会话,添加会话到会话关联的IO处理器;然后处理连接超时的连接请求,即设置连接结果超时异常,添加到连接请求到取消队列;处理取消连接的连接请求,即关闭连接请求关联的SocketChannel。
今天我们来看连接器的具体实现NioSocketConnector:
/**
 * {@link IoConnector} for socket transport (TCP/IP).
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements
SocketConnector {
    private volatile Selector selector;//选择器
}

从socket连接器来看,选择器selector为内部唯一变量。
/**
 * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model).
 默认构造多线程模型
 */
public NioSocketConnector() {
    super(new DefaultSocketSessionConfig(), NioProcessor.class);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}

/**
 * Constructor for {@link NioSocketConnector} with default configuration, and
 * given number of {@link NioProcessor} for multithreading I/O operations
 * @param processorCount the number of processor to create and place in a
 * {@link SimpleIoProcessorPool}
 多线程模型,指定Io处理器线程池大小
 */
public NioSocketConnector(int processorCount) {
    super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
/**
 *  Constructor for {@link NioSocketConnector} with default configuration but a
 *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
 *  {@link IoService} of the same type.
 多Io服务共享Io处理器模式构造
 * @param processor the processor to use for managing I/O events
 */
public NioSocketConnector(IoProcessor<NioSession> processor) {
    super(new DefaultSocketSessionConfig(), processor);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}

/**
 *  Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling
 *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing
 *  the same processor and executor over multiple {@link IoService} of the same type.
 * @param executor the executor for connection
 * @param processor the processor for I/O operations
 与上一个方法不同的,添加了Io事件执行器参数executor
 */
public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
    super(new DefaultSocketSessionConfig(), executor, processor);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}

/**
 * Constructor for {@link NioSocketConnector} with default configuration which will use a built-in
 * thread pool executor to manage the given number of processor instances. The processor class must have
 * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a
 * no-arg constructor.
 * 使用内部IO处理器线程池SimpleIoProcessorPool,管理Io处理器实例
 * @param processorClass the processor class.
 * @param processorCount the number of processors to instantiate.
 * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
 * @since 2.0.0-M4
 */
public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) {
    super(new DefaultSocketSessionConfig(), processorClass, processorCount);
}

/**
 * Constructor for {@link NioSocketConnector} with default configuration with default configuration which will use a built-in
 * thread pool executor to manage the default number of processor instances. The processor class must have
 * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a
 * no-arg constructor. The default number of instances is equal to the number of processor cores
 * in the system, plus one.
 * 多线程IO处理器模式,IO处理器实例默认为系统核心处理器数量+1
 * @param processorClass the processor class.
 * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
 * @since 2.0.0-M4
 */
public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
    super(new DefaultSocketSessionConfig(), processorClass);
}

上面的构造函数和AbstractPollingIoConnector的构造基本相同。
再来看socket连接器的其他操作:
/**
 * {@inheritDoc}
 初始化,打开一个选择器
 */
@Override
protected void init() throws Exception {
    this.selector = Selector.open();
}
/**                                         
 * {@inheritDoc}     
 销毁socket连接器,关闭选择器
 */                                         
@Override                                   
protected void destroy() throws Exception { 
    if (selector != null) {                 
        selector.close();                   
    }                                       
}                                           
 /**
 * {@inheritDoc}
 连接远端地址,即委托SocketChannel的连接操作
 */
@Override
protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
    return handle.connect(remoteAddress);
}
/**
 * {@inheritDoc}
 获取socket通道的连接请求
 */
@Override
protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
    //获取通道的选择key
    SelectionKey key = handle.keyFor(selector);
    if ((key == null) || (!key.isValid())) {
        return null;
    }
    //返回选择key附加物
    return (ConnectionRequest) key.attachment();
}
/**
 * {@inheritDoc}
 关闭socket通道
 */
@Override
protected void close(SocketChannel handle) throws Exception {
    //取消通道关联的选择key,关闭通道
    SelectionKey key = handle.keyFor(selector);
    if (key != null) {
        key.cancel();
    }
    handle.close();
}
/**
 * {@inheritDoc}
 完成连接
 */
@Override
protected boolean finishConnect(SocketChannel handle) throws Exception {
    //已完成连接,则取消通道关联的选择key(连接操作)
    if (handle.finishConnect()) {
        SelectionKey key = handle.keyFor(selector);
        if (key != null) {
            key.cancel();
        }
        return true;
    }
    return false;
}

来看创建socket通道
/**
 * {@inheritDoc}
 */
@Override
protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
   //打开一个socket通道,配置接收缓存区size
    SocketChannel ch = SocketChannel.open();
    int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
    if (receiveBufferSize > 65535) {
        ch.socket().setReceiveBufferSize(receiveBufferSize);
    }
    if (localAddress != null) {
        try {
	    //绑定地址
            ch.socket().bind(localAddress);
        } catch (IOException ioe) {
            // Add some info regarding the address we try to bind to the
            // message
            String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
                    + ioe.getMessage();
            Exception e = new IOException(newMessage);
            e.initCause(ioe.getCause());

            // Preemptively close the channel
            ch.close();
            throw e;
        }
    }
    //配置通道为非阻塞模式
    ch.configureBlocking(false);
    return ch;
}

从创建socket通道来看,首先打开一个socket通道,配置接收缓存区size,绑定本地socket地址,配置通道为非阻塞模式。
/**
 * {@inheritDoc}
 注册socket通道的连接事件到选择器
 */
@Override
protected void register(SocketChannel handle, ConnectionRequest request) throws Exception {
    handle.register(selector, SelectionKey.OP_CONNECT, request);
}
 /**
 * {@inheritDoc}
 创建socket会话
 */
@Override
protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
    return new NioSocketSession(this, processor, handle);
}
 /**
 * {@inheritDoc}
 选择操作
 */
@Override
protected int select(int timeout) throws Exception {
    return selector.select(timeout);
}
/**
 * {@inheritDoc}
 唤醒操作
 */
@Override
protected void wakeup() {
    selector.wakeup();
}

从上面一个方法来看注册操作,即注册socket通道的连接事件到选择器。
选择操作和唤醒操作直接委托给内部选择器。
再来看剩下的操作,很简单,看一下明白,不再讲
/**
 * {@inheritDoc}
 */
@Override
public TransportMetadata getTransportMetadata() {
    return NioSocketSession.METADATA;
}
/**
 * {@inheritDoc}
 */
@Override
public SocketSessionConfig getSessionConfig() {
    return (SocketSessionConfig) sessionConfig;
}
/**
 * {@inheritDoc}
 */
@Override
public InetSocketAddress getDefaultRemoteAddress() {
    return (InetSocketAddress) super.getDefaultRemoteAddress();
}
/**
 * {@inheritDoc}
 */
@Override
public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
    super.setDefaultRemoteAddress(defaultRemoteAddress);
}
/**
 * {@inheritDoc}
 连接操作事件就绪的Socket通道
 */
@Override
protected Iterator<SocketChannel> selectedHandles() {
    return new SocketChannelIterator(selector.selectedKeys());
}

/**
 * {@inheritDoc}
 选择器所有关联通道
 */
@Override
protected Iterator<SocketChannel> allHandles() {
    return new SocketChannelIterator(selector.keys());
}
private static class SocketChannelIterator implements Iterator<SocketChannel> {
    private final Iterator<SelectionKey> i;
    private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
        this.i = selectedKeys.iterator();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean hasNext() {
        return i.hasNext();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public SocketChannel next() {
        SelectionKey key = i.next();
        return (SocketChannel) key.channel();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void remove() {
        i.remove();
    }
}

总结:
socket连接器NioSocketConnector内部关联一个选择器;初始化连接器,为打开一个选择器;连接远端地址,即委托SocketChannel的连接操作;创建socket通道来看,首先打开一个socket通道,配置接收缓存区size,绑定本地socket地址,配置通道为非阻塞模式;注册操作,即注册socket通道的连接事件到选择器。选择操作和唤醒操作直接委托给内部选择器。销毁socket连接器,即关闭选择器。
0
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics