Home Blog CV Projects Patterns Notes Book Colophon Search

Fast Python HTTP Server

31 Jan, 2022

Here's an HTTP server that will give > 300k requests a second with Python 3 on a laptop, and > 700k requests a second with PyPy3.

I've made it fast by doing the minimum HTTP processing necessary whilst maximising the amount of resources it can efficiently consume by using multiple processes that can match the CPU core's threads.

This might not be the type of server you'd put directly on the web, but it should be ideal behind something like NGINX which does the more serious processing for you anyway.

'''
Server
++++++

Direct
======

curl -v http://localhost:8000/ok
curl -v http://localhost:8000/not-found
wrk -t 8 -d 10 -c 128 http://localhost:8000/ok

Testing auth, including via NGINX
=================================

See https://jimmyg.org/blog/2022/nginx-auth/index.html

curl -v --http1.0 http://localhost:8000/auth
curl -v --http1.0 http://localhost/private/private.txt?auth=true
curl -v --http1.0 http://localhost/private/private.txt?auth=false
wrk -t 8 -d 10 -c 128 http://localhost:8000/ok
'''

import socket
import sys
import os
import asyncio
import multiprocessing


DEV = LOGGING = False
if LOGGING:
    info = print
    debug = lambda *x: None
log = error = print
    

def create_socket(address):
    try:
        host, port = address.split(':')
        port = int(port)
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((host, port))
    except OSError as e:
        error('Failed to create TCP Socket:', e)
        sys.exit(1)
    except ValueError:
        # https://pymotw.com/2/socket/uds.html
        path = os.path.normpath(os.path.abspath(address))
        try:
            os.unlink(path)
        except OSError:
            if os.path.exists(address):
                raise
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.bind(path)
        os.chmod(path, 666)
    return sock


class BadRequest(Exception):
    pass


async def headers(reader, headers_to_return):
    if DEV:
        for header in headers_to_return:
            assert isinstance(header, bytes) and header.lower() == header, f'Expected lowercase header name as a type \'bytes\', not {header}'
    found = [None for name in headers_to_return]
    while True:    
        line = (await reader.readline())
        if line == b'\r\n':
            if len(headers_to_return) == 1:
                # Just return an item, not an array so that the caller doesn't need array index access
                return found[0]
            return found
        if len(line) < 4:
            raise BadRequest(f'Invalid header: {line}')
        for name_index, name in enumerate(headers_to_return):
            match = True
            index = 0
            for i, c in enumerate(name):
                if LOGGING:
                    debug('Header characters', line, name, c, line[i], line[i] + 32)
                if not (c == line[i] or c == line[i] + 32):
                    match = False
                    break
                else:
                    index = i
            if match and len(line) > 2:
                while index < len(line) - 3:
                    c = line[index + 1]
                    if LOGGING:
                        debug('Header finding ":"', line[index+1], index, c)
                    if c == 32:
                        index += 1
                    elif c == 58:
                        index +=1  
                        break
                    else:
                        raise Exception(line[index:])
                if found[name_index] is None:
                    found[name_index] = line[index+1:].strip()
                else:
                    # https://stackoverflow.com/questions/4371328/are-duplicate-http-response-headers_to_return-acceptable
                    found[name_index] += b', ' + line[index+1:].strip()


async def handle_auth(reader, version):
    if version != b'HTTP/1.0':
        raise BadRequest(f'Expected HTTP/1.0 for /auth, got {version}')
    connection, original_uri = await headers(reader, [b'connection', b'x-original-uri'])
    if original_uri == b'/private/private.txt?auth=true':
        response = b'HTTP/1.0 200 OK\r\nContent-Length: 7\r\n\r\nAllowed'
    else:
        response = b'HTTP/1.0 403 Forbidden\r\nContent-Length: 6\r\n\r\nDenied'
    return connection, response


# https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams
async def handle_connection(reader, writer):
    try:
        while True:
            line = await reader.readline()
            if not line:
                writer.close()
                break
            try:
                method, path, version = line.strip().split(b' ')
            except Exception as e:
                raise BadRequest(str(e))
            if path == b'/auth':
                connection, response = await handle_auth(reader, version)
            elif path == b'/ok':
                connection = await headers(reader, [b'connection'])
                response = b'HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK'
            else:
                connection = await headers(reader, [b'connection'])
                response = b'HTTP/1.1 404 Not Found\r\nContent-Length: 9\r\n\r\nNot Found'
            if LOGGING:
                debug(f"Response: {response!r}")
            writer.write(response)
            if connection:
                connection_lower = connection.lower()
                if connection_lower == b'close' or version == b'HTTP/1.0' and connection_lower != 'keep-alive':
                    if LOGGING:
                        debug("Close the connection")
                    writer.close()
                    break
    except ConnectionResetError:
        # The connection is already closed, nothing to do
        pass
    except BadRequest as e:
        error(e)
        response = b'HTTP/1.1 400 Bad Request\r\nConnection: close\r\nContent-Length: 11\r\n\r\nBad Request'
        writer.write(response)
        writer.close()
    # https://stackoverflow.com/questions/7160983/catching-all-exceptions-in-python
    except Exception as e:
        error(e)
        response = b'HTTP/1.1 500 Error\r\nConnection: close\r\nContent-Length: 5\r\n\r\nError'
        writer.write(response)
        writer.close()


# https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams
async def serve(num, sock):
    if sock.family == socket.AF_UNIX:
        family = 'UNIX domain socket'
        server = await asyncio.start_unix_server(handle_connection, sock=sock)
    else:
        family = 'TCP socket'
        server = await asyncio.start_server(handle_connection, sock=sock)
    log(f'Serving worker {num} (pid {os.getpid()}) on {family} {sock.getsockname()}')
    async with server:
        await server.serve_forever()


def run_worker(num, sock):
    asyncio.run(serve(num, sock))


if __name__ == '__main__':
    num_workers = 1
    if len(sys.argv) > 2:
        num_workers = int(sys.argv[2])
    sock = create_socket(sys.argv[1])
    workers = []
    try:
        for i in range(num_workers):
            worker = multiprocessing.Process(target=run_worker, args=(i + 1, sock))
            worker.daemon = True
            worker.start()
            workers.append(worker)
        for worker in workers:
            worker.join()
    except KeyboardInterrupt:
        log(f'Shutting down worker processes ...')
        for worker in workers:
            worker.terminate()
    except Exception as e:
        error(f'Error in worker {num}.', e)
    log('Closing the socket')
    sock.close()
    log('Finished.')

And a test:

import os
import unittest
import asyncio
from server import headers


debug = lambda *x: None
log = error = print


def run_with_readlines(object_under_test, lines, *args, **kwargs):
    class TestReader:
        def __init__(self):
            super().__init__()
            self.index = 0
        async def readline(self):
            line = b''
            if self.index <= len(lines) - 1:
                line = lines[self.index]
            debug(self.index, line)
            self.index += 1
            return line
    return asyncio.get_event_loop().run_until_complete(object_under_test(TestReader(), *args, **kwargs))


class HeadersTest(unittest.TestCase):
    def test_headers(self):
        self.assertEqual(run_with_readlines(headers, [b'Connection: close\r\n', b'\r\n'], [b'connection']), b'close')


if __name__ == '__main__':
    unittest.main()

Comments

Be the first to comment.

Add Comment





Copyright James Gardner 1996-2020 All Rights Reserved. Admin.