28 Sep, 2023
This is currently named pyvicorn_server
which is a bit silly, I'll find something better.
It is basically a subset of ASGI and HTTP that might be useful for as a starting point for either development use, or for the sort of code that might end up on something like Lambda anyway, so without needing to support more advanced features of HTTP. The server part is only 237 lines long.
What amazes me is that this server does 400,000 requests per second on my Linux laptop when using pypy3
with 8 workers when doing 128 concurrent requests. Which shows that if you only build what you need, things can be quite efficient.
PYTHONPATH="${PYTHONPATH}:${PWD}" pypy3 pyvicorn_server/__init__.py localhost:8000 8 app:application
wrk -t 8 -d 10 -c 128 'http://localhost:8000/hello?a=1&a=2&b=3'
I've written some tests that use curl
, wrk
and ab
which cover HTTP 1.1 and 1.0. In the test configuration it does about 60,000 requests per second with the 1 worker and lower concurrency.
app.py
:
async def application(scope, receive, send):
assert scope["path"] == "/hello", scope["path"]
assert scope["query_string"] == b"a=1&a=2&b=3", scope["query_string"]
msg = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Hello</title>
</head>
<body>
<h1>Hello</h1>
</body>
</html>
""".encode(
"utf8"
)
body = await receive()
if scope["method"] == "POST":
assert (
body["body"] == b"hi"
), f'Expected body to be b"hi" but got, {body["body"]}'
assert body["type"] == "http.request"
assert body["more_body"] == False
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [
(b"Content-Type", b"text/html"),
(b"Content-Length", str(len(msg)).encode("ascii")),
],
}
)
await send(
{
"type": "http.response.body",
"body": msg,
}
)
pyvicorn_server/__init__.py
:
import asyncio
import multiprocessing
import os
import socket
import sys
import time
from wsgiref.handlers import format_date_time
DEV = LOGGING = False
if LOGGING:
info = print
log = error = print
def create_socket(address):
try:
host, port = address.split(":")
port = int(port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
except OSError as e:
error("Failed to create TCP Socket:", e)
sys.exit(1)
except ValueError:
# https://pymotw.com/2/socket/uds.html
path = os.path.normpath(os.path.abspath(address))
try:
os.unlink(path)
except OSError:
if os.path.exists(address):
raise
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(path)
os.chmod(path, 666)
return sock
class BadRequest(Exception):
pass
async def headers(reader):
found = []
counter = 0
size = 0
max_headers_length = 1024 * 1024
while counter < 1000 and size < max_headers_length:
line = await reader.readline()
if LOGGING:
info(line)
if line == b"\r\n":
# End of headers
return found
if len(line) < 4:
raise BadRequest(f"Invalid header: {line}")
size += len(line)
counter += 1
first = line.find(b":")
if first < 0:
raise BadRequest("Invalid header line")
key = line[:first].lower().strip()
value = line[first + 1 :].strip()
found.append((key, value))
raise BadRequest(
f"Could not find end of headers after reading 1000 lines or {max_headers_length} bytes."
)
def handle_connection(application):
# https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams
async def handle_connection_run(reader, writer):
try:
while True:
line = await reader.readline()
if LOGGING:
info(line)
if not line:
writer.close()
break
try:
raw_method, raw_path, raw_http_version = line.strip().split(b" ")
except Exception as e:
raise BadRequest(str(e))
scope = {
"type": "http",
"asgi": {"version": "3.0", "spec_version": "2.3"},
"http_version": raw_http_version.decode("utf8")[5:],
"method": raw_method.decode("utf8").upper(),
"raw_path": raw_path,
"headers": await headers(reader),
}
pos = scope["raw_path"].find(b"?")
if pos == 0:
raise BadRequest(
"Can't have a ? as the first character, should be /?"
)
if pos > 0:
scope["path"] = scope["raw_path"][:pos].decode("utf8")
scope["query_string"] = scope["raw_path"][pos + 1 :]
# raise Exception((scope['path'], scope['query_string']))
async def receive():
body = b""
for k, v in scope["headers"]:
if k == b"content-length":
body = await reader.read(int(v.decode("ascii")))
return {
"type": "http.request",
"body": body,
"more_body": False,
}
started = [False]
connection = [None]
to_send = [b""]
should_close = [False]
for k, v in scope["headers"]:
if k.lower().strip() == b"connection":
connection[0] = v.lower()
async def send(event):
if event["type"] == "http.response.start":
response = b"HTTP/1.1 200 OK\r\n"
started[0] = True
for k, v in event["headers"]:
response += k + b": " + v + b"\r\n"
if scope["http_version"] == "1.0":
if connection[0] == b"keep-alive":
response += b"Connection: Keep-Alive\r\n"
else:
should_close[0] = True
elif connection[0] == b"close":
response += b"Connection: Close\r\n"
should_close[0] = True
response += (
b"Date: "
+ (format_date_time(time.time()).encode("utf8"))
+ b"\r\n"
)
response += b"Server: pyvicorn\r\n"
response += b"\r\n"
if LOGGING:
info(response)
to_send[0] += response
elif event["type"] == "http.response.body":
if not started[0]:
raise Exception(
"Expected the http.responsie.start event to be sent first"
)
body = event.get("body")
if body:
if LOGGING:
info(body)
to_send[0] += body
else:
raise Exception("Unknown event type")
await application(scope, receive, send)
writer.write(to_send[0])
if should_close[0]:
if LOGGING:
info("Close the connection")
writer.close()
break
except ConnectionResetError:
# The connection is already closed, nothing to do
if LOGGING:
debug("Connection reset")
except BadRequest as e:
error(e)
response = b"HTTP/1.1 400 Bad Request\r\nConnection: close\r\nContent-Length: 11\r\n\r\nBad Request"
writer.write(response)
writer.close()
# https://stackoverflow.com/questions/7160983/catching-all-exceptions-in-python
except Exception as e:
error(type(e), e)
sys.stdout.flush()
response = b"HTTP/1.1 500 Error\r\nConnection: close\r\nContent-Length: 5\r\n\r\nError"
writer.write(response)
writer.close()
return handle_connection_run
# https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams
async def serve(num, sock, application):
if sock.family == socket.AF_UNIX:
family = "UNIX domain socket"
server = await asyncio.start_unix_server(
handle_connection(application), sock=sock
)
else:
family = "TCP socket"
server = await asyncio.start_server(handle_connection(application), sock=sock)
log(f"Serving worker {num} (pid {os.getpid()}) on {family} {sock.getsockname()}")
sys.stdout.flush()
async with server:
await server.serve_forever()
def run_worker(num, sock, application):
asyncio.run(serve(num, sock, application))
if __name__ == "__main__":
import importlib
sock = create_socket(sys.argv[1])
num_workers = int(sys.argv[2])
module_name, application_name = sys.argv[3].split(":")
module = importlib.import_module(module_name)
application = getattr(module, application_name)
workers = []
try:
# Run all but one in separate processes
for i in range(num_workers - 1):
worker = multiprocessing.Process(
target=run_worker, args=(i + 2, sock, application)
)
worker.daemon = True
worker.start()
workers.append(worker)
# Run one worker in this process
run_worker(1, sock, application)
except KeyboardInterrupt:
log(f"Shutting down worker processes ...")
for worker in workers:
worker.terminate()
for worker in workers:
worker.join()
except Exception as e:
error(f"Error in worker {num}.", e)
log("Closing the socket")
sock.close()
log("Finished.")
tests/__init__.py
(Set curl_version
at the top to match your curl version):
import os
import signal
import subprocess
import sys
import time
import urllib.request
from wsgiref.handlers import format_date_time
curl_version = "7.88.1" # 8.1.2
try:
from selenium import webdriver
from selenium.webdriver.common.by import By
use_selenium = True
except ImportError:
use_selenium = False
msg = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Hello</title>
</head>
<body>
<h1>Hello</h1>
</body>
</html>
""".encode(
"utf8"
)
def compare(expected, actual, before, after):
assert after - before < 1, (before, after)
for i, line in enumerate(actual):
if line.lower().startswith("< date: "):
line_before = line[:8] + format_date_time(before) + "\r"
line_after = line[:8] + format_date_time(after) + "\r"
assert actual[i] in (line_before, line_after), (
line_before,
actual[i],
line_after,
)
else:
assert expected[i] == actual[i], (expected[i], actual[i])
assert len(expected) == len(actual)
def test_curl_uvicorn(port):
return test_curl_(port, mode="uvicorn")
def test_curl(port):
return test_curl_(port, mode="normal")
def test_curl_post(port):
return test_curl_(port, mode="normal", post="hi")
def test_curl_post_uvicorn(port):
return test_curl_(port, mode="uvicorn", post="hi")
def test_curl_(port, mode="normal", post=None):
before = time.time()
cmd = ["curl"]
if post is not None:
cmd += ["-d", post, "-H", "Content-Type: text/plain"]
cmd += ["-s", "-v", f"http://localhost:{port}/hello?a=1&a=2&b=3"]
# print(cmd)
curl = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = curl.communicate()
assert stdout == msg, stdout
assert curl.wait() == 0
after = time.time()
# print(stdout.decode("utf8"))
# print(stderr.decode("utf8"))
lines = [
"* Trying 127.0.0.1:" + str(port) + "...",
"* Connected to localhost (127.0.0.1) port " + str(port) + " (#0)",
]
if post is not None:
lines += ["> POST /hello?a=1&a=2&b=3 HTTP/1.1\r"]
else:
lines += ["> GET /hello?a=1&a=2&b=3 HTTP/1.1\r"]
lines += [
"> Host: localhost:" + str(port) + "\r",
"> User-Agent: curl/" + curl_version + "\r",
"> Accept: */*\r",
]
if post is not None:
lines += [
"> Content-Type: text/plain\r",
"> Content-Length: " + str(len(post)) + "\r",
"> \r",
"} [" + str(len(post)) + " bytes data]",
]
else:
lines += [
"> \r",
]
lines += [
"< HTTP/1.1 200 OK\r",
]
if mode == "normal":
lines += [
"< Content-Type: text/html\r",
"< Content-Length: " + str(len(msg)) + "\r",
"< Date: " + format_date_time(time.time()) + "\r",
"< Server: pyvicorn\r",
]
else:
lines += [
"< date: " + format_date_time(time.time()) + "\r",
"< server: uvicorn\r",
"< Content-Type: text/html\r",
"< Content-Length: " + str(len(msg)) + "\r",
]
lines += [
"< \r",
"{ [" + str(len(msg)) + " bytes data]",
"* Connection #0 to host localhost left intact",
"",
]
compare(lines, stderr.decode("utf8").split("\n"), before, after)
return True
def test_wrk(port):
wrk = subprocess.Popen(
[
"wrk",
"-t",
"1",
"-d",
"1",
"-c",
"10",
"http://localhost:" + str(port) + "/hello?a=1&a=2&b=3",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = wrk.communicate()
assert stderr == b""
assert b"Socket errors" not in stdout
assert wrk.wait() == 0
[
print(line.decode("utf8"))
for line in stdout.split(b"\n")
if line.startswith(b"Requests/sec: ")
]
return True
def test_urllib(port):
request = urllib.request.Request(
"http://localhost:" + str(port) + "/hello?a=1&a=2&b=3"
)
with urllib.request.urlopen(request) as fp:
response = fp.read()
assert response == msg
return True
def test_ab(port):
ab = subprocess.Popen(
[
"ab",
"-n",
"1000",
"-c10",
"-s1",
"http://localhost:" + str(port) + "/hello?a=1&a=2&b=3",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = ab.communicate()
assert b"\nFailed requests: 0\n" in stdout, stdout
assert b"\nFinished 1000 requests\n" in stderr, stderr
assert ab.wait() == 0
return True
def test_ab_keep_alive(port):
ab = subprocess.Popen(
[
"ab",
"-n",
"1000",
"-c10",
"-s1",
"-k",
"http://localhost:" + str(port) + "/",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = ab.communicate()
assert b"\nFailed requests: 0\n" in stdout, stdout
assert b"\nFinished 1000 requests\n" in stderr, stderr
assert ab.wait() == 0
return True
def test_curl_connection_close(port):
before = time.time()
curl = subprocess.Popen(
[
"curl",
"-H",
"Connection: Close",
"-s",
"-v",
f"http://localhost:{port}/hello?a=1&a=2&b=3",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = curl.communicate()
assert stdout == msg, stdout
assert curl.wait() == 0
after = time.time()
compare(
stderr.decode("utf8").split("\n"),
[
"* Trying 127.0.0.1:" + str(port) + "...",
"* Connected to localhost (127.0.0.1) port " + str(port) + " (#0)",
"> GET /hello?a=1&a=2&b=3 HTTP/1.1\r",
"> Host: localhost:" + str(port) + "\r",
"> User-Agent: curl/" + curl_version + "\r",
"> Accept: */*\r",
"> Connection: Close\r",
"> \r",
"< HTTP/1.1 200 OK\r",
"< Content-Type: text/html\r",
"< Content-Length: " + str(len(msg)) + "\r",
"< Connection: Close\r",
"< Date: " + format_date_time(time.time()) + "\r",
"< Server: pyvicorn\r",
"< \r",
"{ [" + str(len(msg)) + " bytes data]",
"* Closing connection 0",
"",
],
before,
after,
)
return True
def test_browser(port):
if use_selenium:
driver = webdriver.Chrome()
driver.get(url + "/hello?a=1&a=2&b=3")
elem = driver.find_element(By.TAG_NAME, "h1")
assert elem.text == "Hello", elem.text
driver.close()
return True
else:
return False
def test_application(port):
server = subprocess.Popen(
[
# Attempt to load uvicorn from the virtual environment:
os.path.join(os.path.dirname(sys.executable), "uvicorn"),
"--port",
str(port),
"app:application",
],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
line = server.stderr.readline()
while b"Uvicorn running on" not in line:
line = server.stderr.readline()
try:
for t in [
test_urllib,
test_curl_post_uvicorn,
test_curl_uvicorn,
]:
t(port)
print(".", end="")
sys.stdout.flush()
finally:
server.send_signal(signal.SIGINT)
server.wait()
def test_server(port):
if "PYTHONPATH" in os.environ:
PYTHONPATH = os.environ["PYTHONPATH"] + ":" + os.getcwd()
else:
PYTHONPATH = os.getcwd()
env = os.environ.copy()
env.update({"PYTHONPATH": PYTHONPATH})
server = subprocess.Popen(
[
sys.executable,
"-u",
"pyvicorn_server/__init__.py",
f"localhost:{port}",
"1",
"app:application",
],
stdout=subprocess.PIPE,
env=env,
)
try:
line = server.stdout.readline()
assert line.startswith(b"Serving worker 1 "), line
for t in [
test_curl_post,
test_curl,
test_curl_connection_close,
test_wrk,
test_ab,
test_ab_keep_alive,
test_urllib,
test_browser,
]:
if t(port):
print(".", end="")
else:
print("-", end="")
sys.stdout.flush()
finally:
server.send_signal(signal.SIGINT)
server.wait()
if __name__ == "__main__":
import math
import random
port = port2 = 1
while port == port2:
port = 49152 + math.floor((65535 - 49152) * random.random())
port2 = 49152 + math.floor((65535 - 49152) * random.random())
url = "http://localhost:" + str(port)
url2 = "http://localhost:" + str(port2)
print("Running server tests on " + url + " ...")
print("Running application tests on " + url2 + " ...")
try:
test_server(port=port)
test_application(port=port2)
except:
print()
print("FAILED")
raise
else:
print()
print("SUCCESS")
pyproject.toml
:
[project]
name = "pyvicorn-server"
version = "0.1.0"
dependencies = [
]
[project.optional-dependencies]
dev = [
'black==23.9.1',
'uvicorn==0.23.2',
'isort==5.12.0',
'autoflake==2.2.1',
'selenium==4.13.0',
]
README.md
:
# Pyvicorn
Install:
```
python3 -m venv .venv && .venv/bin/pip install '.[dev]'
```
You'll also need `curl`, `ab`, `wrk`, `chrome` and `chromedriver` for the tests:
```sh
apt install -y apache2-utils wrk chromium chromium-driver
```
or:
```sh
brew install wrk
curl -O https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/116.0.5845.96/mac-arm64/chromedriver-mac-arm64.zip
unzip chromedriver-mac-arm64.zip
mv chromedriver-mac-arm64/chromedriver .
rm chromedriver-mac-arm64.zip
rm -r chromedriver-mac-arm64
```
Run:
```sh
PYTHONPATH="${PYTHONPATH}:${PWD}" .venv/bin/python3 pyvicorn_server/__init__.py localhost:8000 8 app:application
```
Format code:
```sh
.venv/bin/isort . && .venv/bin/autoflake -r --in-place --remove-unused-variables --remove-all-unused-imports . && .venv/bin/black .
```
Test:
```sh
.venv/bin/python3 tests/__init__.py
```
Be the first to comment.
Copyright James Gardner 1996-2020 All Rights Reserved. Admin.