diff --git a/kombu/transport/pubsub.py b/kombu/transport/pubsub.py index f2326b989..c0d1b6422 100644 --- a/kombu/transport/pubsub.py +++ b/kombu/transport/pubsub.py @@ -2,6 +2,7 @@ import os import sys +import time from dateutil import parser from threading import Thread @@ -15,13 +16,11 @@ from kombu.utils import cached_property, uuid from kombu.utils.compat import OrderedDict -try: - from google.cloud import pubsub_v1 - from google.cloud import tasks_v2 - from google.protobuf import timestamp_pb2 - from google.api_core.exceptions import AlreadyExists, DeadlineExceeded -except: - pubsub_v1 = None +import google.auth +from google.cloud import pubsub_v1 +from google.cloud import tasks_v2 +from google.protobuf import timestamp_pb2 +from google.api_core.exceptions import AlreadyExists, DeadlineExceeded logger = get_logger(__name__) @@ -36,8 +35,12 @@ def __init__(self, client, subscription_path, max_messages, queue): self.max_messages = max_messages self.start() + def callback(self, msg): + self.queue.put(msg, block=True) + def run(self): ''' run ''' + time.sleep(5) while True: logger.info("".join(["Pulling messsage using subscription ", self.subscription_path])) @@ -245,7 +248,7 @@ def basic_publish(self, message, exchange='', routing_key='', """ if loads(message['body'])['eta']: return self._create_cloud_task(exchange, message) - return self._publish(exchange, message, kwargs) + return self._publish(exchange, message, **kwargs) def _publish(self, topic, message, **kwargs): ''' publish the message ''' @@ -264,12 +267,15 @@ def _create_cloud_task(self, exchange, message): return self.cloud_task.create_task(self.cloud_task_queue_path, task) def _get_task(self, eta, exchange, message): - parsed_time = parser.parse(eta.strip()) + d = parser.parse(eta.strip()) ts = timestamp_pb2.Timestamp() - ts.FromDatetime(parsed_time) + ts.FromDatetime(d) return { "http_request": { "http_method": tasks_v2.enums.HttpMethod.POST, + "oidc_token": { + "service_account_email": self.service_account_email, + }, "headers": {"Content-type": "application/json"}, "url": self.transport_options.get("CLOUD_FUNCTION_PUBLISHER"), "body": dumps({ @@ -298,6 +304,11 @@ def cloud_task(self): """ Client connection for cloud task """ return tasks_v2.CloudTasksClient() + @cached_property + def service_account_email(self): + creds, _ = google.auth.default() + return creds.service_account_email + @cached_property def transport_options(self): """PubSub Transport sepcific configurations""" @@ -346,6 +357,4 @@ class Transport(virtual.Transport): driver_name = 'pubsub_v1' def __init__(self, *args, **kwargs): - if pubsub_v1 is None: - raise ImportError("The pubsub_v1 library is not installed") super(Transport, self).__init__(*args, **kwargs)