首页 > 学院 > 开发设计 > 正文

tornado源码分析之http和tcp层

2019-11-14 17:28:20
字体:
来源:转载
供稿:网友

转载请注明: TheViper http://www.VEVb.com/TheViper

将tornado改成rails的风格形式,并可以设置隐藏参数中分析了tornado的路由机制,这里不再分析。

本屌才疏学浅,所以分析主要是以tornado的执行流程为线索,配合源码的注释和网上的一些资料来说明。基于tornado 3.2.2

tornado的设计模型

可以看到,tornado分为四层:最底层的EVENT层处理IO事件;TCP层实现了TCP服务器,负责数据传输;HTTP/HTTPS层基于HTTP协议实现了HTTP服务器和客户端;最上层为WEB框架,包含了处理器、模板数据库连接、认证、本地化等等WEB框架需要具备的功能。

 更确切点,tornado服务器的工作流程是这样的。

首先按照socket(创建套接字)->bind(绑定)->listen(监听)的顺序创建listen socket监听客户端,并将每个listen socket的fd注册到IOLoop的单例实例中;当listen socket可读时回调_handle_events处理客户端请求;在与客户端通信的过程中使用IOStream封装了读、写缓冲区,实现与客户端的异步读写。

下面开始源码部分,个人喜欢按照执行流程的顺序看代码。

 具体执行流程

启动文件

        application = web.Application([            (r"/", MainPageHandler),        ])        http_server = httpserver.HTTPServer(application)        http_server.listen(8080)        ioloop.IOLoop.instance().start()

 这是tornado最简单的一种启动方式,当然还有其他方式,请参考文档。

 首先是实例化了web.Application这个类。

里面这几个方法值得注意,__init__(),listen(),add_handlers(),__call__()。

__init__()初始化Application类,一般将处理器直接传入,它会调用 add_handlers 添加这些处理器,初始化还包括 transforms (分块、压缩等)、UI模块、静态文件处理器的初始化。 add_handlers 方法负责添加URI和处理器的映射。

当监听到请求时,tornado通过调用 Application 实例触发 __call__.具体参见另一篇将tornado改成rails的风格形式,并可以设置隐藏参数

而listen()只是对HTTPServer 中的 listen 的封装。

    def listen(self, port, address="", **kwargs):        from tornado.httpserver import HTTPServer        server = HTTPServer(self, **kwargs)        server.listen(port, address)

 

HTTPServer类。

 在httpserver.py里面。

    A non-blocking, single-threaded HTTP server.    A server is defined by a request callback that takes an HTTPRequest    instance as an argument and writes a valid HTTP response with    `HTTPRequest.write`. `HTTPRequest.finish` finishes the request (but does    not necessarily close the connection in the case of HTTP/1.1 keep-alive    requests).

HTTPServer是一个无阻塞、单线程HTTP服务器。支持HTTP/1.1协议keep-alive连接,但不支持chunked encoding。服务器支持’X-Real-IP’和’X-Scheme’头以及SSL传输,支持多进程为prefork模式实现。代码很简单。

class HTTPServer(TCPServer):        def __init__(self, request_callback, no_keep_alive=False, io_loop=None,                 xheaders=False, ssl_options=None, protocol=None, **kwargs):        self.request_callback = request_callback        self.no_keep_alive = no_keep_alive        self.xheaders = xheaders        self.protocol = protocol        TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,                           **kwargs)    def handle_stream(self, stream, address):        HTTPConnection(stream, address, self.request_callback,                       self.no_keep_alive, self.xheaders, self.protocol)

 里面的self.request_callback是上面的Application实例。

TCPServer类,在tcpserver.py里面

