多连接的客户端 / 服务器程序

文章目录

下面两节中,我们将使用 selectors 模块中的 selector 对象来创建一个可以同时处理多个请求的客户端和服务端

多连接的服务端

首先,我们来看眼多连接服务端程序的代码,multiconn-server.py。这是开始建立监听 socket 部分

import selectors
sel = selectors.DefaultSelector()
# ...
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print('listening on', (host, port))
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)

这个程序和之前打印程序服务端最大的不同是使用了 lsock.setblocking(False) 配置 socket 为非阻塞模式,这个 socket 的调用将不在是阻塞的。当它和 sel.select() 一起使用的时候(下面会提到),我们就可以等待 socket 就绪事件,然后执行读写操作

sel.register() 使用 sel.select() 为你感兴趣的事件注册 socket 监控,对于监听 socket,我们希望使用 selectors.EVENT_READ 读取到事件

data 用来存储任何你 socket 中想存的数据,当 select() 返回的时候它也会返回。我们将使用 data 来跟踪 socket 上发送或者接收的东西

下面就是事件循环:

import selectors
sel = selectors.DefaultSelector()

# ...

while True:
events = sel.select(timeout=None)
for key, mask in events:
if key.data is None:
accept_wrapper(key.fileobj)
else:
service_connection(key, mask)

sel.select(timeout=None) 调用会阻塞直到 socket I/O 就绪。它返回一个 (key, events) 元组,每个 socket 都有一个。key 就是一个包含 fileobj 属性的具名元组。key.fileobj 是一个 socket 对象,mask 表示一个操作就绪的事件掩码

如果 key.data 为空,我们就可以知道它来自于监听 socket,我们需要调用 accept() 方法来授受连接请求。我们将使用一个 accept() 包装函数来获取新的 socket 对象并注册到 selector 上,我们马上就会看到

如果 key.data 不为空,我们就可以知道它是一个被接受的客户端 socket,我们需要为它服务,接着 service_connection() 会传入 keymask 参数并调用,这包含了所有我们需要在 socket 上操作的东西

让我们一起来看看 accept_wrapper() 方法做了什么:

def accept_wrapper(sock):
conn, addr = sock.accept() # Should be ready to read
print('accepted connection from', addr)
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
events = selectors.EVENT_READ | selectors.EVENT_WRITE
sel.register(conn, events, data=data)

由于监听 socket 被注册到了 selectors.EVENT_READ 上,它现在就能被读取,我们调用 sock.accept() 后立即再立即调 conn.setblocking(False) 来让 socket 进入非阻塞模式

请记住,这是这个版本服务器程序的主要目标,因为我们不希望它被阻塞。如果被阻塞,那么整个服务器在返回前都处于挂起状态。这意味着其它 socket 处于等待状态,这是一种 非常严重的 谁都不想见到的服务被挂起的状态

接着我们使用了 types.SimpleNamespace 类创建了一个对象用来保存我们想要的 socket 和数据,由于我们得知道客户端连接什么时候可以写入或者读取,下面两个事件都会被用到:

events = selectors.EVENT_READ | selectors.EVENT_WRITE

事件掩码、socket 和数据对象都会被传入 sel.register()

现在让我们来看下,当客户端 socket 就绪的时候连接请求是如何使用 service_connection() 来处理的

def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # Should be ready to read
if recv_data:
data.outb += recv_data
else:
print('closing connection to', data.addr)
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
print('echoing', repr(data.outb), 'to', data.addr)
sent = sock.send(data.outb) # Should be ready to write
data.outb = data.outb[sent:]

这就是多连接服务端的核心部分,key 就是从调用 select() 方法返回的一个具名元组,它包含了 socket 对象「fileobj」和数据对象。mask 包含了就绪的事件

如果 socket 就绪而且可以被读取,mask & selectors.EVENT_READ 就为真,sock.recv() 会被调用。所有读取到的数据都会被追加到 data.outb 里面。随后被发送出去

注意 else: 语句,如果没有收到任何数据:

if recv_data:
data.outb += recv_data
else:
print('closing connection to', data.addr)
sel.unregister(sock)
sock.close()

