如何在Python中使用Linux epoll

一.简介

在Python2.6中引入了一个访问Linux的epoll的API。本文使用Python3简要地展示一下这个API。

二.阻塞式套接字编程示例

例1是一个简单的Python服务器:它在端口8080上监听HTTP请求消息,并把它打印到控制台,返回给客户端发送一个HTTP响应消息。

    * Line 9: 创建服务器套接字。
    * Line 10: 在11行上允许 bind(),即使另一个程序最近在同一端口上监听。否则程序无法运行,直到一两分钟后,前面的程序使用该端口已经完成。
    * Line 11: 服务器套接字绑定到这台机器上的所有可用的IPv4地址的端口8080上。
    * Line 12: 告诉服务器套接字开始从客户端接受传入的连接。
    * Line 14: 该程序将会在这里等待着,直到一个连接被接受。当发生这种情况时,服务器套接字将在这台机器上创建一个新的套接字用于与客户端交互。这个新的套接字将会被 accept() 调用的 clientconnection 对象来表示。地址对象表示连接的另一端的IP地址和端口号。
    * Lines 15-17: 数据开始在传输直到一个完整的HTTP请求完成。HTTP 协议是在HTTP Made Easy(http://www.jmarshall.com/easy/http/)中有详细的描述。
    * Line 18: 为了验证正确的操作,打印请求到控制台。
    * Line 19: 发送响应到客户端。
    * Lines 20-22: 关闭到客户端的连接以及监听服务器套接字。

官方的 HOWTO (http://docs.python.org/3.0/howto/sockets.html) 对使用 Python 进行套接字编程有着更详细的说明。

Example 1 (All examples use Python 3)

 1  import socket
 2
 3  EOL1 = b'\n\n'
 4  EOL2 = b'\n\r\n'
 5  response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
 6  response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
 7  response += b'Hello, world!'
 8
 9  serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
10  serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
11  serversocket.bind(('0.0.0.0', 8080))
12  serversocket.listen(1)
13
14  connectiontoclient, address = serversocket.accept()
15  request = b''
16  while EOL1 not in request and EOL2 not in request:
17     request += connectiontoclient.recv(1024)
18  print(request.decode())
19  connectiontoclient.send(response)
20  connectiontoclient.close()
21
22  serversocket.close()

例2在15行增加了一个循环用来反复处理客户端连接,并由用户(例如,一个键盘中断)中断。这更清楚地说明服务器套接字绝不会被用来与客户端的数据交换。相反,它接受从客户端的连接,然后在服务器计算机上创建一个新的套接字,用于与客户端交互。

在第23-24行上的 finally 语句,确保监听的服务器套接字总是关闭,即使在发生异常的时候。

Example 2

 1  import socket
 2
 3  EOL1 = b'\n\n'
 4  EOL2 = b'\n\r\n'
 5  response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
 6  response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
 7  response += b'Hello, world!'
 8
 9  serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
10  serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
11  serversocket.bind(('0.0.0.0', 8080))
12  serversocket.listen(1)
13
14  try:
15     while True:
16        connectiontoclient, address = serversocket.accept()
17        request = b''
18        while EOL1 not in request and EOL2 not in request:
19            request += connectiontoclient.recv(1024)
20        print('-'*40 + '\n' + request.decode()[:-2])
21        connectiontoclient.send(response)
22        connectiontoclient.close()
23  finally:
24     serversocket.close()

三.异步套接字和Linux epoll的好处

在例2中展示的套接字服务器是阻塞式的,因为程序在等待直到一个事件的发生。在16行的 accept() 调用阻塞着直到一个客户端的连接被接受。19行的 recv() 调用阻塞着直到接收完客户端的数据(或者甚至没有接收到任何数据)。21行的 send() 调用阻塞着直到所有返回给客户端的数据在Linux上已排队准备传输。

当一个程序使用阻塞式套接字,它通常使用一个线程(甚至是专用的进程)与这些套接字进行通信。主程序的线程包含了监听服务器的套接字,它将接受客户端的连接。它将接受所有的客户端连接,并且把新建的套接字分配给一个独立的线程,这些独立的线程将会与客户端进行交互。因为每一个线程单独与一个客户端进行通信,任何堵塞不影响其他线程执行各自的任务。

这种使用多线程的阻塞式的套接字让代码简单明了,但是带来很多问题。首先它很难确保线程之间共享资源。另外在单CPU的机器上这种风格的程序效率很低。

C10K问题(http://www.kegel.com/c10k.html)讨论处理多并发的套接字的替代方法,比如异步套接字。这些套接字不会等着事件的发生。相反,程序执行异步套接字的行动,并立即通知该操作是否成功或失败。这个信息允许程序决定如何进行。由于异步套接字是非阻塞的,因此没有多个线程执行的必要。所有的工作可能在一个线程中进行。这种单线程的方式有着许多的挑战性,但是会是许多程序的一个选择。它也可以与多线程进行结合:一个服务器的网络组件可以用于异步套接字使用一个单独的线程,其它线程可以用来访问其他阻塞资源,例如数据库。

Linux有多种机制来管理异步套接字,python中常用的有select,poll以及epoll API。epoll和poll是优于select,因为Python程序没有必要检查每个套接字感兴趣的事件。相反,它可以依靠操作系统,告诉它哪个套接字可能有这些事件。同样,epoll是优于poll的,因为它不需要操作系统每次检查套接字所有感兴趣的事件,这是需要Python程序来查询的。

四.使用Epoll的异步套接字编程示例

使用epoll的程序通常有如下的流程:
1.创建一个epoll对象;
2.通知epoll对象在特定的套接字上监测特定的事件;
3.询问的epoll对象自上次查询后哪个套接字可能发生特定的事件;
4.在这些套接字上执行一些操作;
5.通知epoll对象修改监测的套接字或者/以及事件的列表;
6.重复步骤3到5直至结束;
7.销毁epoll对象。

例3复制了例2的功能,只是改成了异步套接字。程序比较复杂因为单个线程与多个客户端相互通信。


    * Line 1: select模块包含了 epoll 功能。
    * Line 13: 因为默认情况下套接字使用阻塞式,这里有必要使用非阻塞式(异步式)模式。
    * Line 15: 创建epoll对象。
    * Line 16: 在服务器套接字上注册读事件。一个读事件可能发生在服务器套接字接收一个套接字连接的任何时候。
    * Line 19: 连接字典映射文件描述符到它们相应的网络连接对象。
    * Line 21: 查询的epoll的对象,以找出是否可能发生的任何事件。参数“1”表示,我们愿意等待长达一秒钟的事件发生。如果任何感兴趣的事件发生之前,这个查询将立即返回与这些事件的列表。
    * Line 22: 事件将会以(fileno,event code)元组序列形式返回。fileno是一个文件描述符的象征,并且总是整数。
    * Line 23: 如果一个读事件在服务器套接字上发生,一个新的套接字连接可能会被创建。
    * Line 25: 设置一个新的套接字为非阻塞模式。
    * Line 26: 为一个新的套接字注册一个读事件(EPOLLIN)。
    * Line 31: 如果一个读事件发生的话,将会读取来自客户端的发送的新的数据。
    * Line 33: 一旦接收到完整的请求,注销读事件并且注册写事件。当可能会发送响应数据返回给客户端的时候,写事件可能会发生。
    * Line 34: 打印完整的请求。
    * Line 35: 如果在客户端的套接字上发生写入事件,它能够接受新的数据发送到客户端。
    * Lines 36-38: 发送响应数据,直到完全把响应数据交付给操作系统传输。
    * Line 39: 一旦已发送完整的响应,禁用进一步读取或写入事件。
    * Line 40: 如果一个连接明显地关闭,套接字关闭是可选的。本示例使用了套接字关闭是为了触发客户端主动关闭。
    * Line 41: HUP (hang-up) 事件表明了客户端的套接字连接已经关闭,所以套接字也会关闭。
    * Line 42: 在这个套接字上注销事件。
    * Line 43: 关闭套接字连接。
    * Lines 46-48: 打开套接字连接不需要被关闭,因为Python程序终止时将它们关闭。


Example 3

 1  import socket, select
 2
 3  EOL1 = b'\n\n'
 4  EOL2 = b'\n\r\n'
 5  response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
 6  response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
 7  response += b'Hello, world!'
 8
 9  serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
10  serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
11  serversocket.bind(('0.0.0.0', 8080))
12  serversocket.listen(1)
13  serversocket.setblocking(0)
14
15  epoll = select.epoll()
16  epoll.register(serversocket.fileno(), select.EPOLLIN)
17
18  try:
19     connections = {}; requests = {}; responses = {}
20     while True:
21        events = epoll.poll(1)
22        for fileno, event in events:
23           if fileno == serversocket.fileno():
24              connection, address = serversocket.accept()
25              connection.setblocking(0)
26              epoll.register(connection.fileno(), select.EPOLLIN)
27              connections[connection.fileno()] = connection
28              requests[connection.fileno()] = b''
29              responses[connection.fileno()] = response
30           elif event & select.EPOLLIN:
31              requests[fileno] += connections[fileno].recv(1024)
32              if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
33                 epoll.modify(fileno, select.EPOLLOUT)
34                 print('-'*40 + '\n' + requests[fileno].decode()[:-2])
35           elif event & select.EPOLLOUT:
36              byteswritten = connections[fileno].send(responses[fileno])
37              responses[fileno] = responses[fileno][byteswritten:]
38              if len(responses[fileno]) == 0:
39                 epoll.modify(fileno, 0)
40                 connections[fileno].shutdown(socket.SHUT_RDWR)
41           elif event & select.EPOLLHUP:
42              epoll.unregister(fileno)
43              connections[fileno].close()
44              del connections[fileno]
45  finally:
46     epoll.unregister(serversocket.fileno())
47     epoll.close()
48     serversocket.close()





LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.

ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。

Example 4

 1  import socket, select
 2
 3  EOL1 = b'\n\n'
 4  EOL2 = b'\n\r\n'
 5  response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
 6  response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
 7  response += b'Hello, world!'
 8
 9  serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
10  serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
11  serversocket.bind(('0.0.0.0', 8080))
12  serversocket.listen(1)
13  serversocket.setblocking(0)
14
15  epoll = select.epoll()
16  epoll.register(serversocket.fileno(), select.EPOLLIN | select.EPOLLET)
17
18  try:
19     connections = {}; requests = {}; responses = {}
20     while True:
21        events = epoll.poll(1)
22        for fileno, event in events:
23           if fileno == serversocket.fileno():
24              try:
25                 while True:
26                    connection, address = serversocket.accept()
27                    connection.setblocking(0)
28                    epoll.register(connection.fileno(), select.EPOLLIN | select.EPOLLET)
29                    connections[connection.fileno()] = connection
30                    requests[connection.fileno()] = b''
31                    responses[connection.fileno()] = response
32              except socket.error:
33                 pass
34           elif event & select.EPOLLIN:
35              try:
36                 while True:
37                    requests[fileno] += connections[fileno].recv(1024)
38              except socket.error:
39                 pass
40              if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
41                 epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET)
42                 print('-'*40 + '\n' + requests[fileno].decode()[:-2])
43           elif event & select.EPOLLOUT:
44              try:
45                 while len(responses[fileno]) > 0:
46                    byteswritten = connections[fileno].send(responses[fileno])
47                    responses[fileno] = responses[fileno][byteswritten:]
48              except socket.error:
49                 pass
50              if len(responses[fileno]) == 0:
51                 epoll.modify(fileno, select.EPOLLET)
52                 connections[fileno].shutdown(socket.SHUT_RDWR)
53           elif event & select.EPOLLHUP:
54              epoll.unregister(fileno)
55              connections[fileno].close()
56              del connections[fileno]
57  finally:
58     epoll.unregister(serversocket.fileno())
59     epoll.close()
60     serversocket.close()