Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switching to boto3 only #693

Merged
merged 48 commits into from
Apr 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
b1169d7
Switch Boto2 to Boto3 for SQS messaging
Dec 30, 2016
fad9250
Fixed region support
Dec 30, 2016
39b7bd3
Add SQS FIFO queue support
Dec 30, 2016
04ee160
Add sensible defaults for message attributes
Jan 3, 2017
c270e92
Asynchronous support, plus boto3 for region endpoint lookups
Jan 12, 2017
1e4b1e3
Clean up imports
Jan 12, 2017
66870de
Fix Python 2 support
Jan 13, 2017
5f20c80
Fix receive_message tests
Jan 20, 2017
8a6fb0e
Reformat docstring
Jan 23, 2017
bc8e108
boto3 import changes for CI
Jan 23, 2017
759e09e
skip tests if boto3 not installed
Jan 23, 2017
c4b5564
skip tests if boto3 not installed
Jan 23, 2017
90c23b0
flake8
Jan 23, 2017
1389269
Merge commit 'b1169d7f5e564bae85f0026af0e58d2457d56ae2'
revmischa Feb 15, 2017
cd29d56
Merge remote-tracking branch 'pr/boto3-service-resource'
revmischa Feb 15, 2017
2d492a4
noboto
revmischa Feb 15, 2017
57ea7f1
ditching boto2. got queue URL fetching, async HTTP request generation…
revmischa Feb 15, 2017
fbab896
request signing working kinda
revmischa Feb 15, 2017
f8f568a
async parsing of SQS message response more or less working
revmischa Feb 15, 2017
053db8c
botocore sqs dep
revmischa Feb 15, 2017
bf8148e
ripping out more old boto2 stuff
revmischa Feb 15, 2017
aebabd8
removing tests that are no longer valid with boto3/SQS
revmischa Feb 15, 2017
17f4ab0
fix boto3 dep, min version and no botocore
revmischa Feb 15, 2017
a446e9d
no boto2 for test
revmischa Feb 15, 2017
b4911a4
cleaning up some SQS tests. fixing header parsing of response to msg
revmischa Feb 15, 2017
5729ff8
fixing some sqs tests
revmischa Feb 16, 2017
07bf709
removing response-parsing tests that are no longer necessary as we're…
revmischa Feb 16, 2017
b28bc7c
fixing more SQS tests
revmischa Feb 16, 2017
e67ec19
wants a region
revmischa Feb 16, 2017
78158a9
trying to fix py2 parsing of sqs message
revmischa Feb 16, 2017
1708986
lint
revmischa Feb 16, 2017
fefd287
py2/py2 message header parsing stupidness
revmischa Feb 16, 2017
56a38f0
forgot
revmischa Feb 16, 2017
a8c551f
python 2 sux
revmischa Feb 16, 2017
7c04093
flake8
revmischa Feb 16, 2017
b137cd3
Import boto3 from the right place
revmischa Feb 16, 2017
7377d71
Changes
adamszeptycki Feb 16, 2017
87d5f88
Update encode fuction
adamszeptycki Feb 16, 2017
e26deb7
Fix lint
adamszeptycki Feb 16, 2017
483f5bd
remove some unused things
revmischa Feb 17, 2017
4829931
Merge branch 'noboto2' of github.com:jetbridge/kombu into noboto2
revmischa Feb 17, 2017
c2d4e1a
removing unused stuff
revmischa Feb 17, 2017
6c09344
ugh
revmischa Feb 17, 2017
a738a00
ugh
revmischa Feb 17, 2017
ec7d0bf
ugh
revmischa Feb 17, 2017
1659c6c
landscape ignoring
revmischa Feb 17, 2017
06b56a4
shut up, landscape
revmischa Feb 17, 2017
5f66684
Merge branch 'master' into noboto2
auvipy Apr 14, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/hello_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import absolute_import, unicode_literals, print_function

from kombu import Connection
from kombu import Connection # noqa


with Connection('amqp://guest:guest@localhost:5672//') as conn:
Expand Down
229 changes: 98 additions & 131 deletions kombu/async/aws/connection.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
# -*- coding: utf-8 -*-
# * coding: utf8 *
"""Amazon AWS Connection."""
from __future__ import absolute_import, unicode_literals

from io import BytesIO

from vine import promise, transform

from kombu.async.aws.ext import AWSRequest, get_response

from kombu.async.http import Headers, Request, get_client
from kombu.five import items, python_2_unicode_compatible

from .ext import (
boto, AWSAuthConnection, AWSQueryConnection, XmlHandler, ResultSet,
)

try:
from urllib.parse import urlunsplit
except ImportError:
from urlparse import urlunsplit # noqa
from xml.sax import parseString as sax_parse # noqa
import io

try: # pragma: no cover
from email import message_from_file
from email import message_from_bytes
from email.mime.message import MIMEMessage

# py3
def message_from_headers(hdr): # noqa
bs = "\r\n".join("{}: {}".format(*h) for h in hdr)
return message_from_bytes(bs.encode())

except ImportError: # pragma: no cover
from mimetools import Message as MIMEMessage # noqa

