转载请注明: TheViper http://www.VEVb.com/TheViper
在将tornado改成rails的风格形式,并可以设置隐藏参数中分析了tornado的路由机制,这里不再分析。
本屌才疏学浅,所以分析主要是以tornado的执行流程为线索,配合源码的注释和网上的一些资料来说明。基于tornado 3.2.2
tornado的设计模型
可以看到,tornado分为四层:最底层的EVENT层处理IO事件;TCP层实现了TCP服务器,负责数据传输;HTTP/HTTPS层基于HTTP协议实现了HTTP服务器和客户端;最上层为WEB框架,包含了处理器、模板、数据库连接、认证、本地化等等WEB框架需要具备的功能。
更确切点,tornado服务器的工作流程是这样的。
首先按照socket(创建套接字)->bind(绑定)->listen(监听)的顺序创建listen socket监听客户端,并将每个listen socket的fd注册到IOLoop的单例实例中;当listen socket可读时回调_handle_events处理客户端请求;在与客户端通信的过程中使用IOStream封装了读、写缓冲区,实现与客户端的异步读写。
下面开始源码部分,个人喜欢按照执行流程的顺序看代码。
具体执行流程
启动文件
application = web.Application([ (r"/", MainPageHandler), ]) http_server = httpserver.HTTPServer(application) http_server.listen(8080) ioloop.IOLoop.instance().start()
这是tornado最简单的一种启动方式,当然还有其他方式,请参考文档。
首先是实例化了web.Application这个类。
里面这几个方法值得注意,__init__(),listen(),add_handlers(),__call__()。
__init__()初始化Application类,一般将处理器直接传入,它会调用 add_handlers 添加这些处理器,初始化还包括 transforms (分块、压缩等)、UI模块、静态文件处理器的初始化。 add_handlers 方法负责添加URI和处理器的映射。
当监听到请求时,tornado通过调用 Application 实例触发 __call__.具体参见另一篇将tornado改成rails的风格形式,并可以设置隐藏参数。
而listen()只是对HTTPServer 中的 listen 的封装。
def listen(self, port, address="", **kwargs): from tornado.httpserver import HTTPServer server = HTTPServer(self, **kwargs) server.listen(port, address)
HTTPServer类。
在httpserver.py里面。
A non-blocking, single-threaded HTTP server. A server is defined by a request callback that takes an HTTPRequest instance as an argument and writes a valid HTTP response with `HTTPRequest.write`. `HTTPRequest.finish` finishes the request (but does not necessarily close the connection in the case of HTTP/1.1 keep-alive requests).
HTTPServer是一个无阻塞、单线程HTTP服务器。支持HTTP/1.1协议keep-alive连接,但不支持chunked encoding。服务器支持’X-Real-IP’和’X-Scheme’头以及SSL传输,支持多进程为prefork模式实现。代码很简单。
class HTTPServer(TCPServer): def __init__(self, request_callback, no_keep_alive=False, io_loop=None, xheaders=False, ssl_options=None, protocol=None, **kwargs): self.request_callback = request_callback self.no_keep_alive = no_keep_alive self.xheaders = xheaders self.protocol = protocol TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options, **kwargs) def handle_stream(self, stream, address): HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders, self.protocol)
里面的self.request_callback是上面的Application实例。
TCPServer类,在tcpserver.py里面
class TCPServer(object): def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None): self.io_loop = io_loop self.ssl_options = ssl_options self._sockets = {} # fd -> socket object,保存socket self._pending_sockets = [] self._started = False self.max_buffer_size = max_buffer_size if self.ssl_options is not None and isinstance(self.ssl_options, dict): ... def listen(self, port, address=""): sockets = bind_sockets(port, address=address) self.add_sockets(sockets) def add_sockets(self, sockets): if self.io_loop is None: self.io_loop = IOLoop.current() for sock in sockets: self._sockets[sock.fileno()] = sock add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop) def _handle_connection(self, connection, address): if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl_wrap_socket(connection, self.ssl_options, server_side=True, do_handshake_on_connect=False) except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error as err: # If the connection is closed immediately after it is created # (as in a port scan), we can get one of several errors. # wrap_socket makes an internal call to getpeername, # which may return either EINVAL (Mac OS X) or ENOTCONN # (linux). If it returns ENOTCONN, this error is # silently swallowed by the ssl module, so we need to # catch another error later on (AttributeError in # SSLIOStream._do_ssl_handshake). # To test this behavior, try nmap with the -sT flag. # https://github.com/tornadoweb/tornado/pull/750 if err.args[0] in (errno.ECONNABORTED, errno.EINVAL): return connection.close() else: raise try: if self.ssl_options is not None: stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size) else: stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size) self.handle_stream(stream, address) except Exception: app_log.error("Error in connection callback", exc_info=True)
这里省略了用其他方式启动tornado时,在tcpserver.py中执行的代码。
到__init__()时,httpserver=httpserver.HTTPServer(application)就执行完毕,得到一个HTTPServer对象。
接着是http_server.listen(8080),在上面TCPServer类里面。
bind_sockets()在netutil.py里面,用来创建socket实例的,返回一个含有socket实例的列表。
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128, flags=None): sockets = [] if address == "": address = None if not socket.has_ipv6 and family == socket.AF_UNSPEC: # Python can be compiled with --disable-ipv6, which causes # Operations on AF_INET6 sockets to fail, but does not # automatically exclude those results from getaddrinfo # results. # http://bugs.python.org/issue16208 family = socket.AF_INET if flags is None: flags = socket.AI_PASSIVE for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags)): af, socktype, proto, canonname, sockaddr = res try: sock = socket.socket(af, socktype, proto) except socket.error as e: if e.args[0] == errno.EAFNOSUPPORT: continue raise set_close_exec(sock.fileno()) if os.name != 'nt': sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if af == socket.AF_INET6: # On linux, ipv6 sockets accept ipv4 too by default, # but this makes it impossible to bind to both # 0.0.0.0 in ipv4 and :: in ipv6. On other systems, # separate sockets *must* be used to listen for both ipv4 # and ipv6. For consistency, always disable ipv4 on our # ipv6 sockets and use a separate ipv4 socket when needed. # # Python 2.x on windows doesn't have IPPROTO_IPV6. if hasattr(socket, "IPPROTO_IPV6"): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) sock.setblocking(0) sock.bind(sockaddr) sock.listen(backlog) sockets.append(sock) return sockets
bind_sockets在启动监听端口过程中调用,getaddrinfo返回服务器的所有网卡信息, 每块网卡上都要创建监听客户端的请求并返回创建的sockets。创建socket过程中绑定地址和端口,同时设置了fcntl.FD_CLOEXEC(创建子进程时关闭打开的socket)和socket.SO_REUSEADDR(保证某一socket关闭后立即释放端口,实现端口复用)标志位。sock.listen(backlog=128)默认设定等待被处理的连接最大个数为128。
对于 TCP 编程的总结就是:创建一个监听 socket,然后把它绑定到端口和地址上并开始监听,然后不停 accept。上面的bind_sockets()只做了绑定和监听。
关于套接字,强烈推荐这一篇linux学习笔记:套接字,写的很好。上面bind_sockets()里的些细节在里面都有很详细的说明。
然后是tcpserver.py里面,IOLoop先不用管它,先只用知道它是单例的,通过IOLoop.current()获取。
self.add_sockets(sockets),遍历sockets,保存到self._sockets.
add_accept_handler(sock, self._handle_connection,io_loop=self.io_loop)。在netutil.py里面.
def add_accept_handler(sock, callback, io_loop=None): if io_loop is None: io_loop = IOLoop.current() def accept_handler(fd, events): while True: try: connection, address = sock.accept() except socket.error as e: # EWOULDBLOCK and EAGAIN indicate we have accepted every # connection that is available. if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return # ECONNABORTED indicates that there was a connection # but it was closed while still in the accept queue. # (observed on FreeBSD). if e.args[0] == errno.ECONNABORTED: continue raise callback(connection, address) io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
里面定义的accept_handler()负责完成上面没完成的socket的接收。参数callback是tcpserver.py里面的_handle_connection()._handle_connection()在接受客户端的连接处理结束之后会被调用.里面很简单了,抛开ssl的处理,就是创建IOStream对象了。
注意,从这里开始,我们就假设tornado接收到了客户端的连接请求,开始处理请求。在这之前的行为都是在初始化。
def _handle_connection(self, connection, address): if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl_wrap_socket(connection, self.ssl_options, server_side=True, do_handshake_on_connect=False) except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error as err: # If the connection is closed immediately after it is created # (as in a port scan), we can get one of several errors. # wrap_socket makes an internal call to getpeername, # which may return either EINVAL (Mac OS X) or ENOTCONN # (Linux). If it returns ENOTCONN, this error is # silently swallowed by the ssl module, so we need to # catch another error later on (AttributeError in # SSLIOStream._do_ssl_handshake). # To test this behavior, try nmap with the -sT flag. # https://github.com/tornadoweb/tornado/pull/750 if err.args[0] in (errno.ECONNABORTED, errno.EINVAL): return connection.close() else: raise try: if self.ssl_options is not None: stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size) else: stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size) self.handle_stream(stream, address) except Exception: app_log.error("Error in connection callback", exc_info=True)
handle_stream()后面会说。
接着是add_accept_handler()中最后的io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ).
这个是向 loloop 对象注册在fd上的read事件和回调函数accept_handler。该回调函数是现成定义的,属于IOLoop层次的回调,每当事件发生时就会调用。回调内容也就是accept得到新socket和客户端地址,然后调用callback向上层传递事件。从上面的分析可知,当read事件发生时,accept_handler被调用,进而callback=_handle_connection被调用。
最后是tcpserver.py中_handle_connection()里面的self.handle_stream(stream, address)。
这个在子类HTTPServer中实现的。
def handle_stream(self, stream, address): HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders, self.protocol)
创建HTTPConnection对象,它封装了IOStream的一些操作,用来处理http客户端的连接还有http请求的,执行request回调,直到http连接关闭。
class HTTPConnection(object): def __init__(self, stream, address, request_callback, no_keep_alive=False, xheaders=False, protocol=None): self.stream = stream self.address = address # Save the socket's address family now so we know how to # interpret self.address even after the stream is closed # and its socket attribute replaced with None. self.address_family = stream.socket.family self.request_callback = request_callback self.no_keep_alive = no_keep_alive self.xheaders = xheaders self.protocol = protocol self._clear_request_state() # Save stack context here, outside of any request. This keeps # contexts from one request from leaking into the next. self._header_callback = stack_context.wrap(self._on_headers) self.stream.set_close_callback(self._on_connection_close) self.stream.read_until(b"/r/n/r/n", self._header_callback)
self.request_callback是前面的Application实例,stream是IOStream实例.
_clear_request_state():在请求被当做垃圾回收或连接关闭时清空请求状态。
def _clear_request_state(self): self._request = None self._request_finished = False self._write_callback = None self._close_callback = None
self._header_callback = stack_context.wrap(self._on_headers),它对多线程执行环境的上下文做了一些维护。这个在低一点的版本里没有,都是直接self.stream.read_until(b"/r/n/r/n", self._on_headers),搞不懂。
然后是 self.stream.set_close_callback(self._on_connection_close)设置关闭连接时的回调。
def set_close_callback(self, callback): self._close_callback = stack_context.wrap(callback) def _on_connection_close(self): if self._close_callback is not None: callback = self._close_callback self._close_callback = None callback() # Delete any unfinished callbacks to break up reference cycles. self._header_callback = None self._clear_request_state()
最后是最重要的self.stream.read_until(b"/r/n/r/n", self._header_callback).顾名思义,就是一直读取知道/r/n/r/n(这一般意味这请求头的结束),然后调用回调函数_on_headers(这是 IOStream 层次的回调)。
这里先不讨论IOstream里的read_until。先说_on_headers.
def _on_headers(self, data): try: data = native_str(data.decode('latin1')) eol = data.find("/r/n") start_line = data[:eol] try: method, uri, version = start_line.split(" ") except ValueError: raise _BadRequestException("Malformed HTTP request line") if not version.startswith("HTTP/"): raise _BadRequestException("Malformed HTTP version in HTTP Request-Line") try: headers = httputil.HTTPHeaders.parse(data[eol:]) except ValueError: # Probably from split() if there was no ':' in the line raise _BadRequestException("Malformed HTTP headers") # HTTPRequest wants an IP, not a full socket address if self.address_family in (socket.AF_INET, socket.AF_INET6): remote_ip = self.address[0] else: # Unix (or other) socket; fake the remote address remote_ip = '0.0.0.0' self._request = HTTPRequest( connection=self, method=method, uri=uri, version=version, headers=headers, remote_ip=remote_ip, protocol=self.protocol) content_length = headers.get("Content-Length") if content_length: content_length = int(content_length) if content_length > self.stream.max_buffer_size: raise _BadRequestException("Content-Length too long") if headers.get("Expect") == "100-continue": self.stream.write(b"HTTP/1.1 100 (Continue)/r/n/r/n") self.stream.read_bytes(content_length, self._on_request_body) return self.request_callback(self._request) except _BadRequestException as e: gen_log.info("Malformed HTTP request from %r: %s", self.address, e) self.close() return
data = native_str(data.decode('latin1'))。里面的data是IOStream解析socket接收到的请求内容。类似于,
然后由此可以解析出method, uri, version.然后进一步解析header.
将解析出来的header信息,传入HTTPRequest(),创建HTTPRequest对象,
content_length = headers.get("Content-Length"),上面这个例子头信息没有Content-Length,所以直接走后面的self.request_callback(self._request)。就回到Application类里面的__call__(self,request)了。
下面说HTTPRequest,它是客户端请求的代表,它携带了所有和客户端请求的信息。对它的调用实际上是,HTTPRequest在访问传入它的那些请求参数或调用传入它的HTTPConnection里面的方法。
新闻热点
疑难解答