Home Blog CV Projects Patterns Notes Book Colophon Search

Asyncio Signals

15 Jul, 2024

The asyncio loop doesn't allow you to register multiple handlers for the same signal, and they have to be functions, not async functions.

Here's a class that:

import asyncio
import signal

class FirstSignalHandler:
    def __init__(self, loop, signals):
        self._loop = loop
        self._handlers = []
        self._signals = signals
        self._finished = asyncio.Event()
        self._run = False
        for sig in self._signals:
            self._loop.add_signal_handler(sig, lambda sig=sig: self._handle_signal(sig))

    def _handle_signal(self, sig):
        asyncio.create_task(self._run_handlers(sig))

    async def _run_handlers(self, sig):
        if self._run:
            print('Signal already received')
        else:
            self._run = True
            tasks = [handler() for handler in self._handlers]
            await asyncio.gather(*tasks)
            self._finished.set()

    def register(self, handler):
        if not asyncio.iscoroutinefunction(handler):
            raise ValueError("Handler must be an async function")
        self._handlers.append(handler)

    async def wait(self):
        await self._finished.wait()

async def example_task(name):
    try:
        while True:
            print(f"Task {name} is running")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print(f"Task {name} is cancelled")

async def async_handler_1():
    print("Async handler 1 called")
    await asyncio.sleep(1)
    print("Async handler 1 finished")

async def async_handler_2():
    print("Async handler 2 called")
    await asyncio.sleep(1)
    print("Async handler 2 finished")

async def async_handler_3():
    print("Async handler 3 called")
    await asyncio.sleep(1)
    print("Async handler 3 finished")

async def main():
    loop = asyncio.get_running_loop()
    first_signal = FirstSignalHandler(loop, [signal.SIGINT, signal.SIGTERM])

    # Run example tasks
    task1 = asyncio.create_task(example_task("A"))
    task2 = asyncio.create_task(example_task("B"))

    async def cancel():
        task1.cancel()
        task2.cancel()

    # Add multiple async signal handlers
    first_signal.register(async_handler_1)
    first_signal.register(async_handler_2)
    first_signal.register(cancel)

    try:
        await asyncio.wait([task1, task2])
    except asyncio.CancelledError:
        pass
    await first_signal.wait()
    print('Waited for everything to finish')

if __name__ == "__main__":
    asyncio.run(main())

Gives this with Ctrl+C/SIGTETRM sent twice:

Task A is running
Task B is running
Task A is running
Task B is running
Task A is running
Task B is running
^CAsync handler 1 called
Async handler 2 called
Task A is cancelled
Task B is cancelled
^CSignal already received
Async handler 1 finished
Async handler 2 finished
Waited for everything to finish

Comments

Be the first to comment.

Add Comment





Copyright James Gardner 1996-2020 All Rights Reserved. Admin.