diff --git a/providers/google/README.rst b/providers/google/README.rst index 366caa9c10e2f..164560527cef7 100644 --- a/providers/google/README.rst +++ b/providers/google/README.rst @@ -101,7 +101,7 @@ PIP package Version required ``google-cloud-monitoring`` ``>=2.18.0`` ``google-cloud-orchestration-airflow`` ``>=1.10.0`` ``google-cloud-os-login`` ``>=2.9.1`` -``google-cloud-pubsub`` ``>=2.21.3`` +``google-cloud-pubsub`` ``>=2.24.0`` ``google-cloud-redis`` ``>=2.12.0`` ``google-cloud-secret-manager`` ``>=2.16.0`` ``google-cloud-spanner`` ``>=3.50.0`` diff --git a/providers/google/docs/index.rst b/providers/google/docs/index.rst index 85857cdb6976b..241bbeae21270 100644 --- a/providers/google/docs/index.rst +++ b/providers/google/docs/index.rst @@ -152,7 +152,7 @@ PIP package Version required ``google-cloud-monitoring`` ``>=2.18.0`` ``google-cloud-orchestration-airflow`` ``>=1.10.0`` ``google-cloud-os-login`` ``>=2.9.1`` -``google-cloud-pubsub`` ``>=2.21.3`` +``google-cloud-pubsub`` ``>=2.24.0`` ``google-cloud-redis`` ``>=2.12.0`` ``google-cloud-secret-manager`` ``>=2.16.0`` ``google-cloud-spanner`` ``>=3.50.0`` diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml index 7fe5536d162b8..54616cb84280a 100644 --- a/providers/google/pyproject.toml +++ b/providers/google/pyproject.toml @@ -105,7 +105,7 @@ dependencies = [ "google-cloud-monitoring>=2.18.0", "google-cloud-orchestration-airflow>=1.10.0", "google-cloud-os-login>=2.9.1", - "google-cloud-pubsub>=2.21.3", + "google-cloud-pubsub>=2.24.0", "google-cloud-redis>=2.12.0", "google-cloud-secret-manager>=2.16.0", "google-cloud-spanner>=3.50.0", diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py b/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py index 58237c5c2ae1e..b4da615f3894e 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py @@ -82,6 +82,7 @@ def __init__( gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, enable_message_ordering: bool = False, + enable_open_telemetry_tracing: bool = False, **kwargs, ) -> None: super().__init__( @@ -90,6 +91,7 @@ def __init__( **kwargs, ) self.enable_message_ordering = enable_message_ordering + self.enable_open_telemetry_tracing = enable_open_telemetry_tracing self._client = None def get_conn(self) -> PublisherClient: @@ -104,6 +106,7 @@ def get_conn(self) -> PublisherClient: client_info=CLIENT_INFO, publisher_options=PublisherOptions( enable_message_ordering=self.enable_message_ordering, + enable_open_telemetry_tracing=self.enable_open_telemetry_tracing, ), ) return self._client diff --git a/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py b/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py index 250bcfde4ef13..d02ac90171d63 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py @@ -680,6 +680,9 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator): ordering_key in PubsubMessage will be delivered to the subscribers in the order in which they are received by the Pub/Sub system. Otherwise, they may be delivered in any order. Default is False. + :param enable_open_telemetry_tracing: If true, enables OpenTelemetry tracing for + published messages. This allows distributed tracing of messages as they flow + through Pub/Sub topics. Default is False. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. @@ -695,6 +698,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator): "topic", "messages", "enable_message_ordering", + "enable_open_telemetry_tracing", "impersonation_chain", ) ui_color = "#0273d4" @@ -707,6 +711,7 @@ def __init__( project_id: str = PROVIDE_PROJECT_ID, gcp_conn_id: str = "google_cloud_default", enable_message_ordering: bool = False, + enable_open_telemetry_tracing: bool = False, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: @@ -716,6 +721,7 @@ def __init__( self.messages = messages self.gcp_conn_id = gcp_conn_id self.enable_message_ordering = enable_message_ordering + self.enable_open_telemetry_tracing = enable_open_telemetry_tracing self.impersonation_chain = impersonation_chain @cached_property @@ -724,6 +730,7 @@ def pubsub_hook(self): gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, enable_message_ordering=self.enable_message_ordering, + enable_open_telemetry_tracing=self.enable_open_telemetry_tracing, ) def execute(self, context: Context) -> None: diff --git a/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py b/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py index fbf10fe6ae878..1375b59207337 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py @@ -86,11 +86,59 @@ def test_publisher_client_creation(self, mock_client, mock_get_creds): mock_client.assert_called_once_with( credentials=mock_get_creds.return_value, client_info=CLIENT_INFO, - publisher_options=PublisherOptions(enable_message_ordering=False), + publisher_options=PublisherOptions( + enable_message_ordering=False, + enable_open_telemetry_tracing=False, + ), ) assert mock_client.return_value == result assert self.pubsub_hook._client == result + @mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials") + @mock.patch("airflow.providers.google.cloud.hooks.pubsub.PublisherClient") + def test_publisher_client_creation_with_open_telemetry_tracing(self, mock_client, mock_get_creds): + with mock.patch(BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_init): + pubsub_hook_with_tracing = PubSubHook( + gcp_conn_id="test", + enable_open_telemetry_tracing=True, + ) + assert pubsub_hook_with_tracing._client is None + result = pubsub_hook_with_tracing.get_conn() + + mock_client.assert_called_once_with( + credentials=mock_get_creds.return_value, + client_info=CLIENT_INFO, + publisher_options=PublisherOptions( + enable_message_ordering=False, + enable_open_telemetry_tracing=True, + ), + ) + assert mock_client.return_value == result + assert pubsub_hook_with_tracing._client == result + + @mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials") + @mock.patch("airflow.providers.google.cloud.hooks.pubsub.PublisherClient") + def test_publisher_client_creation_with_message_ordering_and_tracing(self, mock_client, mock_get_creds): + with mock.patch(BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_init): + pubsub_hook_with_both = PubSubHook( + gcp_conn_id="test", + enable_message_ordering=True, + enable_open_telemetry_tracing=True, + ) + assert pubsub_hook_with_both._client is None + result = pubsub_hook_with_both.get_conn() + + mock_client.assert_called_once_with( + credentials=mock_get_creds.return_value, + client_info=CLIENT_INFO, + publisher_options=PublisherOptions( + enable_message_ordering=True, + enable_open_telemetry_tracing=True, + ), + ) + assert mock_client.return_value == result + assert pubsub_hook_with_both._client == result + @mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials") @mock.patch("airflow.providers.google.cloud.hooks.pubsub.SubscriberClient") def test_subscriber_client_creation(self, mock_client, mock_get_creds): diff --git a/providers/google/tests/unit/google/cloud/operators/test_pubsub.py b/providers/google/tests/unit/google/cloud/operators/test_pubsub.py index 289001e3bc881..c335c3baa73a8 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_pubsub.py +++ b/providers/google/tests/unit/google/cloud/operators/test_pubsub.py @@ -352,6 +352,49 @@ def test_publish_with_ordering_key(self, mock_hook): project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES_ORDERING_KEY ) + @mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook") + def test_publish_with_open_telemetry_tracing(self, mock_hook): + operator = PubSubPublishMessageOperator( + task_id=TASK_ID, + project_id=TEST_PROJECT, + topic=TEST_TOPIC, + messages=TEST_MESSAGES, + enable_open_telemetry_tracing=True, + ) + + operator.execute(None) + mock_hook.assert_called_once_with( + gcp_conn_id="google_cloud_default", + impersonation_chain=None, + enable_message_ordering=False, + enable_open_telemetry_tracing=True, + ) + mock_hook.return_value.publish.assert_called_once_with( + project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES + ) + + @mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook") + def test_publish_with_ordering_and_tracing(self, mock_hook): + operator = PubSubPublishMessageOperator( + task_id=TASK_ID, + project_id=TEST_PROJECT, + topic=TEST_TOPIC, + messages=TEST_MESSAGES_ORDERING_KEY, + enable_message_ordering=True, + enable_open_telemetry_tracing=True, + ) + + operator.execute(None) + mock_hook.assert_called_once_with( + gcp_conn_id="google_cloud_default", + impersonation_chain=None, + enable_message_ordering=True, + enable_open_telemetry_tracing=True, + ) + mock_hook.return_value.publish.assert_called_once_with( + project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES_ORDERING_KEY + ) + @pytest.mark.parametrize( ("project_id", "expected_dataset"), [