Home Blog CV Projects Patterns Notes Book Colophon Search

Fast ASGI Server

28 Sep, 2023

This is currently named pyvicorn_server which is a bit silly, I'll find something better.

It is basically a subset of ASGI and HTTP that might be useful for as a starting point for either development use, or for the sort of code that might end up on something like Lambda anyway, so without needing to support more advanced features of HTTP. The server part is only 237 lines long.

What amazes me is that this server does 400,000 requests per second on my Linux laptop when using pypy3 with 8 workers when doing 128 concurrent requests. Which shows that if you only build what you need, things can be quite efficient.

PYTHONPATH="${PYTHONPATH}:${PWD}" pypy3 pyvicorn_server/__init__.py localhost:8000 8 app:application
wrk -t 8 -d 10 -c 128 'http://localhost:8000/hello?a=1&a=2&b=3'

I've written some tests that use curl, wrk and ab which cover HTTP 1.1 and 1.0. In the test configuration it does about 60,000 requests per second with the 1 worker and lower concurrency.

app.py:

async def application(scope, receive, send):
    assert scope["path"] == "/hello", scope["path"]
    assert scope["query_string"] == b"a=1&a=2&b=3", scope["query_string"]
    msg = """\
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Hello</title>
  </head>
  <body>
    <h1>Hello</h1>
  </body>
</html>
""".encode(
        "utf8"
    )
    body = await receive()
    if scope["method"] == "POST":
        assert (
            body["body"] == b"hi"
        ), f'Expected body to be b"hi" but got, {body["body"]}'
        assert body["type"] == "http.request"
        assert body["more_body"] == False
    await send(
        {
            "type": "http.response.start",
            "status": 200,
            "headers": [
                (b"Content-Type", b"text/html"),
                (b"Content-Length", str(len(msg)).encode("ascii")),
            ],
        }
    )
    await send(
        {
            "type": "http.response.body",
            "body": msg,
        }
    )

pyvicorn_server/__init__.py:

import asyncio
import multiprocessing
import os
import socket
import sys
import time
from wsgiref.handlers import format_date_time

DEV = LOGGING = False
if LOGGING:
    info = print
log = error = print


def create_socket(address):
    try:
        host, port = address.split(":")
        port = int(port)
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((host, port))
    except OSError as e:
        error("Failed to create TCP Socket:", e)
        sys.exit(1)
    except ValueError:
        # https://pymotw.com/2/socket/uds.html
        path = os.path.normpath(os.path.abspath(address))
        try:
            os.unlink(path)
        except OSError:
            if os.path.exists(address):
                raise
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.bind(path)
        os.chmod(path, 666)
    return sock


class BadRequest(Exception):
    pass


async def headers(reader):
    found = []
    counter = 0
    size = 0
    max_headers_length = 1024 * 1024
    while counter < 1000 and size < max_headers_length:
        line = await reader.readline()
        if LOGGING:
            info(line)
        if line == b"\r\n":
            # End of headers
            return found
        if len(line) < 4:
            raise BadRequest(f"Invalid header: {line}")
        size += len(line)
        counter += 1
        first = line.find(b":")
        if first < 0:
            raise BadRequest("Invalid header line")
        key = line[:first].lower().strip()
        value = line[first + 1 :].strip()
        found.append((key, value))
    raise BadRequest(
        f"Could not find end of headers after reading 1000 lines or {max_headers_length} bytes."
    )


