`
Donald_Draper
  • 浏览: 945773 次
社区版块
存档分类
最新评论

Tomcat的Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程

阅读更多
Tomcat的Server初始化及启动过程:http://donald-draper.iteye.com/blog/2327060
Tomcat的connector:http://hill007299.iteye.com/blog/1757198
阻塞队列--LinkedBlockingQueue:http://www.cnblogs.com/linjiqin/p/5128048.html
Socket处理(2) - AprEndpoint:http://blog.sina.com.cn/s/blog_53b51a980100qejl.html
在Server的启动过程中,我们看到了Connectors的初始化与启动,那时我们没有深入分析
,今天我们来看一下:
public class StandardService extends LifecycleMBeanBase implements Service {
    private static final String info =
        "org.apache.catalina.core.StandardService/1.0";
    private String name = null;
    private static final StringManager sm =
        StringManager.getManager(Constants.Package);
    private Server server = null;
    protected Connector connectors[] = new Connector[0];
    private final Object connectorsLock = new Object();
    protected ArrayList<Executor> executors = new ArrayList<Executor>();
    protected Container container = null;
    private ClassLoader parentClassLoader = null;

    protected void initInternal() throws LifecycleException {
        synchronized (connectorsLock) {
            for (Connector connector : connectors) {
                try {
		    //初始化Connector
                    connector.init();
                } 
            }
        }
    }
    protected void startInternal() throws LifecycleException {
        synchronized (connectorsLock) {
            for (Connector connector: connectors) {
                try {
                    if (connector.getState() != LifecycleState.FAILED) {
		        //启动connector
                        connector.start();
                    }
                } 
            }
        }
    }
}

//Connector
public class Connector extends LifecycleMBeanBase  {
   //关联Service
   protected Service service = null;
   /** The port number on which we listen for requests.*/
    protected int port = -1;
    protected String proxyName = null;
    protected int proxyPort = 0;
    /** The redirect port for non-SSL to SSL redirects. */
    protected int redirectPort = 443;
    protected String scheme = "http";
    //Get or Post 允许最大请求参数个数
    protected int maxParameterCount = 10000;
    //参数解析最大size,默认2MB
    protected int maxPostSize = 2 * 1024 * 1024;
    protected int maxSavePostSize = 4 * 1024;
    //需要解析Body的方法
    protected String parseBodyMethods = "POST";
    //协议处理handler Class
    protected String protocolHandlerClassName ="org.apache.coyote.http11.Http11Protocol";
    /**Coyote protocol handler.*/
    protected ProtocolHandler protocolHandler = null;
    /** Coyote adapter.*/
    protected Adapter adapter = null;
    //HTTP,Servlet API
    protected Mapper mapper = new Mapper();
    //MapperListener
    protected MapperListener mapperListener = new MapperListener(mapper, this);
    protected String URIEncoding = null;
    protected boolean useBodyEncodingForURI = false;

 //构造函数
 public Connector(String protocol) {
        setProtocol(protocol);
        // Instantiate protocol handler
        try {
            Class<?> clazz = Class.forName(protocolHandlerClassName);
            this.protocolHandler = (ProtocolHandler) clazz.newInstance();
        } 
    }
   //初始化
    protected void initInternal() throws LifecycleException {
        super.initInternal();
	//新建请求处理器代理
        adapter = new CoyoteAdapter(this);
	//设置协议处理器的Adapter
        protocolHandler.setAdapter(adapter);
        // Make sure parseBodyMethodsSet has a default
        if( null == parseBodyMethodsSet ) {
            setParseBodyMethods(getParseBodyMethods());
        }
        if (protocolHandler.isAprRequired() &&
                !AprLifecycleListener.isAprAvailable()) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerNoApr",
                            getProtocolHandlerClassName()));
        }
        try {
	    //初始化协议处理器
            protocolHandler.init();
        } 
        // Initialize mapper listener
	//初始化,注册到JMX,实际调用的是,LifecycleMBeanBase.initInternal
        mapperListener.init();
    }
}

//Http11Protocol
/**
 * Abstract the protocol implementation, including threading, etc.
 * Processor is single threaded and specific to stream-based protocols,
 * will not fit Jk protocols like JNI.
 */
public class Http11Protocol extends AbstractHttp11JsseProtocol<Socket> {

  protected Http11ConnectionHandler cHandler;
  private int disableKeepAlivePercentage = 75;
  public Http11Protocol() {
        //新建请求处理EndPoint为JIoEndpoint
        endpoint = new JIoEndpoint();
        cHandler = new Http11ConnectionHandler(this);
	//设置endpoint的handler
        ((JIoEndpoint) endpoint).setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }
}

查看Http11Protocol协议 ,Http11Protocol无init方法,再查看
AbstractHttp11JsseProtocol
public abstract class AbstractHttp11JsseProtocol<S>
        extends AbstractHttp11Protocol<S> {
   protected SSLImplementation sslImplementation = null;
    // ------------------------------------------------------- Lifecycle methods
    @Override
    public void init() throws Exception {
        // SSL implementation needs to be in place before end point is
        // initialized
	//初始化SSLImplementation
        sslImplementation = SSLImplementation.getInstance(sslImplementationName);
        super.init();
    }
}

//SSLImplementation
public abstract class SSLImplementation {
    private static final String[] implementations = { JSSEImplementationClass };
    //获取SSL实现类
    public static SSLImplementation getInstance(String className)
            throws ClassNotFoundException {
        if (className == null)
            return getInstance();

        try {
            // Workaround for the J2SE 1.4.x classloading problem (under
            // Solaris).
            // Class.forName(..) fails without creating class using new.
            // This is an ugly workaround.
            if (JSSEImplementationClass.equals(className)) {
                return new org.apache.tomcat.util.net.jsse.JSSEImplementation();
            }
            Class<?> clazz = Class.forName(className);
            return (SSLImplementation) clazz.newInstance();
        }
    }
     public static SSLImplementation getInstance() throws ClassNotFoundException {
        for (int i = 0; i < implementations.length; i++) {
            try {
                SSLImplementation impl = getInstance(implementations[i]);
                return impl;
            } 
        }
    }
    }

分析AbstractHttp11JsseProtocol初始化,首先获取SSLImplementation实例
,调用父类的init
查看AbstractHttp11Protocol,无初始化方法
public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
    @Override
    protected String getProtocolName() {
        return "Http";
    }
}

查看AbstractProtocol
public abstract class AbstractProtocol<S> implements ProtocolHandler,
        MBeanRegistration {
    //Name of MBean for the Global Request Processor.
    protected ObjectName rgOname = null;
    /** Name of MBean for the ThreadPool.*/
    protected ObjectName tpOname = null;
    /**
     * Unique ID for this connector. Only used if the connector is configured
     * to use a random port as the port will change if stop(), start() is
     * called.
     */
    private int nameIndex = 0;
    /**
     * Endpoint that provides low-level network I/O - must be matched to the
     * ProtocolHandler implementation (ProtocolHandler using BIO, requires BIO
     * Endpoint etc.).
     */
    protected AbstractEndpoint<S> endpoint = null;
    /**
     * The adapter provides the link between the ProtocolHandler and the
     * connector.
     */
    protected Adapter adapter;
    // ------------------------------------------------------- Lifecycle methods

    /*
     * NOTE: There is no maintenance of state or checking for valid transitions
     * within this class. It is expected that the connector will maintain state
     * and prevent invalid state transitions.
     */

    @Override
    public void init() throws Exception {
        //初始化Global Request Processor Mbean,并注册到JMX
        if (oname == null) {
            // Component not pre-registered so register it
            oname = createObjectName();
            if (oname != null) {
                Registry.getRegistry(null, null).registerComponent(this, oname,
                    null);
            }
        }
        //初始化MBean for the ThreadPool,并注册到JMX
        if (this.domain != null) {
            try {
                tpOname = new ObjectName(domain + ":" +
                        "type=ThreadPool,name=" + getName());
                Registry.getRegistry(null, null).registerComponent(endpoint,
                        tpOname, null);
               } 
            }
            rgOname=new ObjectName(domain +
                    ":type=GlobalRequestProcessor,name=" + getName());
            Registry.getRegistry(null, null).registerComponent(
                    getHandler().getGlobal(), rgOname, null );
        }
        String endpointName = getName();
        endpoint.setName(endpointName.substring(1, endpointName.length()-1));
        try {
	    //网络IO Handler初始化
            endpoint.init();
        }
    }
    //启动
    @Override
    public void start() throws Exception {
        try {
	 //启动endPoint
            endpoint.start();
        } 
    }
}

AbstractProtocol初始化最重要的一点是网络IO Handler初始化,下面
我们来看AbstractEndpoint的初始化:
public abstract class AbstractEndpoint<S> {
public static interface Handler {
        /**
         * Different types of socket states to react upon.
         */
        public enum SocketState {
            // TODO Add a new state to the AsyncStateMachine and remove
            //      ASYNC_END (if possible)
            OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING_TOMCAT,
            UPGRADING, UPGRADED
        }
 }
 //绑定状态枚举
   protected enum BindState {
        UNBOUND, BOUND_ON_INIT, BOUND_ON_START
    }
   //http请求接受器Acceptor
    public abstract static class Acceptor implements Runnable {
        public enum AcceptorState {
            NEW, RUNNING, PAUSED, ENDED
        }

        protected volatile AcceptorState state = AcceptorState.NEW;
        public final AcceptorState getState() {
            return state;
        }
}
    /**
     * Threads used to accept new connections and pass them to worker threads.
     */
    protected Acceptor[] acceptors;
    /**
     * External Executor based thread pool.
     */
    private Executor executor = null;
    /**
     * Server socket port.
     */
    private int port;
    /**
     * Address for the server socket.
     */
    private InetAddress address;
    /**
     * Controls when the Endpoint binds the port. <code>true</code>, the default
     * binds the port on {@link #init()} and unbinds it on {@link #destroy()}.
     * If set to <code>false</code> the port is bound on {@link #start()} and
     * unbound on {@link #stop()}.
     */
    private boolean bindOnInit = true;
     private BindState bindState = BindState.UNBOUND;

    /**
     * Keepalive timeout, if not set the soTimeout is used.
     */
    private Integer keepAliveTimeout = null;
    /**
     * SSL engine.
     */
    private boolean SSLEnabled = false;
    /**
     * Socket timeout.
     */
    public int getSoTimeout() { return socketProperties.getSoTimeout(); }
    /**
     * The default is true - the created threads will be
     *  in daemon mode. If set to false, the control thread
     *  will not be daemon - and will keep the process alive.
     */
    private boolean daemon = true;
    
    /**
     * Priority of the worker threads.
     */
    protected int threadPriority = Thread.NORM_PRIORITY;
    /*
     * NOTE: There is no maintenance of state or checking for valid transitions
     * within this class other than ensuring that bind/unbind are called in the
     * right place. It is expected that the calling code will maintain state and
     * prevent invalid state transitions.
     */
    //看到这几个函数,是否有种似曾相识的感觉
    public abstract void bind() throws Exception;
    public abstract void unbind() throws Exception;
    public abstract void startInternal() throws Exception;
    public abstract void stopInternal() throws Exception;
    //初始化
    public final void init() throws Exception {
        testServerCipherSuitesOrderSupport();
	//bindOnInit默认为true,
        if (bindOnInit) {
	    //而bind为抽象函数,待子类扩展
            bind();
            bindState = BindState.BOUND_ON_INIT;
        }
    }
    //启动
     public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
	 //而startInternal为抽象函数,待子类扩展
        startInternal();
    }
}

而AbstractEndpoint有JIoEndpoint,NioEndpoint,AprEndpoint子类实现
在serverx.xml中有这么一段:
<!-- A "Connector" represents an endpoint by which requests are received
         and responses are returned. Documentation at :
         Java HTTP Connector: /docs/config/http.html (blocking & non-blocking)
         Java AJP  Connector: /docs/config/ajp.html
         APR (HTTP/AJP) Connector: /docs/apr.html
         Define a non-SSL HTTP/1.1 Connector on port 8080
    -->
    <Connector port="8080" protocol="HTTP/1.1" 
               connectionTimeout="20000" 
               redirectPort="8443" />

JIoEndpoint,基于java bio实现,特点是每建立一个连接分配一个线程,读数据阻塞。
NioEndpoint,使用java nio实现,使用反应器模式,线程和连接解绑,多路复用。
AprEndpoint,使用Apache Portable Runtime实现,直接调用native方法,有更高的效率,但是实现依赖具体平台。
调试追踪Tomcat7.0.70发现如下图:



这里我们来看一下AprEndpoint
/**
 * APR tailored thread pool, providing the following services:
 * [list]
 * [*]Socket acceptor thread

 * [*]Socket poller thread

 * [*]Sendfile thread

 * [*]Worker threads pool

 * [/list]
 */
public class AprEndpoint extends AbstractEndpoint<Long> {
    protected static final Set<String> SSL_PROTO_ALL = new HashSet<String>();
    static {
        /* Default used if SSLProtocol is not configured, also
           used if SSLProtocol="All" */
        SSL_PROTO_ALL.add(Constants.SSL_PROTO_TLSv1);
        SSL_PROTO_ALL.add(Constants.SSL_PROTO_TLSv1_1);
        SSL_PROTO_ALL.add(Constants.SSL_PROTO_TLSv1_2);
    }
    /**
     * Root APR memory pool.
     */
    protected long rootPool = 0;


    /**
     * Server socket "pointer".
     */
    protected long serverSock = 0;


    /**
     * APR memory pool for the server socket.
     */
    protected long serverSockPool = 0;


    /**
     * SSL context.
     */
    protected long sslContext = 0;
    private final Map<Long,AprSocketWrapper> connections =
            new ConcurrentHashMap<Long, AprSocketWrapper>();
    // ------------------------------------------------------------ Constructor

    public AprEndpoint() {
        // Need to override the default for maxConnections to align it with what
        // was pollerSize (before the two were merged)
        setMaxConnections(8 * 1024);
    }
    private static class AprSocketWrapper extends SocketWrapper<Long> {

        // This field should only be used by Poller#run()
        private int pollerFlags = 0;

        public AprSocketWrapper(Long socket) {
            super(socket);
        }
    }
    /**
     * Handling of accepted sockets.
     */
    protected Handler handler = null;
     /**
     * Poll interval, in microseconds. The smaller the value, the more CPU the poller
     * will use, but the more responsive to activity it will be.
     */
    protected int pollTime = 2000;
    /**
     * Allow comet request handling.
     */
    protected boolean useComet = true;
    /**
     * The socket poller.
     */
    protected Poller poller = null;
    /**
     * SSL protocols.
     */
    protected String SSLProtocol = "all";
    /**
     * SSL password (if a cert is encrypted, and no password has been provided, a callback
     * will ask for a password).
     */
    protected String SSLPassword = null;
    /**
     * SSL certificate file.
     */
    protected String SSLCertificateFile = null;
    /**
     * SSL certificate key file.
     */
    protected String SSLCertificateKeyFile = null;
     /**
     * SSL CA certificate path.
     */
    protected String SSLCACertificatePath = null;
     /**
     * SSL CA certificate file.
     */
    protected String SSLCACertificateFile = null;
   //初始化EndPoint,所有关键工作都在这里
    @Override
    public void bind() throws Exception {
    // Create the root APR memory pool
    try {
          //调用本地方法,创建apr内存池
          rootPool = Pool.create(0);
     }
     // Create the pool for the server socket
        serverSockPool = Pool.create(rootPool);
        // Create the APR address that will be bound
        String addressStr = null;
        if (getAddress() != null) {
            addressStr = getAddress().getHostAddress();
        }
        int family = Socket.APR_INET;
        if (Library.APR_HAVE_IPV6) {
            if (addressStr == null) {
                if (!OS.IS_BSD && !OS.IS_WIN32 && !OS.IS_WIN64)
                    family = Socket.APR_UNSPEC;
            } else if (addressStr.indexOf(':') >= 0) {
                family = Socket.APR_UNSPEC;
            }
         }
        long inetAddress = Address.info(addressStr, family,
                getPort(), 0, rootPool);
        // Create the APR server socket
        serverSock = Socket.create(Address.getInfo(inetAddress).family,
                Socket.SOCK_STREAM,
                Socket.APR_PROTO_TCP, rootPool);
        if (OS.IS_UNIX) {
            Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
        }
        // Deal with the firewalls that tend to drop the inactive sockets
        Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1);
        //server socket地址绑定
        int ret = Socket.bind(serverSock, inetAddress);
        // Start listening on the server socket
        ret = Socket.listen(serverSock, getBacklog());
        if (OS.IS_WIN32 || OS.IS_WIN64) {
            // On Windows set the reuseaddr flag after the bind/listen
            Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
        }
        // Sendfile usage on systems which don't support it cause major problems
        if (useSendfile && !Library.APR_HAS_SENDFILE) {
            useSendfile = false;
        }
        // Initialize thread count default for acceptor
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        // Delay accepting of new connections until data is available
        // Only Linux kernels 2.4 + have that implemented
        // on other platforms this call is noop and will return APR_ENOTIMPL.
        if (deferAccept) {
            if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1) == Status.APR_ENOTIMPL) {
                deferAccept = false;
            }
        }

        //如果SSL开启,这SSL上下文
        if (isSSLEnabled()) {
            if (SSLCertificateFile == null) {
                // This is required
                throw new Exception(sm.getString("endpoint.apr.noSslCertFile"));
            }
            // SSL protocol
            int value = SSL.SSL_PROTOCOL_NONE;
            if (SSLProtocol == null || SSLProtocol.length() == 0) {
                value = SSL.SSL_PROTOCOL_ALL;
            } else {

                Set<String> protocols = new HashSet<String>();

                // List of protocol names, separated by "+" or "-".
                // Semantics is adding ("+") or removing ("-") from left
                // to right, starting with an empty protocol set.
                // Tokens are individual protocol names or "all" for a
                // default set of supported protocols.

                // Split using a positive lookahead to keep the separator in
                // the capture so we can check which case it is.
                for (String protocol : SSLProtocol.split("(?=[-+])")) {
                    String trimmed = protocol.trim();
                    // Ignore token which only consists of prefix character
                    if (trimmed.length() > 1) {
                        if (trimmed.charAt(0) == '-') {
                            trimmed = trimmed.substring(1).trim();
                            if (trimmed.equalsIgnoreCase(Constants.SSL_PROTO_ALL)) {
                                protocols.removeAll(SSL_PROTO_ALL);
                            } else {
                                protocols.remove(trimmed);
                            }
                        } else {
                            if (trimmed.charAt(0) == '+') {
                                trimmed = trimmed.substring(1).trim();
                            }
                            if (trimmed.equalsIgnoreCase(Constants.SSL_PROTO_ALL)) {
                                protocols.addAll(SSL_PROTO_ALL);
                            } else {
                                protocols.add(trimmed);
                            }
                        }
                    }
                }
                for (String protocol : protocols) {
                    if (Constants.SSL_PROTO_SSLv2.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_SSLV2;
                    } else if (Constants.SSL_PROTO_SSLv3.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_SSLV3;
                    } else if (Constants.SSL_PROTO_TLSv1.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_TLSV1;
                    } else if (Constants.SSL_PROTO_TLSv1_1.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_TLSV1_1;
                    } else if (Constants.SSL_PROTO_TLSv1_2.equalsIgnoreCase(protocol)) {
                        value |= SSL.SSL_PROTOCOL_TLSV1_2;
                    } else {
                        // Protocol not recognized, fail to start as it is safer than
                        // continuing with the default which might enable more than the
                        // is required
                        throw new Exception(sm.getString(
                                "endpoint.apr.invalidSslProtocol", SSLProtocol));
                    }
                }
            }

            // Create SSL Context
            try {
                sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER);
            }
            if (SSLInsecureRenegotiation) {
                boolean legacyRenegSupported = false;
                try {
                    legacyRenegSupported = SSL.hasOp(SSL.SSL_OP_ALLOW_UNSAFE_LEGACY_RENEGOTIATION);
                    if (legacyRenegSupported)
                        SSLContext.setOptions(sslContext, SSL.SSL_OP_ALLOW_UNSAFE_LEGACY_RENEGOTIATION);
                } 
            }

            // Set cipher order: client (default) or server
            if (SSLHonorCipherOrder) {
                boolean orderCiphersSupported = false;
                try {
                    orderCiphersSupported = SSL.hasOp(SSL.SSL_OP_CIPHER_SERVER_PREFERENCE);
                    if (orderCiphersSupported)
                        SSLContext.setOptions(sslContext, SSL.SSL_OP_CIPHER_SERVER_PREFERENCE);
                } 
            }
            // Disable compression if requested
            if (SSLDisableCompression) {
                boolean disableCompressionSupported = false;
                try {
                    disableCompressionSupported = SSL.hasOp(SSL.SSL_OP_NO_COMPRESSION);
                    if (disableCompressionSupported)
                        SSLContext.setOptions(sslContext, SSL.SSL_OP_NO_COMPRESSION);
                } 
            }

            // List the ciphers that the client is permitted to negotiate
            SSLContext.setCipherSuite(sslContext, SSLCipherSuite);
            // Load Server key and certificate
            SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA);
            // Set certificate chain file
            SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false);
            // Support Client Certificates
            SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath);
            // Set revocation
            SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath);
            // Client certificate verification
            value = SSL.SSL_CVERIFY_NONE;
            if ("optional".equalsIgnoreCase(SSLVerifyClient)) {
                value = SSL.SSL_CVERIFY_OPTIONAL;
            } else if ("require".equalsIgnoreCase(SSLVerifyClient)) {
                value = SSL.SSL_CVERIFY_REQUIRE;
            } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {
                value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;
            }
            SSLContext.setVerify(sslContext, value, SSLVerifyDepth);
            // For now, sendfile is not supported with SSL
            useSendfile = false;
        }
    }
    /**
     * Start the APR endpoint, creating acceptor, poller and sendfile threads.
     */
    @Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if (getExecutor() == null) {
	       //AbstractEndpoint创建线程执行器,
                createExecutor();
            }
            //初始化连接共享锁
            initializeConnectionLatch();

            // Start poller thread
            //bind监听端口地址,并监听处理HTTP(TCP)请求,初始化SSL
            poller = new Poller();
            poller.init();
            Thread pollerThread = new Thread(poller, getName() + "-Poller");
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();

            // Start sendfile thread
            if (useSendfile) {
                sendfile = new Sendfile();
                sendfile.init();
                Thread sendfileThread =
                        new Thread(sendfile, getName() + "-Sendfile");
                sendfileThread.setPriority(threadPriority);
                sendfileThread.setDaemon(true);
                sendfileThread.start();
            }
	    
            startAcceptorThreads();
            // Start async timeout thread
            asyncTimeout = new AsyncTimeout();
            Thread timeoutThread = new Thread(asyncTimeout,
                    getName() + "-AsyncTimeout");
            timeoutThread.setPriority(threadPriority);
            timeoutThread.setDaemon(true);
            timeoutThread.start();
        }
 }

查看AbstractEndpoint
    //创建线程执行器
   
public void createExecutor() {
        internalExecutor = true;
	//新建任务队列
        TaskQueue taskqueue = new TaskQueue();
	//新建任务线程工程
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        //初始化线程执行器
	executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

//查看TaskQueue
/**
 * As task queue specifically designed to run with a thread pool executor.
 * The task queue is optimised to properly utilize threads within
 * a thread pool executor. If you use a normal queue, the executor will spawn threads
 * when there are idle threads and you wont be able to force items unto the queue itself
 *
 */
TaskQueue继承了LinkedBlockingQueue
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private ThreadPoolExecutor parent = null;
    // no need to be volatile, the one times when we change and read it occur in
    // a single thread (the one that did stop a context and fired listeners)
    private Integer forcedRemainingCapacity = null;
    @Override
    //如果可能的话,将Runnable加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }
    @Override
    //获取并移除此队列的头Runnable,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            // the poll timed out, it gives an opportunity to stop the current
            // thread if needed to avoid memory leaks.
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    @Override
    //获取BlockingQueue里排在首位的对象Runnable,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止
    public Runnable take() throws InterruptedException {
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
            // yes, this may return null (in case of timeout) which normally
            // does not occur with take()
            // but the ThreadPoolExecutor implementation allows this
        }
        return super.take();
    }
 }

//查看TaskThreadFactory
/**
 * Simple task thread factory to use to create threads for an executor implementation.
 */
public class TaskThreadFactory implements ThreadFactory {
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private final boolean daemon;
    private final int threadPriority;
    public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.threadPriority = priority;
    }

    @Override
    public Thread newThread(Runnable r) {
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
        t.setDaemon(daemon);
        t.setPriority(threadPriority);
        return t;
    }
}

查看ThreadPoolExecutor
ThreadPoolExecutor继承了java.util.concurrent.ThreadPoolExecutor
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {}
查看Poller线程的初始化和启动,
Poller类主要负责poll传入的socket连接(提供add(),由其他线程传入),如果发现有事件,交给AjpConnectionHandler处理。
// Start poller thread
poller = new Poller();
poller.init();
Thread pollerThread = new Thread(poller, getName() + "-Poller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();

Poller是AprEndPointer的内部类
// ------------------------------------------------------ Poller Inner Class
   public class Poller implements Runnable {
        /** Pointers to the pollers.*/
        protected long[] pollers = null;
        /** Actual poller size. */
        protected int actualPollerSize = 0;
        /** Amount of spots left in the poller. */
        protected int[] pollerSpace = null;
        /** Amount of low level pollers in use by this poller.*/
        protected int pollerCount;
        /** Timeout value for the poll call. */
        protected int pollerTime;
        /**
         * Variable poller timeout that adjusts depending on how many poll sets
         * are in use so that the total poll time across all poll sets remains
         * equal to pollTime.
         */
        private int nextPollerTime;
        /** Root pool. */
        protected long pool = 0;
        /** Socket descriptors.*/
        protected long[] desc;
        /** List of sockets to be added to the poller.*/
        protected SocketList addList = null; 
        /** List of sockets to be closed.*/
        private SocketList closeList = null;
        /**
         * Structure used for storing timeouts.
         */
        protected SocketTimeouts timeouts = null;
        /**
         * Last run of maintain. Maintain will run approximately once every one
         * second (may be slightly longer between runs).
         */
        protected long lastMaintain = System.currentTimeMillis();
        /**
         * The number of connections currently inside this Poller. The correct
         * operation of the Poller depends on this figure being correct. If it
         * is not, it is possible that the Poller will enter a wait loop where
         * it waits for the next connection to be added to the Poller before it
         * calls poll when it should still be polling existing connections.
         * Although not necessary at the time of writing this comment, it has
         * been implemented as an AtomicInteger to ensure that it remains
         * thread-safe.
         */
        private AtomicInteger connectionCount = new AtomicInteger(0);
        public int getConnectionCount() { return connectionCount.get(); }
        private volatile boolean pollerRunning = true;
        /**
         * Create the poller. With some versions of APR, the maximum poller size
         * will be 62 (recompiling APR is necessary to remove this limitation).
         */
	 //初始化Poller
        protected void init() {
            pool = Pool.create(serverSockPool);
            // Single poller by default
            int defaultPollerSize = getMaxConnections();
            if ((OS.IS_WIN32 || OS.IS_WIN64) && (defaultPollerSize > 1024)) {
                // The maximum per poller to get reasonable performance is 1024
                // Adjust poller size so that it won't reach the limit. This is
                // a limitation of XP / Server 2003 that has been fixed in
                // Vista / Server 2008 onwards.
                actualPollerSize = 1024;
            } else {
                actualPollerSize = defaultPollerSize;
            }

            timeouts = new SocketTimeouts(defaultPollerSize);

            // At the moment, setting the timeout is useless, but it could get
            // used again as the normal poller could be faster using maintain.
            // It might not be worth bothering though.
            long pollset = allocatePoller(actualPollerSize, pool, -1);
            if (pollset == 0 && actualPollerSize > 1024) {
                actualPollerSize = 1024;
                pollset = allocatePoller(actualPollerSize, pool, -1);
            }
            if (pollset == 0) {
                actualPollerSize = 62;
                pollset = allocatePoller(actualPollerSize, pool, -1);
            }

            pollerCount = defaultPollerSize / actualPollerSize;
            pollerTime = pollTime / pollerCount;
            nextPollerTime = pollerTime;

            pollers = new long[pollerCount];
            pollers[0] = pollset;
            for (int i = 1; i < pollerCount; i++) {
                pollers[i] = allocatePoller(actualPollerSize, pool, -1);
            }
            pollerSpace = new int[pollerCount];
            for (int i = 0; i < pollerCount; i++) {
                pollerSpace[i] = actualPollerSize;
            }
            desc = new long[actualPollerSize * 2];
            connectionCount.set(0);
            addList = new SocketList(defaultPollerSize);
            closeList = new SocketList(defaultPollerSize);
        }
         /**
         * The background thread that listens for incoming TCP/IP connections
         * and hands them off to an appropriate processor.
         */
	 //监听TCP/IP连接,并交给相应handler处理
        @Override
        public void run() {
		// Poll for the specified interval
                    for (int i = 0; i < pollers.length; i++) {
		         for (int n = 0; n < rv; n++) {
                                long timeout = timeouts.remove(desc[n*2+1]);
                                AprSocketWrapper wrapper = connections.get(
                                        Long.valueOf(desc[n*2+1]));
                                if (getLog().isDebugEnabled()) {
                                    log.debug(sm.getString(
                                            "endpoint.debug.pollerProcess",
                                            Long.valueOf(desc[n*2+1]),
                                            Long.valueOf(desc[n*2])));
                                }
                                wrapper.pollerFlags = wrapper.pollerFlags & ~((int) desc[n*2]);
                                // Check for failed sockets and hand this socket off to a worker
				//处理请求
                                if (wrapper.isComet()) {
                                    // Event processes either a read or a write depending on what the poller returns
                                    //APR_POLLHUP
				    if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
                                            || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
                                            || ((desc[n*2] & Poll.APR_POLLNVAL) == Poll.APR_POLLNVAL)) {
                                        if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) {
                                            // Close socket and clear pool
                                            closeSocket(desc[n*2+1]);
                                        }
					//APR_POLLIN
                                    } else if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) {
                                        if (wrapper.pollerFlags != 0) {
                                            add(desc[n*2+1], 1, wrapper.pollerFlags);
                                        }
					//OPEN_READ
                                        if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) {
                                            // Close socket and clear pool
                                            closeSocket(desc[n*2+1]);
                                        }
					//APR_POLLOUT
                                    } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
                                        if (wrapper.pollerFlags != 0) {
                                            add(desc[n*2+1], 1, wrapper.pollerFlags);
                                        }
                                        if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) {
                                            // Close socket and clear pool
                                            closeSocket(desc[n*2+1]);
                                        }
                                    } else {
                                        // Unknown event
                                        getLog().warn(sm.getString(
                                                "endpoint.apr.pollUnknownEvent",
                                                Long.valueOf(desc[n*2])));
                                        if (!processSocket(desc[n*2+1], SocketStatus.ERROR)) {
                                            // Close socket and clear pool
                                            closeSocket(desc[n*2+1]);
                                        }
                                    }
                                }
		    }
		    synchronized (this) {
                this.notifyAll();
            }
        }
    /**
     * Process given socket. Called in non-comet mode, typically keep alive
     * or upgraded protocol.
     */
    //处理socket事件
    public boolean processSocket(long socket, SocketStatus status) {
        try {
            Executor executor = getExecutor();
            if (executor == null) {
                log.warn(sm.getString("endpoint.warn.noExector",
                        Long.valueOf(socket), null));
            } else {
                SocketWrapper<Long> wrapper =
                        connections.get(Long.valueOf(socket));
                // Make sure connection hasn't been closed
                if (wrapper != null) {
		    //创建SocketProcessor处理线程,并执行
                    executor.execute(new SocketProcessor(wrapper, status));
                }
            }
        } 
        return true;
    }
    }

来看SocketProcessor
// -------------------------------------------- SocketProcessor Inner Class
    /**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     */
    protected class SocketProcessor implements Runnable {

        private final SocketWrapper<Long> socket;
        private final SocketStatus status;
        public SocketProcessor(SocketWrapper<Long> socket,
                SocketStatus status) {
            this.socket = socket;
            if (status == null) {
                // Should never happen
                throw new NullPointerException();
            }
            this.status = status;
        }
        @Override
        public void run() {

            // Upgraded connections need to allow multiple threads to access the
            // connection at the same time to enable blocking IO to be used when
            // Servlet 3.1 NIO has been configured
            if (socket.isUpgraded() && SocketStatus.OPEN_WRITE == status) {
                synchronized (socket.getWriteThreadLock()) {
                    doRun();
                }
            } else {
                synchronized (socket) {
                    doRun();
                }
            }
        }
        private void doRun() {
            // Process the request from this socket
            if (socket.getSocket() == null) {
                // Closed in another thread
                return;
            }
	    //handler处理请求
            SocketState state = handler.process(socket, status);
            if (state == Handler.SocketState.CLOSED) {
                // Close socket and pool
                closeSocket(socket.getSocket().longValue());
                socket.socket = null;
            } else if (state == Handler.SocketState.LONG) {
                socket.access();
                if (socket.async) {
                    waitingRequests.add(socket);
                }
            } else if (state == Handler.SocketState.ASYNC_END) {
                socket.access();
                SocketProcessor proc = new SocketProcessor(socket,
                        SocketStatus.OPEN_READ);
                getExecutor().execute(proc);
            }
        }
    }

根据handler.process(socket, status),我们去查看handler是如何处理请求的在Http11Protocol的
构造方法中可以看到handler就是Http11ConnectionHandler
public Http11Protocol() {
        //新建请求处理EndPoint为JIoEndpoint
        endpoint = new JIoEndpoint();
        cHandler = new Http11ConnectionHandler(this);
	//设置endpoint的handler
        ((JIoEndpoint) endpoint).setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }

查看Http11ConnectionHandler是Http11Protocol的内部类
// -----------------------------------  Http11ConnectionHandler Inner Class
    protected static class Http11ConnectionHandler
            extends AbstractConnectionHandler<Socket, Http11Processor> implements Handler {
	 }

而Http11ConnectionHandler没有process(socket, status),这可方法定已在Handler在,是一个抽象方法
查看AbstractConnectionHandler,为AbstractProtocol的内部类
// ------------------------------------------- Connection handler base class
    
    protected abstract static class AbstractConnectionHandler<S,P extends Processor<S>>
            implements AbstractEndpoint.Handler {
	@SuppressWarnings("deprecation") 
	// Old HTTP upgrade method has been deprecated
	//处理HTTP请求
        public SocketState process(SocketWrapper<S> wrapper,SocketStatus status) {
            if (wrapper == null) {
                // Nothing to do. Socket has been closed.
                return SocketState.CLOSED;
            }
            S socket = wrapper.getSocket();
            if (socket == null) {
                // Nothing to do. Socket has been closed.
                return SocketState.CLOSED;
            }
            Processor<S> processor = connections.get(socket);
            if (status == SocketStatus.DISCONNECT && processor == null) {
                // Nothing to do. Endpoint requested a close and there is no
                // longer a processor associated with this socket.
                return SocketState.CLOSED;
            }
            wrapper.setAsync(false);
            ContainerThreadMarker.markAsContainerThread();
            try {
                if (processor == null) {
                    processor = recycledProcessors.poll();
                }
                if (processor == null) {
		    //创建处理器
                    processor = createProcessor();
                }
                //初始化SSL,在子类中扩展
                initSsl(wrapper, processor);
                SocketState state = SocketState.CLOSED;
                do {
                    if (status == SocketStatus.CLOSE_NOW) {
                        processor.errorDispatch();
                        state = SocketState.CLOSED;
                    } else if (status == SocketStatus.DISCONNECT &&
                            !processor.isComet()) {
                        // Do nothing here, just wait for it to get recycled
                        // Don't do this for Comet we need to generate an end
                        // event (see BZ 54022)
                    } else if (processor.isAsync() || state == SocketState.ASYNC_END) {
                        state = processor.asyncDispatch(status);
                        if (state == SocketState.OPEN) {
                            // release() won't get called so in case this request
                            // takes a long time to process, remove the socket from
                            // the waiting requests now else the async timeout will
                            // fire
                            getProtocol().endpoint.removeWaitingRequest(wrapper);
                            // There may be pipe-lined data to read. If the data
                            // isn't processed now, execution will exit this
                            // loop and call release() which will recycle the
                            // processor (and input buffer) deleting any
                            // pipe-lined data. To avoid this, process it now.
                            state = processor.process(wrapper);
                        }
                    } else if (processor.isComet()) {
                        state = processor.event(status);
                    } {
                        state = processor.process(wrapper);
                    }
        }
	//带子类扩展
	 protected abstract P createProcessor();
        protected abstract void initSsl(SocketWrapper<S> socket,
                Processor<S> processor);
        protected abstract void longPoll(SocketWrapper<S> socket,
                Processor<S> processor);
        protected abstract void release(SocketWrapper<S> socket,
                Processor<S> processor, boolean socketClosing,
                boolean addToPoller);
}
查看Http11ConnectionHandler,createProcessor
	 @Override
        protected Http11AprProcessor createProcessor() {
            Http11AprProcessor processor = new Http11AprProcessor(
                    proto.getMaxHttpHeaderSize(), (AprEndpoint)proto.endpoint,
                    proto.getMaxTrailerSize(), proto.getAllowedTrailerHeadersAsSet(),
                    proto.getMaxExtensionSize(), proto.getMaxSwallowSize());
	   //设置处理器适配器,在connector构造函数中,
	   //adapter = new CoyoteAdapter(this);
           //protocolHandler.setAdapter(adapter);
	   //有这么两句
            processor.setAdapter(proto.adapter);
            processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
            processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
            processor.setConnectionUploadTimeout(
                    proto.getConnectionUploadTimeout());
            processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
            processor.setCompressionMinSize(proto.getCompressionMinSize());
            processor.setCompression(proto.getCompression());
            processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
            processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
            processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
            processor.setSocketBuffer(proto.getSocketBuffer());
            processor.setMaxSavePostSize(proto.getMaxSavePostSize());
            processor.setServer(proto.getServer());
            processor.setClientCertProvider(proto.getClientCertProvider());
            register(processor);
            return processor;
        }

查看Http11AprProcessor
public class Http11AprProcessor extends AbstractHttp11Processor<Long> {}

无处理函数process(socket, status)
查看AbstractHttp11Processor
public abstract class AbstractHttp11Processor<S> extends AbstractProcessor<S> {
/**
     * Process pipelined HTTP requests using the specified input and output
     * streams.
     * @param socketWrapper Socket from which the HTTP requests will be read
     *               and the HTTP responses will be written.
     * @throws IOException error during an I/O operation
     */
     //处理HTTP请求
    @Override
    public SocketState process(SocketWrapper<S> socketWrapper)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Setting up the I/O
        setSocketWrapper(socketWrapper);
        getInputBuffer().init(socketWrapper, endpoint);
        getOutputBuffer().init(socketWrapper, endpoint);
	 while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
                upgradeInbound == null &&
                httpUpgradeHandler == null && !endpoint.isPaused()) {

            // Parsing the request header
            try {
                setRequestLineReadTimeout();
	       //503
               if (endpoint.isPaused()) {
                    // 503 - Service unavailable
                    response.setStatus(503);
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                } else {
                    keptAlive = true;
                    // Set this every time in case limit has been changed via JMX
                    request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                    // Currently only NIO will ever return false here
                    if (!getInputBuffer().parseHeaders()) {
                        // We've read part of the request, don't recycle it
                        // instead associate it with the socket
                        openSocket = true;
                        readComplete = false;
                        break;
                    }
                    if (!disableUploadTimeout) {
                        setSocketTimeout(connectionUploadTimeout);
                    }
                }
		catch (Throwable t) {
                // 400 - Bad Request
                response.setStatus(400);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
	    if (!getErrorState().isError()) {
                // Setting up filters, and parse some request headers
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                try {
		    //请求预处理
                    prepareRequest();
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                    getAdapter().log(request, response, 0);
                }
            }
           //503,400,500,看到这些数字想到了什么?
	   // Process the request in the adapter
	   //适配器,处理request, response
            if (!getErrorState().isError()) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    //这个就是关键了,service函数,这个adapter,上面已经讲过
		    //实际上是CoyoteAdapter
                    adapter.service(request, response);
	    }
	 /**
         * After reading the request headers, we have to setup the request filters.
         */
         protected void prepareRequest() {
		MimeHeaders headers = request.getMimeHeaders();
		   // Check for a full URI (including protocol://host:port/)
                 ByteChunk uriBC = request.requestURI().getByteChunk();
		 if (uriBC.startsWithIgnoreCase("http", 0)) {
		    int pos = uriBC.indexOf("://", 0, 3, 4);
		    int uriBCStart = uriBC.getStart();
		    int slashPos = -1;
		    if (pos != -1) {
			byte[] uriB = uriBC.getBytes();
			slashPos = uriBC.indexOf('/', pos + 3);
			if (slashPos == -1) {
			    slashPos = uriBC.getLength();
			    // Set URI as "/"
			    request.requestURI().setBytes
				(uriB, uriBCStart + pos + 1, 1);
			} else {
			    request.requestURI().setBytes
				(uriB, uriBCStart + slashPos,
				 uriBC.getLength() - slashPos);
			}
			MessageBytes hostMB = headers.setValue("host");
			hostMB.setBytes(uriB, uriBCStart + pos + 3,
					slashPos - pos - 3);
		    }

        }
    }
}

从AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)可以看出,
Http处理器,首先解析HTTP的头部协议,过滤器,以及编码问题,然后交给Adapter去处理
来看CoyoteAdapter
public class CoyoteAdapter implements Adapter {
     /**
     * Construct a new CoyoteProcessor associated with the specified connector.
     *
     * @param connector CoyoteConnector that owns this processor
     */
    //一个Adapter关联一个Connector
    public CoyoteAdapter(Connector connector) {
        super();
        this.connector = connector;
    }
    private Connector connector = null;
     /**
     * Encoder for the Location URL in HTTP redirects.
     */
    protected static URLEncoder urlEncoder;
   //url安全字符集
    static {
        urlEncoder = new URLEncoder();
        urlEncoder.addSafeCharacter('-');
        urlEncoder.addSafeCharacter('_');
        urlEncoder.addSafeCharacter('.');
        urlEncoder.addSafeCharacter('*');
        urlEncoder.addSafeCharacter('/');
    }
    **
     * Service method.
     */
    @Override
    public void service(org.apache.coyote.Request req,
                        org.apache.coyote.Response res)
        throws Exception {
	Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);
	try {
            // Parse and set Catalina and configuration specific
            // request parameters
            req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName());
            postParseSuccess = postParseRequest(req, request, res, response);
            if (postParseSuccess) {
                //check valves if we support async
                request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container
		//这个就是我们一直想要找的,Servlet处理request
                connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
	}
}

前文中我们分析过,每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理
Request的Servlet包装类,那么,此时应该理解这句的含义了吧!
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);


从上面可以看出Poller,调用本地方创建socket监听线程,当监听到连接时,创一个SocketProcessor,即socket处理线程SOCKET处理线程,委托给Http11ConnectionHandler(Http11ConnectionHandler是Http11Protocol的内部类)处理,
Http11ConnectionHandler继承AbstractConnectionHandler,AbstractConnectionHandler处理请求是获取precessor,precessor从Http11ConnectionHandler获取,实际为Http11AprProcessor,Http11AprProcessor继承AbstractHttp11Processor,并处理请求在process(SocketWrapper<S> socketWrapper)中,在这个方法中处理请求头部,再讲实际业务逻辑处理委托CoyoteAdapter的service方法,service方法通过反射调用对应的servlet的service方法来处理逻辑。

在这里,简单总结一下,每个Connector关联一个Http11Protocol,Http11Protocol关联一个Http11ConnectionHandler,同时Http11Protocol关联一个Endpoint和Adapter,这个Http11ConnectionHandler实际要注入到Endpoint中,每个handler管理创建Http11AprProcessor,Http11AprProcessor关联一个Adapter(从Http11Protocol获取),
而Adapter关联一个Connector,Endpoint处理Socket的请求实际通过Http11AprProcessor,Http11AprProcessor委托给Adapter,Adapter将请求委托给Servlet。

上面一部分的堆栈有点深,现在退出来,回到AprEndpoint的startInternal看     
startAcceptorThreads();这句话
这个方法在AbstractEndPoint中
protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];
        for (int i = 0; i < count; i++) {
	    //创建
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }
    /**
     * Hook to allow Endpoints to provide a specific Acceptor implementation.
     */
    //委托给子类扩展
 protected abstract Acceptor createAcceptor();

//AbstractEndPoint的抽象静态内部类Acceptor,描述Socket请求处理状态
public abstract static class Acceptor implements Runnable {
        public enum AcceptorState {
            NEW, RUNNING, PAUSED, ENDED
        }

        protected volatile AcceptorState state = AcceptorState.NEW;
        public final AcceptorState getState() {
            return state;
        }

        private String threadName;
        protected final void setThreadName(final String threadName) {
            this.threadName = threadName;
        }
        protected final String getThreadName() {
            return threadName;
        }
    }

查看AprEndpoint的createAcceptor方法
  @Override
    protected AbstractEndpoint.Acceptor createAcceptor() {
        return new Acceptor();
    }

总结Connector
每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理Request的Servlet包装类;同时每个Connector关联一个Http11Protocol,Http11Protocol关联一个Http11ConnectionHandler,同时Http11Protocol关联一个Endpoint和Adapter,这个Http11ConnectionHandler实际要注入到Endpoint中,,每个handler管理创建Http11AprProcessor,Http11AprProcessor关联一个Adapter(从Http11Protocol获取),而Adapter关联一个Connector,Endpoint处理Socket的请求实际通过Http11AprProcessor,Http11AprProcessor委托给Adapter,Adapter将请求委托给Servlet。Connector初始化最主要的是,初始化Http11Protocol,实际初始化Endpoint,EndPoint初始化bind,SOCKET地址,并监听请求及初始化SSL,Endpoint启动,即初始化Poller(处理socket请求)。

下面一篇文章我们来看看,JioEndPoint。
//ContainerThreadMarker
public class ContainerThreadMarker {
    private static final ThreadLocal<Boolean> marker = new ThreadLocal<Boolean>();
    public static boolean isContainerThread() {
        Boolean flag = marker.get();
        if (flag == null) {
            return false;
        } else {
            return flag.booleanValue();
        }
    }
    public static void markAsContainerThread() {
        marker.set(Boolean.TRUE);
    }
}

//Pool
public class Pool {

    /**
     * Create a new pool.
     * @param parent The parent pool.  If this is 0, the new pool is a root
     * pool.  If it is non-zero, the new pool will inherit all
     * of its parent pool's attributes, except the apr_pool_t will
     * be a sub-pool.
     * @return The pool we have just created.
    */
    public static native long create(long parent);

    /**
     * Clear all memory in the pool and run all the cleanups. This also destroys all
     * subpools.
     * @param pool The pool to clear
     * This does not actually free the memory, it just allows the pool
     *         to re-use this memory for the next allocation.
     */
    public static native void clear(long pool);

    /**
     * Destroy the pool. This takes similar action as apr_pool_clear() and then
     * frees all the memory.
     * This will actually free the memory
     * @param pool The pool to destroy
     */
    public static native void destroy(long pool);

    /**
     * Get the parent pool of the specified pool.
     * @param pool The pool for retrieving the parent pool.
     * @return The parent of the given pool.
     */
    public static native long parentGet(long pool);

    /**
     * Determine if pool a is an ancestor of pool b
     * @param a The pool to search
     * @param b The pool to search for
     * @return True if a is an ancestor of b, NULL is considered an ancestor
     * of all pools.
     */
    public static native boolean isAncestor(long a, long b);


    /*
     * Cleanup
     *
     * Cleanups are performed in the reverse order they were registered.  That is:
     * Last In, First Out.  A cleanup function can safely allocate memory from
     * the pool that is being cleaned up. It can also safely register additional
     * cleanups which will be run LIFO, directly after the current cleanup
     * terminates.  Cleanups have to take caution in calling functions that
     * create subpools. Subpools, created during cleanup will NOT automatically
     * be cleaned up.  In other words, cleanups are to clean up after themselves.
     */

    /**
     * Register a function to be called when a pool is cleared or destroyed
     * @param pool The pool register the cleanup with
     * @param o The object to call when the pool is cleared
     *                      or destroyed
     * @return The cleanup handler.
     */
    public static native long cleanupRegister(long pool, Object o);

    /**
     * Remove a previously registered cleanup function
     * @param pool The pool remove the cleanup from
     * @param data The cleanup handler to remove from cleanup
     */
    public static native void cleanupKill(long pool, long data);

    /**
     * Register a process to be killed when a pool dies.
     * @param a The pool to use to define the processes lifetime
     * @param proc The process to register
     * @param how How to kill the process, one of:
     * <PRE>
     * APR_KILL_NEVER         -- process is never sent any signals
     * APR_KILL_ALWAYS        -- process is sent SIGKILL on apr_pool_t cleanup
     * APR_KILL_AFTER_TIMEOUT -- SIGTERM, wait 3 seconds, SIGKILL
     * APR_JUST_WAIT          -- wait forever for the process to complete
     * APR_KILL_ONLY_ONCE     -- send SIGTERM and then wait
     * </PRE>
     */
    public static native void noteSubprocess(long a, long proc, int how);

    /**
     * Allocate a block of memory from a pool
     * @param p The pool to allocate from
     * @param size The amount of memory to allocate
     * @return The ByteBuffer with allocated memory
     */
    public static native ByteBuffer alloc(long p, int size);

    /**
     * Allocate a block of memory from a pool and set all of the memory to 0
     * @param p The pool to allocate from
     * @param size The amount of memory to allocate
     * @return The ByteBuffer with allocated memory
     */
    public static native ByteBuffer calloc(long p, int size);

    /*
     * User data management
     */

    /**
     * Set the data associated with the current pool
     * @param data The user data associated with the pool.
     * @param key The key to use for association
     * @param pool The current pool
     * <br><b>Warning :</b>
     * The data to be attached to the pool should have a life span
     * at least as long as the pool it is being attached to.
     * Object attached to the pool will be globally referenced
     * until the pool is cleared or dataSet is called with the null data.
     * @return APR Status code.
     */
     public static native int dataSet(long pool, String key, Object data);

    /**
     * Return the data associated with the current pool.
     * @param key The key for the data to retrieve
     * @param pool The current pool.
     */
     public static native Object dataGet(long pool, String key);

    /**
     * Run all of the child_cleanups, so that any unnecessary files are
     * closed because we are about to exec a new program
     */
    public static native void cleanupForExec();

}


//SocketWrapper
public class SocketWrapper<E> {

    protected volatile E socket;

    // Volatile because I/O and setting the timeout values occurs on a different
    // thread to the thread checking the timeout.
    protected volatile long lastAccess = System.currentTimeMillis();
    protected volatile long timeout = -1;
    
    protected boolean error = false;
    protected long lastRegistered = 0;
    protected volatile int keepAliveLeft = 100;
    private boolean comet = false;
    protected boolean async = false;
    protected boolean keptAlive = false;
    private boolean upgraded = false;
    private boolean secure = false;

    /*
     * Used if block/non-blocking is set at the socket level. The client is
     * responsible for the thread-safe use of this field via the locks provided.
     */
    private volatile boolean blockingStatus = true;
    private final Lock blockingStatusReadLock;
    private final WriteLock blockingStatusWriteLock;

    /*
     * In normal servlet processing only one thread is allowed to access the
     * socket at a time. That is controlled by a lock on the socket for both
     * read and writes). When HTTP upgrade is used, one read thread and one
     * write thread are allowed to access the socket concurrently. In this case
     * the lock on the socket is used for reads and the lock below is used for
     * writes.
     */
    private final Object writeThreadLock = new Object();

    public SocketWrapper(E socket) {
        this.socket = socket;
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        this.blockingStatusReadLock = lock.readLock();
        this.blockingStatusWriteLock =lock.writeLock();
    }

    public E getSocket() {
        return socket;
    }

    public boolean isComet() { return comet; }
    public void setComet(boolean comet) { this.comet = comet; }
    public boolean isAsync() { return async; }
    public void setAsync(boolean async) { this.async = async; }
    public boolean isUpgraded() { return upgraded; }
    public void setUpgraded(boolean upgraded) { this.upgraded = upgraded; }
    public boolean isSecure() { return secure; }
    public void setSecure(boolean secure) { this.secure = secure; }
    public long getLastAccess() { return lastAccess; }
    public void access() {
        // Async timeouts are based on the time between the call to startAsync()
        // and complete() / dispatch() so don't update the last access time
        // (that drives the timeout) on every read and write when using async
        // processing.
        if (!isAsync()) {
            access(System.currentTimeMillis());
        }
    }
    public void access(long access) { lastAccess = access; }
    public void setTimeout(long timeout) {this.timeout = timeout;}
    public long getTimeout() {return this.timeout;}
    public boolean getError() { return error; }
    public void setError(boolean error) { this.error = error; }
    public void setKeepAliveLeft(int keepAliveLeft) { this.keepAliveLeft = keepAliveLeft;}
    public int decrementKeepAlive() { return (--keepAliveLeft);}
    public boolean isKeptAlive() {return keptAlive;}
    public void setKeptAlive(boolean keptAlive) {this.keptAlive = keptAlive;}
    public boolean getBlockingStatus() { return blockingStatus; }
    public void setBlockingStatus(boolean blockingStatus) {
        this.blockingStatus = blockingStatus;
    }
    public Lock getBlockingStatusReadLock() { return blockingStatusReadLock; }
    public WriteLock getBlockingStatusWriteLock() {
        return blockingStatusWriteLock;
    }
    public Object getWriteThreadLock() { return writeThreadLock; }

    public void reset(E socket, long timeout) {
        async = false;
        blockingStatus = true;
        comet = false;
        error = false;
        keepAliveLeft = 100;
        lastAccess = System.currentTimeMillis();
        this.socket = socket;
        this.timeout = timeout;
        upgraded = false;
    }

    /**
     * Overridden for debug purposes. No guarantees are made about the format of
     * this message which may vary significantly between point releases.
     * <p>
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        return super.toString() + ":" + String.valueOf(socket);
    }
}
  • 大小: 26.3 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics