Это самый большой пример, и чтобы придать ему хоть какую-то обозримость, я допустил в нём несколько существенных небрежностей-упрощений реализации. Очень рекомендую походить по ссылкам, которые привожу, там есть более качественные реализации отдельных приёмов, используемых тут.
Давайте чуть-чуть поясню дизайн.
Строго говоря, существует не одна схема организации prefork-сервера. Даже мультиплексирование соединения можно делать в разных местах, в зависимости от задачи и протокола. Здесь приведена схема очень похожая на то, как работает HTTP-сервер Apache.
Первым делом, мы создаём слушающий сокет.
Потом мы порождаем потомков и, естественно, все они получают доступ к этому слушающему сокету.
При порождении потомка, мы организуем с ним анонимное UNIX-соединение. Через него потомок и сервер обмениваются командами.
Мастер-процесс, — общий родитель, — начинает ждать событий на слушающем сокете (входящих соединений) и на сокетах дочерних процессов (команды).
Тут реализуется асинхронная схема работы.
Когда приходит входящее соединение, мастер находит незанятого потомка и даёт ему команду принять соединение.
Потомок-обработчик, получив команду, принимает соединение и отправляет серверу команду «принято», сервер начинает ждать новых подключений, а потомок спокойно работает с клиентом.
Когда работа с клиентом закончена, потомок закрывает соединение с клиентом и отдаёт мастеру команду «я готов принять новые соединения».
На самом деле работа немного сложнее, потомок может ответить разными командами, сообщениями об ошибках, исчерпании ресурсов, сообщениями о выработанных лимитах… Мастер должен отслеживать ситуацию «нет свободных обработчиков»; большинство серверов на этот случай имеют определённую логику типа: порождать потомков до определённого предела, потом принимать соединение и сразу сбрасывать самим мастером. В этом примере всего этого нет.
Но это не единственный недостаток этого примера.
Не обрабатываем исключения. То есть, совсем никакие. Это, конечно безобразие. В других моих примерах обрабатываются хотя бы базовые исключения. Хотя и этого мало, см. введение.
Синхронные сокеты. Имеет место ужасная смесь синхронных и асинхронных операций. Это неприемлемо. Как должен работать асинхронный сервер, смотри в отдельном примере. Здесь, для простоты и краткости, я не стал делать всё честно.
Самое ужасное место, это там, где мастер ждёт от обработчика ответ, о приёме соединения в работу. На самом деле, в этом месте надо удалить из кандидатов на чтение слушающий сокет и продолжить крутить main loop. А когда процесс-обработчик ответит — надо вернуть слушающий сокет в обработку. Причём, пока слушающий сокет «не слушает», надо внимательно отслеживать тайм ауты. В моём примере асинхронного сервера есть примитивная работа с тайм аутами. Можно посмотреть там.
Не отслеживание умираний потомков. Чтобы её прикрутить,
надо, как минимум, обработать исключения errno.EINTR
и
сигнал signal.SIGCHLD
.
Минимальные примеры этих обработчиков можно найти в моём примере простейшего форкающегося сервера.
Но можно пойти и другим путём: использовать стандартные средства
типа multiprocessing
. У вас действительно есть достаточно
веские причины не делать этого?
Не аккуратное чтение из сокета.
При общении между мастером и обработчиками, мы используем однобайтные
команды. Это, в какой-то степени, извиняет нашу небрежность. Но
в реальной жизни чтения и записи хорошо бы делать, учитывая,
что данные могут приходить частями, и используя буферы. Как это сделано
в других моих примерах и даже (тоже небрежно) в handle
этого примера.
Не оптимальный поиск свободного потомка. Очевидно лишние системные вызовы. И множество других алгоритмических несовершенств.
На самом деле, в реальной жизни производительностью часто можно пожертвовать в пользу читаемости. В любом случае, оптимизацией кода надо заниматься только после того, как он стал устойчиво работать. Этот код оптимизировать рано.
#!/usr/bin/python
import time
import errno
import os
import select
import socket
import logging
logger = logging.getLogger('main')
BIND_ADDRESS = ('localhost', 8999)
BACKLOG = 5
CHILDNUM = 3
childrens_pull = []
class ChildController:
def __init__(self, pipe):
self.is_free = True
self.pipe = pipe
def __repr__(self):
return '<%s is_free=%s>' % (
self.__class__.__name__,
self.is_free)
def handle(sock):
# обработчик, работающий в процессе-потомке
logger.info('Start to process request')
# получаем все данные до перевода строки
# (это не очень честный подход, может сожрать сразу несколько строк,
# если они придут одним пакетом; но для наших целей -- сойдёт)
in_buffer = b''
while not in_buffer.endswith(b'\n'):
in_buffer += sock.recv(1024)
logger.info('In buffer = ' + repr(in_buffer))
# изображаем долгую обработку
time.sleep(5)
# получаем результат [не используйте eval!]
try:
result = str(eval(in_buffer, {}, {}))
except Exception as e:
result = repr(e)
out_buffer = result.encode('utf-8') + b'\r\n'
logger.info('Out buffer = ' + repr(out_buffer))
# отправляем
sock.sendall(out_buffer)
logger.info('Done.')
def create_child(listen_sock):
# создаём пару связанных анонимных сокетов
child_pipe, parent_pipe = socket.socketpair()
# порождаем потомка
pid = os.fork()
if pid == 0:
# это выполняется в дочернем процессе
child_pipe.close()
# запускаем бесконечный цикл обработки запросов;
# очень важно, что этот цикл именно бесконечный,
# потомок не должен выходить из create_child,
# иначе начнётся безудержное размножение и катаклизм
while True:
# блокирующее чтение из сокета, соединяющего
# потомка с родителем
command = parent_pipe.recv(1)
# мы получили единственную возможную команду
logger.info('Child get command=%s' % repr(command))
# получаем соединение
connection, (client_ip, clinet_port) = listen_sock.accept()
logger.info('Accept connection %s:%d' % (client_ip, clinet_port))
logger.info('Child send "begin"')
parent_pipe.send(b'B')
# отправляем соединение на обработку
handle(connection)
# всё аккуратненько закрываем
connection.close()
# отправляем родителю информацию, что мы освободились
logger.info('Child send "free"')
parent_pipe.send(b'F')
# это выполняется в родительском процессе
logger.info('Starting child with PID: %s' % pid)
childrens_pull.append(ChildController(child_pipe))
# закрываем ненужные дескрипторы
parent_pipe.close()
return child_pipe
def prepare_childs_and_serve_forever():
# открываем сокет
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# re-use the port
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# неблокирующий режим
#listen_sock.setblocking(0)
listen_sock.bind(BIND_ADDRESS)
listen_sock.listen(BACKLOG)
logger.info('Listning no %s:%d...' % BIND_ADDRESS)
# создаём потомков
for i in range(CHILDNUM):
create_child(listen_sock)
# массив сокетов-кандидатов на чтение
to_read = [listen_sock] + [c.pipe.fileno() for c in childrens_pull]
while True:
readables, writables, exceptions = select.select(to_read, [], [])
if listen_sock in readables:
logger.info('Listning socket is readable')
# кажется, у нас имеется входящее соединение
# ищем свободного потомка
for c in childrens_pull:
if c.is_free:
# передаём свободному потому команду принять подключение
logger.info('Send command "accept connection" to child')
c.pipe.send(b'A')
# ждём подтверждения от потомка, что он
# взял в обработку соединение
command = c.pipe.recv(1)
# строго говоря, тут можно бы и ошибки принимать,
# но у нас снова будет только одна команда
# "принято в обработку"
logger.info(
'Parent get command %s from child. Mark free.' %
repr(command))
# отмечаем потомка, как занятого (хотя, строго говоря, это ещё не факт)
c.is_free = False
break
else:
logger.info('Child not free')
else:
raise Exception('No more childrens.')
for c in childrens_pull:
if c.pipe.fileno() in readables:
# мы получили команду от потомка
command = c.pipe.recv(1)
if command != b'F':
raise Exception(repr(command))
logger.info(
'Parent get command %s from child. Mark free.' %
repr(command))
# бывает только одна команда: "я освободился"
c.is_free = True
def main():
# настраиваем логгинг
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] [%(process)s] %(message)s',
'%H:%M:%S'
)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info('Run')
prepare_childs_and_serve_forever()
if __name__ == '__main__':
main()
12:53:58 [INFO] [2994] Run
12:53:58 [INFO] [2994] Listning no localhost:8999...
12:53:58 [INFO] [2994] Starting child with PID: 2995
12:53:58 [INFO] [2994] Starting child with PID: 2996
12:53:58 [INFO] [2994] Starting child with PID: 2997
12:54:01 [INFO] [2994] Listning socket is readable
12:54:01 [INFO] [2994] Send command "accept connection" to child
12:54:01 [INFO] [2995] Child get command=b'A'
12:54:01 [INFO] [2995] Accept connection 127.0.0.1:46200
12:54:01 [INFO] [2995] Child send "begin"
12:54:01 [INFO] [2995] Start to process request
12:54:01 [INFO] [2994] Parent get command b'B' from child. Mark free.
12:54:01 [INFO] [2995] In buffer = b'2+2000\n'
12:54:03 [INFO] [2994] Listning socket is readable
12:54:03 [INFO] [2994] Child not free
12:54:03 [INFO] [2994] Send command "accept connection" to child
12:54:03 [INFO] [2996] Child get command=b'A'
12:54:03 [INFO] [2996] Accept connection 127.0.0.1:46203
12:54:03 [INFO] [2996] Child send "begin"
12:54:03 [INFO] [2996] Start to process request
12:54:03 [INFO] [2994] Parent get command b'B' from child. Mark free.
12:54:03 [INFO] [2996] In buffer = b'2+2\n'
12:54:06 [INFO] [2995] Out buffer = b'2002\r\n'
12:54:06 [INFO] [2995] Done.
12:54:06 [INFO] [2995] Child send "free"
12:54:06 [INFO] [2994] Parent get command b'F' from child. Mark free.
12:54:08 [INFO] [2996] Out buffer = b'4\r\n'
12:54:08 [INFO] [2996] Done.
12:54:08 [INFO] [2996] Child send "free"
12:54:08 [INFO] [2994] Parent get command b'F' from child. Mark free.
При рассмотрении perfork-серверов нельзя не упомянуть о принципиально ином, возможном, походе. Если вы про него не знаете, то код может показаться абсолютно непостижимым.
Итак. Есть такая вещь, как CMSG (control messages).
Суть в том, что с помощью вызовов sendmsg(2) и recvmsg(2) и чёрной магии, можно через сокет передать файловый дескриптор. Этот дескриптор не только приедет, но и сам открытый файл или сокет появятся у процесса-получателя сообщения.
Эту технологию часто используют при написании prefork-серверов.
Как ни странно, это работает довольно давно и не только на Linux. Но в широкой известности эта возможность пока не получила. Во многом ещё и от того, что она плохо поддержана в языках высокого уровня. Например, в Python она появилась только в версии 3.3. В менее прогрессивных языках, типа Perl, есть только сторонние модули и версии у них колеблются вблизи 0.0.3.