Worker

On the child process side all the fun starts from worker.WorkerBase.main() method. After fork() child process:

  • closes parent event loop, disconnects signal handlers from it;

  • creates new event loop

  • calls worker.WorkerBase.on_start() callbacks

  • calls worker.WorkerBase.main()

  • on main() exit, calls worker.WorkerBase.on_shutdown() callbacks

  • cleanups event loop and exits child process.

Quickstart

  1. Inherit from worker.WorkerBase and implement main() coroutine or method. It should be a forever loop with periodic WorkerBase.is_running() check.

  2. Add initialization callbacks with worker.WorkerBase.on_start()

Warning

Don’t initialize anything related to asyncio before on_start callbacks, parent event loop will be used and after closing it child process will just do nothing.

  1. Cleanup callbacks should be added with worker.WorkerBase.on_shutdown().

  2. Don’t forget to re-initialize logging, if necessary.

Child termination

If got SIGTERM signal, worker.WorkerBase.stop() is called to mark worker as not running. After that worker waits for main() and executes shutdown sequence.

Warning

It is on a developer to periodically check and gracefully stop main() loop if worker.WorkerBase.is_running() returns False.

If got SIGINT signal, worker cancels main() if it is a coroutine. If not, worker just executes shutdown sequence:

  • schedules worker.WorkerBase.on_shutdown() callbacks;

  • schedules asyncio.AbstractEventLoop.stop();

  • closes event loop and exists child process.

Example

(see sockets.py)

import asyncio
import os
import socket

from aiohttp import web

from aioworkerpool import master, worker


def open_socket():
    # open socket and prepare it to be handled in worker processes
    s = socket.socket()
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    s.bind(('127.0.0.1', 8080))
    return s


class WorkerHandler(worker.WorkerBase):
    def __init__(self, worker_id, loop):
        super().__init__(worker_id, loop)
        # add custom initialization on worker startup
        self.on_start(self.init_web_app)
        self.server = self.app = self.handler = None

    async def pong(self, _):
        # aiohttp view handler
        return web.Response(text='pong from #%s %s!\n' %
                                 (self.id, os.getpid()))

    async def init_web_app(self):
        # initialize aiohttp.web.Application instance with new event
        # loop only after child process creation
        self.app = web.Application(loop=self.loop)
        self.app.router.add_route('GET', '/', self.pong)
        # make handler for app
        self.handler = self.app.make_handler()
        # start async tcp server
        self.server = await self.loop.create_server(
            self.handler, sock=sock)
        # call app startup callbacks
        await self.app.startup()

    async def shutdown_web_app(self):
        # close tcp server
        self.server.close()
        # wait for closing
        await self.server.wait_closed()
        # call app shutdown callbacks
        await self.app.shutdown()
        # cleanup connections
        await self.handler.finish_connections(1.0)
        # call app cleanup callbacks
        await self.app.cleanup()

    async def main(self):
        # aiohttp application is already completely initialized and
        # receiving requests, so main loop just checks periodically
        # that worker is still running.
        while self.is_running():
            await asyncio.sleep(1)


# open socket in master process to share it with workeres
sock = open_socket()

supervisor = master.Supervisor(
    worker_factory=WorkerHandler,
    # add opened socket to preserved file descriptor list
    preserve_fds=[sock.fileno()])

supervisor.main()

Module reference

class aioworkerpool.worker.WorkerBase(worker_id: int, loop: asyncio.events.AbstractEventLoop)

Abstract class for worker implementation.

property id

Logical worker id.

interrupt()asyncio.futures.Future

SIGINT signal handler.

Interrupts main(), cleanups event loop and exits child process.

is_running()

Returns worker state, running or terminating.

Returns

True if worker is in running state

property loop

Current event loop instance.

abstract main()

Worker infinite loop.

Must be implemented in descendant classes. If main() is a coroutine, called with loop.run_until_completed. If not, when main() is called, event loop is not running.

on_shutdown(callback: Union[Callable[], None], Callable[], Awaitable[None]]])

Appends a callback to a shutdown callback list.

on_start(callback: Union[Callable[], None], Callable[], Awaitable[None]]])

Appends a callback to a startup callback list.

start()asyncio.futures.Future

Worker entry point.

Runs on_start callbacks, starts main() infinite loop, waits for main() exit, then executes on_shutdown callbacks.

:returns future that happens when event loop is stopped.

stop()

Mark worker as stopping.

terminate()

SIGTERM signal handler.

Marks worker as stopping, waits for main() exit, cleanups event loop and exits child process.