Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow underlying queue to be customized #40

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jordan-heemskerk
Copy link

@jordan-heemskerk jordan-heemskerk commented Jul 7, 2020

I've got a use case where I have I'd like to inject a custom queue type that behaves closely to multiprocessing.Queue. This allows the user to supply a factory function to create new queue instances.

@jruere
Copy link
Owner

jruere commented Jul 8, 2020

Interesting. What is the use case for this?

@jordan-heemskerk
Copy link
Author

So, like I said, the core use case is being able to use a custom queue.

The long story is that I'm using some CUDA code with multiprocessing (and hopefully logging too!). CUDA will not work with the default (on Unix based systems) fork context of multiprocessing. Because of this, I am constrained to using the spawn context. Since the child process does not get a copy of the parent process memory space in the spawn context, the current implementation of multiprocessing-logging does not work. The reason it does not work on Windows/OSX is the same I believe.

I've written some fairly repository specific code to allow me to do logging from multiprocessing with a spawn context. There are some very large overlaps with the code used in this repository, with the largest deviation being that I need to use a multiprocessing.Manager().Queue() instead of a multiprocessing.Queue(). There is also some set-up that needs to be run in the child processes, but I think that is out of scope of multiprocessing-logging.

So, in the end, I'd really like to take advantage of the MultiProcessHandler class you've written, but I need to be able to use multiprocessing.Manager().Queue(). I figured the best approach might be allow any user to inject any multiprocessing.Queue()-like object. If there is another approach that you would prefer, I'd be happy to explore that.

@jruere
Copy link
Owner

jruere commented Jul 10, 2020

This is very interesting! Lets me read a little and play a little with this.
Ideally, the library should choose the right queue automatically but I don't know how feasible is that.

@jordan-heemskerk
Copy link
Author

Ideally, the library should choose the right queue automatically but I don't know how feasible is that.

Yeah, in a perfect world, I agree. Unfortunately, to get logging to work in my spawn context with multiprocessing pool I actually needed a lot more code that I'm not sure belongs in this library.

class QueueProxy():
    """
    Exposes a multiprocessing.Queue interface to a queue.Queue
    """

    def __init__(self, q):
        self.q = q

    def qsize(self):
        return self.q.qsize()

    def empty(self):
        return self.q.empty()

    def full(self):
        return self.q.full()

    def put(self, *args, **kwargs):
        return self.q.put(*args, **kwargs)

    def put_nowait(self, *args, **kwargs):
        return self.q.put_nowait(*args, **kwargs)

    def get(self, *args, **kwargs):
        return self.q.get(*args, **kwargs)

    def get_nowarit(self, *args, **kwargs):
        return self.q.get_nowait(*args, **kwargs)

    def close(self):
        pass

    def join_thread(self):
        pass

    def cancel_join_thread(self):
        pass


class ManagedQueueMultiProcessingHandler(MultiProcessingHandler):

    def __init__(self, name, sub_handler=None):
        super().__init__(name, sub_handler, lambda: QueueProxy(multiprocessing.Manager().Queue(-1)))


def _initwrap(queues, levels, initfunc, initargs):

    # Configure the logger for this process
    root_logger = logging.getLogger("")
    for handler in root_logger.handlers:
        root_logger.removeHandler(handler)
    root_logger.setLevel(0)
    for queue, level in zip(queues, levels):
        handler = logging.handlers.QueueHandler(queue)
        handler.setLevel(level)
        root_logger.addHandler(handler)

    # Call the original provided initfunc
    if initfunc is not None and initargs is not None:
        initfunc(*initargs)
    elif initfunc is not None:
        initfunc()



class LoggedSpawnerPool(multiprocessing.pool.Pool):
    def __init__(self, processes=None, initializer=None, initargs=None, maxtasksperchild=None, context=None):
        queues = []
        levels = []
        for handler in logger.parent.handlers:
            if isinstance(handler, MultiProcessingHandler):
                queues.append(handler.queue)
                levels.append(handler.level)
            else:
                logger.warn(f"Log handler {handler} is not a MultiProcessingHandler, so will not get log messages during multiprocessing")
        ctx = multiprocessing.get_context("spawn") if context is None else context
        super().__init__(processes, _initwrap, [queues, levels, initializer, initargs], maxtasksperchild, ctx)

which can then be used like

logger = logging.getLogger(__name__)

def worker(args):                                                                                                                                                                                                                
    x = args                                                                                                                                                                                                                     
    logger.info(x)                                                                                                                                                                                                               
                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                   
if __name__ == "__main__":                                                                                                                                                                                                       
    config_logging()   # sets handlers and wraps them in ManagedQueueMulitProcessingHandler                                                                                                                          
    with LoggedSpawnerPool(3) as pool:                                                                                                                                                                                           
        pool.map(worker, range(10))

Hopefully that can help give you a little more context.

def __init__(self, name, sub_handler=None, queue_factory=None):
"""
:param name: The name of this handler
:param sub_handler: The handler to foward events to. If None, use a new logging.SteamHandler
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param sub_handler: The handler to foward events to. If None, use a new logging.SteamHandler
:param sub_handler: The handler to forward events to. If None, use a new logging.SteamHandler

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants