Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/google/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ PIP package Version required
``google-cloud-monitoring`` ``>=2.18.0``
``google-cloud-orchestration-airflow`` ``>=1.10.0``
``google-cloud-os-login`` ``>=2.9.1``
``google-cloud-pubsub`` ``>=2.21.3``
``google-cloud-pubsub`` ``>=2.24.0``
``google-cloud-redis`` ``>=2.12.0``
``google-cloud-secret-manager`` ``>=2.16.0``
``google-cloud-spanner`` ``>=3.50.0``
Expand Down
2 changes: 1 addition & 1 deletion providers/google/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ PIP package Version required
``google-cloud-monitoring`` ``>=2.18.0``
``google-cloud-orchestration-airflow`` ``>=1.10.0``
``google-cloud-os-login`` ``>=2.9.1``
``google-cloud-pubsub`` ``>=2.21.3``
``google-cloud-pubsub`` ``>=2.24.0``
``google-cloud-redis`` ``>=2.12.0``
``google-cloud-secret-manager`` ``>=2.16.0``
``google-cloud-spanner`` ``>=3.50.0``
Expand Down
2 changes: 1 addition & 1 deletion providers/google/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ dependencies = [
"google-cloud-monitoring>=2.18.0",
"google-cloud-orchestration-airflow>=1.10.0",
"google-cloud-os-login>=2.9.1",
"google-cloud-pubsub>=2.21.3",
"google-cloud-pubsub>=2.24.0",
"google-cloud-redis>=2.12.0",
"google-cloud-secret-manager>=2.16.0",
"google-cloud-spanner>=3.50.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
enable_message_ordering: bool = False,
enable_open_telemetry_tracing: bool = False,
**kwargs,
) -> None:
super().__init__(
Expand All @@ -90,6 +91,7 @@ def __init__(
**kwargs,
)
self.enable_message_ordering = enable_message_ordering
self.enable_open_telemetry_tracing = enable_open_telemetry_tracing
self._client = None

def get_conn(self) -> PublisherClient:
Expand All @@ -104,6 +106,7 @@ def get_conn(self) -> PublisherClient:
client_info=CLIENT_INFO,
publisher_options=PublisherOptions(
enable_message_ordering=self.enable_message_ordering,
enable_open_telemetry_tracing=self.enable_open_telemetry_tracing,
),
)
return self._client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,9 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
ordering_key in PubsubMessage will be delivered to the subscribers in the order
in which they are received by the Pub/Sub system. Otherwise, they may be
delivered in any order. Default is False.
:param enable_open_telemetry_tracing: If true, enables OpenTelemetry tracing for
published messages. This allows distributed tracing of messages as they flow
through Pub/Sub topics. Default is False.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
Expand All @@ -695,6 +698,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
"topic",
"messages",
"enable_message_ordering",
"enable_open_telemetry_tracing",
"impersonation_chain",
)
ui_color = "#0273d4"
Expand All @@ -707,6 +711,7 @@ def __init__(
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
enable_message_ordering: bool = False,
enable_open_telemetry_tracing: bool = False,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
Expand All @@ -716,6 +721,7 @@ def __init__(
self.messages = messages
self.gcp_conn_id = gcp_conn_id
self.enable_message_ordering = enable_message_ordering
self.enable_open_telemetry_tracing = enable_open_telemetry_tracing
self.impersonation_chain = impersonation_chain

@cached_property
Expand All @@ -724,6 +730,7 @@ def pubsub_hook(self):
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
enable_message_ordering=self.enable_message_ordering,
enable_open_telemetry_tracing=self.enable_open_telemetry_tracing,
)

def execute(self, context: Context) -> None:
Expand Down
50 changes: 49 additions & 1 deletion providers/google/tests/unit/google/cloud/hooks/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,59 @@ def test_publisher_client_creation(self, mock_client, mock_get_creds):
mock_client.assert_called_once_with(
credentials=mock_get_creds.return_value,
client_info=CLIENT_INFO,
publisher_options=PublisherOptions(enable_message_ordering=False),
publisher_options=PublisherOptions(
enable_message_ordering=False,
enable_open_telemetry_tracing=False,
),
)
assert mock_client.return_value == result
assert self.pubsub_hook._client == result

@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials")
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PublisherClient")
def test_publisher_client_creation_with_open_telemetry_tracing(self, mock_client, mock_get_creds):
with mock.patch(BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_init):
pubsub_hook_with_tracing = PubSubHook(
gcp_conn_id="test",
enable_open_telemetry_tracing=True,
)
assert pubsub_hook_with_tracing._client is None
result = pubsub_hook_with_tracing.get_conn()

mock_client.assert_called_once_with(
credentials=mock_get_creds.return_value,
client_info=CLIENT_INFO,
publisher_options=PublisherOptions(
enable_message_ordering=False,
enable_open_telemetry_tracing=True,
),
)
assert mock_client.return_value == result
assert pubsub_hook_with_tracing._client == result

@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials")
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PublisherClient")
def test_publisher_client_creation_with_message_ordering_and_tracing(self, mock_client, mock_get_creds):
with mock.patch(BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_init):
pubsub_hook_with_both = PubSubHook(
gcp_conn_id="test",
enable_message_ordering=True,
enable_open_telemetry_tracing=True,
)
assert pubsub_hook_with_both._client is None
result = pubsub_hook_with_both.get_conn()

mock_client.assert_called_once_with(
credentials=mock_get_creds.return_value,
client_info=CLIENT_INFO,
publisher_options=PublisherOptions(
enable_message_ordering=True,
enable_open_telemetry_tracing=True,
),
)
assert mock_client.return_value == result
assert pubsub_hook_with_both._client == result

@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials")
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.SubscriberClient")
def test_subscriber_client_creation(self, mock_client, mock_get_creds):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,49 @@ def test_publish_with_ordering_key(self, mock_hook):
project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES_ORDERING_KEY
)

@mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook")
def test_publish_with_open_telemetry_tracing(self, mock_hook):
operator = PubSubPublishMessageOperator(
task_id=TASK_ID,
project_id=TEST_PROJECT,
topic=TEST_TOPIC,
messages=TEST_MESSAGES,
enable_open_telemetry_tracing=True,
)

operator.execute(None)
mock_hook.assert_called_once_with(
gcp_conn_id="google_cloud_default",
impersonation_chain=None,
enable_message_ordering=False,
enable_open_telemetry_tracing=True,
)
mock_hook.return_value.publish.assert_called_once_with(
project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES
)

@mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook")
def test_publish_with_ordering_and_tracing(self, mock_hook):
operator = PubSubPublishMessageOperator(
task_id=TASK_ID,
project_id=TEST_PROJECT,
topic=TEST_TOPIC,
messages=TEST_MESSAGES_ORDERING_KEY,
enable_message_ordering=True,
enable_open_telemetry_tracing=True,
)

operator.execute(None)
mock_hook.assert_called_once_with(
gcp_conn_id="google_cloud_default",
impersonation_chain=None,
enable_message_ordering=True,
enable_open_telemetry_tracing=True,
)
mock_hook.return_value.publish.assert_called_once_with(
project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES_ORDERING_KEY
)

@pytest.mark.parametrize(
("project_id", "expected_dataset"),
[
Expand Down