`
Donald_Draper
  • 浏览: 953138 次
社区版块
存档分类
最新评论

PipeImpl解析

    博客分类:
  • NIO
nio 
阅读更多
ServerSocketChannel定义:http://donald-draper.iteye.com/blog/2369836
ServerSocketChannelImpl解析:http://donald-draper.iteye.com/blog/2370912
SocketChannelImpl 解析一(通道连接,发送数据):http://donald-draper.iteye.com/blog/2372364
SocketChannelImpl 解析二(发送数据后续):http://donald-draper.iteye.com/blog/2372548
SocketChannelImpl 解析三(接收数据):http://donald-draper.iteye.com/blog/2372590
SocketChannelImpl 解析四(关闭通道等) :http://donald-draper.iteye.com/blog/2372717
Pipe定义:http://donald-draper.iteye.com/blog/2373540
引言:
    Pipe中包含一个可写通道SinkChannel和一个可读通道SourceChannel。sink向管道写字节序序列,
source从管道读取字节序列。
我们从Pipe的open方法开始:
public static Pipe open() throws IOException {
        return SelectorProvider.provider().openPipe();
}

这里为什么是SelectorProviderImpl,前面已经说过不在说,
//SelectorProviderImpl
 public Pipe openPipe()
        throws IOException
{
    return new PipeImpl(this);
}

下面来看通道的实现,PipeImpl
package sun.nio.ch;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.security.*;
import java.util.Random;

// Referenced classes of package sun.nio.ch:
//            IOUtil, Util, SinkChannelImpl, SourceChannelImpl

class PipeImpl extends Pipe
{
    private java.nio.channels.Pipe.SourceChannel source;//Source通道
    private java.nio.channels.Pipe.SinkChannel sink;//Sink通道
    private static final Random rnd;//
    static 
    {
        //加载net和nio资源库
        Util.load();
        byte abyte0[] = new byte[8];
	//委托IOUtil,获取8个字节序列,static native boolean randomBytes(byte abyte0[]);
        boolean flag = IOUtil.randomBytes(abyte0);
        if(flag)
            rnd = new Random(ByteBuffer.wrap(abyte0).getLong());
        else
            rnd = new Random();
    }
     PipeImpl(SelectorProvider selectorprovider)
        throws IOException
    {
        try
        {
	    //在与当前线程访问控制权限的情况下,执行Initializer,权限动作,执行Initializer的run方法
            AccessController.doPrivileged(new Initializer(selectorprovider));
        }
        catch(PrivilegedActionException privilegedactionexception)
        {
            throw (IOException)privilegedactionexception.getCause();
        }
    }
    //管道初始化Action
    private class Initializer
        implements PrivilegedExceptionAction
    {
        private final SelectorProvider sp;
        static final boolean $assertionsDisabled = !sun/nio/ch/PipeImpl.desiredAssertionStatus();
        final PipeImpl this$0;
        private Initializer(SelectorProvider selectorprovider)
        {
            this$0 = PipeImpl.this;
            super();
            sp = selectorprovider;
        }
        public volatile Object run()
            throws Exception
        {
            return run();
        }
        public Void run()
            throws IOException
        {
            ServerSocketChannel serversocketchannel;//ServerSocket通道,
            SocketChannel socketchannel;//用于source通道
            SocketChannel socketchannel1;//用于Sink通道
            serversocketchannel = null;
            socketchannel = null;
            socketchannel1 = null;
            try
            {
	        //获取本地地址
                InetAddress inetaddress = InetAddress.getByName("127.0.0.1");
                if(!$assertionsDisabled && !inetaddress.isLoopbackAddress())
                    throw new AssertionError();
		//打开一个ServerSocket通道
                serversocketchannel = ServerSocketChannel.open();
		//ServerSocket通道绑定地址
                serversocketchannel.socket().bind(new InetSocketAddress(inetaddress, 0));
                InetSocketAddress inetsocketaddress = new InetSocketAddress(inetaddress, serversocketchannel.socket().getLocalPort());
                //打开一个SocketChannel通道
		socketchannel = SocketChannel.open(inetsocketaddress);
                ByteBuffer bytebuffer = ByteBuffer.allocate(8);
		//获取通道的随机long值
                long l = PipeImpl.rnd.nextLong();
                bytebuffer.putLong(l).flip();
		//向serverSocket通道发送一个long值,即8个字节
                socketchannel.write(bytebuffer);
                do
                {
		    //serverSocket接受连接
                    socketchannel1 = serversocketchannel.accept();
                    bytebuffer.clear();
		    //接受client通道端发送过来的数据
                    socketchannel1.read(bytebuffer);
                    bytebuffer.rewind();
                    if(bytebuffer.getLong() == l)
                        break;
                    socketchannel1.close();
                } while(true);
		//根据client通道,构造SourceChannelImpl
                source = new SourceChannelImpl(sp, socketchannel);
		//根据ServerChannel接受连接产生的SocketChannel通道,构造SinkChannelImpl
                sink = new SinkChannelImpl(sp, socketchannel1);
            }
            catch(IOException ioexception1)
            {
                try
                {
                    if(socketchannel != null)
                        socketchannel.close();
                    if(socketchannel1 != null)
                        socketchannel1.close();
                }
                catch(IOException ioexception2) { }
                IOException ioexception3 = new IOException("Unable to establish loopback connection");
                ioexception3.initCause(ioexception1);
                throw ioexception3;
            }
            try
            {
	        //关闭serverSocketChannle,任务完成(建立一个SocketChannle连接)
                if(serversocketchannel != null)
                    serversocketchannel.close();
            }
            catch(IOException ioexception) { }
            break MISSING_BLOCK_LABEL_277;
            Exception exception;
            exception;
            try
            {
                if(serversocketchannel != null)
                    serversocketchannel.close();
            }
            catch(IOException ioexception4) { }
            throw exception;
            return null;
        }
    }
    //返回source通道
    public java.nio.channels.Pipe.SourceChannel source()
    {
        return source;
    }
    //返回sink通道
    public java.nio.channels.Pipe.SinkChannel sink()
    {
        return sink;
    }
}

从上面可以看出PipeImpl,内部有一个Source通道SourceChannel,Sink通道SinkChannel,一个
随机数rnd(long),还有一个管道初始化Action,初始化时加载net和nio资源库,委托IOUtil产生8个字节,然后根据8个字节生成一个随机数rnd;在构造时,在与当前线程访问控制权限的情况下,执行Initializer,权限动作,执行Initializer的run方法,即通过ServerSocketChannle和SocketChannel建立一个通道连接;首先新建一个ServerSocketChannle和SocketChannel,分别绑定地址SocketChannel向ServerSocetChannel发送随机数rnd,ServerSocetChannel接受SocketChannel连接,产生一个SocketChannel1(server),SocketChannel1接受client(SocketChannel),检验与随机数rnd,相等则建立连接。然后根据SocketChannel1(server),构造Sink通道SinkChannelImpl,根据client(SocketChannel),构造Source通道SourceChannelImpl。
我们先来看SinkChannelImpl
package sun.nio.ch;

import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;

// Referenced classes of package sun.nio.ch:
//            SelChImpl, SelectionKeyImpl, SelectorImpl, SocketChannelImpl, 
//            Util

class SinkChannelImpl extends java.nio.channels.Pipe.SinkChannel
    implements SelChImpl
{
    SocketChannel sc;//关联socket通道
    public FileDescriptor getFD()
    {
        return ((SocketChannelImpl)sc).getFD();
    }

    public int getFDVal()
    {
        return ((SocketChannelImpl)sc).getFDVal();
    }

    SinkChannelImpl(SelectorProvider selectorprovider, SocketChannel socketchannel)
    {
        super(selectorprovider);
        sc = socketchannel;
    }
   //关闭通道
    protected void implCloseSelectableChannel()
        throws IOException
    {
        //通道没有注册到任何选择器
        if(!isRegistered())
            kill();
    }
   //关闭socket通道
    public void kill()
        throws IOException
    {
        sc.close();
    }
    //配置阻塞模式
    protected void implConfigureBlocking(boolean flag)
        throws IOException
    {
        sc.configureBlocking(flag);
    }
    //写字节序列
      public int write(ByteBuffer bytebuffer)
        throws IOException
    {
        return sc.write(bytebuffer);
        AsynchronousCloseException asynchronouscloseexception;
        asynchronouscloseexception;
        close();
        throw asynchronouscloseexception;
    }

    public long write(ByteBuffer abytebuffer[])
        throws IOException
    {
        return sc.write(abytebuffer);
        AsynchronousCloseException asynchronouscloseexception;
        asynchronouscloseexception;
        close();
        throw asynchronouscloseexception;
    }

    public long write(ByteBuffer abytebuffer[], int i, int j)
        throws IOException
    {
        if(i < 0 || j < 0 || i > abytebuffer.length - j)
            throw new IndexOutOfBoundsException();
        return write(Util.subsequence(abytebuffer, i, j));
        AsynchronousCloseException asynchronouscloseexception;
        asynchronouscloseexception;
        close();
        throw asynchronouscloseexception;
    }
    //设置就绪操作事件
    public boolean translateAndSetReadyOps(int i, SelectionKeyImpl selectionkeyimpl)
    {
        return translateReadyOps(i, 0, selectionkeyimpl);
    }
    //更新就绪操作事件
    public boolean translateAndUpdateReadyOps(int i, SelectionKeyImpl selectionkeyimpl)
    {
        return translateReadyOps(i, selectionkeyimpl.nioReadyOps(), selectionkeyimpl);
    }
    public boolean translateReadyOps(int i, int j, SelectionKeyImpl selectionkeyimpl)
    {
        int k = selectionkeyimpl.nioInterestOps();
        int l = selectionkeyimpl.nioReadyOps();
        int i1 = j;
	//就绪事件为读1写4连接8,接受连接事件16,不是这四种事件,则抛出Error
        if((i & 32) != 0)
            throw new Error("POLLNVAL detected");
	 //为8+16,接受连接,并建立连接,设置就绪事件k
        if((i & 24) != 0)
        {
            i1 = k;
            selectionkeyimpl.nioReadyOps(i1);
            return (i1 & ~l) != 0;
        }
        if((i & 4) != 0 && (k & 4) != 0)
            i1 |= 4;//写操作
        selectionkeyimpl.nioReadyOps(i1);
        return (i1 & ~l) != 0;
    }
    //设置兴趣操作事件
    public void translateAndSetInterestOps(int i, SelectionKeyImpl selectionkeyimpl)
    {
        if((i & 4) != 0)
            i = 4;//写事件
        selectionkeyimpl.selector.putEventOps(selectionkeyimpl, i);
    }  
}

从SinkChannelImpl,可以看出内部关联一个socket通道,SinkChannelImpl关闭通道,配置通道阻塞模式,写字节序列到管道都是委托给内部的SocketChannle。
再看SourceChannelImpl
class SourceChannelImpl extends java.nio.channels.Pipe.SourceChannel
    implements SelChImpl
{
    SocketChannel sc;
    public FileDescriptor getFD()
    {
        return ((SocketChannelImpl)sc).getFD();
    }

    public int getFDVal()
    {
        return ((SocketChannelImpl)sc).getFDVal();
    }

    SourceChannelImpl(SelectorProvider selectorprovider, SocketChannel socketchannel)
    {
        super(selectorprovider);
        sc = socketchannel;
    }
   //关闭通道
    protected void implCloseSelectableChannel()
        throws IOException
    {
        //通道没有注册到任何选择器
        if(!isRegistered())
            kill();
    }
    //关闭socket通道
    public void kill()
        throws IOException
    {
        sc.close();
    }
   //配置阻塞模式
    protected void implConfigureBlocking(boolean flag)
        throws IOException
    {
        sc.configureBlocking(flag);
    }
    //读取字节序列
    public int read(ByteBuffer bytebuffer)
        throws IOException
    {
        return sc.read(bytebuffer);
        AsynchronousCloseException asynchronouscloseexception;
        asynchronouscloseexception;
        close();
        throw asynchronouscloseexception;
    }

    public long read(ByteBuffer abytebuffer[], int i, int j)
        throws IOException
    {
        if(i < 0 || j < 0 || i > abytebuffer.length - j)
            throw new IndexOutOfBoundsException();
        return read(Util.subsequence(abytebuffer, i, j));
        AsynchronousCloseException asynchronouscloseexception;
        asynchronouscloseexception;
        close();
        throw asynchronouscloseexception;
    }

    public long read(ByteBuffer abytebuffer[])
        throws IOException
    {
        return sc.read(abytebuffer);
        AsynchronousCloseException asynchronouscloseexception;
        asynchronouscloseexception;
        close();
        throw asynchronouscloseexception;
    }
     //设置就绪操作事件
    public boolean translateAndSetReadyOps(int i, SelectionKeyImpl selectionkeyimpl)
    {
        return translateReadyOps(i, 0, selectionkeyimpl);
    }
    //更新就绪操作事件
     public boolean translateAndUpdateReadyOps(int i, SelectionKeyImpl selectionkeyimpl)
    {
        return translateReadyOps(i, selectionkeyimpl.nioReadyOps(), selectionkeyimpl);
    }
    public boolean translateReadyOps(int i, int j, SelectionKeyImpl selectionkeyimpl)
    {
        int k = selectionkeyimpl.nioInterestOps();
        int l = selectionkeyimpl.nioReadyOps();
        int i1 = j;
	//就绪事件为读1写4连接8,接受连接事件16,不是这四种事件,则抛出Error
        if((i & 32) != 0)
            throw new Error("POLLNVAL detected");
	 //为8+16,接受连接,并建立连接,设置就绪事件k
        if((i & 24) != 0)
        {
            i1 = k;
            selectionkeyimpl.nioReadyOps(i1);
            return (i1 & ~l) != 0;
        }
        if((i & 1) != 0 && (k & 1) != 0)
            i1 |= 1;//读事件
        selectionkeyimpl.nioReadyOps(i1);
        return (i1 & ~l) != 0;
    }
    //设置兴趣操作事件
    public void translateAndSetInterestOps(int i, SelectionKeyImpl selectionkeyimpl)
    {
        if((i & 1) != 0)
            i = 1;//读事件
        selectionkeyimpl.selector.putEventOps(selectionkeyimpl, i);
    }
}

从SourceChannelImpl,可以看出内部关联一个socket通道,SourceChannelImpl关闭通道,配置通道阻塞模式,从管道读取字节序列都是委托给内部的SocketChannle。
总结:
     PipeImpl,内部有一个Source通道SourceChannel,Sink通道SinkChannel,一个随机数rnd(long),还有一个管道初始化Action,初始化时加载net和nio资源库,委托IOUtil产生8个字节,然后根据8个字节生成一个随机数rnd;在构造时,在与当前线程访问控制权限的情况下,执行Initializer,权限动作,执行Initializer的run方法,即通过ServerSocketChannle和SocketChannel建立一个通道连接;首先新建一个ServerSocketChannle和SocketChannel,分别绑定地址SocketChannel向ServerSocetChannel发送随机数rnd,ServerSocetChannel接受SocketChannel连接,产生一个SocketChannel1(server),SocketChannel1接受client(SocketChannel),检验与随机数rnd,相等则建立连接。然后根据SocketChannel1(server),构造Sink通道SinkChannelImpl,根据client(SocketChannel),构造Source通道SourceChannelImpl。
    SinkChannelImpl,内部关联一个socket通道,SinkChannelImpl关闭通道,配置通道阻塞模式,写字节序列到管道都是委托给内部的SocketChannle。
    SourceChannelImpl,内部关联一个socket通道,SourceChannelImpl关闭通道,配置通道阻塞模式,从管道读取字节序列都是委托给内部的SocketChannle。
分享到:
评论

相关推荐

    PipeImpl.rar_Java编程_Unix_Linux_

    Implements java.nio.channels.Pipe for Embedded Linux.

    五子棋wuziq.zip

    五子棋游戏想必大家都非常熟悉,游戏规则十分简单。游戏开始后,玩家在游戏设置中选择人机对战,则系统执黑棋,玩家自己执白棋。双方轮流下一棋,先将横、竖或斜线的5个或5个以上同色棋子连成不间断的一排者为胜。 【项目资源】:包含前端、后端、移动开发、操作系统、人工智能、物联网、信息化管理、数据库、硬件开发、大数据、课程资源、音视频、网站开发等各种技术项目的源码。包括STM32、ESP8266、PHP、QT、Linux、iOS、C++、Java、python、web、C#、EDA、proteus、RTOS等项目的源码。 【技术】 Java、Python、Node.js、Spring Boot、Django、Express、MySQL、PostgreSQL、MongoDB、React、Angular、Vue、Bootstrap、Material-UI、Redis、Docker、Kubernetes

    单片机C语言Proteus仿真实例占空比可调模拟仿真程序

    单片机C语言Proteus仿真实例占空比可调模拟仿真程序提取方式是百度网盘分享地址

    单片机C语言Proteus仿真实例用数码管设计的可调式电子钟

    单片机C语言Proteus仿真实例用数码管设计的可调式电子钟提取方式是百度网盘分享地址

    2023年第16届中国大学生计算机设计大赛附往届获奖作品合集资料

    2023年第16届中国大学生计算机设计大赛附往届获奖作品合集资料提取方式是百度网盘分享地址

    Linux下,C语言实现五子棋程序Linux-Wuziqi.zip

    五子棋游戏想必大家都非常熟悉,游戏规则十分简单。游戏开始后,玩家在游戏设置中选择人机对战,则系统执黑棋,玩家自己执白棋。双方轮流下一棋,先将横、竖或斜线的5个或5个以上同色棋子连成不间断的一排者为胜。 【项目资源】:包含前端、后端、移动开发、操作系统、人工智能、物联网、信息化管理、数据库、硬件开发、大数据、课程资源、音视频、网站开发等各种技术项目的源码。包括STM32、ESP8266、PHP、QT、Linux、iOS、C++、Java、python、web、C#、EDA、proteus、RTOS等项目的源码。 【技术】 Java、Python、Node.js、Spring Boot、Django、Express、MySQL、PostgreSQL、MongoDB、React、Angular、Vue、Bootstrap、Material-UI、Redis、Docker、Kubernetes

    需要系数法负荷计算软件.zip

    需要系数法负荷计算软件

    变压器差动计算软件.zip

    变压器差动计算软件

    单片机C语言Proteus仿真实例数码管动态显示

    单片机C语言Proteus仿真实例数码管动态显示提取方式是百度网盘分享地址

    Python源码-小海龟之螺旋曲线.py

    Python源码-小海龟之螺旋曲线

    Python源码-图形验证码考眼力游戏.py

    Python源码-图形验证码考眼力游戏

    飞机大战项目planegame.zip

    五子棋游戏想必大家都非常熟悉,游戏规则十分简单。游戏开始后,玩家在游戏设置中选择人机对战,则系统执黑棋,玩家自己执白棋。双方轮流下一棋,先将横、竖或斜线的5个或5个以上同色棋子连成不间断的一排者为胜。 【项目资源】:包含前端、后端、移动开发、操作系统、人工智能、物联网、信息化管理、数据库、硬件开发、大数据、课程资源、音视频、网站开发等各种技术项目的源码。包括STM32、ESP8266、PHP、QT、Linux、iOS、C++、Java、python、web、C#、EDA、proteus、RTOS等项目的源码。 【技术】 Java、Python、Node.js、Spring Boot、Django、Express、MySQL、PostgreSQL、MongoDB、React、Angular、Vue、Bootstrap、Material-UI、Redis、Docker、Kubernetes

    架空送电线路导地线弧垂计算小软件.zip

    架空送电线路导地线弧垂计算小软件

    W25Q512数据手册

    W25Q512数据手册。The W25Q512JV (512M-bit) Serial Flash memory provides a storage solution for systems with limited space, pins and power. The 25Q series offers flexibility and performance well beyond ordinary Serial Flash devices. They are ideal for code shadowing to RAM, executing code directly from Dual/Quad SPI (XIP) and storing voice, text and data. The device operates on a single 2.7V to 3.6V power supply with current consumption as low as 1µA for power-down. All devices are offered in space-

    three.js应用篇(五)模型内第一视角漫游

    完整示例代码

    setuptools-1.3.1.zip

    Python库是一组预先编写的代码模块,旨在帮助开发者实现特定的编程任务,无需从零开始编写代码。这些库可以包括各种功能,如数学运算、文件操作、数据分析和网络编程等。Python社区提供了大量的第三方库,如NumPy、Pandas和Requests,极大地丰富了Python的应用领域,从数据科学到Web开发。Python库的丰富性是Python成为最受欢迎的编程语言之一的关键原因之一。这些库不仅为初学者提供了快速入门的途径,而且为经验丰富的开发者提供了强大的工具,以高效率、高质量地完成复杂任务。例如,Matplotlib和Seaborn库在数据可视化领域内非常受欢迎,它们提供了广泛的工具和技术,可以创建高度定制化的图表和图形,帮助数据科学家和分析师在数据探索和结果展示中更有效地传达信息。

    单片机C语言Proteus仿真实例数码管显示4×4键盘矩阵按键

    单片机C语言Proteus仿真实例数码管显示4×4键盘矩阵按键提取方式是百度网盘分享地址

    setuptools-46.3.0.zip

    Python库是一组预先编写的代码模块,旨在帮助开发者实现特定的编程任务,无需从零开始编写代码。这些库可以包括各种功能,如数学运算、文件操作、数据分析和网络编程等。Python社区提供了大量的第三方库,如NumPy、Pandas和Requests,极大地丰富了Python的应用领域,从数据科学到Web开发。Python库的丰富性是Python成为最受欢迎的编程语言之一的关键原因之一。这些库不仅为初学者提供了快速入门的途径,而且为经验丰富的开发者提供了强大的工具,以高效率、高质量地完成复杂任务。例如,Matplotlib和Seaborn库在数据可视化领域内非常受欢迎,它们提供了广泛的工具和技术,可以创建高度定制化的图表和图形,帮助数据科学家和分析师在数据探索和结果展示中更有效地传达信息。

    bloom filter , 递归 , 回溯 , 五子棋 , 迷宫 , 扫雷 , 贪吃蛇 , 消字 , 各种数据结构算法.zip

    五子棋游戏想必大家都非常熟悉,游戏规则十分简单。游戏开始后,玩家在游戏设置中选择人机对战,则系统执黑棋,玩家自己执白棋。双方轮流下一棋,先将横、竖或斜线的5个或5个以上同色棋子连成不间断的一排者为胜。 【项目资源】:包含前端、后端、移动开发、操作系统、人工智能、物联网、信息化管理、数据库、硬件开发、大数据、课程资源、音视频、网站开发等各种技术项目的源码。包括STM32、ESP8266、PHP、QT、Linux、iOS、C++、Java、python、web、C#、EDA、proteus、RTOS等项目的源码。 【技术】 Java、Python、Node.js、Spring Boot、Django、Express、MySQL、PostgreSQL、MongoDB、React、Angular、Vue、Bootstrap、Material-UI、Redis、Docker、Kubernetes

    附件1:证券公司及基金管理公司子公司资产证券化业务管理规定(修订稿).pdf

    附件1:证券公司及基金管理公司子公司资产证券化业务管理规定(修订稿).pdf

Global site tag (gtag.js) - Google Analytics