本文继续分析HttpProcessor类,该类实现了org.apache.catalina.Lifecycle接口和java.lang.Runnable接口
我们先分析它的构造函数
/**
* The HttpConnector with which this processor is associated.
*/
private
HttpConnector connector =
null
;
/**
* The HTTP request object we will pass to our associated container.
*/
private
HttpRequestImpl request =
null
;
/**
* The HTTP response object we will pass to our associated container.
*/
private
HttpResponseImpl response =
null
;
/**
* Construct a new HttpProcessor associated with the specified connector.
*
*
@param
connector HttpConnector that owns this processor
*
@param
id Identifier of this HttpProcessor (unique per connector)
*/
public
HttpProcessor(HttpConnector connector,
int
id) {
super
();
this
.connector =
connector;
this
.debug =
connector.getDebug();
this
.id =
id;
this
.proxyName =
connector.getProxyName();
this
.proxyPort =
connector.getProxyPort();
this
.request =
(HttpRequestImpl) connector.createRequest();
this
.response =
(HttpResponseImpl) connector.createResponse();
this
.serverPort =
connector.getPort();
this
.threadName =
"HttpProcessor[" + connector.getPort() + "][" + id + "]"
;
}
构造函数用于初始化成员变量HttpConnector connector = null、HttpRequestImpl request = null、HttpResponseImpl response = null等
当调用它的start()方法时,用于启动处理器线程
/**
* Start the background thread we will use for request processing.
*
*
@exception
LifecycleException if a fatal startup error occurs
*/
public
void
start()
throws
LifecycleException {
if
(started)
throw
new
LifecycleException
(sm.getString(
"httpProcessor.alreadyStarted"
));
lifecycle.fireLifecycleEvent(START_EVENT,
null
);
started
=
true
;
threadStart();
}
调用threadStart()方法
/**
* Start the background processing thread.
*/
private
void
threadStart() {
log(sm.getString(
"httpProcessor.starting"
));
thread
=
new
Thread(
this
, threadName);
thread.setDaemon(
true
);
thread.start();
if
(debug >= 1
)
log(
" Background thread has been started"
);
}
即启动处理器线程,下面是run()方法
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
public
void
run() {
//
Process requests until we receive a shutdown signal
while
(!
stopped) {
//
Wait for the next socket to be assigned
Socket socket =
await();
if
(socket ==
null
)
continue
;
//
Process the request from this socket
try
{
process(socket);
}
catch
(Throwable t) {
log(
"process.invoke"
, t);
}
//
Finish up this request
connector.recycle(
this
);
}
//
Tell threadStop() we have shut ourselves down successfully
synchronized
(threadSync) {
threadSync.notifyAll();
}
}
下面关键是分析await()方法与assign()方法是怎么同步的了,这里可以理解为经典的生产者与消费者模型
我们先来看assign()方法
/**
* Process an incoming TCP/IP connection on the specified socket. Any
* exception that occurs during processing must be logged and swallowed.
* <b>NOTE</b>: This method is called from our Connector's thread. We
* must assign it to our own thread so that multiple simultaneous
* requests can be handled.
*
*
@param
socket TCP socket to process
*/
synchronized
void
assign(Socket socket) {
//
Wait for the Processor to get the previous Socket
while
(available) {
try
{
wait();
}
catch
(InterruptedException e) {
}
}
//
Store the newly available Socket and notify our thread
this
.socket =
socket;
available
=
true
;
notifyAll();
if
((debug >= 1) && (socket !=
null
))
log(
" An incoming request is being assigned"
);
}
成员变量默认为boolean available = false
下面是await()方法
/**
* Await a newly assigned Socket from our Connector, or <code>null</code>
* if we are supposed to shut down.
*/
private
synchronized
Socket await() {
//
Wait for the Connector to provide a new Socket
while
(!
available) {
try
{
wait();
}
catch
(InterruptedException e) {
}
}
//
Notify the Connector that we have received this Socket
Socket socket =
this
.socket;
available
=
false
;
notifyAll();
if
((debug >= 1) && (socket !=
null
))
log(
" The incoming request has been awaited"
);
return
(socket);
}
显然这里是采用notifyAll()方法与wait()方法相同通信,当await()方法执行notifyAll()并返回socket后,assign()方法又可以继续接收请求socket对象了
await()方法里面采用的是局部变量,为了不占用成员变量引用(Socket socket = null),返回的socket对象供处理器线程进行处理,又要回到run()方法了
Socket socket = await()这里也是采用了局部变量,与await()方法里面采用局部变量同样的原因
获取socket实例后,调用process()方法进行处理,处理完毕后将处理器对象重新入栈,最后是如果收到停止处理器线程命令,则事件通知可以停止线程了
下面关键是process()方法,这个方法有点长
/**
* Process an incoming HTTP request on the Socket that has been assigned
* to this Processor. Any exceptions that occur during processing must be
* swallowed and dealt with.
*
*
@param
socket The socket on which we are connected to the client
*/
private
void
process(Socket socket) {
boolean
ok =
true
;
boolean
finishResponse =
true
;
SocketInputStream input
=
null
;
OutputStream output
=
null
;
//
Construct and initialize the objects we will need
try
{
input
=
new
SocketInputStream(socket.getInputStream(),
connector.getBufferSize());
}
catch
(Exception e) {
log(
"process.create"
, e);
ok
=
false
;
}
keepAlive
=
true
;
while
(!stopped && ok &&
keepAlive) {
finishResponse
=
true
;
try
{
request.setStream(input);
request.setResponse(response);
output
=
socket.getOutputStream();
response.setStream(output);
response.setRequest(request);
((HttpServletResponse) response.getResponse()).setHeader
(
"Server"
, SERVER_INFO);
}
catch
(Exception e) {
log(
"process.create"
, e);
ok
=
false
;
}
//
Parse the incoming request
try
{
if
(ok) {
parseConnection(socket);
parseRequest(input, output);
if
(!
request.getRequest().getProtocol()
.startsWith(
"HTTP/0"
))
parseHeaders(input);
if
(http11) {
//
Sending a request acknowledge back to the client if
//
requested.
ackRequest(output);
//
If the protocol is HTTP/1.1, chunking is allowed.
if
(connector.isChunkingAllowed())
response.setAllowChunking(
true
);
}
}
}
catch
(EOFException e) {
//
It's very likely to be a socket disconnect on either the
//
client or the server
ok =
false
;
finishResponse
=
false
;
}
catch
(ServletException e) {
ok
=
false
;
try
{
((HttpServletResponse) response.getResponse())
.sendError(HttpServletResponse.SC_BAD_REQUEST);
}
catch
(Exception f) {
;
}
}
catch
(InterruptedIOException e) {
if
(debug > 1
) {
try
{
log(
"process.parse"
, e);
((HttpServletResponse) response.getResponse())
.sendError(HttpServletResponse.SC_BAD_REQUEST);
}
catch
(Exception f) {
;
}
}
ok
=
false
;
}
catch
(Exception e) {
try
{
log(
"process.parse"
, e);
((HttpServletResponse) response.getResponse()).sendError
(HttpServletResponse.SC_BAD_REQUEST);
}
catch
(Exception f) {
;
}
ok
=
false
;
}
//
Ask our Container to process this request
try
{
((HttpServletResponse) response).setHeader
(
"Date"
, FastHttpDateFormat.getCurrentDate());
if
(ok) {
connector.getContainer().invoke(request, response);
}
}
catch
(ServletException e) {
log(
"process.invoke"
, e);
try
{
((HttpServletResponse) response.getResponse()).sendError
(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
catch
(Exception f) {
;
}
ok
=
false
;
}
catch
(InterruptedIOException e) {
ok
=
false
;
}
catch
(Throwable e) {
log(
"process.invoke"
, e);
try
{
((HttpServletResponse) response.getResponse()).sendError
(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
catch
(Exception f) {
;
}
ok
=
false
;
}
//
Finish up the handling of the request
if
(finishResponse) {
try
{
response.finishResponse();
}
catch
(IOException e) {
ok
=
false
;
}
catch
(Throwable e) {
log(
"process.invoke"
, e);
ok
=
false
;
}
try
{
request.finishRequest();
}
catch
(IOException e) {
ok
=
false
;
}
catch
(Throwable e) {
log(
"process.invoke"
, e);
ok
=
false
;
}
try
{
if
(output !=
null
)
output.flush();
}
catch
(IOException e) {
ok
=
false
;
}
}
//
We have to check if the connection closure has been requested
//
by the application or the response stream (in case of HTTP/1.0
//
and keep-alive).
if
( "close".equals(response.getHeader("Connection"
)) ) {
keepAlive
=
false
;
}
//
End of request processing
status =
Constants.PROCESSOR_IDLE;
//
Recycling the request and the response objects
request.recycle();
response.recycle();
}
try
{
shutdownInput(input);
socket.close();
}
catch
(IOException e) {
;
}
catch
(Throwable e) {
log(
"process.invoke"
, e);
}
socket
=
null
;
}
上面方法中,首先根据参数socket的输入流与输出流初始化request对象与response对象,接着是解析输入流并填充request对象和 response对象 ,接下来调用容器对象的void invoke(Request request, Response response)方法具体进行处理
接下来清理并回收request对象和response对象
注:keepAlive表示是否是持久连接
最后是检查输入流是否有未读完的字节并跳过这些字节和关闭socket实例。
下面是一个简单的容器,实现了org.apache.catalina.Container接口,关键代码如下
public
class
SimpleContainer
implements
Container {
public
static
final
String WEB_ROOT =
System.getProperty(
"user.dir") + File.separator + "webroot"
;
public
SimpleContainer() {
}
public
Loader getLoader() {
return
null
;
}
public
void
setLoader(Loader loader) {
}
public
void
invoke(Request request, Response response)
throws
IOException, ServletException {
String servletName
=
( (HttpServletRequest) request).getRequestURI();
servletName
= servletName.substring(servletName.lastIndexOf("/") + 1
);
URLClassLoader loader
=
null
;
try
{
URL[] urls
=
new
URL[1
];
URLStreamHandler streamHandler
=
null
;
File classPath
=
new
File(WEB_ROOT);
String repository
= (
new
URL("file",
null
, classPath.getCanonicalPath() +
File.separator)).toString() ;
urls[
0] =
new
URL(
null
, repository, streamHandler);
loader
=
new
URLClassLoader(urls);
}
catch
(IOException e) {
System.out.println(e.toString() );
}
Class myClass
=
null
;
try
{
myClass
=
loader.loadClass(servletName);
}
catch
(ClassNotFoundException e) {
System.out.println(e.toString());
}
Servlet servlet
=
null
;
try
{
servlet
=
(Servlet) myClass.newInstance();
servlet.service((HttpServletRequest) request, (HttpServletResponse) response);
}
catch
(Exception e) {
System.out.println(e.toString());
}
catch
(Throwable e) {
System.out.println(e.toString());
}
}
}
该方法与前面文章的ServletProcessor类的process()方法类似,不再具体分析
---------------------------------------------------------------------------
本系列How Tomcat Works系本人原创
转载请注明出处 博客园 刺猬的温驯
本人邮箱: chenying998179 # 163.com ( #改为@ )

