17 Sep, 2023
I've written a couple of examples of simple (and therefore fast) HTTP servers. Here are some attempts at the same thing but using the low level selectors directly without a library like asyncio
or gevent
.
I imagined this would result in a faster more stable server, but actually the implementations always seemed slower, and I haven't been able to get a server that reliably works with both wrk
and ab
under load (unlike with the other approaches). So I don't know what's going on!
Here is the code though in case I ever want to pick it up and try again.
First a simple version that registers a socket for reading and writing at the same time:
import selectors
import socket
from collections import namedtuple
import os
import sys
import multiprocessing
import time
connection = namedtuple('collection', ('callback'))
def run_worker(num, sock):
print('Worker', num)
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, connection(read_and_write))
def read_and_write(conn, mask):
if mask & selectors.EVENT_READ:
read(conn, mask)
if mask & selectors.EVENT_WRITE:
write(conn, mask)
def write(conn, mask):
pid = os.getpid()
print(num, 'writing', conn.send(b'HTTP/1.1 200 OK\r\nContent-Length: ' + str(len(str(pid))).encode('ascii') + b'\r\n\r\n' + str(pid).encode('ascii')))
# print('closing', conn)
sel.unregister(conn)
conn.close()
def read(conn, mask):
data = conn.recv(1000) # Should be ready
#print('got', repr(data), 'to', conn)
# sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, connection(accept))
while True:
events = sel.select()
for key, mask in events:
callback = key.data.callback
callback(key.fileobj, mask)
if __name__ == '__main__':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.AI_PASSIVE, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 12349))
sock.listen(100)
num_workers = 12
workers = []
try:
for i in range(num_workers):
worker = multiprocessing.Process(target=run_worker, args=(i + 1, sock))
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
except KeyboardInterrupt:
print(f'Shutting down worker processes ...')
for worker in workers:
worker.terminate()
except Exception as e:
print(f'Error:', e)
sock.close()
And now a more sophisticated one that changes whether to listen for read or write events based on the incoming HTTP request. This one does the listening after the fork, it doesn't seem to make a difference which surprised me:
import selectors
import socket
import multiprocessing
NUM_BYTES = 2**8
def run_worker(num, sock):
print('Worker', num)
sock.listen()
sel = selectors.DefaultSelector()
def accept(sock, mask, data):
conn, addr = sock.accept() # Should be ready
conn.setblocking(False)
# The data format is: callback, [read_buffer, write_buffer, close]
# Where close is whether the connection should be closed after write
sel.register(conn, selectors.EVENT_READ, (read, [b'', b'', None]))
def cleanup(conn, e=None):
sel.unregister(conn)
conn.close()
if e:
print('ERROR', e)
def write(conn, mask, data):
if not data[1]:
return cleanup(conn)
written = conn.send(data[1][:NUM_BYTES])
if written == 0:
return cleanup(conn)
data[1] = data[1][written:]
# Once all the data has gone:
if not data[1]:
data[2] = False
if data[2]:
cleanup(conn)
else:
sel.modify(conn, selectors.EVENT_READ, (read, data))
def handle_headers(headers):
close = True
method, path, version = headers[0].split(b' ')
for line in headers[1:]:
parts = line.split(b':')
key = parts[0].decode('ascii').lower().strip()
value = (b':'.join(parts[1:])).decode('ascii').lower().strip()
if key == 'connection':
if value == 'close':
close = True
elif value == 'keep-alive':
close = False
break
return close, method, path, version
def read(conn, mask, data):
if data[2] is True:
return cleanup(conn, Exception('Reading from a connection that is supposed to be closed'))
received = conn.recv(NUM_BYTES)
if received == b'':
return cleanup(conn)
data[0] += received
location = data[0].find(b'\r\n\r\n')
if location >= 0:
headers = data[0][:location].split(b'\r\n')
close, method, path, version = handle_headers(headers)
data[1] += version + b' 200 OK\r\n'
if not close:
data[1] += b'Connection: Keep-Alive\r\n'
else:
data[1] += b'Connection: Close\r\n'
data[0] = data[0][location+4:]
data[1] += b'Content-Type: text/plain\r\nContent-Length: 2\r\n\r\nok'
data[2] = close
sel.modify(conn, selectors.EVENT_WRITE, (write, data))
if data[0]:
raise Exception('Pipelineing not supported')
sel.register(sock, selectors.EVENT_READ, (accept, None))
while True:
events = sel.select()
for key, mask in events:
callback = key.data[0]
try:
callback(key.fileobj, mask, key.data[1])
except Exception as e:
print(e)
# But continue anyway for now
if __name__ == '__main__':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 8081))
num_workers = 8
workers = []
try:
for i in range(num_workers):
worker = multiprocessing.Process(target=run_worker, args=(i + 1, sock))
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
except KeyboardInterrupt:
print('Shutting down worker processes ...')
for worker in workers:
worker.terminate()
except Exception as e:
print(f'Error:', e)
raise
sock.close()
As I say, both these approaches failed, so I wouldn't recommend them.
See also:
Be the first to comment.
Copyright James Gardner 1996-2020 All Rights Reserved. Admin.