class TCPServer(object):        def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None):        self.io_loop = io_loop        self.ssl_options = ssl_options        self._sockets = {}  # fd -> socket object,保存socket        self._pending_sockets = []        self._started = False        self.max_buffer_size = max_buffer_size        if self.ssl_options is not None and isinstance(self.ssl_options, dict):            ...    def listen(self, port, address=""):        sockets = bind_sockets(port, address=address)        self.add_sockets(sockets)    def add_sockets(self, sockets):        if self.io_loop is None:            self.io_loop = IOLoop.current()        for sock in sockets:            self._sockets[sock.fileno()] = sock            add_accept_handler(sock, self._handle_connection,                               io_loop=self.io_loop)    def _handle_connection(self, connection, address):        if self.ssl_options is not None:            assert ssl, "Python 2.6+ and OpenSSL required for SSL"            try:                connection = ssl_wrap_socket(connection,                                             self.ssl_options,                                             server_side=True,                                             do_handshake_on_connect=False)            except ssl.SSLError as err:                if err.args[0] == ssl.SSL_ERROR_EOF:                    return connection.close()                else:                    raise            except socket.error as err:                # If the connection is closed immediately after it is created                # (as in a port scan), we can get one of several errors.                # wrap_socket makes an internal call to getpeername,                # which may return either EINVAL (Mac OS X) or ENOTCONN                # (linux).  If it returns ENOTCONN, this error is                # silently swallowed by the ssl module, so we need to                # catch another error later on (AttributeError in                # SSLIOStream._do_ssl_handshake).                # To test this behavior, try nmap with the -sT flag.                # https://github.com/tornadoweb/tornado/pull/750                if err.args[0] in (errno.ECONNABORTED, errno.EINVAL):                    return connection.close()                else:                    raise        try:            if self.ssl_options is not None:                stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)            else:                stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)            self.handle_stream(stream, address)        except Exception:            app_log.error("Error in connection callback", exc_info=True)

这里省略了用其他方式启动tornado时,在tcpserver.py中执行的代码。

到__init__()时,httpserver=httpserver.HTTPServer(application)就执行完毕,得到一个HTTPServer对象。

接着是http_server.listen(8080),在上面TCPServer类里面。

bind_sockets()在netutil.py里面,用来创建socket实例的,返回一个含有socket实例的列表。

def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128, flags=None):    sockets = []    if address == "":        address = None    if not socket.has_ipv6 and family == socket.AF_UNSPEC:        # Python can be compiled with --disable-ipv6, which causes        # Operations on AF_INET6 sockets to fail, but does not        # automatically exclude those results from getaddrinfo        # results.        # http://bugs.python.org/issue16208        family = socket.AF_INET    if flags is None:        flags = socket.AI_PASSIVE    for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,                                      0, flags)):        af, socktype, proto, canonname, sockaddr = res        try:            sock = socket.socket(af, socktype, proto)        except socket.error as e:            if e.args[0] == errno.EAFNOSUPPORT:                continue            raise        set_close_exec(sock.fileno())        if os.name != 'nt':            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)        if af == socket.AF_INET6:            # On linux, ipv6 sockets accept ipv4 too by default,            # but this makes it impossible to bind to both            # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,            # separate sockets *must* be used to listen for both ipv4            # and ipv6.  For consistency, always disable ipv4 on our            # ipv6 sockets and use a separate ipv4 socket when needed.            #            # Python 2.x on windows doesn't have IPPROTO_IPV6.            if hasattr(socket, "IPPROTO_IPV6"):                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)        sock.setblocking(0)        sock.bind(sockaddr)        sock.listen(backlog)        sockets.append(sock)    return sockets

bind_sockets在启动监听端口过程中调用,getaddrinfo返回服务器的所有网卡信息, 每块网卡上都要创建监听客户端的请求并返回创建的sockets。创建socket过程中绑定地址和端口,同时设置了fcntl.FD_CLOEXEC(创建子进程时关闭打开的socket)和socket.SO_REUSEADDR(保证某一socket关闭后立即释放端口,实现端口复用)标志位。sock.listen(backlog=128)默认设定等待被处理的连接最大个数为128。

 对于 TCP 编程的总结就是:创建一个监听 socket,然后把它绑定到端口和地址上并开始监听,然后不停 accept。上面的bind_sockets()只做了绑定和监听。

