Home Blog CV Projects Patterns Notes Book Colophon Search

Python Async Pipes

25 Jun, 2024

Here's an example that does 700,000 round trips a second on my laptop:

main.py:

import asyncio
import json
import sys
import time


async def benchmark_subprocess():
    print(sys.executable, '-u', 'upper2.py')
    proc = await asyncio.create_subprocess_exec(
        sys.executable, '-u', 'upper2.py',
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        # stderr=asyncio.subprocess.PIPE
    )

    message = {"message": "hello"}
    num = 10_000_000
    start_time = time.time()

    data = (json.dumps(message) + '\n').encode('utf8')
    async def send():
        for i in range(num):
            proc.stdin.write(data)
            await proc.stdin.drain()
    async def receive():
        for i in range(num):
            data = await proc.stdout.readuntil(b'\n')
            response = json.loads(data.decode())
            assert response == {"message": "HELLO"}, repr(response)
    try:
        s = send()
        r = receive()
        await asyncio.gather(s, r)
        print(f"Round-trips per second {num/(time.time() - start_time):.0f}")
    except asyncio.CancelledError:
        proc.terminate()
        await proc.wait()
    finally:
        proc.terminate()
        await proc.wait()

async def main():
    await benchmark_subprocess()

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

upper2.py:

import asyncio
import sys

async def upper(data):
    return data.upper() + '\n'

async def process_data():
    loop = asyncio.get_running_loop()
    reader = asyncio.StreamReader()
    reader_protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin)

    transport, writer_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    writer = asyncio.StreamWriter(transport, writer_protocol, None, loop)

    while True:
        data = await reader.read(8)
        if not data:
            break
        upper_data = await upper(data.decode())
        writer.write(upper_data.encode())
        await writer.drain()

async def main():
    await process_data()

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

A direct version gives better throughput:

import os
import asyncio
import time

NUM_MESSAGES = 1_000_000

async def send_messages(writer, num_messages):
    for i in range(num_messages):
        writer.write(f"Message {i}\n".encode())
        await writer.drain()
    writer.write_eof()
    writer.close()

async def receive_messages(reader, num_messages):
    received_count = 0
    while received_count < num_messages:
        data = await reader.readuntil(b'\n')
        received_count += 1
    return received_count

def receiver_process(r_fd, num_messages):
    async def run_receiver():
        reader = asyncio.StreamReader()
        reader_transport, reader_protocol = await asyncio.get_running_loop().connect_read_pipe(
            lambda: asyncio.StreamReaderProtocol(reader), os.fdopen(r_fd, 'rb')
        )
        await receive_messages(reader, num_messages)

    asyncio.run(run_receiver())

async def main():
    # Create a pair of connected pipes
    r_fd, w_fd = os.pipe()

    # Fork the process
    pid = os.fork()

    if pid == 0:
        # Child process: receiver
        os.close(w_fd)  # Close the write end in the child process
        receiver_process(r_fd, NUM_MESSAGES)
        os._exit(0)  # Exit the child process
    else:
        # Parent process: sender
        os.close(r_fd)  # Close the read end in the parent process

        # Convert the write file descriptor to an asyncio stream
        writer_transport, writer_protocol = await asyncio.get_running_loop().connect_write_pipe(
            asyncio.streams.FlowControlMixin, os.fdopen(w_fd, 'wb')
        )
        writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_running_loop())

        # Start timing
        start_time = time.time()

        # Send messages
        await send_messages(writer, NUM_MESSAGES)

        # Wait for the child process to finish
        os.waitpid(pid, 0)

        # End timing
        end_time = time.time()

        # Calculate and print the messages per second
        time_taken = end_time - start_time
        messages_per_second = NUM_MESSAGES / time_taken
        print(f"Time taken to send and receive {NUM_MESSAGES} messages: {time_taken:.2f} seconds")
        print(f"Messages per second: {messages_per_second:.2f}")

# Run the main function
asyncio.run(main())

Running:

$ python3 pipes5.py 
Time taken to send and receive 1000000 messages: 0.77 seconds
Messages per second: 1302611.65

Comments

Be the first to comment.

Add Comment





Copyright James Gardner 1996-2020 All Rights Reserved. Admin.