☰ Оглавление

Простейший prefork-сервер

Это самый большой пример, и чтобы придать ему хоть какую-то обозримость, я допустил в нём несколько существенных небрежностей-упрощений реализации. Очень рекомендую походить по ссылкам, которые привожу, там есть более качественные реализации отдельных приёмов, используемых тут.

Давайте чуть-чуть поясню дизайн.

Как работает prefork-сервер

Строго говоря, существует не одна схема организации prefork-сервера. Даже мультиплексирование соединения можно делать в разных местах, в зависимости от задачи и протокола. Здесь приведена схема очень похожая на то, как работает HTTP-сервер Apache.

Первым делом, мы создаём слушающий сокет.

Потом мы порождаем потомков и, естественно, все они получают доступ к этому слушающему сокету.

При порождении потомка, мы организуем с ним анонимное UNIX-соединение. Через него потомок и сервер обмениваются командами.

Мастер-процесс, — общий родитель, — начинает ждать событий на слушающем сокете (входящих соединений) и на сокетах дочерних процессов (команды).

Тут реализуется асинхронная схема работы.

Когда приходит входящее соединение, мастер находит незанятого потомка и даёт ему команду принять соединение.

Потомок-обработчик, получив команду, принимает соединение и отправляет серверу команду «принято», сервер начинает ждать новых подключений, а потомок спокойно работает с клиентом.

Когда работа с клиентом закончена, потомок закрывает соединение с клиентом и отдаёт мастеру команду «я готов принять новые соединения».

На самом деле работа немного сложнее, потомок может ответить разными командами, сообщениями об ошибках, исчерпании ресурсов, сообщениями о выработанных лимитах… Мастер должен отслеживать ситуацию «нет свободных обработчиков»; большинство серверов на этот случай имеют определённую логику типа: порождать потомков до определённого предела, потом принимать соединение и сразу сбрасывать самим мастером. В этом примере всего этого нет.

Но это не единственный недостаток этого примера.

Неточности и небрежности

Не обрабатываем исключения. То есть, совсем никакие. Это, конечно безобразие. В других моих примерах обрабатываются хотя бы базовые исключения. Хотя и этого мало, см. введение.

Синхронные сокеты. Имеет место ужасная смесь синхронных и асинхронных операций. Это неприемлемо. Как должен работать асинхронный сервер, смотри в отдельном примере. Здесь, для простоты и краткости, я не стал делать всё честно.

Самое ужасное место, это там, где мастер ждёт от обработчика ответ, о приёме соединения в работу. На самом деле, в этом месте надо удалить из кандидатов на чтение слушающий сокет и продолжить крутить main loop. А когда процесс-обработчик ответит — надо вернуть слушающий сокет в обработку. Причём, пока слушающий сокет «не слушает», надо внимательно отслеживать тайм ауты. В моём примере асинхронного сервера есть примитивная работа с тайм аутами. Можно посмотреть там.

Не отслеживание умираний потомков. Чтобы её прикрутить, надо, как минимум, обработать исключения errno.EINTR и сигнал signal.SIGCHLD.

Минимальные примеры этих обработчиков можно найти в моём примере простейшего форкающегося сервера.

Но можно пойти и другим путём: использовать стандартные средства типа multiprocessing. У вас действительно есть достаточно веские причины не делать этого?

Не аккуратное чтение из сокета. При общении между мастером и обработчиками, мы используем однобайтные команды. Это, в какой-то степени, извиняет нашу небрежность. Но в реальной жизни чтения и записи хорошо бы делать, учитывая, что данные могут приходить частями, и используя буферы. Как это сделано в других моих примерах и даже (тоже небрежно) в handle этого примера.

Не оптимальный поиск свободного потомка. Очевидно лишние системные вызовы. И множество других алгоритмических несовершенств.

На самом деле, в реальной жизни производительностью часто можно пожертвовать в пользу читаемости. В любом случае, оптимизацией кода надо заниматься только после того, как он стал устойчиво работать. Этот код оптимизировать рано.

