Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump aio pika to 8.1.8 #113

Closed
wants to merge 13 commits into from
Closed

Conversation

unkcpz
Copy link
Member

@unkcpz unkcpz commented Aug 5, 2022

  • pamqp will check the exchange channel name, _.*/ are not allowed. Using uuid only.
  • 'RobustConnection' object has no attribute 'connection'
  • ack and nack which called when requeue task are async function in aio-pika

@unkcpz unkcpz marked this pull request as draft August 5, 2022 09:07
@codecov
Copy link

codecov bot commented Aug 6, 2022

Codecov Report

Merging #113 (8c9da20) into develop (0e7d02d) will decrease coverage by 2.34%.
The diff coverage is 89.48%.

❗ Current head 8c9da20 differs from pull request most recent head 1b530fe. Consider uploading reports for the commit 1b530fe to get more accurate results

@@             Coverage Diff             @@
##           develop     #113      +/-   ##
===========================================
- Coverage    90.32%   87.98%   -2.33%     
===========================================
  Files           16       16              
  Lines         1146     1156      +10     
===========================================
- Hits          1035     1017      -18     
- Misses         111      139      +28     
Impacted Files Coverage Δ
kiwipy/rmq/tasks.py 94.18% <83.34%> (-0.91%) ⬇️
kiwipy/rmq/threadcomms.py 84.89% <93.34%> (-8.90%) ⬇️
kiwipy/rmq/communicator.py 86.85% <100.00%> (-3.75%) ⬇️
kiwipy/rmq/messages.py 93.53% <100.00%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@unkcpz unkcpz marked this pull request as ready for review August 6, 2022 11:01
@unkcpz
Copy link
Member Author

unkcpz commented Aug 6, 2022

@muhrin could you have a look at these changes?

A critical change I am not confident in is that since _on_task is now made async, it can not be added to task outcome future by fut.add_done_callback. I tried to chain the callback with fut object but it create an extra reference to outcome which is conflicting with the idea of using weakref to trigger the requeue. If you prefer we can have a discussion, it is a bit hard to explain it since it is still vague to me how weakref works.

Also pinning @sphuber @chrisjsewell for comment.

Copy link
Collaborator

@muhrin muhrin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some problems with some of this PR. I'll come and have a chat and we can figure it out together.

def _outcome_destroyed(self, outcome_ref):
# This only happens if someone called self.process() and then let the future
# get destroyed without setting an outcome
assert outcome_ref is self._outcome_ref
# This task will not be processed
self._outcome_ref = None
self.requeue()
self._subscriber.loop().create_task(self.requeue())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it call requeue() using create_task when requeue() itself uses the loop_scheduler to await? Shouldn't this just be a requeue() call?

message_exchange = f'{__file__}.{shortuuid.uuid()}'
task_exchange = f'{__file__}.{shortuuid.uuid()}'
task_queue = f'{__file__}.{shortuuid.uuid()}'
message_exchange = f'{shortuuid.uuid()}'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove the __file__ part? Sometimes, I found it useful to have the file in there so that when looking at the RMQ web console I could tell which test the queues/exchanges belong to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because pamqp will check the exchange channel name, special characters such as _.*/ are not allowed.

message_exchange = f'{__file__}.{shortuuid.uuid()}'
task_exchange = f'{__file__}.{shortuuid.uuid()}'
task_queue = f'{__file__}.{shortuuid.uuid()}'
message_exchange = f'{shortuuid.uuid()}'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my comment above about __file__

kiwipy/rmq/tasks.py Outdated Show resolved Hide resolved
kiwipy/rmq/tasks.py Outdated Show resolved Hide resolved
kiwipy/rmq/threadcomms.py Show resolved Hide resolved
kiwipy/rmq/threadcomms.py Outdated Show resolved Hide resolved
@unkcpz
Copy link
Member Author

unkcpz commented Sep 5, 2022

Hi @muhrin, it's been so long that I think that probably better to set a quick meeting and check this together again.
I am now successfully putting the _on_task_done as a coroutine and schedule it in the event loop.
But when using the ThreadCommunicator this implementation comes up with a warning that coroutine call from another thread' and not able to execute.

@sphuber
Copy link
Collaborator

sphuber commented Oct 13, 2022

Superseded by #114

@sphuber sphuber closed this Oct 13, 2022
@unkcpz unkcpz deleted the bump-aio-pika branch October 13, 2022 15:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants