Добавим к нашему предыдущему примеру немного rock-n-roll-а: взаимодействие между нитями. Это уже похоже на решение, подходящее для неплохого чата.
Здесь каждый подключившийся, получает ответ не только на свой вопрос, но информацию об уже решённых задачах.
Telnet сеанс выглядит так:
$ telnet localhost 8999
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
6*6
1+1=2, 2*2=1+1=24
36
Connection closed by foreign host.
Однако напомню, что эти примеры не предназначены для продкшена,
a eval
— зло.
#!/usr/bin/python
import errno
import threading
import socket
import logging
import time
logger = logging.getLogger('main')
BIND_ADDRESS = ('localhost', 8999)
BACKLOG = 5
class ResultsStorage:
def __init__(self):
self.__lock = threading.Lock()
self.__stor = []
def append(self, i, o):
e = (i + b'=' + o).translate(None, b' \t\r\n')
with self.__lock:
self.__stor.append(e)
if len(self.__stor) > 5:
self.__stor = self.__stor[-5:]
def get(self):
with self.__lock:
return b', '.join(self.__stor) + b'\r\n'
results_storage = ResultsStorage()
def handle(sock, clinet_ip, client_port):
# обработчик, работающий в процессе-потомке
logger.info('Start to process request from %s:%d' % (clinet_ip, client_port))
# получаем все данные до перевода строки
# (это не очень честный подход, может сожрать сразу несколько строк,
# если они придут одним какетом; но для наших целей -- сойдёт)
in_buffer = b''
while not in_buffer.endswith(b'\n'):
in_buffer += sock.recv(1024)
logger.info('In buffer = ' + repr(in_buffer))
# изображаем долгую обработку
time.sleep(5)
# получаем результат
try:
result = str(eval(in_buffer, {}, {}))
except Exception as e:
result = repr(e)
out_buffer = results_storage.get() + result.encode('utf-8') + b'\r\n'
results_storage.append(in_buffer, out_buffer)
logger.info('Out buffer = ' + repr(out_buffer))
# отправляем
sock.sendall(out_buffer)
# в отличии от fork-варианта, здесь процесс не завершается
# автоматического закрытия сокета не произойдёт;
# поэтому закрываем сокет руками
sock.close()
logger.info('Done.')
def serve_forever():
# создаём слушающий сокет
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# re-use port
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(BIND_ADDRESS)
sock.listen(BACKLOG)
# слушаем и при получении нового входящего соединения,
# порождаем нить, которая будет его обрабатывать
logger.info('Listning no %s:%d...' % BIND_ADDRESS)
while True:
try:
connection, (client_ip, clinet_port) = sock.accept()
except IOError as e:
if e.errno == errno.EINTR:
continue
raise
# запускаем нить
thread = threading.Thread(
target=handle,
args=(connection, client_ip, clinet_port)
)
thread.daemon = True
thread.start()
def main():
# настраиваем логгинг
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] [%(thread)s] %(message)s',
'%H:%M:%S'
)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info('Run')
# запускаем сервер
serve_forever()
main()
Здесь всё очень похоже на
пример с невзаимодействующими нитями.
Всё взаимодействие происходит через объект
ResultsStorage
. Обратите внимание, что он не только
следит за тем, чтобы массив не переполнялся, но и за блокировками
общих данных.
11:09:52 [INFO] [139711435970304] Run
11:09:52 [INFO] [139711435970304] Listning no localhost:8999...
11:09:55 [INFO] [139711397947136] Start to process request from 127.0.0.1:59689
11:09:57 [INFO] [139711397947136] In buffer = b'1+1\r\n'
11:10:02 [INFO] [139711397947136] Out buffer = b'\r\n2\r\n'
11:10:02 [INFO] [139711397947136] Done.
11:10:04 [INFO] [139711397947136] Start to process request from 127.0.0.1:59693
11:10:06 [INFO] [139711397947136] In buffer = b'2*2\r\n'
11:10:08 [INFO] [139711387367168] Start to process request from 127.0.0.1:59695
11:10:10 [INFO] [139711387367168] In buffer = b'6*6\r\n'
11:10:11 [INFO] [139711397947136] Out buffer = b'1+1=2\r\n4\r\n'
11:10:11 [INFO] [139711397947136] Done.
11:10:15 [INFO] [139711387367168] Out buffer = b'1+1=2, 2*2=1+1=24\r\n36\r\n'
11:10:15 [INFO] [139711387367168] Done.