def message_from_file(m): # noqa
return m
# py2
def message_from_headers(hdr): # noqa
return io.BytesIO(b'\r\n'.join(
b'{0}: {1}'.format(*h) for h in hdr
))

__all__ = [
'AsyncHTTPConnection', 'AsyncHTTPSConnection',
'AsyncHTTPResponse', 'AsyncConnection',
'AsyncAWSAuthConnection', 'AsyncAWSQueryConnection',
'AsyncHTTPSConnection', 'AsyncConnection',
]


Expand All @@ -56,11 +55,7 @@ def getheaders(self):
@property
def msg(self):
if self._msg is None:
self._msg = MIMEMessage(message_from_file(
BytesIO(b'\r\n'.join(
b'{0}: {1}'.format(*h) for h in self.getheaders())
)
))
self._msg = MIMEMessage(message_from_headers(self.getheaders()))
return self._msg

@property
Expand All @@ -78,7 +73,7 @@ def __repr__(self):


@python_2_unicode_compatible
class AsyncHTTPConnection(object):
class AsyncHTTPSConnection(object):
"""Async HTTP Connection."""

Request = Request
Expand All @@ -87,13 +82,9 @@ class AsyncHTTPConnection(object):
method = 'GET'
path = '/'
body = None
scheme = 'http'
default_ports = {'http': 80, 'https': 443}

def __init__(self, host, port=None,
strict=None, timeout=20.0, http_client=None, **kwargs):
self.host = host
self.port = port
def __init__(self, strict=None, timeout=20.0, http_client=None):
self.headers = []
self.timeout = timeout
self.strict = strict
Expand All @@ -112,14 +103,9 @@ def request(self, method, path, body=None, headers=None):
if headers is not None:
self.headers.extend(list(items(headers)))

