Асинхронные сервера выглядят немного сложнее синхронных. Это происходит потому, что для каждого соединения мы должны создать контекст: некую структуру данных, которая будет хранить состояние соединения, информацию о том, что сейчас происходит на этом соединении. Действия же с соединением (чтение, запись…) происходят только тогда, когда соединение к этому готово. Поэтому, ни одно действие не вызывает ожидания.
Ожидания в таком дизайне становятся недопустимы, так как все обработчики будут ждать одного.
Существует множество способов определить, к каким
действиям готово соединение. Здесь мы будем использовать
самый простой и наиболее кроссплатформенный
вызов select
.
Мы будем использовать два вида контекста:
Listner
— обёртка для слушающего сокета,
Handler
— контекст соединения с клиентом.
Для простоты, мы не будем использовать наследование (хотя оно здесь уместно) и большинство действий мы постараемся вынести в основной цикл приложения (main loop). Это тоже, скорее, нестандартный подход. Обычно, стараются разгрузить main loop, максимально вынести из него всю функциональность, но это приводит к тому, что разные части кода, обрабатывающего запрос, попадают в разные объекты и код становится более запутанным. Мы же пишем демонстрационный пример, поэтому поступили по-другому.
#!/usr/bin/python
import os
import errno
import signal
import select
import socket
import logging
import time
logger = logging.getLogger('main')
BIND_ADDRESS = ('localhost', 8999)
BACKLOG = 5
class Handler:
def __init__(self, sock, clinet_ip, client_port):
self.log('Start to process request from %s:%d' % (clinet_ip, client_port))
self.sock = sock
self.ready_to_read = True
self.ready_to_write = False
self.i_am_done = False
self.in_buffer = b''
self.out_buffer = b''
self.wait_until = 1e10 # недостижимое время
def log(self, message):
logger.info('[id=%d] %s' % (id(self), message))
def get_data_from_socket(self):
while True:
chank = self.sock.recv(1024)
self.in_buffer += chank
if len(chank) < 1024:
break
if self.in_buffer.endswith(b'\n'):
self.log('In buffer collected: ' + repr(self.in_buffer))
self.ready_to_read = False
# ждём 5 секунд (изображаем обращение к внешнему процессу)
self.wait_until = time.time() + 5
def time_tick(self):
if time.time() < self.wait_until:
return
# получаем результат
try:
result = str(eval(self.in_buffer, {}, {}))
except Exception as e:
result = repr(e)
self.out_buffer = result.encode('utf-8') + b'\r\n'
self.log('Out buffer ready: ' + repr(self.out_buffer))
# выставляем флаг, что мы готовы отдавать результат
self.ready_to_write = True
def write_data_to_socket(self):
n = self.sock.send(self.out_buffer)
if n == len(self.out_buffer):
# все отправлено
self.log('Done.')
self.i_am_done = True
self.ready_to_write = False
self.sock.close()
else:
# удаляем часть буфера, которая уже отправлена
self.out_buffer = self.out_buffer[n:]
class Listner:
def __init__(self, sock):
self.ready_to_read = True
self.ready_to_write = False
self.i_am_done = False
def time_tick(self):
pass # затычка для совместимости
# глобальная карта соответствия fileno <-> объект_обработчик
socket_map = {}
def serve_forever():
# создаём слушающий сокет
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# re-use port
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(BIND_ADDRESS)
sock.listen(BACKLOG)
# слушаем и при получении нового входящего соединения,
# порождаем объект Handler, который будет его обрабатывать
logger.info('Listning no %s:%d...' % BIND_ADDRESS)
# добавляем наш слушающий сокет в карту
# обернув его в Listner для обеспечения единого интерфейса
# всех объектов-контекстов
socket_map[sock.fileno()] = Listner(sock)
while True:
# собираем все сокеты, для которых
# есть операции, ожидающие чтения или записи
to_read = []
to_write = []
for fileno, obj in socket_map.items():
obj.time_tick()
if obj.ready_to_read:
to_read.append(fileno)
if obj.ready_to_write:
to_write.append(fileno)
# проверяем фактическое состояние сокета и его
# готовность к вводу-выводу
has_data_to_read, waiting_for_writing, errors = select.select(
to_read, to_write, [], 1)
# обрабатываем все чтения
for fileno in has_data_to_read:
obj = socket_map[fileno]
# проверяем тип сокета
if type(obj) is Listner:
# мы получили новое входящее соединение
try:
connection, (client_ip, clinet_port) = sock.accept()
except IOError as e:
if e.errno == errno.EINTR:
continue
raise
# не блокирующий
connection.setblocking(0)
# cоздаём новый объект для асинхронного
# обслуживания соединения
socket_map[connection.fileno()] = Handler(
connection,
client_ip,
clinet_port)
else:
# мы получили данные
obj.get_data_from_socket()
# обрабатываем все записи (тут всё просто, альтернатив нет)
for fileno in waiting_for_writing:
socket_map[fileno].write_data_to_socket()
# удаляем все обработчики, которые завершили свою работу
to_delete = []
for fileno, obj in socket_map.items():
if obj.i_am_done:
to_delete.append(fileno)
for fileno in to_delete:
del socket_map[fileno]
def main():
# настраиваем логгинг
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s',
'%H:%M:%S'
)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info('Run')
# запускаем сервер
serve_forever()
main()
И снова:
никогда не используйте eval
!
Как видите, два объекта обрабатывают запрос «одновременно»
10:37:43 [INFO] Run
10:37:43 [INFO] Listning no localhost:8999...
10:37:49 [INFO] [id=140243061768432] Start to process request from 127.0.0.1:59613
10:37:53 [INFO] [id=140243061768432] In buffer collected: b'10+10\r\n'
10:37:54 [INFO] [id=140243061768824] Start to process request from 127.0.0.1:59615
10:37:56 [INFO] [id=140243061768824] In buffer collected: b'99*99\r\n'
10:37:58 [INFO] [id=140243061768432] Out buffer ready: b'20\r\n'
10:37:58 [INFO] [id=140243061768432] Done.
10:38:01 [INFO] [id=140243061768824] Out buffer ready: b'9801\r\n'
10:38:01 [INFO] [id=140243061768824] Done.
На самом деле, конечно, такой одновременности, как в случае дочерних процессов, или нитей, нет. Под одновременностью понимается то, что они одновременно ждут, при этом не мешая друг другу и не замедляя работу друг друга. В нашем случае, они ждут просто истечения определённого времени, но на практике, они могли бы ждать данных из базы данных, или ответа другого сервера с цепочке прокси-серверов, или чтения с диска… Не редко асинхронные приложения демонстрируют большую производительность, чем многопоточные и многопроцессные аналоги. Это происходит от того, что в асинхронном дизайне нет необходимости переключения контекстов, стеков или иной диспетчеризации. Для операционной системы — это очень простые приложения.