Worker¶
On the child process side all the fun starts from worker.WorkerBase.main()
method. After fork() child process:
closes parent event loop, disconnects signal handlers from it;
creates new event loop
calls
worker.WorkerBase.on_start()
callbackscalls
worker.WorkerBase.main()
on
main()
exit, callsworker.WorkerBase.on_shutdown()
callbackscleanups event loop and exits child process.
Quickstart¶
Inherit from
worker.WorkerBase
and implementmain()
coroutine or method. It should be a forever loop with periodicWorkerBase.is_running()
check.Add initialization callbacks with
worker.WorkerBase.on_start()
Warning
Don’t initialize anything related to asyncio
before on_start
callbacks, parent event loop will be used and after closing it child
process will just do nothing.
Cleanup callbacks should be added with
worker.WorkerBase.on_shutdown()
.Don’t forget to re-initialize logging, if necessary.
Child termination¶
If got SIGTERM
signal, worker.WorkerBase.stop()
is called to mark
worker as not running. After that worker waits for main()
and executes
shutdown sequence.
Warning
It is on a developer to periodically check and gracefully stop
main()
loop if worker.WorkerBase.is_running()
returns False
.
If got SIGINT
signal, worker cancels main()
if it is a coroutine.
If not, worker just executes shutdown sequence:
schedules
worker.WorkerBase.on_shutdown()
callbacks;schedules
asyncio.AbstractEventLoop.stop()
;closes event loop and exists child process.
Example¶
(see sockets.py)
import asyncio
import os
import socket
from aiohttp import web
from aioworkerpool import master, worker
def open_socket():
# open socket and prepare it to be handled in worker processes
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
s.bind(('127.0.0.1', 8080))
return s
class WorkerHandler(worker.WorkerBase):
def __init__(self, worker_id, loop):
super().__init__(worker_id, loop)
# add custom initialization on worker startup
self.on_start(self.init_web_app)
self.server = self.app = self.handler = None
async def pong(self, _):
# aiohttp view handler
return web.Response(text='pong from #%s %s!\n' %
(self.id, os.getpid()))
async def init_web_app(self):
# initialize aiohttp.web.Application instance with new event
# loop only after child process creation
self.app = web.Application(loop=self.loop)
self.app.router.add_route('GET', '/', self.pong)
# make handler for app
self.handler = self.app.make_handler()
# start async tcp server
self.server = await self.loop.create_server(
self.handler, sock=sock)
# call app startup callbacks
await self.app.startup()
async def shutdown_web_app(self):
# close tcp server
self.server.close()
# wait for closing
await self.server.wait_closed()
# call app shutdown callbacks
await self.app.shutdown()
# cleanup connections
await self.handler.finish_connections(1.0)
# call app cleanup callbacks
await self.app.cleanup()
async def main(self):
# aiohttp application is already completely initialized and
# receiving requests, so main loop just checks periodically
# that worker is still running.
while self.is_running():
await asyncio.sleep(1)
# open socket in master process to share it with workeres
sock = open_socket()
supervisor = master.Supervisor(
worker_factory=WorkerHandler,
# add opened socket to preserved file descriptor list
preserve_fds=[sock.fileno()])
supervisor.main()
Module reference¶
-
class
aioworkerpool.worker.
WorkerBase
(worker_id: int, loop: asyncio.events.AbstractEventLoop)¶ Abstract class for worker implementation.
-
property
id
¶ Logical worker id.
-
interrupt
() → asyncio.futures.Future¶ SIGINT signal handler.
Interrupts main(), cleanups event loop and exits child process.
-
is_running
()¶ Returns worker state, running or terminating.
- Returns
True if worker is in running state
-
property
loop
¶ Current event loop instance.
-
abstract
main
()¶ Worker infinite loop.
Must be implemented in descendant classes. If main() is a coroutine, called with loop.run_until_completed. If not, when main() is called, event loop is not running.
-
on_shutdown
(callback: Union[Callable[], None], Callable[], Awaitable[None]]])¶ Appends a callback to a shutdown callback list.
-
on_start
(callback: Union[Callable[], None], Callable[], Awaitable[None]]])¶ Appends a callback to a startup callback list.
-
start
() → asyncio.futures.Future¶ Worker entry point.
Runs on_start callbacks, starts main() infinite loop, waits for main() exit, then executes on_shutdown callbacks.
:returns future that happens when event loop is stopped.
-
stop
()¶ Mark worker as stopping.
-
terminate
()¶ SIGTERM signal handler.
Marks worker as stopping, waits for main() exit, cleanups event loop and exits child process.
-
property