def handle_connection(application):
    # https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams
    async def handle_connection_run(reader, writer):
        try:
            while True:
                line = await reader.readline()
                if LOGGING:
                    info(line)
                if not line:
                    writer.close()
                    break
                try:
                    raw_method, raw_path, raw_http_version = line.strip().split(b" ")
                except Exception as e:
                    raise BadRequest(str(e))
                scope = {
                    "type": "http",
                    "asgi": {"version": "3.0", "spec_version": "2.3"},
                    "http_version": raw_http_version.decode("utf8")[5:],
                    "method": raw_method.decode("utf8").upper(),
                    "raw_path": raw_path,
                    "headers": await headers(reader),
                }
                pos = scope["raw_path"].find(b"?")
                if pos == 0:
                    raise BadRequest(
                        "Can't have a ? as the first character, should be /?"
                    )
                if pos > 0:
                    scope["path"] = scope["raw_path"][:pos].decode("utf8")
                    scope["query_string"] = scope["raw_path"][pos + 1 :]
                    # raise Exception((scope['path'], scope['query_string']))

                async def receive():
                    body = b""
                    for k, v in scope["headers"]:
                        if k == b"content-length":
                            body = await reader.read(int(v.decode("ascii")))
                    return {
                        "type": "http.request",
                        "body": body,
                        "more_body": False,
                    }

                started = [False]
                connection = [None]
                to_send = [b""]
                should_close = [False]

                for k, v in scope["headers"]:
                    if k.lower().strip() == b"connection":
                        connection[0] = v.lower()

                async def send(event):
                    if event["type"] == "http.response.start":
                        response = b"HTTP/1.1 200 OK\r\n"
                        started[0] = True
                        for k, v in event["headers"]:
                            response += k + b": " + v + b"\r\n"
                        if scope["http_version"] == "1.0":
                            if connection[0] == b"keep-alive":
                                response += b"Connection: Keep-Alive\r\n"
                            else:
                                should_close[0] = True
                        elif connection[0] == b"close":
                            response += b"Connection: Close\r\n"
                            should_close[0] = True
                        response += (
                            b"Date: "
                            + (format_date_time(time.time()).encode("utf8"))
                            + b"\r\n"
                        )
                        response += b"Server: pyvicorn\r\n"
                        response += b"\r\n"
                        if LOGGING:
                            info(response)
                        to_send[0] += response
                    elif event["type"] == "http.response.body":
                        if not started[0]:
                            raise Exception(
                                "Expected the http.responsie.start event to be sent first"
                            )
                        body = event.get("body")
                        if body:
                            if LOGGING:
                                info(body)
                            to_send[0] += body
                    else:
                        raise Exception("Unknown event type")

                await application(scope, receive, send)
                writer.write(to_send[0])
                if should_close[0]:
                    if LOGGING:
                        info("Close the connection")
                    writer.close()
                    break
        except ConnectionResetError:
            # The connection is already closed, nothing to do
            if LOGGING:
                debug("Connection reset")
        except BadRequest as e:
            error(e)
            response = b"HTTP/1.1 400 Bad Request\r\nConnection: close\r\nContent-Length: 11\r\n\r\nBad Request"
            writer.write(response)
            writer.close()
        # https://stackoverflow.com/questions/7160983/catching-all-exceptions-in-python
        except Exception as e:
            error(type(e), e)
            sys.stdout.flush()
            response = b"HTTP/1.1 500 Error\r\nConnection: close\r\nContent-Length: 5\r\n\r\nError"
            writer.write(response)
            writer.close()

    return handle_connection_run


# https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams
async def serve(num, sock, application):
    if sock.family == socket.AF_UNIX:
        family = "UNIX domain socket"
        server = await asyncio.start_unix_server(
            handle_connection(application), sock=sock
        )
    else:
        family = "TCP socket"
        server = await asyncio.start_server(handle_connection(application), sock=sock)
    log(f"Serving worker {num} (pid {os.getpid()}) on {family} {sock.getsockname()}")
    sys.stdout.flush()
    async with server:
        await server.serve_forever()


def run_worker(num, sock, application):
    asyncio.run(serve(num, sock, application))


if __name__ == "__main__":
    import importlib

    sock = create_socket(sys.argv[1])
    num_workers = int(sys.argv[2])
    module_name, application_name = sys.argv[3].split(":")
    module = importlib.import_module(module_name)
    application = getattr(module, application_name)
    workers = []
    try:
        # Run all but one in separate processes
        for i in range(num_workers - 1):
            worker = multiprocessing.Process(
                target=run_worker, args=(i + 2, sock, application)
            )
            worker.daemon = True
            worker.start()
            workers.append(worker)
        # Run one worker in this process
        run_worker(1, sock, application)
    except KeyboardInterrupt:
        log(f"Shutting down worker processes ...")
        for worker in workers:
            worker.terminate()
        for worker in workers:
            worker.join()
    except Exception as e:
        error(f"Error in worker {num}.", e)
    log("Closing the socket")
    sock.close()
    log("Finished.")

tests/__init__.py (Set curl_version at the top to match your curl version):

import os
import signal
import subprocess
import sys
import time
import urllib.request
from wsgiref.handlers import format_date_time

curl_version = "7.88.1"  # 8.1.2

try:
    from selenium import webdriver
    from selenium.webdriver.common.by import By

    use_selenium = True
except ImportError:
    use_selenium = False

msg = """\
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Hello</title>
  </head>
  <body>
    <h1>Hello</h1>
  </body>
</html>
""".encode(
    "utf8"
)


def compare(expected, actual, before, after):
    assert after - before < 1, (before, after)
    for i, line in enumerate(actual):
        if line.lower().startswith("< date: "):
            line_before = line[:8] + format_date_time(before) + "\r"
            line_after = line[:8] + format_date_time(after) + "\r"
            assert actual[i] in (line_before, line_after), (
                line_before,
                actual[i],
                line_after,
            )
        else:
            assert expected[i] == actual[i], (expected[i], actual[i])
    assert len(expected) == len(actual)


def test_curl_uvicorn(port):
    return test_curl_(port, mode="uvicorn")


def test_curl(port):
    return test_curl_(port, mode="normal")


