本文继续分析HttpProcessor类,该类实现了org.apache.catalina.Lifecycle接口和java.lang.Runnable接口
我们先分析它的构造函数
/** * The HttpConnector with which this processor is associated. */ private HttpConnector connector = null ; /** * The HTTP request object we will pass to our associated container. */ private HttpRequestImpl request = null ; /** * The HTTP response object we will pass to our associated container. */ private HttpResponseImpl response = null ; /** * Construct a new HttpProcessor associated with the specified connector. * * @param connector HttpConnector that owns this processor * @param id Identifier of this HttpProcessor (unique per connector) */ public HttpProcessor(HttpConnector connector, int id) { super (); this .connector = connector; this .debug = connector.getDebug(); this .id = id; this .proxyName = connector.getProxyName(); this .proxyPort = connector.getProxyPort(); this .request = (HttpRequestImpl) connector.createRequest(); this .response = (HttpResponseImpl) connector.createResponse(); this .serverPort = connector.getPort(); this .threadName = "HttpProcessor[" + connector.getPort() + "][" + id + "]" ; }
构造函数用于初始化成员变量HttpConnector connector = null、HttpRequestImpl request = null、HttpResponseImpl response = null等
当调用它的start()方法时,用于启动处理器线程
/** * Start the background thread we will use for request processing. * * @exception LifecycleException if a fatal startup error occurs */ public void start() throws LifecycleException { if (started) throw new LifecycleException (sm.getString( "httpProcessor.alreadyStarted" )); lifecycle.fireLifecycleEvent(START_EVENT, null ); started = true ; threadStart(); }
调用threadStart()方法
/** * Start the background processing thread. */ private void threadStart() { log(sm.getString( "httpProcessor.starting" )); thread = new Thread( this , threadName); thread.setDaemon( true ); thread.start(); if (debug >= 1 ) log( " Background thread has been started" ); }
即启动处理器线程,下面是run()方法
/** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Process requests until we receive a shutdown signal while (! stopped) { // Wait for the next socket to be assigned Socket socket = await(); if (socket == null ) continue ; // Process the request from this socket try { process(socket); } catch (Throwable t) { log( "process.invoke" , t); } // Finish up this request connector.recycle( this ); } // Tell threadStop() we have shut ourselves down successfully synchronized (threadSync) { threadSync.notifyAll(); } }
下面关键是分析await()方法与assign()方法是怎么同步的了,这里可以理解为经典的生产者与消费者模型
我们先来看assign()方法
/** * Process an incoming TCP/IP connection on the specified socket. Any * exception that occurs during processing must be logged and swallowed. * <b>NOTE</b>: This method is called from our Connector's thread. We * must assign it to our own thread so that multiple simultaneous * requests can be handled. * * @param socket TCP socket to process */ synchronized void assign(Socket socket) { // Wait for the Processor to get the previous Socket while (available) { try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread this .socket = socket; available = true ; notifyAll(); if ((debug >= 1) && (socket != null )) log( " An incoming request is being assigned" ); }
成员变量默认为boolean available = false
下面是await()方法
/** * Await a newly assigned Socket from our Connector, or <code>null</code> * if we are supposed to shut down. */ private synchronized Socket await() { // Wait for the Connector to provide a new Socket while (! available) { try { wait(); } catch (InterruptedException e) { } } // Notify the Connector that we have received this Socket Socket socket = this .socket; available = false ; notifyAll(); if ((debug >= 1) && (socket != null )) log( " The incoming request has been awaited" ); return (socket); }
显然这里是采用notifyAll()方法与wait()方法相同通信,当await()方法执行notifyAll()并返回socket后,assign()方法又可以继续接收请求socket对象了
await()方法里面采用的是局部变量,为了不占用成员变量引用(Socket socket = null),返回的socket对象供处理器线程进行处理,又要回到run()方法了
Socket socket = await()这里也是采用了局部变量,与await()方法里面采用局部变量同样的原因
获取socket实例后,调用process()方法进行处理,处理完毕后将处理器对象重新入栈,最后是如果收到停止处理器线程命令,则事件通知可以停止线程了
下面关键是process()方法,这个方法有点长
/** * Process an incoming HTTP request on the Socket that has been assigned * to this Processor. Any exceptions that occur during processing must be * swallowed and dealt with. * * @param socket The socket on which we are connected to the client */ private void process(Socket socket) { boolean ok = true ; boolean finishResponse = true ; SocketInputStream input = null ; OutputStream output = null ; // Construct and initialize the objects we will need try { input = new SocketInputStream(socket.getInputStream(), connector.getBufferSize()); } catch (Exception e) { log( "process.create" , e); ok = false ; } keepAlive = true ; while (!stopped && ok && keepAlive) { finishResponse = true ; try { request.setStream(input); request.setResponse(response); output = socket.getOutputStream(); response.setStream(output); response.setRequest(request); ((HttpServletResponse) response.getResponse()).setHeader ( "Server" , SERVER_INFO); } catch (Exception e) { log( "process.create" , e); ok = false ; } // Parse the incoming request try { if (ok) { parseConnection(socket); parseRequest(input, output); if (! request.getRequest().getProtocol() .startsWith( "HTTP/0" )) parseHeaders(input); if (http11) { // Sending a request acknowledge back to the client if // requested. ackRequest(output); // If the protocol is HTTP/1.1, chunking is allowed. if (connector.isChunkingAllowed()) response.setAllowChunking( true ); } } } catch (EOFException e) { // It's very likely to be a socket disconnect on either the // client or the server ok = false ; finishResponse = false ; } catch (ServletException e) { ok = false ; try { ((HttpServletResponse) response.getResponse()) .sendError(HttpServletResponse.SC_BAD_REQUEST); } catch (Exception f) { ; } } catch (InterruptedIOException e) { if (debug > 1 ) { try { log( "process.parse" , e); ((HttpServletResponse) response.getResponse()) .sendError(HttpServletResponse.SC_BAD_REQUEST); } catch (Exception f) { ; } } ok = false ; } catch (Exception e) { try { log( "process.parse" , e); ((HttpServletResponse) response.getResponse()).sendError (HttpServletResponse.SC_BAD_REQUEST); } catch (Exception f) { ; } ok = false ; } // Ask our Container to process this request try { ((HttpServletResponse) response).setHeader ( "Date" , FastHttpDateFormat.getCurrentDate()); if (ok) { connector.getContainer().invoke(request, response); } } catch (ServletException e) { log( "process.invoke" , e); try { ((HttpServletResponse) response.getResponse()).sendError (HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } catch (Exception f) { ; } ok = false ; } catch (InterruptedIOException e) { ok = false ; } catch (Throwable e) { log( "process.invoke" , e); try { ((HttpServletResponse) response.getResponse()).sendError (HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } catch (Exception f) { ; } ok = false ; } // Finish up the handling of the request if (finishResponse) { try { response.finishResponse(); } catch (IOException e) { ok = false ; } catch (Throwable e) { log( "process.invoke" , e); ok = false ; } try { request.finishRequest(); } catch (IOException e) { ok = false ; } catch (Throwable e) { log( "process.invoke" , e); ok = false ; } try { if (output != null ) output.flush(); } catch (IOException e) { ok = false ; } } // We have to check if the connection closure has been requested // by the application or the response stream (in case of HTTP/1.0 // and keep-alive). if ( "close".equals(response.getHeader("Connection" )) ) { keepAlive = false ; } // End of request processing status = Constants.PROCESSOR_IDLE; // Recycling the request and the response objects request.recycle(); response.recycle(); } try { shutdownInput(input); socket.close(); } catch (IOException e) { ; } catch (Throwable e) { log( "process.invoke" , e); } socket = null ; }
上面方法中,首先根据参数socket的输入流与输出流初始化request对象与response对象,接着是解析输入流并填充request对象和 response对象 ,接下来调用容器对象的void invoke(Request request, Response response)方法具体进行处理
接下来清理并回收request对象和response对象
注:keepAlive表示是否是持久连接
最后是检查输入流是否有未读完的字节并跳过这些字节和关闭socket实例。
下面是一个简单的容器,实现了org.apache.catalina.Container接口,关键代码如下
public class SimpleContainer implements Container { public static final String WEB_ROOT = System.getProperty( "user.dir") + File.separator + "webroot" ; public SimpleContainer() { } public Loader getLoader() { return null ; } public void setLoader(Loader loader) { } public void invoke(Request request, Response response) throws IOException, ServletException { String servletName = ( (HttpServletRequest) request).getRequestURI(); servletName = servletName.substring(servletName.lastIndexOf("/") + 1 ); URLClassLoader loader = null ; try { URL[] urls = new URL[1 ]; URLStreamHandler streamHandler = null ; File classPath = new File(WEB_ROOT); String repository = ( new URL("file", null , classPath.getCanonicalPath() + File.separator)).toString() ; urls[ 0] = new URL( null , repository, streamHandler); loader = new URLClassLoader(urls); } catch (IOException e) { System.out.println(e.toString() ); } Class myClass = null ; try { myClass = loader.loadClass(servletName); } catch (ClassNotFoundException e) { System.out.println(e.toString()); } Servlet servlet = null ; try { servlet = (Servlet) myClass.newInstance(); servlet.service((HttpServletRequest) request, (HttpServletResponse) response); } catch (Exception e) { System.out.println(e.toString()); } catch (Throwable e) { System.out.println(e.toString()); } } }
该方法与前面文章的ServletProcessor类的process()方法类似,不再具体分析
---------------------------------------------------------------------------
本系列How Tomcat Works系本人原创
转载请注明出处 博客园 刺猬的温驯
本人邮箱: chenying998179 # 163.com ( #改为@ )