-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add "consume()" and "cancel()" to the Twisted API #72
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The setupRead()
method and the _message_callback
attribute can now be removed, right?
Also, I think it would be a bit more pythonic if the consumer dict that is returned by consume()
and passed to cancel()
was an object (it could be a subclass of dict
).
Yeah, we'll need to make adjustments in the migration tools, but that's not hard.
Sure thing. I originally had them as namedtuples that were hashable, but then I wanted to add in a reference to the channel they're consuming on and thus mutate the tuple which... anyway, I think it got refactored a bit since I tried that out so I'm going to look at doing that again. Thanks for the feedback! |
0771a57
to
caf7898
Compare
Hey @abompard, I've done a bit more adding to the API here, but I'd like your thoughts before I start writing tests (and fixing the rest of the existing ones I broke). It's not broken out into proper commits or anything like that, so I'd recommend just looking at the function signatures and letting me know what you think. I'll tidy it up tomorrow, but I expect you'll wake up before I do 😄. The general idea is to stuff the config a user wants to persist across connections into the factory, and push everything else into the protocol class. That way a user sets up the factory with some consumers and can rest easy knowing they'll be automatically reconfigured when connection failures occur. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thanks!
It does not contain your one-channel-per-consumer changes yet, right?
fedora_messaging/twisted/factory.py
Outdated
exchanges=None, | ||
queues=None, | ||
bindings=None, | ||
consumers=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's useful to have the consumers
arg here as well as the consume()
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that you'd use this for the initial setup (before you've tried to start the service) and later call consume and cancel while the service is running. What made me add it was that for fmn-next I had a big set of existing consumers, but I wanted an API to add and remove consumers when people change their settings in the web UI.
That being said, I think the reason I did this was that originally the protocol needed to be set up and producing before you could call consume (or it'd call it for you), but it's no longer that way so having "one way to do it" with consume()
sounds nice.
) | ||
deferred = self._read(queue_object, consumer) | ||
deferred.addErrback( | ||
lambda f: _log.failure, "_read failed on consumer {c}", c=consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 7 lines are duplicated in consume()
but I'm not sure it's worthwhile to factor them out ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll keep that in mind, it might be worth factoring them out if the testing is burdensome
fedora_messaging/twisted/protocol.py
Outdated
yield self._channel.basic_qos( | ||
prefetch_count=config.conf["qos"]["prefetch_count"], | ||
prefetch_size=config.conf["qos"]["prefetch_size"], | ||
all_channels=True, | ||
) | ||
if self._confirms: | ||
yield self._channel.confirm_delivery() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has already been done in _allocate_channel()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right 👍
return | ||
|
||
try: | ||
yield consumer.channel.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Until every consumer is in their own channel, this will close the main channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeremycline what do you think of that comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, each consumer now has its own channel, allocated in the consume()
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do they? I'm not seing this, could you point me to the right line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, my bad, the Github UI was somehow showing an old version.
It does not yet, but I'm going to include that. |
7d83730
to
760d984
Compare
Okay, I think this is now ready for a review again. I'm very sorry it's gotten so big. Also, I'm not sure what the deal is with codecov. |
760d984
to
3590657
Compare
import socket | ||
|
||
from crochet import setup, wait_for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use pytest_twisted as elsewhere? Did you have issues with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to run the actual reactor and start real connections, I think I just assumed twisted_pytest wasn't doing that, but it does seem to just use the normal reactor. I can re-write the tests to use that, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, maybe I wanted timeouts so tests wouldn't hang forever? I don't remember.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you think using crochet is better, let's keep it and convert the other tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I had to fiddle a bit, but I think I've got things all going on pytest_twisted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past I've seen tests pass when they should fail with several of the tools that try to integrate Twisted with unit tests runners. As a result I'm always a bit worried on that subject, we should be very careful that we're using them properly and not causing false negatives.
972f1b7
to
3dbd828
Compare
This combines the _queues set with the _consumers dictionary and uses a new representation that includes the consumer tag, the queue for the consumer, and the callback to be invoked when a message arrives. This offers several advantages: * Creating our own consumer tag rather than waiting for the server to create one for us, we can register consumers and return references to callers without actually calling out to the server. This will be used in the future with a ``consume`` and ``cancel`` API that doesn't require ``resumeProducing`` to be called first. * Allows for Twisted users to have different callbacks for different consumers without starting one service per callback. This is a prep-work commit and doesn't change the behavior of the Twisted protocol. Signed-off-by: Jeremy Cline <jcline@redhat.com>
Allow users to register and unregister new consumers with the Twisted protocol. Signed-off-by: Jeremy Cline <jcline@redhat.com>
3dbd828
to
9cb280f
Compare
Codecov Report
@@ Coverage Diff @@
## master #72 +/- ##
==========================================
- Coverage 97.29% 96.63% -0.67%
==========================================
Files 11 11
Lines 813 891 +78
Branches 115 117 +2
==========================================
+ Hits 791 861 +70
- Misses 10 19 +9
+ Partials 12 11 -1
Continue to review full report at Codecov.
|
Tests should be fixed up so this is ready for re-review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK this looks great, thanks for all that work!
Allow users to add and remove consumers to a Twisted connection. As a side-effect, each consumer can have its own callback.
I'm planning on a follow-up PR that gives each consumer its own channel to work with so one consumer stopping/messing up doesn't cancel all active consumers.