关于套接字,强烈推荐这一篇linux学习笔记:套接字,写的很好。上面bind_sockets()里的些细节在里面都有很详细的说明。

 

然后是tcpserver.py里面,IOLoop先不用管它,先只用知道它是单例的,通过IOLoop.current()获取。

self.add_sockets(sockets),遍历sockets,保存到self._sockets.

add_accept_handler(sock, self._handle_connection,io_loop=self.io_loop)。在netutil.py里面.

def add_accept_handler(sock, callback, io_loop=None):    if io_loop is None:        io_loop = IOLoop.current()            def accept_handler(fd, events):        while True:            try:                connection, address = sock.accept()            except socket.error as e:                # EWOULDBLOCK and EAGAIN indicate we have accepted every                # connection that is available.                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):                    return                # ECONNABORTED indicates that there was a connection                # but it was closed while still in the accept queue.                # (observed on FreeBSD).                if e.args[0] == errno.ECONNABORTED:                    continue                raise            callback(connection, address)                io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)

里面定义的accept_handler()负责完成上面没完成的socket的接收。参数callback是tcpserver.py里面的_handle_connection()._handle_connection()在接受客户端的连接处理结束之后会被调用.里面很简单了,抛开ssl的处理,就是创建IOStream对象了。

注意,从这里开始,我们就假设tornado接收到了客户端的连接请求,开始处理请求。在这之前的行为都是在初始化。

    def _handle_connection(self, connection, address):        if self.ssl_options is not None:            assert ssl, "Python 2.6+ and OpenSSL required for SSL"            try:                connection = ssl_wrap_socket(connection,                                             self.ssl_options,                                             server_side=True,                                             do_handshake_on_connect=False)            except ssl.SSLError as err:                if err.args[0] == ssl.SSL_ERROR_EOF:                    return connection.close()                else:                    raise            except socket.error as err:                # If the connection is closed immediately after it is created                # (as in a port scan), we can get one of several errors.                # wrap_socket makes an internal call to getpeername,                # which may return either EINVAL (Mac OS X) or ENOTCONN                # (Linux).  If it returns ENOTCONN, this error is                # silently swallowed by the ssl module, so we need to                # catch another error later on (AttributeError in                # SSLIOStream._do_ssl_handshake).                # To test this behavior, try nmap with the -sT flag.                # https://github.com/tornadoweb/tornado/pull/750                if err.args[0] in (errno.ECONNABORTED, errno.EINVAL):                    return connection.close()                else:                    raise        try:            if self.ssl_options is not None:                stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)            else:                stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)            self.handle_stream(stream, address)        except Exception:            app_log.error("Error in connection callback", exc_info=True)

handle_stream()后面会说。

接着是add_accept_handler()中最后的io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ).

这个是向 loloop 对象注册在fd上的read事件和回调函数accept_handler。该回调函数是现成定义的,属于IOLoop层次的回调,每当事件发生时就会调用。回调内容也就是accept得到新socket和客户端地址,然后调用callback向上层传递事件。从上面的分析可知,当read事件发生时,accept_handler被调用,进而callback=_handle_connection被调用。

最后是tcpserver.py中_handle_connection()里面的self.handle_stream(stream, address)。

这个在子类HTTPServer中实现的。

    def handle_stream(self, stream, address):        HTTPConnection(stream, address, self.request_callback,                       self.no_keep_alive, self.xheaders, self.protocol)

创建HTTPConnection对象,它封装了IOStream的一些操作,用来处理http客户端的连接还有http请求的,执行request回调,直到http连接关闭。

