Master¶
Master process does lots of things.
Maintains worker process pool
Redirects worker stdout and stderr streams
Checks if worker alive
Collects worker logs
Handles correct pool termination
Performs daemonization if necessary
Configuring Supervisor¶
Worker pool parameters are configured with aioworkerpool.master.Supervisor
.
master.Supervisor(
WorkerHandler,
loop=asyncio.get_event_loop(),
workers=10,
check_interval=15,
**kwargs
)
Parameters:
- WorkerHandler - worker factory, callable that returns an
worker.WorkerBase
descendant which implements application logics.
loop - asyncio event loop instance, default is
None
workers - number of child processes to start
check_interval - pool consistency check period in seconds
kwargs - additional
master.ChildHandler
arguments
Configuring ChildHandler¶
Child handler interacts with a single child process. It takes care about
stdout/stderr redirection, logging and keepalive. Supervisor
calls
Supervisor.child_factory
with these arguments:
worker = self.child_factory(
worker_id,
self._loop,
self._worker_factory,
**self._kwargs)
worker_id - worker id in range from
0
toworkers
loop - asyncio event loop instance
worker_factory - worker factory callable
kwargs - extra keyword arguments including:
stderr - file-like object to redirect stderr from workers
stdout - file-like object to redirect stdout from workers
worker_timeout - worker keep-alive timeout in seconds
preserve_fds - list of file descriptors to preserve on
fork()
Module reference¶
-
class
aioworkerpool.master.
Supervisor
(worker_factory: Callable[[int, asyncio.events.AbstractEventLoop], aioworkerpool.worker.WorkerBase], *, loop: Optional[asyncio.events.AbstractEventLoop] = None, workers: Union[int, collections.abc.Iterable] = 2, check_interval: float = 1.0, **kwargs)¶ Controls worker pool.
- Variables
child_factory – callable that instantiates child process handler
-
add_worker
(worker_id: int)¶ Adds a worker with new id and schedules worker startup.
- Raises
ValueError – if worker already exists
-
child_factory
¶ alias of
aioworkerpool.master.ChildHandler
-
get_daemon_context
(pidfile)¶ Initializes daemon context.
-
interrupt
() → asyncio.tasks.Task¶ SIGINT signal handler.
Interrupts all workers, cleanups event loop and exits process.
:returns task for stopping workers
-
main
(daemonize=False, pidfile='aioworkerpool.pid')¶ Supervisor entry point.
Starts and serves worker pool.
- Parameters
daemonize (
bool
) – option to become a unix daemonpidfile (
str
) – path to daemon pid file
-
on_shutdown
(callback: Union[Callable[], None], Callable[], Awaitable[None]]])¶ Appends a callback to a shutdown callback list.
-
on_start
(callback: Union[Callable[], None], Callable[], Awaitable[None]]])¶ Appends a callback to a startup callback list.
-
remove_worker
(worker_id: int, sig: int = <Signals.SIGTERM: 15>) → asyncio.futures.Future¶ Stops and removes worker with id.
- Return type
Future
- Parameters
worker_id (
int
) – worker id to removesig (
int
) – signal to send to child process
- Raises
KeyError – if id does not exist.
- Returns
future that happens when child process exits.
-
start
() → asyncio.tasks.Task¶ Initializes event loop and prepares infinite pool check loop.
:returns forever loop task
-
start_worker
(worker_id: int)¶ Starts a new worker for id.
- Parameters
worker_id (
int
) – logical id of created worker
-
stop
()¶ Mart supervisor as stopping.
-
terminate
()¶ SIGTERM signal handler.
Gracefully stops all workers, cleanups event loop and exits process.
-
class
aioworkerpool.master.
ChildHandler
(worker_id: int, loop: asyncio.events.AbstractEventLoop, worker_factory: Callable[[int, asyncio.events.AbstractEventLoop], aioworkerpool.worker.WorkerBase], *, stdout=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, stderr=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, worker_timeout=15.0, preserve_fds: Iterable[int] = ())¶ Worker process handler.
-
child_exists
() → bool¶ Checks if child process exists.
-
cleanup_parent_loop
()¶ Cleanups and stops parent event loop in child process.
Closes file descriptors not listed in preserved_fds.
-
fork
() → bool¶ Creates worker process with fork() method
Also initializes pipes between parent and child processes and installs pickling logging handler for child process
- Returns
True for parent process and False for child
-
init_worker
()¶ Construct worker object with worker factory.
-
init_worker_logging
(logging_pipe: int)¶ Initializes handler for passing logs from internal logger to parent process through pipe.
- Parameters
logging_pipe (
int
) – number of file descriptor for logging pipe
-
is_running
() → bool¶ Returns worker running state.
-
is_stale
() → bool¶ Checks if child process hang up.
-
on_child_exit
(pid: int, return_code: int)¶ Handles child process exit.
Stops logging and keepalive tasks.
- Parameters
pid (
int
) – pid of exited processreturn_code (
int
) – exit code of process
-
send_and_wait
(sig: int = <Signals.SIGKILL: 9>) → asyncio.futures.Future¶ Sends signal to child process and waits for process shutdown.
- Return type
Future
- Parameters
sig (
int
) – signal to send to child (TERM, INT, KILL)- Returns
future with exitcode of child process
-
start
()¶ Starts new child process.
-