Skip to content

Commit

Permalink
Switching to boto3 only (#693)
Browse files Browse the repository at this point in the history
* Switch Boto2 to Boto3 for SQS messaging

* Fixed region support

* Add SQS FIFO queue support

* Add sensible defaults for message attributes

* Asynchronous support, plus boto3 for region endpoint lookups

* Clean up imports

* Fix Python 2 support

* Fix receive_message tests

* Reformat docstring

* boto3 import changes for CI

* skip tests if boto3 not installed

* skip tests if boto3 not installed

* flake8

* noboto

* ditching boto2. got queue URL fetching, async HTTP request generation and signing working.

* request signing working kinda

* async parsing of SQS message response more or less working

* botocore sqs dep

* ripping out more old boto2 stuff

* removing tests that are no longer valid with boto3/SQS

* fix boto3 dep, min version and no botocore

* no boto2 for test

* cleaning up some SQS tests. fixing header parsing of response to msg

* fixing some sqs tests

* removing response-parsing tests that are no longer necessary as we're using the botocore response parsing machinery instead of implementing SAX parsing in kombu.

* fixing more SQS tests

* wants a region

* trying to fix py2 parsing of sqs message

* lint

* py2/py2 message header parsing stupidness

* forgot

* python 2 sux

* flake8

* Import boto3 from the right place

* Changes

* Update encode fuction

* Fix lint

* remove some unused things

* removing unused stuff

* ugh

* ugh

* ugh

* landscape ignoring

* shut up, landscape
  • Loading branch information
revmischa authored and auvipy committed Apr 14, 2017
1 parent bf820b2 commit 129a9e4
Show file tree
Hide file tree
Showing 22 changed files with 491 additions and 803 deletions.
2 changes: 1 addition & 1 deletion examples/hello_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import absolute_import, unicode_literals, print_function

from kombu import Connection
from kombu import Connection # noqa


with Connection('amqp://guest:guest@localhost:5672//') as conn:
Expand Down
229 changes: 98 additions & 131 deletions kombu/async/aws/connection.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
# -*- coding: utf-8 -*-
# * coding: utf8 *
"""Amazon AWS Connection."""
from __future__ import absolute_import, unicode_literals

from io import BytesIO

from vine import promise, transform

from kombu.async.aws.ext import AWSRequest, get_response

from kombu.async.http import Headers, Request, get_client
from kombu.five import items, python_2_unicode_compatible

from .ext import (
boto, AWSAuthConnection, AWSQueryConnection, XmlHandler, ResultSet,
)

try:
from urllib.parse import urlunsplit
except ImportError:
from urlparse import urlunsplit # noqa
from xml.sax import parseString as sax_parse # noqa
import io

try: # pragma: no cover
from email import message_from_file
from email import message_from_bytes
from email.mime.message import MIMEMessage

# py3
def message_from_headers(hdr): # noqa
bs = "\r\n".join("{}: {}".format(*h) for h in hdr)
return message_from_bytes(bs.encode())

except ImportError: # pragma: no cover
from mimetools import Message as MIMEMessage # noqa

def message_from_file(m): # noqa
return m
# py2
def message_from_headers(hdr): # noqa
return io.BytesIO(b'\r\n'.join(
b'{0}: {1}'.format(*h) for h in hdr
))

__all__ = [
'AsyncHTTPConnection', 'AsyncHTTPSConnection',
'AsyncHTTPResponse', 'AsyncConnection',
'AsyncAWSAuthConnection', 'AsyncAWSQueryConnection',
'AsyncHTTPSConnection', 'AsyncConnection',
]


Expand All @@ -56,11 +55,7 @@ def getheaders(self):
@property
def msg(self):
if self._msg is None:
self._msg = MIMEMessage(message_from_file(
BytesIO(b'\r\n'.join(
b'{0}: {1}'.format(*h) for h in self.getheaders())
)
))
self._msg = MIMEMessage(message_from_headers(self.getheaders()))
return self._msg

@property
Expand All @@ -78,7 +73,7 @@ def __repr__(self):


@python_2_unicode_compatible
class AsyncHTTPConnection(object):
class AsyncHTTPSConnection(object):
"""Async HTTP Connection."""

Request = Request
Expand All @@ -87,13 +82,9 @@ class AsyncHTTPConnection(object):
method = 'GET'
path = '/'
body = None
scheme = 'http'
default_ports = {'http': 80, 'https': 443}

def __init__(self, host, port=None,
strict=None, timeout=20.0, http_client=None, **kwargs):
self.host = host
self.port = port
def __init__(self, strict=None, timeout=20.0, http_client=None):
self.headers = []
self.timeout = timeout
self.strict = strict
Expand All @@ -112,14 +103,9 @@ def request(self, method, path, body=None, headers=None):
if headers is not None:
self.headers.extend(list(items(headers)))

def getrequest(self, scheme=None):
scheme = scheme if scheme else self.scheme
host = self.host
if self.port and self.port != self.default_ports[scheme]:
host = '{0}:{1}'.format(host, self.port)
url = urlunsplit((scheme, host, self.path, '', ''))
def getrequest(self):
headers = Headers(self.headers)
return self.Request(url, method=self.method, headers=headers,
return self.Request(self.path, method=self.method, headers=headers,
body=self.body, connect_timeout=self.timeout,
request_timeout=self.timeout, validate_cert=False)

Expand All @@ -137,7 +123,7 @@ def connect(self):
def close(self):
pass

def putrequest(self, method, path, **kwargs):
def putrequest(self, method, path):
self.method = method
self.path = path

Expand All @@ -157,139 +143,120 @@ def __repr__(self):
return '<AsyncHTTPConnection: {0!r}>'.format(self.getrequest())


class AsyncHTTPSConnection(AsyncHTTPConnection):
"""Async HTTPS Connection."""

scheme = 'https'


class AsyncConnection(object):
"""Async AWS Connection."""

def __init__(self, http_client=None, **kwargs):
if boto is None:
raise ImportError('boto is not installed')
def __init__(self, sqs_connection, http_client=None, **kwargs): # noqa
self.sqs_connection = sqs_connection
self._httpclient = http_client or get_client()

def get_http_connection(self, host, port, is_secure):
return (AsyncHTTPSConnection if is_secure else AsyncHTTPConnection)(
host, port, http_client=self._httpclient,
)
def get_http_connection(self):
return AsyncHTTPSConnection(http_client=self._httpclient)

def _mexe(self, request, sender=None, callback=None):
callback = callback or promise()
boto.log.debug(
'HTTP %s/%s headers=%s body=%s',
request.host, request.path,
request.headers, request.body,
)

conn = self.get_http_connection(
request.host, request.port, self.is_secure,
)
request.authorize(connection=self)
conn = self.get_http_connection()

if callable(sender):
sender(conn, request.method, request.path, request.body,
request.headers, callback)
else:
conn.request(request.method, request.path,
conn.request(request.method, request.url,
request.body, request.headers)
conn.getresponse(callback=callback)
return callback


class AsyncAWSAuthConnection(AsyncConnection, AWSAuthConnection):
"""Async AWS Authn Connection."""

def __init__(self, host,
http_client=None, http_client_params={}, **kwargs):
AsyncConnection.__init__(self, http_client, **http_client_params)
AWSAuthConnection.__init__(self, host, **kwargs)

def make_request(self, method, path, headers=None, data='', host=None,
auth_path=None, sender=None, callback=None, **kwargs):
req = self.build_base_http_request(
method, path, auth_path, {}, headers, data, host,
)
return self._mexe(req, sender=sender, callback=callback)


class AsyncAWSQueryConnection(AsyncConnection, AWSQueryConnection):
class AsyncAWSQueryConnection(AsyncConnection):
"""Async AWS Query Connection."""

def __init__(self, host,
http_client=None, http_client_params={}, **kwargs):
AsyncConnection.__init__(self, http_client, **http_client_params)
AWSAuthConnection.__init__(self, host, **kwargs)

def make_request(self, action, params, path, verb, callback=None):
request = self.build_base_http_request(
verb, path, None, params, {}, '', self.server_name())
if action:
request.params['Action'] = action
request.params['Version'] = self.APIVersion
return self._mexe(request, callback=callback)

def get_list(self, action, params, markers,
path='/', parent=None, verb='GET', callback=None):
def __init__(self, sqs_connection, http_client=None,
http_client_params=None, **kwargs):
if not http_client_params:
http_client_params = {}
AsyncConnection.__init__(self, sqs_connection, http_client,
**http_client_params)

def make_request(self, operation, params_, path, verb, callback=None): # noqa
params = params_.copy()
if operation:
params['Action'] = operation
signer = self.sqs_connection._request_signer # noqa

# defaults for non-get
signing_type = 'standard'
param_payload = {'data': params}
if verb.lower() == 'get':
# query-based opts
signing_type = 'presignurl'
param_payload = {'params': params}

request = AWSRequest(method=verb, url=path, **param_payload)
signer.sign(operation, request, signing_type=signing_type)
prepared_request = request.prepare()

# print(prepared_request.url)
# print(prepared_request.headers)
# print(prepared_request.body)
return self._mexe(prepared_request, callback=callback)

def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa
return self.make_request(
action, params, path, verb,
operation, params, path, verb,
callback=transform(
self._on_list_ready, callback, parent or self, markers,
operation
),
)

def get_object(self, action, params, cls,
path='/', parent=None, verb='GET', callback=None):
def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
return self.make_request(
action, params, path, verb,
operation, params, path, verb,
callback=transform(
self._on_obj_ready, callback, parent or self, cls,
self._on_obj_ready, callback, parent or self, operation
),
)

def get_status(self, action, params,
path='/', parent=None, verb='GET', callback=None):
def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
return self.make_request(
action, params, path, verb,
operation, params, path, verb,
callback=transform(
self._on_status_ready, callback, parent or self,
self._on_status_ready, callback, parent or self, operation
),
)

def _on_list_ready(self, parent, markers, response):
body = response.read()
if response.status == 200 and body:
rs = ResultSet(markers)
h = XmlHandler(rs, parent)
sax_parse(body, h)
return rs
def _on_list_ready(self, parent, markers, operation, response): # noqa
service_model = self.sqs_connection.meta.service_model
if response.status == 200:
_, parsed = get_response(
service_model.operation_model(operation), response.response
)
return parsed
else:
raise self._for_status(response, body)

def _on_obj_ready(self, parent, cls, response):
body = response.read()
if response.status == 200 and body:
obj = cls(parent)
h = XmlHandler(obj, parent)
sax_parse(body, h)
return obj
raise self._for_status(response, response.read())

def _on_obj_ready(self, parent, operation, response): # noqa
service_model = self.sqs_connection.meta.service_model
if response.status == 200:
_, parsed = get_response(
service_model.operation_model(operation), response.response
)
return parsed
else:
raise self._for_status(response, body)

def _on_status_ready(self, parent, response):
body = response.read()
if response.status == 200 and body:
rs = ResultSet()
h = XmlHandler(rs, parent)
sax_parse(body, h)
return rs.status
raise self._for_status(response, response.read())

def _on_status_ready(self, parent, operation, response): # noqa
service_model = self.sqs_connection.meta.service_model
if response.status == 200:
httpres, _ = get_response(
service_model.operation_model(operation), response.response
)
return httpres.code
else:
raise self._for_status(response, body)
raise self._for_status(response, response.read())

def _for_status(self, response, body):
context = 'Empty body' if not body else 'HTTP Error'
exc = self.ResponseError(response.status, response.reason, body)
boto.log.error('{0}: %r'.format(context), exc)
return exc
return Exception("Request {} HTTP {} {} ({})".format(
context, response.status, response.reason, body
))
31 changes: 14 additions & 17 deletions kombu/async/aws/ext.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
# -*- coding: utf-8 -*-
"""Amazon boto interface."""
"""Amazon boto3 interface."""
from __future__ import absolute_import, unicode_literals

try:
import boto
except ImportError: # pragma: no cover
boto = get_regions = ResultSet = RegionInfo = XmlHandler = None
import boto3
from botocore import exceptions
from botocore.awsrequest import AWSRequest
from botocore.response import get_response
except ImportError:
boto3 = None

class _void(object):
pass
AWSAuthConnection = AWSQueryConnection = _void # noqa

class BotoError(Exception):
class BotoCoreError(Exception):
pass
exception = _void()
exception.SQSError = BotoError
exception.SQSDecodeError = BotoError
else:
from boto import exception
from boto.connection import AWSAuthConnection, AWSQueryConnection
from boto.handler import XmlHandler
from boto.resultset import ResultSet
from boto.regioninfo import RegionInfo, get_regions
exceptions = _void()
exceptions.BotoCoreError = BotoCoreError
AWSRequest = _void()
get_response = _void()


__all__ = [
'exception', 'AWSAuthConnection', 'AWSQueryConnection',
'XmlHandler', 'ResultSet', 'RegionInfo', 'get_regions',
'exceptions', 'AWSRequest', 'get_response'
]
Loading

1 comment on commit 129a9e4

@revmischa
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

πŸ‘ πŸ’― πŸ¦„ 🌈 πŸ’― πŸ˜‚

Please sign in to comment.