25 Jun, 2024
Here's an example that does 700,000 round trips a second on my laptop:
main.py
:
import asyncio
import json
import sys
import time
async def benchmark_subprocess():
print(sys.executable, '-u', 'upper2.py')
proc = await asyncio.create_subprocess_exec(
sys.executable, '-u', 'upper2.py',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
# stderr=asyncio.subprocess.PIPE
)
message = {"message": "hello"}
num = 10_000_000
start_time = time.time()
data = (json.dumps(message) + '\n').encode('utf8')
async def send():
for i in range(num):
proc.stdin.write(data)
await proc.stdin.drain()
async def receive():
for i in range(num):
data = await proc.stdout.readuntil(b'\n')
response = json.loads(data.decode())
assert response == {"message": "HELLO"}, repr(response)
try:
s = send()
r = receive()
await asyncio.gather(s, r)
print(f"Round-trips per second {num/(time.time() - start_time):.0f}")
except asyncio.CancelledError:
proc.terminate()
await proc.wait()
finally:
proc.terminate()
await proc.wait()
async def main():
await benchmark_subprocess()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
upper2.py
:
import asyncio
import sys
async def upper(data):
return data.upper() + '\n'
async def process_data():
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin)
transport, writer_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
writer = asyncio.StreamWriter(transport, writer_protocol, None, loop)
while True:
data = await reader.read(8)
if not data:
break
upper_data = await upper(data.decode())
writer.write(upper_data.encode())
await writer.drain()
async def main():
await process_data()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
A direct version gives better throughput:
import os
import asyncio
import time
NUM_MESSAGES = 1_000_000
async def send_messages(writer, num_messages):
for i in range(num_messages):
writer.write(f"Message {i}\n".encode())
await writer.drain()
writer.write_eof()
writer.close()
async def receive_messages(reader, num_messages):
received_count = 0
while received_count < num_messages:
data = await reader.readuntil(b'\n')
received_count += 1
return received_count
def receiver_process(r_fd, num_messages):
async def run_receiver():
reader = asyncio.StreamReader()
reader_transport, reader_protocol = await asyncio.get_running_loop().connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader), os.fdopen(r_fd, 'rb')
)
await receive_messages(reader, num_messages)
asyncio.run(run_receiver())
async def main():
# Create a pair of connected pipes
r_fd, w_fd = os.pipe()
# Fork the process
pid = os.fork()
if pid == 0:
# Child process: receiver
os.close(w_fd) # Close the write end in the child process
receiver_process(r_fd, NUM_MESSAGES)
os._exit(0) # Exit the child process
else:
# Parent process: sender
os.close(r_fd) # Close the read end in the parent process
# Convert the write file descriptor to an asyncio stream
writer_transport, writer_protocol = await asyncio.get_running_loop().connect_write_pipe(
asyncio.streams.FlowControlMixin, os.fdopen(w_fd, 'wb')
)
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_running_loop())
# Start timing
start_time = time.time()
# Send messages
await send_messages(writer, NUM_MESSAGES)
# Wait for the child process to finish
os.waitpid(pid, 0)
# End timing
end_time = time.time()
# Calculate and print the messages per second
time_taken = end_time - start_time
messages_per_second = NUM_MESSAGES / time_taken
print(f"Time taken to send and receive {NUM_MESSAGES} messages: {time_taken:.2f} seconds")
print(f"Messages per second: {messages_per_second:.2f}")
# Run the main function
asyncio.run(main())
Running:
$ python3 pipes5.py
Time taken to send and receive 1000000 messages: 0.77 seconds
Messages per second: 1302611.65
Be the first to comment.
Copyright James Gardner 1996-2020 All Rights Reserved. Admin.