Код

#!/usr/bin/python


import time
import errno
import os
import select
import socket
import logging


logger = logging.getLogger('main')


BIND_ADDRESS = ('localhost', 8999)
BACKLOG = 5
CHILDNUM = 3


childrens_pull = []


class ChildController:

    def __init__(self, pipe):
        self.is_free = True
        self.pipe = pipe

    def __repr__(self):
        return '<%s is_free=%s>' % (
            self.__class__.__name__,
            self.is_free)


def handle(sock):
    # обработчик, работающий в процессе-потомке
    logger.info('Start to process request')
    # получаем все данные до перевода строки
    # (это не очень честный подход, может сожрать сразу несколько строк,
    # если они придут одним пакетом; но для наших целей -- сойдёт)
    in_buffer = b''
    while not in_buffer.endswith(b'\n'):
        in_buffer += sock.recv(1024)
    logger.info('In buffer = ' + repr(in_buffer))
    # изображаем долгую обработку
    time.sleep(5)
    # получаем результат [не используйте eval!]
    try:
        result = str(eval(in_buffer, {}, {}))
    except Exception as e:
        result = repr(e)
    out_buffer = result.encode('utf-8') + b'\r\n'
    logger.info('Out buffer = ' + repr(out_buffer))
    # отправляем
    sock.sendall(out_buffer)
    logger.info('Done.')


def create_child(listen_sock):
    # создаём пару связанных анонимных сокетов
    child_pipe, parent_pipe = socket.socketpair()
    # порождаем потомка
    pid = os.fork()
    if pid == 0:
        # это выполняется в дочернем процессе
        child_pipe.close()
        # запускаем бесконечный цикл обработки запросов;
        # очень важно, что этот цикл именно бесконечный,
        # потомок не должен выходить из create_child,
        # иначе начнётся безудержное размножение и катаклизм
        while True:
            # блокирующее чтение из сокета, соединяющего
            # потомка с родителем
            command = parent_pipe.recv(1)
            # мы получили единственную возможную команду
            logger.info('Child get command=%s' % repr(command))
            # получаем соединение
            connection, (client_ip, clinet_port) = listen_sock.accept()
            logger.info('Accept connection %s:%d' % (client_ip, clinet_port))
            logger.info('Child send "begin"')
            parent_pipe.send(b'B')
            # отправляем соединение на обработку
            handle(connection)
            # всё аккуратненько закрываем
            connection.close()
            # отправляем родителю информацию, что мы освободились
            logger.info('Child send "free"')
            parent_pipe.send(b'F')
    # это выполняется в родительском процессе
    logger.info('Starting child with PID: %s' % pid)
    childrens_pull.append(ChildController(child_pipe))
    # закрываем ненужные дескрипторы
    parent_pipe.close()
    return child_pipe


def prepare_childs_and_serve_forever():
    # открываем сокет
    listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # re-use the port
    listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # неблокирующий режим
    #listen_sock.setblocking(0)
    listen_sock.bind(BIND_ADDRESS)
    listen_sock.listen(BACKLOG)
    logger.info('Listning no %s:%d...' % BIND_ADDRESS)
    # создаём потомков
    for i in range(CHILDNUM):
        create_child(listen_sock)
    # массив сокетов-кандидатов на чтение
    to_read = [listen_sock] + [c.pipe.fileno() for c in childrens_pull]
    while True:
        readables, writables, exceptions = select.select(to_read, [], [])
        if listen_sock in readables:
            logger.info('Listning socket is readable')
            # кажется, у нас имеется входящее соединение
            # ищем свободного потомка
            for c in childrens_pull:
                if c.is_free:
                    # передаём свободному потому команду принять подключение
                    logger.info('Send command "accept connection" to child')
                    c.pipe.send(b'A')
                    # ждём подтверждения от потомка, что он
                    # взял в обработку соединение
                    command = c.pipe.recv(1)
                    # строго говоря, тут можно бы и ошибки принимать,
                    # но у нас снова будет только одна команда
                    # "принято в обработку"
                    logger.info(
                        'Parent get command %s from child. Mark free.' %
                        repr(command))
                    # отмечаем потомка, как занятого (хотя, строго говоря, это ещё не факт)
                    c.is_free = False
                    break
                else:
                    logger.info('Child not free')
            else:
                raise Exception('No more childrens.')
        for c in childrens_pull:
            if c.pipe.fileno() in readables:
                # мы получили команду от потомка
                command = c.pipe.recv(1)
                if command != b'F':
                    raise Exception(repr(command))
                logger.info(
                    'Parent get command %s from child. Mark free.' %
                    repr(command))
                # бывает только одна команда: "я освободился"
                c.is_free = True


