Home Blog CV Projects Patterns Notes Book Colophon Search

Threaded

8 Apr, 2025

Here's a fast multithreaded, multiprocess HTTP server (for behind NGINX only). Gets 77k req/sec on 1 core and around 380k on multiple cores on my machine. Not far off the asyncio versions, but much simpler without having to use async or await keywords.

import socket
import threading
import argparse
import multiprocessing
import os

DEV = LOGGING = False
if LOGGING:
    info = print
    debug = lambda *x: None
log = error = print

class BadRequest(Exception):
    pass

def headers(reader, headers_to_return):
    if DEV:
        for header in headers_to_return:
            assert isinstance(header, bytes) and header.lower() == header, (
                f"Expected lowercase header name as a type 'bytes', not {header}"
            )
    found = [None for _ in headers_to_return]
    while True:    
        line = reader.readline()
        if line == b'\r\n':
            # Just return a single item if only one header was requested.
            if len(headers_to_return) == 1:
                return found[0]
            return found
        if len(line) < 4:
            raise BadRequest(f"Invalid header: {line}")
        for name_index, name in enumerate(headers_to_return):
            match = True
            index = 0
            for i, c in enumerate(name):
                if LOGGING:
                    debug("Header characters", line, name, c, line[i], line[i] + 32)
                if not (c == line[i] or c == line[i] + 32):
                    match = False
                    break
                else:
                    index = i
            if match and len(line) > 2:
                while index < len(line) - 3:
                    c = line[index + 1]
                    if LOGGING:
                        debug("Header finding ':'", line[index+1], index, c)
                    if c == 32:
                        index += 1
                    elif c == 58:  # colon:
                        index += 1  
                        break
                    else:
                        raise Exception(line[index:])
                if found[name_index] is None:
                    found[name_index] = line[index+1:].strip()
                else:
                    # Concatenate duplicate header values per RFC recommendations.
                    found[name_index] += b', ' + line[index+1:].strip()

def handle_client(client_socket, address):
    reader = client_socket.makefile(mode='rb')
    try:
        while True:
            line = reader.readline()
            if not line:
                break
            try:
                method, path, version = line.strip().split(b' ')
            except Exception as e:
                raise BadRequest(str(e))
            if path == b'/ok':
                connection = headers(reader, [b'connection'])
                response = b'HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK'
            else:
                connection = headers(reader, [b'connection'])
                response = b'HTTP/1.1 404 Not Found\r\nContent-Length: 9\r\n\r\nNot Found'
            if LOGGING:
                debug(f"Response: {response!r}")
            client_socket.sendall(response)
            if connection:
                connection_lower = connection.lower()
                # Ensure both operands are bytes.
                if connection_lower == b'close' or (version == b'HTTP/1.0' and connection_lower != b'keep-alive'):
                    break
    except ConnectionResetError:
        # The connection has been closed prematurely.
        pass
    except BadRequest as e:
        error(e)
        response = b'HTTP/1.1 400 Bad Request\r\nConnection: close\r\nContent-Length: 11\r\n\r\nBad Request'
        try:
            client_socket.sendall(response)
        except Exception:
            pass
    except Exception as e:
        error(e)
        response = b'HTTP/1.1 500 Error\r\nConnection: close\r\nContent-Length: 5\r\n\r\nError'
        try:
            client_socket.sendall(response)
        except Exception:
            pass
    finally:
        try:
            reader.close()
        except Exception:
            pass
        client_socket.close()

def serve(host, port):
    """Set up the server socket and process incoming HTTP connections."""
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # Allow the socket to reuse the address.
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # Allow the socket to reuse the port if supported by the OS.
    if hasattr(socket, 'SO_REUSEPORT'):
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    server_socket.bind((host, port))
    server_socket.listen(5)
    print(f"Starting echo server on {host}:{port} (Worker PID: {os.getpid()})")
    try:
        while True:
            client_socket, addr = server_socket.accept()
            # Start a new thread for handling the client.
            threading.Thread(target=handle_client, args=(client_socket, addr), daemon=True).start()
    except KeyboardInterrupt:
        print(f"Shutting down server (Worker PID: {os.getpid()})")
    finally:
        server_socket.close()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="HTTP echo server with multi-worker support.")
    parser.add_argument(
        "--workers", type=int, default=1,
        help="Number of worker processes to use (default: 1)"
    )
    args = parser.parse_args()

    host = '127.0.0.1'
    port = 16000

    if args.workers <= 1:
        # Single-process mode.
        serve(host, port)
    else:
        # Use multiprocessing to run multiple workers.
        processes = []
        for i in range(args.workers):
            p = multiprocessing.Process(target=serve, args=(host, port))
            p.start()
            processes.append(p)
        try:
            for p in processes:
                p.join()
        except KeyboardInterrupt:
            print("Shutting down server (parent process)")
            for p in processes:
                p.terminate()

To use the same code as a CGI script you can do something like this:

from fastmp import handle_client
import os
import sys
import io
class FakeSocket:
    """
    A simple socket-like object that wraps a BytesIO stream for reading and writes to sys.stdout.buffer.
    """
    def __init__(self, request_bytes):
        self._rfile = io.BytesIO(request_bytes)
    def makefile(self, mode='rb'):
        if mode == 'rb':
            return self._rfile
        raise ValueError("Unsupported mode: " + mode)
    def sendall(self, data):
        sys.stdout.buffer.write(data)
        sys.stdout.buffer.flush()
    def close(self):
        pass

request_line = f"{os.environ.get('REQUEST_METHOD', 'GET')} {os.environ.get('SCRIPT_NAME', '')}{os.environ.get('PATH_INFO', '')} {os.environ.get('SERVER_PROTOCOL', 'HTTP/1.1')}\r\n"
headers_str = ""
if 'CONTENT_TYPE' in os.environ:
    headers_str += f"content-type: {os.environ['CONTENT_TYPE']}\r\n"
if 'CONTENT_LENGTH' in os.environ:
    headers_str += f"content-length: {os.environ['CONTENT_LENGTH']}\r\n"
# Add additional headers from the HTTP_ environment variables.
for key, value in os.environ.items():
    if key.startswith("HTTP_"):
        header_name = key[5:].replace('_', '-').lower()
        headers_str += f"{header_name}: {value}\r\n"
headers_str += "\r\n"
# Read any request body (if applicable)
body = sys.stdin.buffer.read() if os.environ.get("CONTENT_LENGTH", "0") != "0" else b""
# Combine into full request bytes.
request_bytes = (request_line + headers_str).encode('utf-8') + body

# Create a FakeSocket with the request bytes.
fake_socket = FakeSocket(request_bytes)
# Call handle_client directly with a dummy address.
handle_client(fake_socket)

You can test it like this:

$ PATH_INFO=/ok python3 cgi.py 
HTTP/1.1 200 OK
Content-Length: 2

OK

Comments

Be the first to comment.

Add Comment





Copyright James Gardner 1996-2020 All Rights Reserved. Admin.