-
-
Notifications
You must be signed in to change notification settings - Fork 940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Switching to boto3 only #693
Conversation
… and signing working.
@@ -160,15 +152,23 @@ def _reset_cycle(self): | |||
|
|||
def entity_name(self, name, table=CHARS_REPLACE_TABLE): | |||
"""Format AMQP queue name into a legal SQS queue name.""" | |||
return text_t(safe_str(name)).translate(table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@revmischa why is there a differentiation for queues with suffix .fifo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jseutter did that not me.
.fifo suffix means it's a FIFO queue, i.e. retains ordering. this is a new feature in SQS. non-FIFO queues don't preserve ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that a necessary change in the scope of removing boto2? I am pointing this out because the changes are already very extensive. I think it would be advisable to keep the diff to a minimum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think FIFO support is absolutely a requirement. Otherwise Kombu will likely break if someone tries to use a new-style queue. However I did not implement this, I'm including the changes from @jseutter in my PR because they're based off of his branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it is basically an enhancement, adding SQS FIFO support :) . I believe it belongs to a separate Pull Request, but I will not insist any further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't disagree with that. It's not my commit though.
requirements/extras/sqs.txt
Outdated
@@ -1,2 +1,3 @@ | |||
boto>=2.8 | |||
boto3 | |||
botocore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think boto3
already depends on botocore
. Also, perhaps it would be a good idea to specify (at least) a minimum version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good point
17f4ab0
t/unit/async/aws/case.py
Outdated
@@ -8,6 +8,7 @@ | |||
|
|||
@skip.if_pypy() | |||
@skip.unless_module('boto') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this line should be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
I found this project that seems quite active. Perhaps it could be used for integration tests? What do you think? Could be useful for Celery testing as well. |
Maybe usesful @georgepsarakis I'm just trying to get all the wacky original SQS tests passing again which is no small feat. They were extremely coupled to the boto2 behaviors. |
@jseutter check out the work I did and let me know if you have any questions. i'm not planning to spend any more time on this right this second. theres no boto2 left anywhere. |
@revmischa thanks for the work. what reamain in your thought? @jseutter review and comment what need to be change |
I managed to get all the tests passing, but I wasn't sure if all the modifications I made were proper. Really the code needs review by someone who understands Kombu better than me. I was trying to perform some major surgery without having any idea of how anything was supposed to fit together. I changed everything I could find but I wasn't really sure how to test all the cases. Probably the channel creation (esp with FIFO support) needs proper testing. I also made big changes to the message serialization/deserialization stuff. |
@revmischa could you please rebase? |
You rebase it. I gave up on Celery and wrote something better using PostgreSQL's |
@revmischa So are you (and the rest of the @jetbridge organization) formally abandoning this PR? I'm interested in getting this merged. Getting away from boto2 will be a large win for Kombu moving forward as boto2:
@georgepsarakis What are your feelings about this PR? How far is it from completion? What else needs to be done before you'd feel comfortable merging? I'm just stepping into this code right now but am willing to put in some hours if need be. I'm running off of the PR branch on my local machine and it seems to be close to working (my message bodies appear to be encoded in Base64, need to figure out why that is, quite possibly not related to this PR). @jseutter Do you have any thoughts on what needs to be done for this to be called "complete"? |
I'll merge in changes if you want but I don't have anything further to contribute to this PR personally. |
a rebase would be fine |
i rebased and squashed. plz test and report bugs |
@revmischa thanks for your efforts on this PR 😃 . @alukach @auvipy I will try to add integration tests in Celery using Atlassian LocalStack, which I recently discovered, and has SQS support. |
that would be really awesome! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying this out on my machine, I think there are some small issues around encoding to/from base64
when using Python3.4
|
||
# print(prepared_request.url) | ||
# print(prepared_request.headers) | ||
# print(prepared_request.body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit of cleanup needed here
@@ -8,6 +8,7 @@ | |||
@skip.unless_environ('AWS_ACCESS_KEY_ID') | |||
@skip.unless_environ('AWS_SECRET_ACCESS_KEY') | |||
@skip.unless_module('boto') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer needed (I believe)
"""Json serialized message.""" | ||
def encode(self, value): | ||
"""Encode/decode the value using Base64 encoding.""" | ||
return base64.b64encode(value).decode('utf-8') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to break with Python3.4. base64.b64encode()
expects a bytes-like object.
I think this line should be return base64.b64encode(value.encode()).decode()
|
||
def _message_to_python(self, message, queue_name, queue): | ||
payload = loads(bytes_to_str(message.get_body())) | ||
payload = loads(bytes_to_str(message['Body'])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this moment, won't the message be in in base64
?
Recommend changing this to:
body = base64.b64decode(message['Body'].encode())
payload = loads(bytes_to_str(body))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@revmischa I know you're signing off on this PR, however would you be able to comment on this? I was unable to get the code running with modifying the code as mentioned above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why base64 was used, if I recall correctly it was done in the boto2 version and I didn't change it. I was developing on python3.5 but I definitely had some issues with the encoding/decoding differences between py2 and p3 and I suggest anything you can do to simplify all of that is a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I now understand that the boto2 did the base64 encoding automatically. Will make a PR in a few...
@@ -209,14 +224,14 @@ def _message_to_python(self, message, queue_name, queue): | |||
delivery_info = {} | |||
properties = {'delivery_info': delivery_info} | |||
payload.update({ | |||
'body': bytes_to_str(message.get_body()), | |||
'body': bytes_to_str(message['Body']), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and then use the body
variable here.
@auvipy @georgepsarakis Thanks for rebasing, however I think the merge may have been a bit pre-emptive. Did anyone try to test this on their machines? I was unable to get workers to consume tasks without modifying the code. |
@alukach would you mind opening a separate Pull Request with your changes? It would be valuable to merge fixes since you seem to be testing with SQS directly. Thanks! |
@georgepsarakis PR made. Take a look. |
Yay! 🎉 @revmischa Thank you thank you for all the initial work done to support SQS!! it is much appreciated Thank you @alukach and kombu team for iterating on this patch, this was holding me back from using celery + sqs! |
Thanks! I was waiting for this merge :). When can I expect release with those changes? |
This change pins Kombu to an unreleased commit that fixes Celery 4 to work properly with SQS. While testing on the dev environment it became clear that while the celery worker was able to connect to the SQS queue it wasn't actually receiving any messages off it. The issue linked below [1] shows that Celery 4.0.2 does not work correctly with SQS. While there is a fix that that has been merged on Kombu [2] this has not made it into a release yet which is why this change pins Kombu to a commit on master. [1] celery/celery#3672 [2] celery/kombu#693
This change pins Kombu to an unreleased commit that fixes Celery 4 to work properly with SQS. While testing on the dev environment it became clear that while the celery worker was able to connect to the SQS queue it wasn't actually receiving any messages off it. The issue linked below [1] shows that Celery 4.0.2 does not work correctly with SQS. While there is a fix that that has been merged on Kombu [2] this has not made it into a release yet which is why this change pins Kombu to a commit on master. I have tested the Kombu commit with a minimal Django 1.11 app using SQS as the broker. [1] celery/celery#3672 [2] celery/kombu#693
Kombu is a library that celery uses under the hood. Kombu is pinned to a specific commit that brings in fixes for SQS - see celery/kombu#693. Kombu v4.0.2 (which ships with celery v4.0.2) doesn't work with SQS due to problems with their use of boto2, so Kombu migrated to boto3 - We're waiting for that to get a release version, and then to get a new version of celery that pulls that in. Until that point, we should override the kombu version to get these SQS fixes. Additionally, we don't need boto2 any more :)
I have removed all traces of boto2 that I could find. I got async message signing and parsing more or less working. Probably still needs some cleanup and testing.
Much farther along now on the journey to boto3.