Logging

Aioworkerpool uses standart logging library to track pool activity.

It defines several loggers:

  • 'aioworkerpool.Supervisor' to track process pool activity,

  • 'aioworkerpool.Handler' to track invididual child process behabior,

  • 'aioworkerpool.Worker' to track what happens in child process.

Default logging setup

By default, master process does not configure any logging handlers. A worker process instead configures aioworkerpool.logging.PickleStreamHandler instance that proxies all messages to master process via pipe. This handler is automatically added to 'aioworkerpool' logger. It pickles log record content and pass it to master process. At master process record is reconstructed and re-emitted with initial logger.

Custom logging

Master process may configure logging before supervisor startup, or while it is starting.

Warning

On start worker process close all file descriptors except stdout/stderr, so it’s on developer to close all file/stream handlers or add corresponding file descriptors to preserved descriptors list.

Worker process should re-configure logging on startup phase. It may use PickleStreamHandler.instance to reuse proxy handler.

Example

(see logs.py)

import asyncio
import logging
import sys

from aioworkerpool import master, worker
from aioworkerpool.logging import PickleStreamHandler


def init_master_logging():
    # Write all logs to stderr
    logging.basicConfig(
        level=logging.DEBUG, stream=sys.stderr,
        format="%(asctime)s [%(process)d] %(name)s: %(message)s")

    # Our special handler
    h = logging.StreamHandler(sys.stderr)
    h.setFormatter(logging.Formatter("Custom: %(message)s"))
    l = logging.getLogger("custom")
    l.propagate = False
    l.addHandler(h)


def init_worker_logging():
    # Remove all handlers initialized in master process
    logging.root.handlers.clear()
    # Add proxy handler to our custom logger for passing all messages
    # to master process.
    l = logging.getLogger("custom")
    l.handlers.clear()
    l.addHandler(PickleStreamHandler.instance)


class WorkerHandler(worker.WorkerBase):

    def __init__(self, worker_id, loop):
        super().__init__(worker_id, loop)
        # re-initializing logging on
        self.on_start(init_worker_logging)
        self.logger = logging.getLogger("custom")

    async def main(self):
        while self.is_running():
            # self.logger name by default is aioworkerpool.Worker
            self.logger.info("I am here!")
            await asyncio.sleep(1)


# Setup aioworkerpool.master.Supervisor instance
s = master.Supervisor(WorkerHandler)

# Add pre-start signal handler
s.on_start(init_master_logging)

s.main()

Module reference

class aioworkerpool.logging.PickleStreamHandler(stream: Optional[TextIO] = None)

Pickle-to-pipe logging handler.

Formats message record to pickle and writes it to stream.

Variables

instance – instance of PickleStreamHandler, if initialized

Parameters

stream – writable file-like object for pickled messages

format(record: logging.LogRecord)str

Formats log record to base64-encoded pickled string.

Return type

str

Parameters

record (LogRecord) – log record

Returns

encoded string

class aioworkerpool.logging.PicklePipeReader(fd: int, file: Optional[TextIO] = None, loop: Optional[asyncio.events.AbstractEventLoop] = None)

Reads encoded log records from pipe and re-emits it in master process.

Parameters
  • fd – pipe file descriptor

  • file – file to write to

  • loop – asyncio event loop

coroutine read_loop()

Reads, decodes and re-emits log records in infinite loop.