8 Apr, 2025
Here's a fast multithreaded, multiprocess HTTP server (for behind NGINX only). Gets 77k req/sec on 1 core and around 380k on multiple cores on my machine. Not far off the asyncio versions, but much simpler without having to use async
or await
keywords.
import socket
import threading
import argparse
import multiprocessing
import os
DEV = LOGGING = False
if LOGGING:
info = print
debug = lambda *x: None
log = error = print
class BadRequest(Exception):
pass
def headers(reader, headers_to_return):
if DEV:
for header in headers_to_return:
assert isinstance(header, bytes) and header.lower() == header, (
f"Expected lowercase header name as a type 'bytes', not {header}"
)
found = [None for _ in headers_to_return]
while True:
line = reader.readline()
if line == b'\r\n':
# Just return a single item if only one header was requested.
if len(headers_to_return) == 1:
return found[0]
return found
if len(line) < 4:
raise BadRequest(f"Invalid header: {line}")
for name_index, name in enumerate(headers_to_return):
match = True
index = 0
for i, c in enumerate(name):
if LOGGING:
debug("Header characters", line, name, c, line[i], line[i] + 32)
if not (c == line[i] or c == line[i] + 32):
match = False
break
else:
index = i
if match and len(line) > 2:
while index < len(line) - 3:
c = line[index + 1]
if LOGGING:
debug("Header finding ':'", line[index+1], index, c)
if c == 32:
index += 1
elif c == 58: # colon:
index += 1
break
else:
raise Exception(line[index:])
if found[name_index] is None:
found[name_index] = line[index+1:].strip()
else:
# Concatenate duplicate header values per RFC recommendations.
found[name_index] += b', ' + line[index+1:].strip()
def handle_client(client_socket, address):
reader = client_socket.makefile(mode='rb')
try:
while True:
line = reader.readline()
if not line:
break
try:
method, path, version = line.strip().split(b' ')
except Exception as e:
raise BadRequest(str(e))
if path == b'/ok':
connection = headers(reader, [b'connection'])
response = b'HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK'
else:
connection = headers(reader, [b'connection'])
response = b'HTTP/1.1 404 Not Found\r\nContent-Length: 9\r\n\r\nNot Found'
if LOGGING:
debug(f"Response: {response!r}")
client_socket.sendall(response)
if connection:
connection_lower = connection.lower()
# Ensure both operands are bytes.
if connection_lower == b'close' or (version == b'HTTP/1.0' and connection_lower != b'keep-alive'):
break
except ConnectionResetError:
# The connection has been closed prematurely.
pass
except BadRequest as e:
error(e)
response = b'HTTP/1.1 400 Bad Request\r\nConnection: close\r\nContent-Length: 11\r\n\r\nBad Request'
try:
client_socket.sendall(response)
except Exception:
pass
except Exception as e:
error(e)
response = b'HTTP/1.1 500 Error\r\nConnection: close\r\nContent-Length: 5\r\n\r\nError'
try:
client_socket.sendall(response)
except Exception:
pass
finally:
try:
reader.close()
except Exception:
pass
client_socket.close()
def serve(host, port):
"""Set up the server socket and process incoming HTTP connections."""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Allow the socket to reuse the address.
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Allow the socket to reuse the port if supported by the OS.
if hasattr(socket, 'SO_REUSEPORT'):
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Starting echo server on {host}:{port} (Worker PID: {os.getpid()})")
try:
while True:
client_socket, addr = server_socket.accept()
# Start a new thread for handling the client.
threading.Thread(target=handle_client, args=(client_socket, addr), daemon=True).start()
except KeyboardInterrupt:
print(f"Shutting down server (Worker PID: {os.getpid()})")
finally:
server_socket.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="HTTP echo server with multi-worker support.")
parser.add_argument(
"--workers", type=int, default=1,
help="Number of worker processes to use (default: 1)"
)
args = parser.parse_args()
host = '127.0.0.1'
port = 16000
if args.workers <= 1:
# Single-process mode.
serve(host, port)
else:
# Use multiprocessing to run multiple workers.
processes = []
for i in range(args.workers):
p = multiprocessing.Process(target=serve, args=(host, port))
p.start()
processes.append(p)
try:
for p in processes:
p.join()
except KeyboardInterrupt:
print("Shutting down server (parent process)")
for p in processes:
p.terminate()
To use the same code as a CGI script you can do something like this:
from fastmp import handle_client
import os
import sys
import io
class FakeSocket:
"""
A simple socket-like object that wraps a BytesIO stream for reading and writes to sys.stdout.buffer.
"""
def __init__(self, request_bytes):
self._rfile = io.BytesIO(request_bytes)
def makefile(self, mode='rb'):
if mode == 'rb':
return self._rfile
raise ValueError("Unsupported mode: " + mode)
def sendall(self, data):
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
def close(self):
pass
request_line = f"{os.environ.get('REQUEST_METHOD', 'GET')} {os.environ.get('SCRIPT_NAME', '')}{os.environ.get('PATH_INFO', '')} {os.environ.get('SERVER_PROTOCOL', 'HTTP/1.1')}\r\n"
headers_str = ""
if 'CONTENT_TYPE' in os.environ:
headers_str += f"content-type: {os.environ['CONTENT_TYPE']}\r\n"
if 'CONTENT_LENGTH' in os.environ:
headers_str += f"content-length: {os.environ['CONTENT_LENGTH']}\r\n"
# Add additional headers from the HTTP_ environment variables.
for key, value in os.environ.items():
if key.startswith("HTTP_"):
header_name = key[5:].replace('_', '-').lower()
headers_str += f"{header_name}: {value}\r\n"
headers_str += "\r\n"
# Read any request body (if applicable)
body = sys.stdin.buffer.read() if os.environ.get("CONTENT_LENGTH", "0") != "0" else b""
# Combine into full request bytes.
request_bytes = (request_line + headers_str).encode('utf-8') + body
# Create a FakeSocket with the request bytes.
fake_socket = FakeSocket(request_bytes)
# Call handle_client directly with a dummy address.
handle_client(fake_socket)
You can test it like this:
$ PATH_INFO=/ok python3 cgi.py
HTTP/1.1 200 OK
Content-Length: 2
OK
Be the first to comment.
Copyright James Gardner 1996-2020 All Rights Reserved. Admin.