Home Blog CV Projects Patterns Notes Book Colophon Search

Selectors for an HTTP Server

17 Sep, 2023

I've written a couple of examples of simple (and therefore fast) HTTP servers. Here are some attempts at the same thing but using the low level selectors directly without a library like asyncio or gevent.

I imagined this would result in a faster more stable server, but actually the implementations always seemed slower, and I haven't been able to get a server that reliably works with both wrk and ab under load (unlike with the other approaches). So I don't know what's going on!

Here is the code though in case I ever want to pick it up and try again.

First a simple version that registers a socket for reading and writing at the same time:

import selectors
import socket
from collections import namedtuple
import os
import sys
import multiprocessing  
import time
  
connection = namedtuple('collection', ('callback'))


def run_worker(num, sock):
    print('Worker', num)
    sel = selectors.DefaultSelector()
    
    def accept(sock, mask):
        conn, addr = sock.accept()  # Should be ready
        conn.setblocking(False)
        sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, connection(read_and_write))
    
    def read_and_write(conn, mask):
        if mask & selectors.EVENT_READ:
            read(conn, mask)
        if mask & selectors.EVENT_WRITE:
            write(conn, mask)
    
    def write(conn, mask):
        pid = os.getpid()
        print(num, 'writing', conn.send(b'HTTP/1.1 200 OK\r\nContent-Length: ' + str(len(str(pid))).encode('ascii') + b'\r\n\r\n' + str(pid).encode('ascii')))
        # print('closing', conn)
        sel.unregister(conn)
        conn.close()
    
    def read(conn, mask):
        data = conn.recv(1000)  # Should be ready
        #print('got', repr(data), 'to', conn)

    # sock.setblocking(False)
    sel.register(sock, selectors.EVENT_READ, connection(accept))
    while True:
        events = sel.select()
        for key, mask in events:
            callback = key.data.callback
            callback(key.fileobj, mask)
    
if __name__ == '__main__':
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.AI_PASSIVE, 1)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('localhost', 12349))
    sock.listen(100)
    
    num_workers = 12
    workers = []
    try:
        for i in range(num_workers):
            worker = multiprocessing.Process(target=run_worker, args=(i + 1, sock))
            worker.daemon = True
            worker.start()
            workers.append(worker)
        for worker in workers:
            worker.join()
    except KeyboardInterrupt:
        print(f'Shutting down worker processes ...')
        for worker in workers:
            worker.terminate()
    except Exception as e:
        print(f'Error:', e)
    sock.close()

And now a more sophisticated one that changes whether to listen for read or write events based on the incoming HTTP request. This one does the listening after the fork, it doesn't seem to make a difference which surprised me:

import selectors
import socket
import multiprocessing  
  

NUM_BYTES = 2**8

def run_worker(num, sock):
    print('Worker', num)
    sock.listen()
    sel = selectors.DefaultSelector()
    
    def accept(sock, mask, data):
        conn, addr = sock.accept()  # Should be ready
        conn.setblocking(False)
        # The data format is: callback, [read_buffer, write_buffer, close]
        # Where close is whether the connection should be closed after write
        sel.register(conn, selectors.EVENT_READ, (read, [b'', b'', None]))
    
    def cleanup(conn, e=None):
        sel.unregister(conn)
        conn.close()
        if e:
            print('ERROR', e)

    def write(conn, mask, data):
        if not data[1]:
            return cleanup(conn)
        written = conn.send(data[1][:NUM_BYTES])
        if written == 0:
            return cleanup(conn)
        data[1] = data[1][written:]
        # Once all the data has gone:
        if not data[1]:
            data[2] = False
            if data[2]:
                cleanup(conn)
            else:
                sel.modify(conn, selectors.EVENT_READ, (read, data))

    def handle_headers(headers):
        close = True
        method, path, version = headers[0].split(b' ')
        for line in headers[1:]:
            parts = line.split(b':')
            key = parts[0].decode('ascii').lower().strip()
            value = (b':'.join(parts[1:])).decode('ascii').lower().strip()
            if key == 'connection':
                if value == 'close':
                    close = True
                elif value == 'keep-alive':
                    close = False
                break
        return close, method, path, version
    
    def read(conn, mask, data):
        if data[2] is True:
            return cleanup(conn, Exception('Reading from a connection that is supposed to be closed'))
        received = conn.recv(NUM_BYTES)
        if received == b'':
            return cleanup(conn)
        data[0] += received
        location = data[0].find(b'\r\n\r\n')
        if location >= 0:
            headers = data[0][:location].split(b'\r\n')
            close, method, path, version = handle_headers(headers)
            data[1] += version + b' 200 OK\r\n'
            if not close:
                data[1] += b'Connection: Keep-Alive\r\n'
            else:
                data[1] += b'Connection: Close\r\n'
            data[0] = data[0][location+4:]
            data[1] += b'Content-Type: text/plain\r\nContent-Length: 2\r\n\r\nok'
            data[2] = close
            sel.modify(conn, selectors.EVENT_WRITE, (write, data))
            if data[0]:
                raise Exception('Pipelineing not supported')

    sel.register(sock, selectors.EVENT_READ, (accept, None))
    while True:
        events = sel.select()
        for key, mask in events:
            callback = key.data[0]
            try:
                callback(key.fileobj, mask, key.data[1])
            except Exception as e:
                print(e)
                # But continue anyway for now
    
if __name__ == '__main__':
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('localhost', 8081))
    
    num_workers = 8
    workers = []
    try:
        for i in range(num_workers):
            worker = multiprocessing.Process(target=run_worker, args=(i + 1, sock))
            worker.daemon = True
            worker.start()
            workers.append(worker)
        for worker in workers:
            worker.join()
    except KeyboardInterrupt:
        print('Shutting down worker processes ...')
        for worker in workers:
            worker.terminate()
    except Exception as e:
        print(f'Error:', e)
        raise
    sock.close()

As I say, both these approaches failed, so I wouldn't recommend them.

See also:

Comments

Be the first to comment.

Add Comment





Copyright James Gardner 1996-2020 All Rights Reserved. Admin.