class HTTPConnection(object):    def __init__(self, stream, address, request_callback, no_keep_alive=False,                 xheaders=False, protocol=None):        self.stream = stream        self.address = address        # Save the socket's address family now so we know how to        # interpret self.address even after the stream is closed        # and its socket attribute replaced with None.        self.address_family = stream.socket.family        self.request_callback = request_callback        self.no_keep_alive = no_keep_alive        self.xheaders = xheaders        self.protocol = protocol        self._clear_request_state()        # Save stack context here, outside of any request.  This keeps        # contexts from one request from leaking into the next.        self._header_callback = stack_context.wrap(self._on_headers)        self.stream.set_close_callback(self._on_connection_close)        self.stream.read_until(b"/r/n/r/n", self._header_callback)

 self.request_callback是前面的Application实例,stream是IOStream实例.

_clear_request_state():在请求被当做垃圾回收或连接关闭时清空请求状态。

    def _clear_request_state(self):        self._request = None        self._request_finished = False        self._write_callback = None        self._close_callback = None

self._header_callback = stack_context.wrap(self._on_headers),它对多线程执行环境的上下文做了一些维护。这个在低一点的版本里没有,都是直接self.stream.read_until(b"/r/n/r/n", self._on_headers),搞不懂。

 然后是 self.stream.set_close_callback(self._on_connection_close)设置关闭连接时的回调。

    def set_close_callback(self, callback):        self._close_callback = stack_context.wrap(callback)    def _on_connection_close(self):        if self._close_callback is not None:            callback = self._close_callback            self._close_callback = None            callback()        # Delete any unfinished callbacks to break up reference cycles.        self._header_callback = None        self._clear_request_state()

 最后是最重要的self.stream.read_until(b"/r/n/r/n", self._header_callback).顾名思义,就是一直读取知道/r/n/r/n(这一般意味这请求头的结束),然后调用回调函数_on_headers(这是 IOStream 层次的回调)。

这里先不讨论IOstream里的read_until。先说_on_headers.

def _on_headers(self, data):        try:            data = native_str(data.decode('latin1'))            eol = data.find("/r/n")            start_line = data[:eol]            try:                method, uri, version = start_line.split(" ")            except ValueError:                raise _BadRequestException("Malformed HTTP request line")            if not version.startswith("HTTP/"):                raise _BadRequestException("Malformed HTTP version in HTTP Request-Line")            try:                headers = httputil.HTTPHeaders.parse(data[eol:])            except ValueError:                # Probably from split() if there was no ':' in the line                raise _BadRequestException("Malformed HTTP headers")            # HTTPRequest wants an IP, not a full socket address            if self.address_family in (socket.AF_INET, socket.AF_INET6):                remote_ip = self.address[0]            else:                # Unix (or other) socket; fake the remote address                remote_ip = '0.0.0.0'            self._request = HTTPRequest(                connection=self, method=method, uri=uri, version=version,                headers=headers, remote_ip=remote_ip, protocol=self.protocol)            content_length = headers.get("Content-Length")            if content_length:                content_length = int(content_length)                if content_length > self.stream.max_buffer_size:                    raise _BadRequestException("Content-Length too long")                if headers.get("Expect") == "100-continue":                    self.stream.write(b"HTTP/1.1 100 (Continue)/r/n/r/n")                self.stream.read_bytes(content_length, self._on_request_body)                return            self.request_callback(self._request)        except _BadRequestException as e:            gen_log.info("Malformed HTTP request from %r: %s",                         self.address, e)            self.close()            return

 

 data = native_str(data.decode('latin1'))。里面的data是IOStream解析socket接收到的请求内容。类似于,

 然后由此可以解析出method, uri, version.然后进一步解析header.

将解析出来的header信息,传入HTTPRequest(),创建HTTPRequest对象,

content_length = headers.get("Content-Length"),上面这个例子头信息没有Content-Length,所以直接走后面的self.request_callback(self._request)。就回到Application类里面的__call__(self,request)了。

 下面说HTTPRequest,它是客户端请求的代表,它携带了所有和客户端请求的信息。对它的调用实际上是,HTTPRequest在访问传入它的那些请求参数或调用传入它的HTTPConnection里面的方法。

 


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表