Skip to content

Commit

Permalink
Merge pull request #2262 from tseaver/2243-pubsub-gax-mediate-publish…
Browse files Browse the repository at this point in the history
…_timestamp

Map pb 'publish_time' -> JSON 'publishTimestamp' for received messges.
  • Loading branch information
tseaver authored Sep 7, 2016
2 parents 570cf29 + 250c397 commit d093302
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
2 changes: 2 additions & 0 deletions google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# pylint: disable=ungrouped-imports
from google.cloud._helpers import _to_bytes
from google.cloud._helpers import exc_to_code
from google.cloud._helpers import _pb_timestamp_to_rfc3339
from google.cloud.exceptions import Conflict
from google.cloud.exceptions import NotFound
# pylint: enable=ungrouped-imports
Expand Down Expand Up @@ -482,6 +483,7 @@ def _message_pb_to_mapping(message_pb):
'messageId': message_pb.message_id,
'data': message_pb.data,
'attributes': message_pb.attributes,
'publishTime': _pb_timestamp_to_rfc3339(message_pb.publish_time),
}


Expand Down
19 changes: 16 additions & 3 deletions unit_tests/pubsub/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,13 +585,25 @@ def test_subscription_modify_push_config_error(self):

def test_subscription_pull_explicit(self):
import base64
import datetime
from google.cloud._helpers import UTC
from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud._helpers import _datetime_to_rfc3339
NOW = datetime.datetime.utcnow().replace(tzinfo=UTC)
NOW_PB = _datetime_to_pb_timestamp(NOW)
NOW_RFC3339 = _datetime_to_rfc3339(NOW)
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD).decode('ascii')
ACK_ID = 'DEADBEEF'
MSG_ID = 'BEADCAFE'
MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}}
MESSAGE = {
'messageId': MSG_ID,
'data': B64,
'attributes': {'a': 'b'},
'publishTime': NOW_RFC3339,
}
RECEIVED = [{'ackId': ACK_ID, 'message': MESSAGE}]
message_pb = _PubsubMessagePB(MSG_ID, B64, {'a': 'b'})
message_pb = _PubsubMessagePB(MSG_ID, B64, {'a': 'b'}, NOW_PB)
response_pb = _PullResponsePB([_ReceivedMessagePB(ACK_ID, message_pb)])
gax_api = _GAXSubscriberAPI(_pull_response=response_pb)
api = self._makeOne(gax_api)
Expand Down Expand Up @@ -891,10 +903,11 @@ def __init__(self, push_endpoint):

class _PubsubMessagePB(object):

def __init__(self, message_id, data, attributes):
def __init__(self, message_id, data, attributes, publish_time):
self.message_id = message_id
self.data = data
self.attributes = attributes
self.publish_time = publish_time


class _ReceivedMessagePB(object):
Expand Down

0 comments on commit d093302

Please sign in to comment.