31 Jan, 2022
Here's an HTTP server that will give > 300k requests a second with Python 3 on a laptop, and > 700k requests a second with PyPy3.
I've made it fast by doing the minimum HTTP processing necessary whilst maximising the amount of resources it can efficiently consume by using multiple processes that can match the CPU core's threads.
This might not be the type of server you'd put directly on the web, but it should be ideal behind something like NGINX which does the more serious processing for you anyway.
'''
Server
++++++
Direct
======
curl -v http://localhost:8000/ok
curl -v http://localhost:8000/not-found
wrk -t 8 -d 10 -c 128 http://localhost:8000/ok
Testing auth, including via NGINX
=================================
See https://jimmyg.org/blog/2022/nginx-auth/index.html
curl -v --http1.0 http://localhost:8000/auth
curl -v --http1.0 http://localhost/private/private.txt?auth=true
curl -v --http1.0 http://localhost/private/private.txt?auth=false
wrk -t 8 -d 10 -c 128 http://localhost:8000/ok
'''
import socket
import sys
import os
import asyncio
import multiprocessing
DEV = LOGGING = False
if LOGGING:
info = print
debug = lambda *x: None
log = error = print
def create_socket(address):
try:
host, port = address.split(':')
port = int(port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
except OSError as e:
error('Failed to create TCP Socket:', e)
sys.exit(1)
except ValueError:
# https://pymotw.com/2/socket/uds.html
path = os.path.normpath(os.path.abspath(address))
try:
os.unlink(path)
except OSError:
if os.path.exists(address):
raise
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(path)
os.chmod(path, 666)
return sock
class BadRequest(Exception):
pass
async def headers(reader, headers_to_return):
if DEV:
for header in headers_to_return:
assert isinstance(header, bytes) and header.lower() == header, f'Expected lowercase header name as a type \'bytes\', not {header}'
found = [None for name in headers_to_return]
while True:
line = (await reader.readline())
if line == b'\r\n':
if len(headers_to_return) == 1:
# Just return an item, not an array so that the caller doesn't need array index access
return found[0]
return found
if len(line) < 4:
raise BadRequest(f'Invalid header: {line}')
for name_index, name in enumerate(headers_to_return):
match = True
index = 0
for i, c in enumerate(name):
if LOGGING:
debug('Header characters', line, name, c, line[i], line[i] + 32)
if not (c == line[i] or c == line[i] + 32):
match = False
break
else:
index = i
if match and len(line) > 2:
while index < len(line) - 3:
c = line[index + 1]
if LOGGING:
debug('Header finding ":"', line[index+1], index, c)
if c == 32:
index += 1
elif c == 58:
index +=1
break
else:
raise Exception(line[index:])
if found[name_index] is None:
found[name_index] = line[index+1:].strip()
else:
# https://stackoverflow.com/questions/4371328/are-duplicate-http-response-headers_to_return-acceptable
found[name_index] += b', ' + line[index+1:].strip()
async def handle_auth(reader, version):
if version != b'HTTP/1.0':
raise BadRequest(f'Expected HTTP/1.0 for /auth, got {version}')
connection, original_uri = await headers(reader, [b'connection', b'x-original-uri'])
if original_uri == b'/private/private.txt?auth=true':
response = b'HTTP/1.0 200 OK\r\nContent-Length: 7\r\n\r\nAllowed'
else:
response = b'HTTP/1.0 403 Forbidden\r\nContent-Length: 6\r\n\r\nDenied'
return connection, response
# https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams
async def handle_connection(reader, writer):
try:
while True:
line = await reader.readline()
if not line:
writer.close()
break
try:
method, path, version = line.strip().split(b' ')
except Exception as e:
raise BadRequest(str(e))
if path == b'/auth':
connection, response = await handle_auth(reader, version)
elif path == b'/ok':
connection = await headers(reader, [b'connection'])
response = b'HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK'
else:
connection = await headers(reader, [b'connection'])
response = b'HTTP/1.1 404 Not Found\r\nContent-Length: 9\r\n\r\nNot Found'
if LOGGING:
debug(f"Response: {response!r}")
writer.write(response)
if connection:
connection_lower = connection.lower()
if connection_lower == b'close' or version == b'HTTP/1.0' and connection_lower != 'keep-alive':
if LOGGING:
debug("Close the connection")
writer.close()
break
except ConnectionResetError:
# The connection is already closed, nothing to do
pass
except BadRequest as e:
error(e)
response = b'HTTP/1.1 400 Bad Request\r\nConnection: close\r\nContent-Length: 11\r\n\r\nBad Request'
writer.write(response)
writer.close()
# https://stackoverflow.com/questions/7160983/catching-all-exceptions-in-python
except Exception as e:
error(e)
response = b'HTTP/1.1 500 Error\r\nConnection: close\r\nContent-Length: 5\r\n\r\nError'
writer.write(response)
writer.close()
# https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams
async def serve(num, sock):
if sock.family == socket.AF_UNIX:
family = 'UNIX domain socket'
server = await asyncio.start_unix_server(handle_connection, sock=sock)
else:
family = 'TCP socket'
server = await asyncio.start_server(handle_connection, sock=sock)
log(f'Serving worker {num} (pid {os.getpid()}) on {family} {sock.getsockname()}')
async with server:
await server.serve_forever()
def run_worker(num, sock):
asyncio.run(serve(num, sock))
if __name__ == '__main__':
num_workers = 1
if len(sys.argv) > 2:
num_workers = int(sys.argv[2])
sock = create_socket(sys.argv[1])
workers = []
try:
for i in range(num_workers):
worker = multiprocessing.Process(target=run_worker, args=(i + 1, sock))
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
except KeyboardInterrupt:
log(f'Shutting down worker processes ...')
for worker in workers:
worker.terminate()
except Exception as e:
error(f'Error in worker {num}.', e)
log('Closing the socket')
sock.close()
log('Finished.')
And a test:
import os
import unittest
import asyncio
from server import headers
debug = lambda *x: None
log = error = print
def run_with_readlines(object_under_test, lines, *args, **kwargs):
class TestReader:
def __init__(self):
super().__init__()
self.index = 0
async def readline(self):
line = b''
if self.index <= len(lines) - 1:
line = lines[self.index]
debug(self.index, line)
self.index += 1
return line
return asyncio.get_event_loop().run_until_complete(object_under_test(TestReader(), *args, **kwargs))
class HeadersTest(unittest.TestCase):
def test_headers(self):
self.assertEqual(run_with_readlines(headers, [b'Connection: close\r\n', b'\r\n'], [b'connection']), b'close')
if __name__ == '__main__':
unittest.main()
Be the first to comment.
Copyright James Gardner 1996-2020 All Rights Reserved. Admin.