def main():
    # настраиваем логгинг
    logger.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    formatter = logging.Formatter(
        '%(asctime)s [%(levelname)s] [%(process)s] %(message)s',
        '%H:%M:%S'
    )
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    logger.info('Run')
    prepare_childs_and_serve_forever()


if __name__ == '__main__':
    main()

Протокол работы

12:53:58 [INFO] [2994] Run
12:53:58 [INFO] [2994] Listning no localhost:8999...
12:53:58 [INFO] [2994] Starting child with PID: 2995
12:53:58 [INFO] [2994] Starting child with PID: 2996
12:53:58 [INFO] [2994] Starting child with PID: 2997
12:54:01 [INFO] [2994] Listning socket is readable
12:54:01 [INFO] [2994] Send command "accept connection" to child
12:54:01 [INFO] [2995] Child get command=b'A'
12:54:01 [INFO] [2995] Accept connection 127.0.0.1:46200
12:54:01 [INFO] [2995] Child send "begin"
12:54:01 [INFO] [2995] Start to process request
12:54:01 [INFO] [2994] Parent get command b'B' from child. Mark free.
12:54:01 [INFO] [2995] In buffer = b'2+2000\n'
12:54:03 [INFO] [2994] Listning socket is readable
12:54:03 [INFO] [2994] Child not free
12:54:03 [INFO] [2994] Send command "accept connection" to child
12:54:03 [INFO] [2996] Child get command=b'A'
12:54:03 [INFO] [2996] Accept connection 127.0.0.1:46203
12:54:03 [INFO] [2996] Child send "begin"
12:54:03 [INFO] [2996] Start to process request
12:54:03 [INFO] [2994] Parent get command b'B' from child. Mark free.
12:54:03 [INFO] [2996] In buffer = b'2+2\n'
12:54:06 [INFO] [2995] Out buffer = b'2002\r\n'
12:54:06 [INFO] [2995] Done.
12:54:06 [INFO] [2995] Child send "free"
12:54:06 [INFO] [2994] Parent get command b'F' from child. Mark free.
12:54:08 [INFO] [2996] Out buffer = b'4\r\n'
12:54:08 [INFO] [2996] Done.
12:54:08 [INFO] [2996] Child send "free"
12:54:08 [INFO] [2994] Parent get command b'F' from child. Mark free.

Appendix

При рассмотрении perfork-серверов нельзя не упомянуть о принципиально ином, возможном, походе. Если вы про него не знаете, то код может показаться абсолютно непостижимым.

Итак. Есть такая вещь, как CMSG (control messages).

Суть в том, что с помощью вызовов sendmsg(2) и recvmsg(2) и чёрной магии, можно через сокет передать файловый дескриптор. Этот дескриптор не только приедет, но и сам открытый файл или сокет появятся у процесса-получателя сообщения.

Эту технологию часто используют при написании prefork-серверов.

Как ни странно, это работает довольно давно и не только на Linux. Но в широкой известности эта возможность пока не получила. Во многом ещё и от того, что она плохо поддержана в языках высокого уровня. Например, в Python она появилась только в версии 3.3. В менее прогрессивных языках, типа Perl, есть только сторонние модули и версии у них колеблются вблизи 0.0.3.