`
Donald_Draper
  • 浏览: 950664 次
社区版块
存档分类
最新评论

Tomcat的JioEndPoint,处理HTTP请求

阅读更多
Tomcat的Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程:
http://donald-draper.iteye.com/blog/2330139
上一篇文章中讲的是Connector(Protocol,CoyoteAdapterAdapter,AprEndPoint)初始化及请求处理过程,
其中EndPoint我们讲的是AprEndPoint,这一篇,我们就来谈谈JioEndPoint
public class Http11Protocol extends AbstractHttp11JsseProtocol<Socket> {
  protected Http11ConnectionHandler cHandler;
  private int disableKeepAlivePercentage = 75;
  public Http11Protocol() {
        //新建请求处理EndPoint为JIoEndpoint
        endpoint = new JIoEndpoint();
        cHandler = new Http11ConnectionHandler(this);
	//设置endpoint的handler
        ((JIoEndpoint) endpoint).setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }
}

从Http11Protocol我们可以看出EndPoint默认为JIoEndpoint
/**
 * Handle incoming TCP connections.
 *
 * This class implement a simple server model: one listener thread accepts on a socket and
 * creates a new worker thread for each incoming connection.
 * More advanced Endpoints will reuse the threads, use queues, etc.
 */
public class JIoEndpoint extends AbstractEndpoint<Socket> {
   /**
     * Associated server socket.
     */
    protected ServerSocket serverSocket = null;
    public JIoEndpoint() {
        // Set maxConnections to zero so we can tell if the user has specified
        // their own value on the connector when we reach bind()
        setMaxConnections(0);
        // Reduce the executor timeout for BIO as threads in keep-alive will not
        // terminate when the executor interrupts them.
        setExecutorTerminationTimeoutMillis(0);
    }
    // ------------------------------------------------------------- Properties

    /**
     * Handling of accepted sockets.
     */
    //实际为Http11ConnectionHandler
    protected Handler handler = null;
     /**
     * Server socket factory.
     */
    protected ServerSocketFactory serverSocketFactory = null;
    @Override
    //初始化serverSocketFactory及serverSocket
    public void bind() throws Exception {

        // Initialize thread count defaults for acceptor
        if (acceptorThreadCount == 0) {
            acceptorThreadCount = 1;
        }
        // Initialize maxConnections
        if (getMaxConnections() == 0) {
            // User hasn't set a value - use the default
            setMaxConnections(getMaxThreadsExecutor(true));
        }

        if (serverSocketFactory == null) {
            if (isSSLEnabled()) {
                serverSocketFactory =
                    handler.getSslImplementation().getServerSocketFactory(this);
            } else {
	       //创建serverSocketFactory
                serverSocketFactory = new DefaultServerSocketFactory(this);
            }
        }
        if (serverSocket == null) {
            try {
                if (getAddress() == null) {
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog());
                } else {
		    //serverSocketFactory根据ip地址和Port创建serverSocket
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog(), getAddress());
                }
            } catch (BindException orig) {
                String msg;
                if (getAddress() == null)
                    msg = orig.getMessage() + " <null>:" + getPort();
                else
                    msg = orig.getMessage() + " " +
                            getAddress().toString() + ":" + getPort();
                BindException be = new BindException(msg);
                be.initCause(orig);
                throw be;
            }
        }
    }
 }

//DefaultServerSocketFactory
public class DefaultServerSocketFactory implements ServerSocketFactory {
    /**
     *
     * @param endpoint  Unused in this implementation.
     */
    public DefaultServerSocketFactory(AbstractEndpoint<?> endpoint) {
    }
    @Override
    public ServerSocket createSocket (int port) throws IOException {
        return  new ServerSocket (port);
    }
    @Override
    public ServerSocket createSocket (int port, int backlog)
            throws IOException {
        return new ServerSocket (port, backlog);
    }
    @Override
    //serverSocketFactory根据ip地址和Port创建serverSocket
    public ServerSocket createSocket (int port, int backlog,
            InetAddress ifAddress) throws IOException {
        return new ServerSocket (port, backlog, ifAddress);
    }
    @Override
    public Socket acceptSocket(ServerSocket socket) throws IOException {
        return socket.accept();
    }
    @Override
    public void handshake(Socket sock) throws IOException {
        // NOOP
    }
}

从bind方法可以看出实际为初始化serverSocketFactory及serverSocket;
再来看
public class JIoEndpoint extends AbstractEndpoint<Socket> {
  @Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if (getExecutor() == null) {
	       //线程执行器创建
                createExecutor();
            }
            //初始化连接共享锁
            initializeConnectionLatch();
            
            startAcceptorThreads();
            // Start async timeout thread
            Thread timeoutThread = new Thread(new AsyncTimeout(),
                    getName() + "-AsyncTimeout");
            timeoutThread.setPriority(threadPriority);
            timeoutThread.setDaemon(true);
            timeoutThread.start();
        }
    }
}

先看createExecutor,此方法在AbstractEndPoint中
//创建线程执行器
   
public void createExecutor() {
        internalExecutor = true;
	//新建任务队列
        TaskQueue taskqueue = new TaskQueue();
	//新建任务线程工程
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        //初始化线程执行器
	executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

//查看TaskQueue
/**
 * As task queue specifically designed to run with a thread pool executor.
 * The task queue is optimised to properly utilize threads within
 * a thread pool executor. If you use a normal queue, the executor will spawn threads
 * when there are idle threads and you wont be able to force items unto the queue itself
 *
 */
TaskQueue继承了LinkedBlockingQueue
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private ThreadPoolExecutor parent = null;
    // no need to be volatile, the one times when we change and read it occur in
    // a single thread (the one that did stop a context and fired listeners)
    private Integer forcedRemainingCapacity = null;
    @Override
    //如果可能的话,将Runnable加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }
    @Override
    //获取并移除此队列的头Runnable,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            // the poll timed out, it gives an opportunity to stop the current
            // thread if needed to avoid memory leaks.
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    @Override
    //获取BlockingQueue里排在首位的对象Runnable,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止
    public Runnable take() throws InterruptedException {
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
            // yes, this may return null (in case of timeout) which normally
            // does not occur with take()
            // but the ThreadPoolExecutor implementation allows this
        }
        return super.take();
    }
 }

//查看TaskThreadFactory
/**
 * Simple task thread factory to use to create threads for an executor implementation.
 */
public class TaskThreadFactory implements ThreadFactory {
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private final boolean daemon;
    private final int threadPriority;
    public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.threadPriority = priority;
    }

    @Override
    public Thread newThread(Runnable r) {
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
        t.setDaemon(daemon);
        t.setPriority(threadPriority);
        return t;
    }
}

查看ThreadPoolExecutor
ThreadPoolExecutor继承了java.util.concurrent.ThreadPoolExecutor
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {}

回到JIoEndpoint的startInternal的initializeConnectionLatch方法
在AbstractEndPoint中
protected LimitLatch initializeConnectionLatch() {
        if (maxConnections==-1) return null;
        if (connectionLimitLatch==null) {
            connectionLimitLatch = new LimitLatch(getMaxConnections());
        }
        return connectionLimitLatch;
    }
//LimitLatch
/**
 * Shared latch that allows the latch to be acquired a limited number of times
 * after which all subsequent requests to acquire the latch will be placed in a
 * FIFO queue until one of the shares is returned.
 */
public class LimitLatch {
    private final Sync sync;
    private final AtomicLong count;
    private volatile long limit;
    private volatile boolean released = false;
 /**
     * Instantiates a LimitLatch object with an initial limit.
     * @param limit - maximum number of concurrent acquisitions of this latch
     */
    public LimitLatch(long limit) {
        this.limit = limit;
        this.count = new AtomicLong(0);
        this.sync = new Sync();
    }
    //获取共享锁
     @Override
        protected int tryAcquireShared(int ignored) {
            long newCount = count.incrementAndGet();
            if (!released && newCount > limit) {
                // Limit exceeded
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }
	//释放共享锁
        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
}

回到JIoEndpoint的startInternal的startAcceptorThreads方法
此方法在AbstractEndPoint中
//创建Socket请求接受处理线程Acceptor
  protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }
    /**
     * Hook to allow Endpoints to provide a specific Acceptor implementation.
     */
    //带子类扩展
    protected abstract Acceptor createAcceptor();

查看JIoEndpoint的createAcceptor方法
 
 @Override
    protected AbstractEndpoint.Acceptor createAcceptor() {
        return new Acceptor();
    }

这个Acceptor是,JIoEndpoint的内部类
  
// --------------------------------------------------- Acceptor Inner Class
    /**
     * The background thread that listens for incoming TCP/IP connections and
     * hands them off to an appropriate processor.
     */
    //监听TCP/IP,并交给相应的processor
    protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
        public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();

                    Socket socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
			//接收TCP/IP请求
                        socket = serverSocketFactory.acceptSocket(serverSocket);
                    // Successful accept, reset the error delay
                    errorDelay = 0;
                    // Configure the socket
                    if (running && !paused && setSocketOptions(socket)) {
                        // Hand this socket off to an appropriate processor
			//处理SOCKET的请求
                        if (!processSocket(socket)) {
                            countDownConnection();
                            // Close socket right away
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        // Close socket right away
                        closeSocket(socket);
                    }
                } 
            state = AcceptorState.ENDED;
        }
    }

//查看JIoEndpoint的 processSocket(Socket socket)方法
/**
     * Process a new connection from a new client. Wraps the socket so
     * keep-alive and other attributes can be tracked and then passes the socket
     * to the executor for processing.
     *
     * @param socket    The socket associated with the client.
     *
     * @return          <code>true</code> if the socket is passed to the
     *                  executor, <code>false</code> if something went wrong or
     *                  if the endpoint is shutting down. Returning
     *                  <code>false</code> is an indication to close the socket
     *                  immediately.
     */
    protected boolean processSocket(Socket socket) {
        // Process the request from this socket
        try {
            SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
            wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
            wrapper.setSecure(isSSLEnabled());
            // During shutdown, executor may be null - avoid NPE
            if (!running) {
                return false;
            }
	    //新建SOCKET处理线程
            getExecutor().execute(new SocketProcessor(wrapper));
        } 
        return true;
    }

//SocketProcessor,JIOEndPoint内部类
/**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     */
    protected class SocketProcessor implements Runnable {

        protected SocketWrapper<Socket> socket = null;
        protected SocketStatus status = null;
        public SocketProcessor(SocketWrapper<Socket> socket) {
            if (socket==null) throw new NullPointerException();
            this.socket = socket;
        }
        public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
            this(socket);
            this.status = status;
        }
        @Override
        public void run() {
            boolean launch = false;
            synchronized (socket) {
                try {
                    SocketState state = SocketState.OPEN;

                    try {
                        // SSL handshake
			//SSL捂手,SSL上下文初始化在AbstractEndPoint.init方法中
                        serverSocketFactory.handshake(socket.getSocket());
                    } 
                    if ((state != SocketState.CLOSED)) {
                        if (status == null) {
                            state = handler.process(socket, SocketStatus.OPEN_READ);
                        } else {
			    //Socket状态不为null,且不处于关闭状态,处理Socket请求
                            state = handler.process(socket,status);
                        }
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket
                        if (log.isTraceEnabled()) {
                            log.trace("Closing socket:"+socket);
                        }
                        countDownConnection();
                        try {
                            socket.getSocket().close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    } else if (state == SocketState.OPEN ||
                            state == SocketState.UPGRADING ||
                            state == SocketState.UPGRADING_TOMCAT  ||
                            state == SocketState.UPGRADED){
                        socket.setKeptAlive(true);
                        socket.access();
                        launch = true;
                    } else if (state == SocketState.LONG) {
                        socket.access();
                        waitingRequests.add(socket);
                    }
                } finally {
                    if (launch) {
                        try {
                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
                        } catch (RejectedExecutionException x) {
                            log.warn("Socket reprocessing request was rejected for:"+socket,x);
                            try {
                                //unable to handle connection at this time
                                handler.process(socket, SocketStatus.DISCONNECT);
                            } finally {
                                countDownConnection();
                            }
                }
            }
            socket = null;
            // Finish up this request
        }
    }

//Socket状态不为null,且不处于关闭状态,处理Socket请求
state = handler.process(socket,status);

从上一面一句找,handler
在Http11Protocol的构造方法中
p
public Http11Protocol() {
        //新建请求处理EndPoint为JIoEndpoint
        endpoint = new JIoEndpoint();
        cHandler = new Http11ConnectionHandler(this);
	//设置endpoint的handler
        ((JIoEndpoint) endpoint).setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }

,Handler为Http11ConnectionHandler。
查看Http11ConnectionHandler
查看Http11ConnectionHandler是Http11Protocol的内部类
// -----------------------------------  Http11ConnectionHandler Inner Class
    protected static class Http11ConnectionHandler
            extends AbstractConnectionHandler<Socket, Http11Processor> implements Handler {
	 }

而Http11ConnectionHandler没有process(socket, status),这可方法定已在Handler在,是一个抽象方法
查看AbstractConnectionHandler,为AbstractProtocol的内部类
// ------------------------------------------- Connection handler base class
    
    protected abstract static class AbstractConnectionHandler<S,P extends Processor<S>>
            implements AbstractEndpoint.Handler {
       //每一个Socket,关联一个Processor
       protected final Map<S,Processor<S>> connections = new ConcurrentHashMap<S,Processor<S>>();
	@SuppressWarnings("deprecation") 
	// Old HTTP upgrade method has been deprecated
	//处理HTTP请求
        public SocketState process(SocketWrapper<S> wrapper,SocketStatus status) {
            if (wrapper == null) {
                // Nothing to do. Socket has been closed.
                return SocketState.CLOSED;
            }
            S socket = wrapper.getSocket();
            if (socket == null) {
                // Nothing to do. Socket has been closed.
                return SocketState.CLOSED;
            }
            Processor<S> processor = connections.get(socket);
            if (status == SocketStatus.DISCONNECT && processor == null) {
                // Nothing to do. Endpoint requested a close and there is no
                // longer a processor associated with this socket.
                return SocketState.CLOSED;
            }
            wrapper.setAsync(false);
            ContainerThreadMarker.markAsContainerThread();
            try {
                if (processor == null) {
                    processor = recycledProcessors.poll();
                }
                if (processor == null) {
		    //创建处理器
                    processor = createProcessor();
                }
                //初始化SSL,在子类中扩展
                initSsl(wrapper, processor);
                SocketState state = SocketState.CLOSED;
                do {
                    if (status == SocketStatus.CLOSE_NOW) {
                        processor.errorDispatch();
                        state = SocketState.CLOSED;
                    } else if (status == SocketStatus.DISCONNECT &&
                            !processor.isComet()) {
                        // Do nothing here, just wait for it to get recycled
                        // Don't do this for Comet we need to generate an end
                        // event (see BZ 54022)
                    } else if (processor.isAsync() || state == SocketState.ASYNC_END) {
                        state = processor.asyncDispatch(status);
                        if (state == SocketState.OPEN) {
                            // release() won't get called so in case this request
                            // takes a long time to process, remove the socket from
                            // the waiting requests now else the async timeout will
                            // fire
                            getProtocol().endpoint.removeWaitingRequest(wrapper);
                            // There may be pipe-lined data to read. If the data
                            // isn't processed now, execution will exit this
                            // loop and call release() which will recycle the
                            // processor (and input buffer) deleting any
                            // pipe-lined data. To avoid this, process it now.
                            state = processor.process(wrapper);
                        }
                    } else if (processor.isComet()) {
                        state = processor.event(status);
                    } {
                        state = processor.process(wrapper);
                    }
        }
	//带子类扩展
	 protected abstract P createProcessor();
        protected abstract void initSsl(SocketWrapper<S> socket,
                Processor<S> processor);
        protected abstract void longPoll(SocketWrapper<S> socket,
                Processor<S> processor);
        protected abstract void release(SocketWrapper<S> socket,
                Processor<S> processor, boolean socketClosing,
                boolean addToPoller);
}
    //查看Http11ConnectionHandler,createProcessor
	 @Override
        protected Http11AprProcessor createProcessor() {
            Http11AprProcessor processor = new Http11AprProcessor(
                    proto.getMaxHttpHeaderSize(), (AprEndpoint)proto.endpoint,
                    proto.getMaxTrailerSize(), proto.getAllowedTrailerHeadersAsSet(),
                    proto.getMaxExtensionSize(), proto.getMaxSwallowSize());
	   //设置处理器适配器,在connector构造函数中,
	   //adapter = new CoyoteAdapter(this);
           //protocolHandler.setAdapter(adapter);
	   //有这么两句
            processor.setAdapter(proto.adapter);
            processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
            processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
            processor.setConnectionUploadTimeout(
                    proto.getConnectionUploadTimeout());
            processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
            processor.setCompressionMinSize(proto.getCompressionMinSize());
            processor.setCompression(proto.getCompression());
            processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
            processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
            processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
            processor.setSocketBuffer(proto.getSocketBuffer());
            processor.setMaxSavePostSize(proto.getMaxSavePostSize());
            processor.setServer(proto.getServer());
            processor.setClientCertProvider(proto.getClientCertProvider());
            register(processor);
            return processor;
        }

查看Http11AprProcessor
public class Http11AprProcessor extends AbstractHttp11Processor<Long> {}

无处理函数process(socket, status)
查看AbstractHttp11Processor
public abstract class AbstractHttp11Processor<S> extends AbstractProcessor<S> {
/**
     * Process pipelined HTTP requests using the specified input and output
     * streams.
     * @param socketWrapper Socket from which the HTTP requests will be read
     *               and the HTTP responses will be written.
     * @throws IOException error during an I/O operation
     */
     //处理HTTP请求
    @Override
    public SocketState process(SocketWrapper<S> socketWrapper)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Setting up the I/O
        setSocketWrapper(socketWrapper);
        getInputBuffer().init(socketWrapper, endpoint);
        getOutputBuffer().init(socketWrapper, endpoint);
	 while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
                upgradeInbound == null &&
                httpUpgradeHandler == null && !endpoint.isPaused()) {

            // Parsing the request header
            try {
                setRequestLineReadTimeout();
	       //503
               if (endpoint.isPaused()) {
                    // 503 - Service unavailable
                    response.setStatus(503);
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                } else {
                    keptAlive = true;
                    // Set this every time in case limit has been changed via JMX
                    request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                    // Currently only NIO will ever return false here
                    if (!getInputBuffer().parseHeaders()) {
                        // We've read part of the request, don't recycle it
                        // instead associate it with the socket
                        openSocket = true;
                        readComplete = false;
                        break;
                    }
                    if (!disableUploadTimeout) {
                        setSocketTimeout(connectionUploadTimeout);
                    }
                }
		catch (Throwable t) {
                // 400 - Bad Request
                response.setStatus(400);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
	    if (!getErrorState().isError()) {
                // Setting up filters, and parse some request headers
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                try {
		    //请求预处理
                    prepareRequest();
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                    getAdapter().log(request, response, 0);
                }
            }
           //503,400,500,看到这些数字想到了什么?
	   // Process the request in the adapter
	   //适配器,处理request, response
            if (!getErrorState().isError()) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    //这个就是关键了,service函数,这个adapter,上面已经讲过
		    //实际上是CoyoteAdapter
                    adapter.service(request, response);
	    }
	 /**
         * After reading the request headers, we have to setup the request filters.
         */
         protected void prepareRequest() {
		MimeHeaders headers = request.getMimeHeaders();
		   // Check for a full URI (including protocol://host:port/)
                 ByteChunk uriBC = request.requestURI().getByteChunk();
		 if (uriBC.startsWithIgnoreCase("http", 0)) {
		    int pos = uriBC.indexOf("://", 0, 3, 4);
		    int uriBCStart = uriBC.getStart();
		    int slashPos = -1;
		    if (pos != -1) {
			byte[] uriB = uriBC.getBytes();
			slashPos = uriBC.indexOf('/', pos + 3);
			if (slashPos == -1) {
			    slashPos = uriBC.getLength();
			    // Set URI as "/"
			    request.requestURI().setBytes
				(uriB, uriBCStart + pos + 1, 1);
			} else {
			    request.requestURI().setBytes
				(uriB, uriBCStart + slashPos,
				 uriBC.getLength() - slashPos);
			}
			MessageBytes hostMB = headers.setValue("host");
			hostMB.setBytes(uriB, uriBCStart + pos + 3,
					slashPos - pos - 3);
		    }

        }
    }
}

从AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)可以看出,
Http处理器,首先解析HTTP的头部协议,过滤器,以及编码问题,然后交给Adapter去处理
来看CoyoteAdapter
public class CoyoteAdapter implements Adapter {
     /**
     * Construct a new CoyoteProcessor associated with the specified connector.
     *
     * @param connector CoyoteConnector that owns this processor
     */
    //一个Adapter关联一个Connector
    public CoyoteAdapter(Connector connector) {
        super();
        this.connector = connector;
    }
    private Connector connector = null;
     /**
     * Encoder for the Location URL in HTTP redirects.
     */
    protected static URLEncoder urlEncoder;
   //url安全字符集
    static {
        urlEncoder = new URLEncoder();
        urlEncoder.addSafeCharacter('-');
        urlEncoder.addSafeCharacter('_');
        urlEncoder.addSafeCharacter('.');
        urlEncoder.addSafeCharacter('*');
        urlEncoder.addSafeCharacter('/');
    }
    **
     * Service method.
     */
    @Override
    public void service(org.apache.coyote.Request req,
                        org.apache.coyote.Response res)
        throws Exception {
	Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);
	try {
            // Parse and set Catalina and configuration specific
            // request parameters
            req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName());
            postParseSuccess = postParseRequest(req, request, res, response);
            if (postParseSuccess) {
                //check valves if we support async
                request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container
		//这个就是我们一直想要找的,Servlet处理request
                connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
	}
}

前文中我们分析过,每个connector关联一个Service,每个Service有一个Engine,每个Engine有多个Host,每个
Host有一个Pipeline(请求处理链),而Pipeline中是StandardWrappeValve链,StandardWrappeValve是处理
Request的Servlet包装类,那么,此时应该理解这句的含义了吧!
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

回到JioEndp的回到JIoEndpoint的startInternal的异步线程启动段
// Start async timeout thread
Thread timeoutThread = new Thread(new AsyncTimeout(),
                    getName() + "-AsyncTimeout");
timeoutThread.setPriority(threadPriority);
timeoutThread.setDaemon(true);
timeoutThread.start();

查看AsyncTimeout,Jio内部类
/**
     * Async timeout thread
     */
    protected class AsyncTimeout implements Runnable {
        /**
         * The background thread that checks async requests and fires the
         * timeout if there has been no activity.
         */
        @Override
        public void run() {

            // Loop until we receive a shutdown command
            while (running) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore
                }
                long now = System.currentTimeMillis();
                Iterator<SocketWrapper<Socket>> sockets =
                    waitingRequests.iterator();
                while (sockets.hasNext()) {
                    SocketWrapper<Socket> socket = sockets.next();
                    long access = socket.getLastAccess();
                    if (socket.getTimeout() > 0 &&
                            (now-access)>socket.getTimeout()) {
                        // Prevent multiple timeouts
                        socket.setTimeout(-1);
			//异步Socket处理
                        processSocketAsync(socket,SocketStatus.TIMEOUT);
                    }
                }

                // Loop if endpoint is paused
                while (paused && running) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

            }
        }
}

查看JIoEndpoint的processSocketAsync方法
/**
     * Process an existing async connection. If processing is required, passes
     * the wrapped socket to an executor for processing.
     *
     * @param socket    The socket associated with the client.
     * @param status    Only OPEN and TIMEOUT are used. The others are used for
     *                  Comet requests that are not supported by the BIO (JIO)
     *                  Connector.
     */
    @Override
    public void processSocketAsync(SocketWrapper<Socket> socket,
            SocketStatus status) {
        try {
            synchronized (socket) {
	        //等待TIMEOUT
                if (waitingRequests.remove(socket)) {
                    SocketProcessor proc = new SocketProcessor(socket,status);
                    ClassLoader loader = Thread.currentThread().getContextClassLoader();
                    try {
                        //threads should not be created by the webapp classloader
                        if (Constants.IS_SECURITY_ENABLED) {
                            PrivilegedAction<Void> pa = new PrivilegedSetTccl(
                                    getClass().getClassLoader());
                            AccessController.doPrivileged(pa);
                        } else {
                            Thread.currentThread().setContextClassLoader(
                                    getClass().getClassLoader());
                        }
                        // During shutdown, executor may be null - avoid NPE
                        if (!running) {
                            return;
                        }
                        getExecutor().execute(proc);
                        //TODO gotta catch RejectedExecutionException and properly handle it
                    } finally {
                        if (Constants.IS_SECURITY_ENABLED) {
                            PrivilegedAction<Void> pa = new PrivilegedSetTccl(loader);
                            AccessController.doPrivileged(pa);
                        } else {
                            Thread.currentThread().setContextClassLoader(loader);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
        }
}
//waitingRequests
 protected ConcurrentLinkedQueue<SocketWrapper<Socket>> waitingRequests =
        new ConcurrentLinkedQueue<SocketWrapper<Socket>>();

总结:
Connetcor初始化,并初始化Http11Protocol,即初始化AbstractProtocol,而AbstractProtocol初始化,相应的初始化AbstractEndPoint初始化方法,AbstractEndPoint初始化方法调用bind方法,bind待子类扩展JioEndPoint,bind方法,主要初始化serverSocketFactory及serverSocket。Connetcor启动,并启动Http11Protocol,即启动AbstractProtocol,而AbstractProtocol启动start方法,启动AbstractEndPoint的start方法,start方法调用 startInternal方法, startInternal带子类扩展,JioEndPoint,startInternal的方法,首先出创建线程执行执行器,然后初始化Request共享锁,然后创建TCP/IP监听,接收线程Acceptor,而Acceptor实际上是JioEndPoint的一个
内部类,继承AbstractEndPoint的内部类Acceptor,最后创建异步TCP/IP监听,接收线程Acceptor。Acceptor接收到TCP/IP请求后,创建SocketProcessor线程处理SOCKET请求,SocketProcessor首先进行SSL handshake,然后,SocketProcessor将实际处理
委托给Http11ConnectionHandler,实际在AbstractConnectionHandler的process(SocketWrapper<S> wrapper,SocketStatus status)
将处理交给Http11Processor,Http11Processor继承AbstractHttp11Processor<Socket>,
AbstractHttp11Processor的process(SocketWrapper<S> socketWrapper)方法,首相处理请求的头部,然后通过调用CoyoteAdapter(Http11Protocol的Adapter)的Service方法,而CoyoteAdapter关联一个Connetcor,CoyoteAdapter的Service方法,通过反射调用Servlet的Service处理方法。
即connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
从前篇文章和这篇文章,我们看一看到JioEndPoint实际上是通过Acceptor来处理请求,及JavaSocket,基于java bio实现,特点是每建立一个连接分配一个线程,读数据阻塞。
而AprEndPoint是通过Poller处理请求,使用Apache Portable Runtime实现,
直接调用native方法,有更高的效率,但是实现依赖具体平台
0
0
分享到:
评论

相关推荐

    j2ee+tomcat6.0核心api

    chm文件,方便查找,包含tomcat6.0核心类,如Connector,Lifecycle,http11Protocal,JIoEndPoint,javax包等。

    从连接器组件看Tomcat的线程模型——BIO模式(推荐)

    在高版本的Tomcat中,默认的模式都是使用NIO模式,在Tomcat 9中...启动时,JIoEndpoint组件将启动某个端口的监听,一个请求到来后将被扔进线程池,线程池进行任务处理,处理过程中将通过协议解析器Http11Processor组件

    yj软件项目开发设计说明文档

    at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:624) at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:445)

    commons-beanutils-1.7.0

    java.lang.SecurityException: class "org.apache.commons.collections.SequencedHashMap... at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447) at java.lang.Thread.run(Unknown Source)

    解决struts2下载异常的jar包 struts2-sunspoter-stream-1.0.jar

    at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489) at java.lang.Thread.run(Thread.java:662) 网络解决办法: (虽然该办法可行,但是本人并不提倡。具体原因在之后解释。) 在...

Global site tag (gtag.js) - Google Analytics