这表示客户端关闭了它的 socket 连接,这时服务端也应该关闭自己的连接。不过别忘了先调用 sel.unregister() 来撤销 select() 的监控

当 socket 就绪而且可以被读取的时候,对于正常的 socket 应该一直是这种状态,任何接收并被 data.outb 存储的数据都将使用 sock.send() 方法打印出来。发送出去的字节随后就会被从缓冲中删除

data.outb = data.outb[sent:]

多连接的客户端

现在让我们一起来看看多连接的客户端程序,multiconn-client.py,它和服务端很相似,不一样的是它没有监听连接请求,它以调用 start_connections() 开始初始化连接:

messages = [b'Message 1 from client.', b'Message 2 from client.']


def start_connections(host, port, num_conns):
server_addr = (host, port)
for i in range(0, num_conns):
connid = i + 1
print('starting connection', connid, 'to', server_addr)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
sock.connect_ex(server_addr)
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(connid=connid,
msg_total=sum(len(m) for m in messages),
recv_total=0,
messages=list(messages),
outb=b'')
sel.register(sock, events, data=data)

num_conns 参数是从命令行读取的,表示为服务器建立多少个链接。就像服务端程序一样,每个 socket 都设置成了非阻塞模式

由于 connect() 方法会立即触发一个 BlockingIOError 异常,所以我们使用 connect_ex() 方法取代它。connect_ex() 会返回一个错误指示 errno.EINPROGRESS,不像 connect() 方法直接在进程中返回异常。一旦连接结束,socket 就可以进行读写并且通过 select() 方法返回

socket 建立完成后,我们将使用 types.SimpleNamespace 类创建想会传送的数据。由于每个连接请求都会调用 socket.send(),发送到服务端的消息得使用 list(messages) 方法转换成列表结构。所有你想了解的东西,包括客户端将要发送的、已发送的、已接收的消息以及消息的总字节数都存储在 data 对象中

让我们再来看看 service_connection()。基本上和服务端一样:

def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # Should be ready to read
if recv_data:
print('received', repr(recv_data), 'from connection', data.connid)
data.recv_total += len(recv_data)
if not recv_data or data.recv_total == data.msg_total:
print('closing connection', data.connid)
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0)
if data.outb:
print('sending', repr(data.outb), 'to connection', data.connid)
sent = sock.send(data.outb) # Should be ready to write
data.outb = data.outb[sent:]

有一个不同的地方,客户端会跟踪从服务器接收的字节数,根据结果来决定是否关闭 socket 连接,服务端检测到客户端关闭则会同样的关闭服务端的连接

运行多连接的客户端和服务端程序

现在让我们把 multiconn-server.pymulticonn-client.py 两个程序跑起来。他们都使用了命令行参数,如果不指定参数可以看到参数调用的方法:

服务端程序,传入主机和端口号

$ ./multiconn-server.py
usage: ./multiconn-server.py <host> <port>

客户端程序,传入启动服务端程序时同样的主机和端口号以及连接数量

$ ./multiconn-client.py
usage: ./multiconn-client.py <host> <port> <num_connections>

下面就是服务端程序运行起来在 65432 端口上监听回环地址的输出:

$ ./multiconn-server.py 127.0.0.1 65432
listening on ('127.0.0.1', 65432)
accepted connection from ('127.0.0.1', 61354)
accepted connection from ('127.0.0.1', 61355)
echoing b'Message 1 from client.Message 2 from client.' to ('127.0.0.1', 61354)
echoing b'Message 1 from client.Message 2 from client.' to ('127.0.0.1', 61355)
closing connection to ('127.0.0.1', 61354)
closing connection to ('127.0.0.1', 61355)

下面是客户端,它创建了两个连接请求到上面的服务端:

$ ./multiconn-client.py 127.0.0.1 65432 2
starting connection 1 to ('127.0.0.1', 65432)
starting connection 2 to ('127.0.0.1', 65432)
sending b'Message 1 from client.' to connection 1
sending b'Message 2 from client.' to connection 1
sending b'Message 1 from client.' to connection 2
sending b'Message 2 from client.' to connection 2
received b'Message 1 from client.Message 2 from client.' from connection 1
closing connection 1
received b'Message 1 from client.Message 2 from client.' from connection 2
closing connection 2