def getrequest(self, scheme=None):
scheme = scheme if scheme else self.scheme
host = self.host
if self.port and self.port != self.default_ports[scheme]:
host = '{0}:{1}'.format(host, self.port)
url = urlunsplit((scheme, host, self.path, '', ''))
def getrequest(self):
headers = Headers(self.headers)
return self.Request(url, method=self.method, headers=headers,
return self.Request(self.path, method=self.method, headers=headers,
body=self.body, connect_timeout=self.timeout,
request_timeout=self.timeout, validate_cert=False)

Expand All @@ -137,7 +123,7 @@ def connect(self):
def close(self):
pass

def putrequest(self, method, path, **kwargs):
def putrequest(self, method, path):
self.method = method
self.path = path

Expand All @@ -157,139 +143,120 @@ def __repr__(self):
return '<AsyncHTTPConnection: {0!r}>'.format(self.getrequest())


class AsyncHTTPSConnection(AsyncHTTPConnection):
"""Async HTTPS Connection."""

scheme = 'https'


class AsyncConnection(object):
"""Async AWS Connection."""

def __init__(self, http_client=None, **kwargs):
if boto is None:
raise ImportError('boto is not installed')
def __init__(self, sqs_connection, http_client=None, **kwargs): # noqa
self.sqs_connection = sqs_connection
self._httpclient = http_client or get_client()

def get_http_connection(self, host, port, is_secure):
return (AsyncHTTPSConnection if is_secure else AsyncHTTPConnection)(
host, port, http_client=self._httpclient,
)
def get_http_connection(self):
return AsyncHTTPSConnection(http_client=self._httpclient)

def _mexe(self, request, sender=None, callback=None):
callback = callback or promise()
boto.log.debug(
'HTTP %s/%s headers=%s body=%s',
request.host, request.path,
request.headers, request.body,
)

conn = self.get_http_connection(
request.host, request.port, self.is_secure,
)
request.authorize(connection=self)
conn = self.get_http_connection()

if callable(sender):
sender(conn, request.method, request.path, request.body,
request.headers, callback)
else:
conn.request(request.method, request.path,
conn.request(request.method, request.url,
request.body, request.headers)
conn.getresponse(callback=callback)
return callback


class AsyncAWSAuthConnection(AsyncConnection, AWSAuthConnection):
"""Async AWS Authn Connection."""

def __init__(self, host,
http_client=None, http_client_params={}, **kwargs):
AsyncConnection.__init__(self, http_client, **http_client_params)
AWSAuthConnection.__init__(self, host, **kwargs)

def make_request(self, method, path, headers=None, data='', host=None,
auth_path=None, sender=None, callback=None, **kwargs):
req = self.build_base_http_request(
method, path, auth_path, {}, headers, data, host,
)
return self._mexe(req, sender=sender, callback=callback)


class AsyncAWSQueryConnection(AsyncConnection, AWSQueryConnection):
class AsyncAWSQueryConnection(AsyncConnection):
"""Async AWS Query Connection."""

def __init__(self, host,
http_client=None, http_client_params={}, **kwargs):
AsyncConnection.__init__(self, http_client, **http_client_params)
AWSAuthConnection.__init__(self, host, **kwargs)

def make_request(self, action, params, path, verb, callback=None):
request = self.build_base_http_request(
verb, path, None, params, {}, '', self.server_name())
if action:
request.params['Action'] = action
request.params['Version'] = self.APIVersion
return self._mexe(request, callback=callback)

def get_list(self, action, params, markers,
path='/', parent=None, verb='GET', callback=None):
def __init__(self, sqs_connection, http_client=None,
http_client_params=None, **kwargs):
if not http_client_params:
http_client_params = {}
AsyncConnection.__init__(self, sqs_connection, http_client,
**http_client_params)

def make_request(self, operation, params_, path, verb, callback=None): # noqa
params = params_.copy()
if operation:
params['Action'] = operation
signer = self.sqs_connection._request_signer # noqa

# defaults for non-get
signing_type = 'standard'
param_payload = {'data': params}
if verb.lower() == 'get':
# query-based opts
signing_type = 'presignurl'
param_payload = {'params': params}

request = AWSRequest(method=verb, url=path, **param_payload)
signer.sign(operation, request, signing_type=signing_type)
prepared_request = request.prepare()

# print(prepared_request.url)
# print(prepared_request.headers)
# print(prepared_request.body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of cleanup needed here

return self._mexe(prepared_request, callback=callback)

def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa
return self.make_request(
action, params, path, verb,
operation, params, path, verb,
callback=transform(
self._on_list_ready, callback, parent or self, markers,
operation
),
)

def get_object(self, action, params, cls,
path='/', parent=None, verb='GET', callback=None):
def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
return self.make_request(
action, params, path, verb,
operation, params, path, verb,
callback=transform(
self._on_obj_ready, callback, parent or self, cls,
self._on_obj_ready, callback, parent or self, operation
),
)

def get_status(self, action, params,
path='/', parent=None, verb='GET', callback=None):
def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
return self.make_request(
action, params, path, verb,
operation, params, path, verb,
callback=transform(
self._on_status_ready, callback, parent or self,
self._on_status_ready, callback, parent or self, operation
),
)

def _on_list_ready(self, parent, markers, response):
body = response.read()
if response.status == 200 and body:
rs = ResultSet(markers)
h = XmlHandler(rs, parent)
sax_parse(body, h)
return rs
def _on_list_ready(self, parent, markers, operation, response): # noqa
service_model = self.sqs_connection.meta.service_model
if response.status == 200:
_, parsed = get_response(
service_model.operation_model(operation), response.response
)
return parsed
else:
raise self._for_status(response, body)

def _on_obj_ready(self, parent, cls, response):
body = response.read()
if response.status == 200 and body:
obj = cls(parent)
h = XmlHandler(obj, parent)
sax_parse(body, h)
return obj
raise self._for_status(response, response.read())

def _on_obj_ready(self, parent, operation, response): # noqa
service_model = self.sqs_connection.meta.service_model
if response.status == 200:
_, parsed = get_response(
service_model.operation_model(operation), response.response
)
return parsed
else:
raise self._for_status(response, body)

def _on_status_ready(self, parent, response):
body = response.read()
if response.status == 200 and body:
rs = ResultSet()
h = XmlHandler(rs, parent)
sax_parse(body, h)
return rs.status
raise self._for_status(response, response.read())

def _on_status_ready(self, parent, operation, response): # noqa
service_model = self.sqs_connection.meta.service_model
if response.status == 200:
httpres, _ = get_response(
service_model.operation_model(operation), response.response
)
return httpres.code
else:
raise self._for_status(response, body)
raise self._for_status(response, response.read())

def _for_status(self, response, body):
context = 'Empty body' if not body else 'HTTP Error'
exc = self.ResponseError(response.status, response.reason, body)
boto.log.error('{0}: %r'.format(context), exc)
return exc
return Exception("Request {} HTTP {} {} ({})".format(
context, response.status, response.reason, body
))
31 changes: 14 additions & 17 deletions kombu/async/aws/ext.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
# -*- coding: utf-8 -*-
"""Amazon boto interface."""
"""Amazon boto3 interface."""
from __future__ import absolute_import, unicode_literals

try:
import boto
except ImportError: # pragma: no cover
boto = get_regions = ResultSet = RegionInfo = XmlHandler = None
import boto3
from botocore import exceptions
from botocore.awsrequest import AWSRequest
from botocore.response import get_response
except ImportError:
boto3 = None

class _void(object):
pass
AWSAuthConnection = AWSQueryConnection = _void # noqa

class BotoError(Exception):
class BotoCoreError(Exception):
pass
exception = _void()
exception.SQSError = BotoError
exception.SQSDecodeError = BotoError
else:
from boto import exception
from boto.connection import AWSAuthConnection, AWSQueryConnection
from boto.handler import XmlHandler
from boto.resultset import ResultSet
from boto.regioninfo import RegionInfo, get_regions
exceptions = _void()
exceptions.BotoCoreError = BotoCoreError
AWSRequest = _void()
get_response = _void()


__all__ = [
'exception', 'AWSAuthConnection', 'AWSQueryConnection',
'XmlHandler', 'ResultSet', 'RegionInfo', 'get_regions',
'exceptions', 'AWSRequest', 'get_response'
]
Loading