Master

Master process does lots of things.

  • Maintains worker process pool

  • Redirects worker stdout and stderr streams

  • Checks if worker alive

  • Collects worker logs

  • Handles correct pool termination

  • Performs daemonization if necessary

Configuring Supervisor

Worker pool parameters are configured with aioworkerpool.master.Supervisor.

master.Supervisor(
    WorkerHandler,
    loop=asyncio.get_event_loop(),
    workers=10,
    check_interval=15,
    **kwargs
)

Parameters:

  • WorkerHandler - worker factory, callable that returns an

    worker.WorkerBase descendant which implements application logics.

  • loop - asyncio event loop instance, default is None

  • workers - number of child processes to start

  • check_interval - pool consistency check period in seconds

  • kwargs - additional master.ChildHandler arguments

Configuring ChildHandler

Child handler interacts with a single child process. It takes care about stdout/stderr redirection, logging and keepalive. Supervisor calls Supervisor.child_factory with these arguments:

worker = self.child_factory(
        worker_id,
        self._loop,
        self._worker_factory,
        **self._kwargs)
  • worker_id - worker id in range from 0 to workers

  • loop - asyncio event loop instance

  • worker_factory - worker factory callable

  • kwargs - extra keyword arguments including:

    • stderr - file-like object to redirect stderr from workers

    • stdout - file-like object to redirect stdout from workers

    • worker_timeout - worker keep-alive timeout in seconds

    • preserve_fds - list of file descriptors to preserve on fork()

Module reference

class aioworkerpool.master.Supervisor(worker_factory: Callable[[int, asyncio.events.AbstractEventLoop], aioworkerpool.worker.WorkerBase], *, loop: Optional[asyncio.events.AbstractEventLoop] = None, workers: Union[int, collections.abc.Iterable] = 2, check_interval: float = 1.0, **kwargs)

Controls worker pool.

Variables

child_factory – callable that instantiates child process handler

add_worker(worker_id: int)

Adds a worker with new id and schedules worker startup.

Raises

ValueError – if worker already exists

child_factory

alias of aioworkerpool.master.ChildHandler

get_daemon_context(pidfile)

Initializes daemon context.

interrupt()asyncio.tasks.Task

SIGINT signal handler.

Interrupts all workers, cleanups event loop and exits process.

:returns task for stopping workers

main(daemonize=False, pidfile='aioworkerpool.pid')

Supervisor entry point.

Starts and serves worker pool.

Parameters
  • daemonize (bool) – option to become a unix daemon

  • pidfile (str) – path to daemon pid file

on_shutdown(callback: Union[Callable[], None], Callable[], Awaitable[None]]])

Appends a callback to a shutdown callback list.

on_start(callback: Union[Callable[], None], Callable[], Awaitable[None]]])

Appends a callback to a startup callback list.

remove_worker(worker_id: int, sig: int = <Signals.SIGTERM: 15>)asyncio.futures.Future

Stops and removes worker with id.

Return type

Future

Parameters
  • worker_id (int) – worker id to remove

  • sig (int) – signal to send to child process

Raises

KeyError – if id does not exist.

Returns

future that happens when child process exits.

start()asyncio.tasks.Task

Initializes event loop and prepares infinite pool check loop.

:returns forever loop task

start_worker(worker_id: int)

Starts a new worker for id.

Parameters

worker_id (int) – logical id of created worker

stop()

Mart supervisor as stopping.

terminate()

SIGTERM signal handler.

Gracefully stops all workers, cleanups event loop and exits process.

class aioworkerpool.master.ChildHandler(worker_id: int, loop: asyncio.events.AbstractEventLoop, worker_factory: Callable[[int, asyncio.events.AbstractEventLoop], aioworkerpool.worker.WorkerBase], *, stdout=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, stderr=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, worker_timeout=15.0, preserve_fds: Iterable[int] = ())

Worker process handler.

child_exists()bool

Checks if child process exists.

cleanup_parent_loop()

Cleanups and stops parent event loop in child process.

Closes file descriptors not listed in preserved_fds.

fork()bool

Creates worker process with fork() method

Also initializes pipes between parent and child processes and installs pickling logging handler for child process

Returns

True for parent process and False for child

init_worker()

Construct worker object with worker factory.

init_worker_logging(logging_pipe: int)

Initializes handler for passing logs from internal logger to parent process through pipe.

Parameters

logging_pipe (int) – number of file descriptor for logging pipe

is_running()bool

Returns worker running state.

is_stale()bool

Checks if child process hang up.

on_child_exit(pid: int, return_code: int)

Handles child process exit.

Stops logging and keepalive tasks.

Parameters
  • pid (int) – pid of exited process

  • return_code (int) – exit code of process

send_and_wait(sig: int = <Signals.SIGKILL: 9>)asyncio.futures.Future

Sends signal to child process and waits for process shutdown.

Return type

Future

Parameters

sig (int) – signal to send to child (TERM, INT, KILL)

Returns

future with exitcode of child process

start()

Starts new child process.