Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kombu.common.Broadcast does not accept custom queue name #1014

Closed
imbuedhope opened this issue Feb 25, 2019 · 8 comments
Closed

kombu.common.Broadcast does not accept custom queue name #1014

imbuedhope opened this issue Feb 25, 2019 · 8 comments

Comments

@imbuedhope
Copy link

the following happens at the moment

>>> import kombu.common
>>> queue_name = 'my_unique_queue'
>>> bcast = kombu.common.Broadcast(name='foo', queue=queue_name)
>>> bcast.queue == queue_name
False

This behavior is inconsistent with the docs for the class which claims that you can pass along a queue name to override the default behavior of generating a unique one.

This, in turn, makes using kombu.common.Broadcast with celery.Celerty.conf.task_queues and celery.Celerty.conf.task_routes impossible since the queue is always different.

changing the following line

queue = '{0}.{1}'.format(queue or 'bcast', uuid())

to

    if queue is None:
        queue = '{0}.{1}'.format(queue or 'bcast', uuid())

resolves the issue, however.

I can make a PR if there's interest.

@imbuedhope imbuedhope changed the title Kombu Broadcast does not accept custom queue name kombu.common.Broadcast does not accept custom queue name Feb 25, 2019
@clokep
Copy link

clokep commented Feb 25, 2019

Might be a regression from #906?

@imbuedhope
Copy link
Author

imbuedhope commented Feb 25, 2019

Looks like it would be. I am confused about why the changes in #906 were needed.

with the changes described above, I can set up celery with the following

app = celery.Celery(
    __name__,
    backend = 'redis://localhost:6379/0',
    broker = 'redis://localhost:6379/1',
)
app.conf.task_queues = (
    kombu.Queue('celery', exchange='celery', routing_key='celery'),
    kombu.common.Broadcast('bcast_tasks', queue='bcast_tasks'),
)
app.conf.task_routes = {
    'bcast.tasks.*' : { 'queue' : 'bcast_tasks' }
}

and tasks route as expected.

@imbuedhope
Copy link
Author

How about this behaviour?
If a user provides a Queue object we will pass it directly.
If it provides a name we will create a unique queue for it.

A Broadcast object is a sub-class of Queue. How does that work exactly? (tbh, digging around I can't find where the kwarg queue gets consumed since it isn't a kwarg for Queue or MaybeChannelBound)

The purpose of a broadcast queue is to have a message fan out to each of the queues it owns.

My understanding was that Broadcast was a Queue that, if no exchange was provided, defaulted to using an Exchange with type=Fanout behavior.

That's what made sense to me based on the docs and looking around the code. A subclass that exists to make it a bit more convinent to do things. A definition to avoid re-writing a class that everyone needs.


Does reverting #906 break anything in particular? I don't see why it would be odd to specifiy a unique queue name in scenarios where it would be needed.

Generating a uuid based custom queue name and passing it to a Broadcast object seems more reasonable than having Broadcast.__init__ generate a unique name and consequently be inconsistent with it's superclass Queue.

@clokep
Copy link

clokep commented May 2, 2019

I think I'll introduce a unique keyword argument instead...
That way, if you want to, you'll be able to create a queue per worker and have it bound to that exchange.

Just spent a few minutes tracking down where this was done, so for posterity this feature was introduced in #1033.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants
@clokep @imbuedhope and others