def test_curl_post(port):
    return test_curl_(port, mode="normal", post="hi")


def test_curl_post_uvicorn(port):
    return test_curl_(port, mode="uvicorn", post="hi")


def test_curl_(port, mode="normal", post=None):
    before = time.time()
    cmd = ["curl"]
    if post is not None:
        cmd += ["-d", post, "-H", "Content-Type: text/plain"]
    cmd += ["-s", "-v", f"http://localhost:{port}/hello?a=1&a=2&b=3"]
    # print(cmd)
    curl = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    stdout, stderr = curl.communicate()
    assert stdout == msg, stdout
    assert curl.wait() == 0
    after = time.time()
    # print(stdout.decode("utf8"))
    # print(stderr.decode("utf8"))

    lines = [
        "*   Trying 127.0.0.1:" + str(port) + "...",
        "* Connected to localhost (127.0.0.1) port " + str(port) + " (#0)",
    ]
    if post is not None:
        lines += ["> POST /hello?a=1&a=2&b=3 HTTP/1.1\r"]
    else:
        lines += ["> GET /hello?a=1&a=2&b=3 HTTP/1.1\r"]
    lines += [
        "> Host: localhost:" + str(port) + "\r",
        "> User-Agent: curl/" + curl_version + "\r",
        "> Accept: */*\r",
    ]
    if post is not None:
        lines += [
            "> Content-Type: text/plain\r",
            "> Content-Length: " + str(len(post)) + "\r",
            "> \r",
            "} [" + str(len(post)) + " bytes data]",
        ]
    else:
        lines += [
            "> \r",
        ]
    lines += [
        "< HTTP/1.1 200 OK\r",
    ]
    if mode == "normal":
        lines += [
            "< Content-Type: text/html\r",
            "< Content-Length: " + str(len(msg)) + "\r",
            "< Date: " + format_date_time(time.time()) + "\r",
            "< Server: pyvicorn\r",
        ]
    else:
        lines += [
            "< date: " + format_date_time(time.time()) + "\r",
            "< server: uvicorn\r",
            "< Content-Type: text/html\r",
            "< Content-Length: " + str(len(msg)) + "\r",
        ]
    lines += [
        "< \r",
        "{ [" + str(len(msg)) + " bytes data]",
        "* Connection #0 to host localhost left intact",
        "",
    ]
    compare(lines, stderr.decode("utf8").split("\n"), before, after)
    return True


def test_wrk(port):
    wrk = subprocess.Popen(
        [
            "wrk",
            "-t",
            "1",
            "-d",
            "1",
            "-c",
            "10",
            "http://localhost:" + str(port) + "/hello?a=1&a=2&b=3",
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    stdout, stderr = wrk.communicate()
    assert stderr == b""
    assert b"Socket errors" not in stdout
    assert wrk.wait() == 0
    [
        print(line.decode("utf8"))
        for line in stdout.split(b"\n")
        if line.startswith(b"Requests/sec: ")
    ]
    return True


def test_urllib(port):
    request = urllib.request.Request(
        "http://localhost:" + str(port) + "/hello?a=1&a=2&b=3"
    )
    with urllib.request.urlopen(request) as fp:
        response = fp.read()
        assert response == msg
    return True


def test_ab(port):
    ab = subprocess.Popen(
        [
            "ab",
            "-n",
            "1000",
            "-c10",
            "-s1",
            "http://localhost:" + str(port) + "/hello?a=1&a=2&b=3",
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    stdout, stderr = ab.communicate()
    assert b"\nFailed requests:        0\n" in stdout, stdout
    assert b"\nFinished 1000 requests\n" in stderr, stderr
    assert ab.wait() == 0
    return True


def test_ab_keep_alive(port):
    ab = subprocess.Popen(
        [
            "ab",
            "-n",
            "1000",
            "-c10",
            "-s1",
            "-k",
            "http://localhost:" + str(port) + "/",
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    stdout, stderr = ab.communicate()
    assert b"\nFailed requests:        0\n" in stdout, stdout
    assert b"\nFinished 1000 requests\n" in stderr, stderr
    assert ab.wait() == 0
    return True


def test_curl_connection_close(port):
    before = time.time()
    curl = subprocess.Popen(
        [
            "curl",
            "-H",
            "Connection: Close",
            "-s",
            "-v",
            f"http://localhost:{port}/hello?a=1&a=2&b=3",
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    stdout, stderr = curl.communicate()
    assert stdout == msg, stdout
    assert curl.wait() == 0
    after = time.time()
    compare(
        stderr.decode("utf8").split("\n"),
        [
            "*   Trying 127.0.0.1:" + str(port) + "...",
            "* Connected to localhost (127.0.0.1) port " + str(port) + " (#0)",
            "> GET /hello?a=1&a=2&b=3 HTTP/1.1\r",
            "> Host: localhost:" + str(port) + "\r",
            "> User-Agent: curl/" + curl_version + "\r",
            "> Accept: */*\r",
            "> Connection: Close\r",
            "> \r",
            "< HTTP/1.1 200 OK\r",
            "< Content-Type: text/html\r",
            "< Content-Length: " + str(len(msg)) + "\r",
            "< Connection: Close\r",
            "< Date: " + format_date_time(time.time()) + "\r",
            "< Server: pyvicorn\r",
            "< \r",
            "{ [" + str(len(msg)) + " bytes data]",
            "* Closing connection 0",
            "",
        ],
        before,
        after,
    )
    return True


def test_browser(port):
    if use_selenium:
        driver = webdriver.Chrome()
        driver.get(url + "/hello?a=1&a=2&b=3")
        elem = driver.find_element(By.TAG_NAME, "h1")
        assert elem.text == "Hello", elem.text
        driver.close()
        return True
    else:
        return False


def test_application(port):
    server = subprocess.Popen(
        [
            # Attempt to load uvicorn from the virtual environment:
            os.path.join(os.path.dirname(sys.executable), "uvicorn"),
            "--port",
            str(port),
            "app:application",
        ],
        stderr=subprocess.PIPE,
        stdout=subprocess.PIPE,
    )
    line = server.stderr.readline()
    while b"Uvicorn running on" not in line:
        line = server.stderr.readline()
    try:
        for t in [
            test_urllib,
            test_curl_post_uvicorn,
            test_curl_uvicorn,
        ]:
            t(port)
            print(".", end="")
            sys.stdout.flush()
    finally:
        server.send_signal(signal.SIGINT)
        server.wait()


def test_server(port):
    if "PYTHONPATH" in os.environ:
        PYTHONPATH = os.environ["PYTHONPATH"] + ":" + os.getcwd()
    else:
        PYTHONPATH = os.getcwd()
    env = os.environ.copy()
    env.update({"PYTHONPATH": PYTHONPATH})
    server = subprocess.Popen(
        [
            sys.executable,
            "-u",
            "pyvicorn_server/__init__.py",
            f"localhost:{port}",
            "1",
            "app:application",
        ],
        stdout=subprocess.PIPE,
        env=env,
    )
    try:
        line = server.stdout.readline()
        assert line.startswith(b"Serving worker 1 "), line
        for t in [
            test_curl_post,
            test_curl,
            test_curl_connection_close,
            test_wrk,
            test_ab,
            test_ab_keep_alive,
            test_urllib,
            test_browser,
        ]:
            if t(port):
                print(".", end="")
            else:
                print("-", end="")
            sys.stdout.flush()

    finally:
        server.send_signal(signal.SIGINT)
        server.wait()


if __name__ == "__main__":
    import math
    import random

    port = port2 = 1
    while port == port2:
        port = 49152 + math.floor((65535 - 49152) * random.random())
        port2 = 49152 + math.floor((65535 - 49152) * random.random())
    url = "http://localhost:" + str(port)
    url2 = "http://localhost:" + str(port2)
    print("Running server tests on " + url + " ...")
    print("Running application tests on " + url2 + " ...")
    try:
        test_server(port=port)
        test_application(port=port2)
    except:
        print()
        print("FAILED")
        raise
    else:
        print()
        print("SUCCESS")

pyproject.toml:

[project]
name = "pyvicorn-server"
version = "0.1.0"
dependencies = [
]

[project.optional-dependencies]
dev = [
  'black==23.9.1',
  'uvicorn==0.23.2',
  'isort==5.12.0',
  'autoflake==2.2.1',
  'selenium==4.13.0',
]

README.md:

# Pyvicorn

Install:

```
python3 -m venv .venv && .venv/bin/pip install '.[dev]'
```

You'll also need `curl`, `ab`, `wrk`, `chrome` and `chromedriver` for the tests:

```sh
apt install -y apache2-utils wrk chromium chromium-driver
```

or:

```sh
brew install wrk
curl -O https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/116.0.5845.96/mac-arm64/chromedriver-mac-arm64.zip
unzip chromedriver-mac-arm64.zip
mv chromedriver-mac-arm64/chromedriver .
rm chromedriver-mac-arm64.zip
rm -r chromedriver-mac-arm64
```

Run:

```sh
PYTHONPATH="${PYTHONPATH}:${PWD}" .venv/bin/python3 pyvicorn_server/__init__.py localhost:8000 8 app:application
```

Format code:

```sh
.venv/bin/isort . && .venv/bin/autoflake -r --in-place --remove-unused-variables --remove-all-unused-imports . && .venv/bin/black .
```

Test:

```sh
.venv/bin/python3 tests/__init__.py
```

Comments

Be the first to comment.

Add Comment





Copyright James Gardner 1996-2020 All Rights Reserved. Admin.