`
Donald_Draper
  • 浏览: 951781 次
社区版块
存档分类
最新评论

netty NioSocketChannel解析

阅读更多
netty 抽象BootStrap定义:http://donald-draper.iteye.com/blog/2392492
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
netty Bootstrap解析:http://donald-draper.iteye.com/blog/2392593
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
netty NioServerSocketChannel解析:http://donald-draper.iteye.com/blog/2393443
netty 通道配置接口定义:http://donald-draper.iteye.com/blog/2393484
netty 默认通道配置初始化:http://donald-draper.iteye.com/blog/2393504
netty 默认通道配置后续:http://donald-draper.iteye.com/blog/2393510
netty 字节buf定义:http://donald-draper.iteye.com/blog/2393813
netty 资源泄漏探测器:http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析:http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器:http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念:http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器:http://donald-draper.iteye.com/blog/2394419
netty Unpooled字节buf分配器:http://donald-draper.iteye.com/blog/2394619
netty Pooled字节buf分配器:http://donald-draper.iteye.com/blog/2394814
引言:
上一篇文章我们看了Pooled字节buf分配器,先来回顾一下:
    Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区(PoolArena),每个Region的内存块(PoolChunk)size为chunkSize,每个内存块内存页(PoolSubpage)大小,默认为8k。Pooled 堆buf是基于字节数组,而direct buf是基于nio 字节buf。Pooled字节分配器分配heap和direct buf时,首先获取线程本地buf缓存PoolThreadCache,从buf获取对应的heap或direct分配区,分配区创建buf(PooledByteBuf),然后将buf放到内存块中管理,根据buf的容量,将放到相应tiny,small,normal Memory Region Cache(MemoryRegionCache)中。每个Pooled buf通过内存的Recycler,重用buf。Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将对象放在一个线程本地栈中管理。
从看了ServerSocket通道之后,我们把字节buf(heap,direct),及字节buf分配器(Unpooled,Pooled),
今天我们回到Socket通道,由于socket通道我们讲了好久,先来把Nio socket通道的父类抽象字节通道回顾一下:
    写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。
    nio字节Unsafe读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则读取缓存区中没有读完的数据,并通道通道处理剩余数据。

现在来看socket通道:
/**
 * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
 */
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
    //选择器提供者
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    private final SocketChannelConfig config;//socket通道配置
    /**
     * Create a new instance
     */
    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }
    /**
     * Create a new instance using the given {@link SelectorProvider}.
     */
    public NioSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }
    /**
     * Create a new instance using the given {@link SocketChannel}.
     */
    public NioSocketChannel(SocketChannel socket) {
        this(null, socket);
    }
    /**
     * Create a new instance
     *
     * @param parent    the {@link Channel} which created this instance or {@code null} if it was created by the user
     * @param socket    the {@link SocketChannel} which will be used
     */
    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }
}

来看创建socket通道:
private static SocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
         *委托给选择提供者,打开一个socket通道
         *  See [url=https://github.com/netty/netty/issues/2308]#2308[/url].
         */
        return provider.openSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a socket.", e);
    }
}

socket通道配置:
private final class NioSocketChannelConfig  extends DefaultSocketChannelConfig {
    private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
        super(channel, javaSocket);
    }
    @Override
    protected void autoReadCleared() {
        clearReadPending();
    }
}

从上面可以看出,socket通道初始化,主要是创建socket通道,初始化socket通道配置NioSocketChannelConfig。

来看绑定地址:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    doBind0(localAddress);
}

private void doBind0(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        //jdk >= 1.7 则使用socket通道绑定地址
        SocketUtils.bind(javaChannel(), localAddress);
    } else {
        //否则委托通道内关联的socket
        SocketUtils.bind(javaChannel().socket(), localAddress);
    }
}


//SocketUtils
public static void bind(final SocketChannel socketChannel, final SocketAddress address) throws IOException {
    try {
        //在当前访问控制权限下,委托给socket通道绑定socket地址
        AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
            @Override
            public Void run() throws IOException {
                socketChannel.bind(address);
                return null;
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}
public static void bind(final Socket socket, final SocketAddress bindpoint) throws IOException {
    try {
        AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
            @Override
            public Void run() throws IOException {
                socket.bind(bindpoint);
                return null;
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

从上面来看地址绑定,如果jdk >= 1.7 则使用socket通道绑定地址,否则委托通道内关联的socket。
再来看连接操作:
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        //本地地址不为空,则绑定本地socket地址
        doBind0(localAddress);
    }

    boolean success = false;
    try {
       //否则委托SocketUtils
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
	    //如果连接操作没完成,则添加连接事件到选择key兴趣事件集
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}


//SocketUtils
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
        throws IOException {
    try {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws IOException {
	        //直接委托给socket通道
                return socketChannel.connect(remoteAddress);
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

//完成连接:
@Override
protected void doFinishConnect() throws Exception {
    //直接调用内部socket通道的完成连接方法
    if (!javaChannel().finishConnect()) {
        throw new Error();
    }
}

再来看读数据到字节buf
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    //从unsafe获取字节buf分配器Handle
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    //设置需要尝试读字节数
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    //委托给字节buf,从Socket通道读取数据,写到当前buf
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}


由于在netty中有Pooled、Unpooled两种字节buf,每种又有heap和direct两种实际buf,我们以
UnpooledHeapByteBuf为例:
//AbstractByteBuf
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}

//UnpooledHeapByteBuf
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    try {
        return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
    } catch (ClosedChannelException ignored) {
        return -1;
    }
}

从上面可以看出,socket通道读取操作,实际委托给socket通道的read操作,从Socket通道读取数据,写到当前buf。

再来看写操作:
先来看写字节buf
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    //从当前buf读取数据,写socket通道中
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

从上面可以看出

//UnpooledHeapByteBuf
 @Override
public int readBytes(GatheringByteChannel out, int length) throws IOException {
    checkReadableBytes(length);
    int readBytes = getBytes(readerIndex, out, length, true);
    readerIndex += readBytes;
    return readBytes;
}
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
    ensureAccessible();
    ByteBuffer tmpBuf;
    if (internal) {
        tmpBuf = internalNioBuffer();
    } else {
        tmpBuf = ByteBuffer.wrap(array);
    }
    return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));
}

从上面可以看出写字节buf,实际委托给socket通道的写操作,从当前buf读取数据,写socket通道中。

再来看写文件Region
@Override
protected long doWriteFileRegion(FileRegion region) throws Exception {
    final long position = region.transferred();
    return region.transferTo(javaChannel(), position);
}

//DefaultFileRegion
public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultFileRegion.class);
    private final File f;//关联文件
    private final long position;
    private final long count;
    private long transferred;
    private FileChannel file;//关联文件通道
    /**
     * Explicitly open the underlying file-descriptor if not done yet.
     打开一个文件通道
     */
    public void open() throws IOException {
        if (!isOpen() && refCnt() > 0) {
            // Only open if this DefaultFileRegion was not released yet.
            file = new RandomAccessFile(f, "r").getChannel();
        }
    }
    //转移文件region的内容到可写字节通道
    @Override
    public long transferTo(WritableByteChannel target, long position) throws IOException {
        long count = this.count - position;
        if (count < 0 || position < 0) {
            throw new IllegalArgumentException(
                    "position out of range: " + position +
                    " (expected: 0 - " + (this.count - 1) + ')');
        }
        if (count == 0) {
            return 0L;
        }
        if (refCnt() == 0) {
            throw new IllegalReferenceCountException(0);
        }
        // Call open to make sure fc is initialized. This is a no-oop if we called it before.
	//确保通道打开
        open();
        //从文件Region读取数据,写到通道中
        long written = file.transferTo(this.position + position, count, target);
        if (written > 0) {
            transferred += written;
        }
        return written;
    }
    ...
}


从上面可以看,socket通道写文件region,委托给文件Region的转移数据操作transferTo,
从文件Region读取数据,写到通道中。

如果前面的文章忘记可以看一下这篇文章:
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098

再看写通道Outbound缓冲区:
 @Override
 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
     for (;;) {
         int size = in.size();//获取刷新写请求size
         if (size == 0) {
             // All written so clear OP_WRITE
	     //如果所有写请求都已经刷新,则清除选择key兴趣事件集,移除写事件
             clearOpWrite();
             break;
         }
         long writtenBytes = 0;
         boolean done = false;
         boolean setOpWrite = false;

         // Ensure the pending writes are made of ByteBufs only.
	 //获取Outbound缓冲中的刷新队列写请求对应的nio字节buf
         ByteBuffer[] nioBuffers = in.nioBuffers();
         int nioBufferCnt = in.nioBufferCount();//nio字节buf数量
         long expectedWrittenBytes = in.nioBufferSize();//需要些的字节数
         SocketChannel ch = javaChannel();//获取关联通道

         // Always us nioBuffers() to workaround data-corruption.
         // See https://github.com/netty/netty/issues/2761
         switch (nioBufferCnt) {
             case 0:
                 // We have something else beside ByteBuffers to write so fallback to normal writes.
                 super.doWrite(in);
                 return;
             case 1:
                 // Only one ByteBuf so use non-gathering write
                 ByteBuffer nioBuffer = nioBuffers[0];
                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
		     //委托给内部通道的写操作
                     final int localWrittenBytes = ch.write(nioBuffer);
                     if (localWrittenBytes == 0) {
                         setOpWrite = true;
                         break;
                     }
                     expectedWrittenBytes -= localWrittenBytes;
                     writtenBytes += localWrittenBytes;
                     if (expectedWrittenBytes == 0) {
                         done = true;
                         break;
                     }
                 }
                 break;
             default:
                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
		     //委托给内部通道的写操作
                     final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                     if (localWrittenBytes == 0) {
                         setOpWrite = true;
                         break;
                     }
                     expectedWrittenBytes -= localWrittenBytes;
                     writtenBytes += localWrittenBytes;
                     if (expectedWrittenBytes == 0) {
                         done = true;
                         break;
                     }
                 }
                 break;
         }

         // Release the fully written buffers, and update the indexes of the partially written buffer.
	 //根据已写字节数,从Outbound刷新队列中,移除已经刷新成功的写请求
         in.removeBytes(writtenBytes);

         if (!done) {
             // Did not write all buffers completely.
             incompleteWrite(setOpWrite);
             break;
         }
     }
 }

上面与ChannelOutboundBuffer相关的文章,见上面的连接,这里不再重复,附篇中也有。
从上面可以看出,写通道Outbound缓存区,首先从Outbound缓存区获取刷新链上的写请求对应的
字节buf,然后委托给socket通道的写操作,发送数据,发送成功后,从刷新链上移除已经发送的写请求。

再来看关闭socket通道输入流:
@Override
public ChannelFuture shutdownInput() {
    return shutdownInput(newPromise());
}

@Override
public ChannelFuture shutdownInput(final ChannelPromise promise) {
    //从事件循环反注册
    Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose();
    if (closeExecutor != null) {//如果关闭执行器不为空
        closeExecutor.execute(new Runnable() {
            @Override
            public void run() {
	        //执行实际关闭输入流工作
                shutdownInput0(promise);
            }
        });
    } else {
        //在当前事件循环中在,则直接执行,否则创建一个线程,完成实际关闭输入流工作
        EventLoop loop = eventLoop();
        if (loop.inEventLoop()) {
            shutdownInput0(promise);
        } else {
            loop.execute(new Runnable() {
                @Override
                public void run() {
                    shutdownInput0(promise);
                }
            });
        }
    }
    return promise;
}

来看从事件循环反注册

//创建Unsafe
 @Override
 protected AbstractNioUnsafe newUnsafe() {
     return new NioSocketChannelUnsafe();
 }

//NioSocketChannelUnsafe
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
    @Override
    protected Executor prepareToClose() {
        try {
            if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                // We need to cancel this key of the channel so we may not end up in a eventloop spin
                // because we try to read or write until the actual close happens which may be later due
                // SO_LINGER handling.
                // See https://github.com/netty/netty/issues/4449
		//从事件循环反注册
                doDeregister();
                return GlobalEventExecutor.INSTANCE;
            }
        } catch (Throwable ignore) {
            // Ignore the error as the underlying channel may be closed in the meantime and so
            // getSoLinger() may produce an exception. In this case we just return null.
            // See https://github.com/netty/netty/issues/4449
        }
        return null;
    }
}


//AbstractNioChannel
 @Override
protected void doDeregister() throws Exception {
    //事件循环,取消选择key
    eventLoop().cancel(selectionKey());
}

再来看实际关闭输入流:
 private void shutdownInput0(final ChannelPromise promise) {
    try {
        shutdownInput0();
        promise.setSuccess();
    } catch (Throwable t) {
        promise.setFailure(t);
    }
}
private void shutdownInput0() throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        //委托给socket通道
        javaChannel().shutdownInput();
    } else {//否则,委托给通道关联的socket
        javaChannel().socket().shutdownInput();
    }
}


从上面可以看出,关闭数据流,就是从事件循环反注册,即事件循环取消选择key,
然后如果jdk大于1.7 则委托socket通道关闭输入流,否则委托通道内关联的socket。


再来看关闭输出流,与关闭输入流思路一致,就不多说了:

@Override
public ChannelFuture shutdownOutput() {
    return shutdownOutput(newPromise());
}

@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
    Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose();
    if (closeExecutor != null) {
        closeExecutor.execute(new Runnable() {
            @Override
            public void run() {
                shutdownOutput0(promise);
            }
        });
    } else {
        EventLoop loop = eventLoop();
        if (loop.inEventLoop()) {
            shutdownOutput0(promise);
        } else {
            loop.execute(new Runnable() {
                @Override
                public void run() {
                    shutdownOutput0(promise);
                }
            });
        }
    }
    return promise;
}

private void shutdownOutput0(final ChannelPromise promise) {
    try {
        shutdownOutput0();
        promise.setSuccess();
    } catch (Throwable t) {
        promise.setFailure(t);
    }
}

private void shutdownOutput0() throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().shutdownOutput();
    } else {
        javaChannel().socket().shutdownOutput();
    }
}

再来看关闭通道,

@Override
public ChannelFuture shutdown() {
    return shutdown(newPromise());
}

@Override
public ChannelFuture shutdown(final ChannelPromise promise) {
    Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose();
    if (closeExecutor != null) {
        closeExecutor.execute(new Runnable() {
            @Override
            public void run() {
                shutdown0(promise);
            }
        });
    } else {
        EventLoop loop = eventLoop();
        if (loop.inEventLoop()) {
            shutdown0(promise);
        } else {
            loop.execute(new Runnable() {
                @Override
                public void run() {
                    shutdown0(promise);
                }
            });
        }
    }
    return promise;
}
private void shutdown0(final ChannelPromise promise) {
        Throwable cause = null;
        try {
            shutdownOutput0();
        } catch (Throwable t) {
            cause = t;
        }
        try {
            shutdownInput0();
        } catch (Throwable t) {
            if (cause == null) {
                promise.setFailure(t);
            } else {
                logger.debug("Exception suppressed because a previous exception occurred.", t);
                promise.setFailure(cause);
            }
            return;
        }
        if (cause == null) {
            promise.setSuccess();
        } else {
            promise.setFailure(cause);
        }
}

从上面可以看出关闭通道实际为关闭通道输入流和输出流。


再来开断开连接:

@Override
protected void doDisconnect() throws Exception {
    doClose();
}

@Override
protected void doClose() throws Exception {
    super.doClose();
    javaChannel().close();
}


从上面可以看出,断开连接实际为close通道。

再来看其他方法,下面这些方法,瞄一眼就行,没有太多要说的:
@Override
public ServerSocketChannel parent() {
    return (ServerSocketChannel) super.parent();
}

@Override
public SocketChannelConfig config() {
    return config;
}

@Override
protected SocketChannel javaChannel() {
    return (SocketChannel) super.javaChannel();
}

@Override
public boolean isActive() {
    SocketChannel ch = javaChannel();
    return ch.isOpen() && ch.isConnected();
}

@Override
public boolean isOutputShutdown() {
    return javaChannel().socket().isOutputShutdown() || !isActive();
}

@Override
public boolean isInputShutdown() {
    return javaChannel().socket().isInputShutdown() || !isActive();
}

@Override
public boolean isShutdown() {
    Socket socket = javaChannel().socket();
    return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive();
}

@Override
public InetSocketAddress localAddress() {
    return (InetSocketAddress) super.localAddress();
}

@Override
public InetSocketAddress remoteAddress() {
    return (InetSocketAddress) super.remoteAddress();
}





总结:
nio socket通道初始化,主要是创建socket通道,初始化socket通道配置NioSocketChannelConfig。地址绑定操作,如果jdk大于1.7 则socket通道直接绑定地址,否则委托通道内关联的socket。连接操作,直接委托给内部的socket通道连接操作。socket通道读取操作,实际委托给socket通道的read操作,从Socket通道读取数据,写到当前buf。写字节buf,实际委托给socket通道的写操作,从当前buf读取数据,写socket通道中。socket通道写文件region,委托给文件Region的转移数据操作transferTo,从文件Region读取数据,写到通道中。
写通道Outbound缓存区,首先从Outbound缓存区获取刷新链上的写请求对应的字节buf,然后委托给socket通道的写操作,发送数据,发送成功后,从刷新链上移除已经发送的写请求。关闭数据流,就是从事件循环反注册,即事件循环取消选择key,然后如果jdk大于1.7 则委托socket通道关闭输入流,否则委托通道内关联的socket。关闭输出流与关闭输入流思路一致。关闭通道实际为关闭通道输入流和输出流。断开连接实际为close通道。




附:
//ChannelOutboundBuffer
public final class ChannelOutboundBuffer {
    // Assuming a 64-bit JVM:
    //  - 16 bytes object header
    //  - 8 reference fields
    //  - 2 long fields
    //  - 2 int fields
    //  - 1 boolean field
    //  - padding
    //Entry buf 头部数据size
    static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
            SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
    //通道Outbound buf 线程本地Buf,存放刷新链表中的写请求消息
    private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
        @Override
        protected ByteBuffer[] initialValue() throws Exception {
            return new ByteBuffer[1024];
        }
    };
   //buf 关联通道
    private final Channel channel;

    // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    //
    // The Entry that is the first in the linked-list structure that was flushed
    private Entry flushedEntry;刷新写请求链的链头
    // The Entry which is the first unflushed in the linked-list structure
    private Entry unflushedEntry;//未刷新的写请求链的链头
    // The Entry which represents the tail of the buffer
    private Entry tailEntry;
    // The number of flushed entries that are not written yet
    private int flushed;//刷新Entry链上待发送的写请求数

    private int nioBufferCount;//当前待发送的消息buf数量
    private long nioBufferSize;//当前待发送的所有消息buf的字节数

    private boolean inFail;//是否刷新失败
    //通道待发送的字节数
    private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
    @SuppressWarnings("UnusedDeclaration")
    private volatile long totalPendingSize;
    private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
    @SuppressWarnings("UnusedDeclaration")
    private volatile int unwritable;//通道写状态
   //触发通道ChannelWritabilityChanged事件任务线程
    private volatile Runnable fireChannelWritabilityChangedTask;

    ChannelOutboundBuffer(AbstractChannel channel) {
        this.channel = channel;
    }
}

再来看将刷新链上的写请求消息,添加到nio buffer数组中:
/** 
 * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only. 
 * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned 
 * array and the total number of readable bytes of the NIO buffers respectively. 
 将刷新链中的写请求对象消息放到nio buf数组中。#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度 
 和可读字节数 
 *  
 * Note that the returned array is reused and thus should not escape 
 * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}. 
 * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example. 
 返回的nio buf将会被 NioSocketChannel#doWrite方法重用 
 *  
 
 */  
public ByteBuffer[] nioBuffers() {  
    long nioBufferSize = 0;//nio buf数组中的字节数  
    int nioBufferCount = 0;//nio buf数组长度  
    final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();  
    //获取通道Outbound缓存区线程本地的niobuf数组  
    ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);  
    Entry entry = flushedEntry;  
    //遍历刷新链,链上的写请求Entry的消息必须为ByteBuf  
    while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {  
        if (!entry.cancelled) {  
        //在写请求没有取消的情况下,获取写请求消息buf,及buf的读索引,和可读字节数  
            ByteBuf buf = (ByteBuf) entry.msg;  
            final int readerIndex = buf.readerIndex();  
            final int readableBytes = buf.writerIndex() - readerIndex;  
  
            if (readableBytes > 0) {  
                if (Integer.MAX_VALUE - readableBytes < nioBufferSize) {  
		    //如果消息buf可读字节数+nioBufferSize大于整数的最大值,则跳出循环  
                    // If the nioBufferSize + readableBytes will overflow an Integer we stop populate the  
                    // ByteBuffer array. This is done as bsd/osx don't allow to write more bytes then  
                    // Integer.MAX_VALUE with one writev(...) call and so will return 'EINVAL', which will  
                    // raise an IOException. On Linux it may work depending on the  
                    // architecture and kernel but to be safe we also enforce the limit here.  
                    // This said writing more the Integer.MAX_VALUE is not a good idea anyway.  
                    //  
                    // See also:  
                    // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2  
                    // - http://linux.die.net/man/2/writev  
                    break;  
                }  
               //更新buf的size  
                nioBufferSize += readableBytes;  
                int count = entry.count;  
                if (count == -1) {  
                    //noinspection ConstantValueVariableUse  
                    entry.count = count =  buf.nioBufferCount();  
                }  
                //需要buf的数量  
                int neededSpace = nioBufferCount + count;  
                //如果buf需求数量大于当前nio buf数组  
                if (neededSpace > nioBuffers.length) {  
                     //则扩容nio数组为原来的两倍,  
                    nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);  
                    //更新nio buf数组  
                    NIO_BUFFERS.set(threadLocalMap, nioBuffers);  
                }  
                if (count == 1) {  
                   //如果需要的buf数量为1,则获取写请求的buf  
                    ByteBuffer nioBuf = entry.buf;  
                    if (nioBuf == null) {  
                        // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a  
                        // derived buffer  
			//如果buf为空,则创建一个buf实例  
                        entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);  
                    }  
		    //将消息buf,添加到nio buf数组中  
                    nioBuffers[nioBufferCount ++] = nioBuf;  
                } else {  
		    //否则获取写请求的buf数组  
                    ByteBuffer[] nioBufs = entry.bufs;  
                    if (nioBufs == null) {  
                        // cached ByteBuffers as they may be expensive to create in terms  
                        // of Object allocation  
			//分配buf数组  
                        entry.bufs = nioBufs = buf.nioBuffers();  
                    }  
                    //添加写请求buf数组到通道Outbound缓存区的nio buf数组中  
                    nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);  
                }  
            }  
        }  
        entry = entry.next;  
    }  
    //更新当前nio buffer 计数器和字节数  
    this.nioBufferCount = nioBufferCount;  
    this.nioBufferSize = nioBufferSize;  
  
    return nioBuffers;  
}  

1.
//则扩容nio数组  
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {  
    int newCapacity = array.length;  
    do {  
        // double capacity until it is big enough  
        // See https://github.com/netty/netty/issues/1890  
    //则扩容nio数组为原来的两倍  
        newCapacity <<= 1;  
  
        if (newCapacity < 0) {  
            throw new IllegalStateException();  
        }  
  
    } while (neededSpace > newCapacity);  
    ByteBuffer[] newArray = new ByteBuffer[newCapacity];  
    //拷贝原始中size buf到新的的buf数组中  
    System.arraycopy(array, 0, newArray, 0, size);  
  
    return newArray;  
}  


2.
//添加写请求buf数组到通道Outbound缓存区的nio buf数组中  
private static int fillBufferArray(ByteBuffer[] nioBufs, ByteBuffer[] nioBuffers, int nioBufferCount) {  
    //遍历添加的buf数组,添加到缓存区的nio buf数组中  
    for (ByteBuffer nioBuf: nioBufs) {  
        if (nioBuf == null) {  
            break;  
        }  
        nioBuffers[nioBufferCount ++] = nioBuf;  
    }  
    return nioBufferCount;  
}  


从上面可以看出:将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,
主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,
这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。

再来看移除移动字节数据

 /** 
  * Removes the fully written entries and update the reader index of the partially written entry. 
  * This operation assumes all messages in this buffer is {@link ByteBuf}. 
  
  从刷新写请求链表,移除writtenBytes个字节数 
  */  
 public void removeBytes(long writtenBytes) {  
     for (;;) {  
         Object msg = current();//获取当前写请求消息  
         if (!(msg instanceof ByteBuf)) {  
         //写请求非ByteBuf实例,且writtenBytes为0  
             assert writtenBytes == 0;  
             break;  
         }  
   
         final ByteBuf buf = (ByteBuf) msg;  
     //获取消息buf的读指针  
         final int readerIndex = buf.readerIndex();  
     //获取buf中可读的字节数  
         final int readableBytes = buf.writerIndex() - readerIndex;  
         //如果可读字节数小于,需要移除的字节数  
         if (readableBytes <= writtenBytes) {  
             if (writtenBytes != 0) {  
             //则更新写请求任务进度  
                 progress(readableBytes);  
         //更新移除字节数  
                 writtenBytes -= readableBytes;  
             }  
         //移除链头写请求消息  
             remove();  
         } else { // readableBytes > writtenBytes  
             if (writtenBytes != 0) {  
            //如果可读字节数大于需要移除的字节数,则移动消息buf的读索引到readerIndex + (int) writtenBytes位置  
                 buf.readerIndex(readerIndex + (int) writtenBytes);  
         //则更新写请求任务进度  
                 progress(writtenBytes);  
             }  
             break;  
         }  
     }  
     //最后清除nio buffer  
     clearNioBuffers();  
 }  

 /** 
  * Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written. 
  返回当前需要发送的消息 
  */  
 public Object current() {  
     Entry entry = flushedEntry;  
     if (entry == null) {  
         return null;  
     }  
   
     return entry.msg;  
 }  


 /** 
  * Notify the {@link ChannelPromise} of the current message about writing progress. 
  获取通道刷新任务的进度 
  */  
 public void progress(long amount) {  
     Entry e = flushedEntry;  
     assert e != null;  
     ChannelPromise p = e.promise;  
     if (p instanceof ChannelProgressivePromise) {  
         long progress = e.progress + amount;  
         e.progress = progress;  
         ((ChannelProgressivePromise) p).tryProgress(progress, e.total);  
     }  
 }  
 
 /** 
  * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no 
  * flushed message exists at the time this method is called it will return {@code false} to signal that no more 
  * messages are ready to be handled. 
  移除当前消息,并标记通道异步任务为成功,并返回true。如果没有刷新消息存在,则返回false,表示没有消息需要处理 
  */  
 public boolean remove() {  
     Entry e = flushedEntry;  
     if (e == null) {  
          //刷新消息链为空,则清除NioBuffer  
         clearNioBuffers();  
         return false;  
     }  
     Object msg = e.msg;  
   
     ChannelPromise promise = e.promise;  
     int size = e.pendingSize;  
     //移除写请求Entry  
     removeEntry(e);  
   
     if (!e.cancelled) {  
         // only release message, notify and decrement if it was not canceled before.  
     //写请求没有取消,则释放消息,更新任务结果,更新当前通道待发送字节数和可写状态,并触发相应的事件  
         ReferenceCountUtil.safeRelease(msg);  
         safeSuccess(promise);  
         decrementPendingOutboundBytes(size, false, true);  
     }  
     //取消,则回收  
     // recycle the entry  
     e.recycle();  
   
     return true;  
 }  


移除操作有几点要看:
1.
if (e == null) {  
     //刷新消息链为空,则清除NioBuffer  
    clearNioBuffers();  
    return false;  
}  



// Clear all ByteBuffer from the array so these can be GC'ed.  
// See https://github.com/netty/netty/issues/3837  
private void clearNioBuffers() {  
    int count = nioBufferCount;  
    if (count > 0) {  
        //重置nio buf计数器,填充线程本地nio buf数组为空。  
        nioBufferCount = 0;  
        Arrays.fill(NIO_BUFFERS.get(), 0, count, null);  
    }  
}  


2.

//移除写请求Entry  
removeEntry(e);  

private void removeEntry(Entry e) {  
    if (-- flushed == 0) {//刷新链为空  
        // processed everything  
        flushedEntry = null;  
        if (e == tailEntry) {//链尾  
            tailEntry = null;  
            unflushedEntry = null;  
        }  
    } else {  
       //否则,刷新链头往后移一位  
        flushedEntry = e.next;  
    }  
}  

从上面可以看出移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,
更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件。

从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,
如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,
继续从刷新链中的写请求消息中移除writtenBytes个字节数。

//FileRegion
/**
 * A region of a file that is sent via a {@link Channel} which supports
 * [url=http://en.wikipedia.org/wiki/Zero-copy]zero-copy file transfer[/url].
 *
 * <h3>Upgrade your JDK / JRE</h3>
 *
 * {@link FileChannel#transferTo(long, long, WritableByteChannel)} has at least
 * four known bugs in the old versions of Sun JDK and perhaps its derived ones.
 * Please upgrade your JDK to 1.6.0_18 or later version if you are going to use
 * zero-copy file transfer.
 * [list]
 * <li>[url=http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988]5103988[/url]
 *   - FileChannel.transferTo() should return -1 for EAGAIN instead throws IOException</li>
 * <li>[url=http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6253145]6253145[/url]
 *   - FileChannel.transferTo() on Linux fails when going beyond 2GB boundary</li>
 * <li>[url=http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6427312]6427312[/url]
 *   - FileChannel.transferTo() throws IOException "system call interrupted"</li>
 * <li>[url=http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6524172]6470086[/url]
 *   - FileChannel.transferTo(2147483647, 1, channel) causes "Value too large" exception</li>
 * [/list]
 *
 * <h3>Check your operating system and JDK / JRE</h3>
 *
 * If your operating system (or JDK / JRE) does not support zero-copy file
 * transfer, sending a file with {@link FileRegion} might fail or yield worse
 * performance.  For example, sending a large file doesn't work well in Windows.
 *
 * <h3>Not all transports support it</h3>
 */
public interface FileRegion extends ReferenceCounted {

    /**
     * Returns the offset in the file where the transfer began.
     */
    long position();

    /**
     * Returns the bytes which was transfered already.
     *
     * @deprecated Use {@link #transferred()} instead.
     */
    @Deprecated
    long transfered();

    /**
     * Returns the bytes which was transfered already.
     */
    long transferred();

    /**
     * Returns the number of bytes to transfer.
     */
    long count();

    /**
     * Transfers the content of this file region to the specified channel.
     *转移当前文件Region内容到通道
     * @param target    the destination of the transfer
     * @param position  the relative offset of the file where the transfer
     *                  begins from.  For example, <tt>0</tt> will make the
     *                  transfer start from {@link #position()}th byte and
     *                  <tt>{@link #count()} - 1</tt> will make the last
     *                  byte of the region transferred.
     */
    long transferTo(WritableByteChannel target, long position) throws IOException;

    @Override
    FileRegion retain();

    @Override
    FileRegion retain(int increment);

    @Override
    FileRegion touch();

    @Override
    FileRegion touch(Object hint);
}



0
0
分享到:
评论

相关推荐

    NIO netty开发

    netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty...

    netty nio 技术文档

    netty nio 封装java nio用于网络分布式数据传输。

    Java高并发编程代码(Netty NIO 实例)

    Java高并发编程代码(Netty NIO 实例)

    Netty nio protocolbuf视频课程

    包含了Netty,NIO AIO,Mina知识的详解以及netty结合spring protocolbuf的源码

    基于netty的nio使用demo源码

    基于netty的nio使用demo源码

    netty服务器解析16进制数据

    netty服务器解析16进制数据

    Java视频教程 Java游戏服务器端开发 Netty NIO AIO Mina视频教程

    jaca视频教程 jaca游戏服务器端开发 Netty NIO AIO Mina视频教程 课程目录: 一、Netty快速入门教程 01、第一课NIO 02、第二课netty服务端 03、第三课netty客户端 04、第四课netty线程模型源码分析(一) 05、...

    netty解析报文,解决粘包拆包

    注:下载前请查看本人博客文章,看是否...里面包含模拟TCP客户端发送报文工具,硬件厂商提供的协议,服务端(springboot+netty)解析报文源码,源码里整合了redis,不需要可自行删除,如有需要客户端代码,可联系我。

    netty服务器son解析

    netty服务器son解析

    Java_NIO框架Netty教程

    资源名称:Java_NIO框架Netty教程资源截图: 资源太大,传百度网盘了,链接在附件中,有需要的同学自取。

    netty示例NIO示例

    myeclipse开发通信示例,框架netty,代码本人写的,而且已测试通过,先运行NettyService,再运行NettyClient即可看到效果。nio示例也有,原理一样,运行先后顺序同netty.

    netty源码解析视频

    netty源码解析视频教程,深入浅出netty源码视频

    netty源码解析视频.txt

    mu ke 网 netty源码解析视频,你值得下载

    Netty源码解析.pdf

    netty源码解析PDF,网络编程

    Netty全套PDF

    Netty 5 全套PDF Netty 5用户指南 -Netty_in_Action(第五版-目录修正版)

    jvm、nio、netty优化使用.txt

    Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了TCP和UDP套接字服务器等网络编程。 “快速简便”并不意味着最终的应用程序将遭受可维护性或性能...

    Java Netty 版 Jt808协议解析工程

    Java Netty版完全符合JT808部标文档的开发规范。直接可以下载使用,不信拉倒。

    netty权威文档

    如何全面系统地掌握Netty,进行Netty NIO开发、Netty编解码开发、Netty多协议开发?如何通过对Netty源码的学习获得更深入地知识?掌握了Netty后,如何将其应用到实际架构中?Netty工程师的就业前景和可涉足的行业是怎样...

    java基于netty NIO的简单聊天室的实现

    主要介绍了java基于netty NIO的简单聊天室的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    t-io是基于aio(nio2)的网络编程框架和netty属于同类

    t-io是基于aio(nio2)的网络编程框架,和netty属于同类,但t-io更注重开发一线工程师的感受,提供了大量和业务相关的API。基于t-io来开发IM、TCP私有协议、RPC、游戏服务器端、推送服务、实时监控、物联网、UDP、...

Global site tag (gtag.js) - Google Analytics