From 4e76d2f2c1fefbd5c8e9d11d4f24b2cf3797d325 Mon Sep 17 00:00:00 2001 From: Gagan Date: Tue, 27 Aug 2024 17:43:15 -0600 Subject: [PATCH 1/4] log statements in google dataproc --- airflow/providers/google/cloud/hooks/dataproc.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py index 2eb8d8952c56e..586645d30c451 100644 --- a/airflow/providers/google/cloud/hooks/dataproc.py +++ b/airflow/providers/google/cloud/hooks/dataproc.py @@ -1851,6 +1851,7 @@ async def create_batch( """ client = self.get_batch_client(region) parent = f"projects/{project_id}/regions/{region}" + self.log.debug(f"Creating batch {batch}") result = await client.create_batch( request={ @@ -1890,6 +1891,7 @@ async def delete_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" + self.log.debug(f"Deleting batch {batch_id}") await client.delete_batch( request={ @@ -1925,6 +1927,7 @@ async def get_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" + self.log.debug(f"Getting batch {batch_id}") result = await client.get_batch( request={ From ccd4348d5bcda235a8901027ca920f3d48525bf2 Mon Sep 17 00:00:00 2001 From: Gagan Date: Tue, 27 Aug 2024 17:48:53 -0600 Subject: [PATCH 2/4] add more log statements --- airflow/providers/google/cloud/hooks/dataproc.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py index 586645d30c451..84ef778954a67 100644 --- a/airflow/providers/google/cloud/hooks/dataproc.py +++ b/airflow/providers/google/cloud/hooks/dataproc.py @@ -995,6 +995,8 @@ def create_batch( """ client = self.get_batch_client(region) parent = f"projects/{project_id}/regions/{region}" + + self.log.debug(f"Creating batch: {batch}") result = client.create_batch( request={ @@ -1034,6 +1036,8 @@ def delete_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" + + self.log.debug(f"Deleting batch with batch id: {batch_id}") client.delete_batch( request={ @@ -1069,6 +1073,8 @@ def get_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" + + self.log.debug(f"Getting batch with batch id: {batch_id}") result = client.get_batch( request={ @@ -1851,7 +1857,7 @@ async def create_batch( """ client = self.get_batch_client(region) parent = f"projects/{project_id}/regions/{region}" - self.log.debug(f"Creating batch {batch}") + self.log.debug(f"Creating batch: {batch}") result = await client.create_batch( request={ @@ -1891,7 +1897,7 @@ async def delete_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" - self.log.debug(f"Deleting batch {batch_id}") + self.log.debug(f"Deleting batch with batch id: {batch_id}") await client.delete_batch( request={ @@ -1927,7 +1933,7 @@ async def get_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" - self.log.debug(f"Getting batch {batch_id}") + self.log.debug(f"Getting batch with batch id: {batch_id}") result = await client.get_batch( request={ From 411bfe7757858472e98662131e72ed9b077fb2d7 Mon Sep 17 00:00:00 2001 From: Gagan Date: Wed, 28 Aug 2024 11:44:28 -0600 Subject: [PATCH 3/4] log --- airflow/providers/google/cloud/hooks/dataproc.py | 10 ---------- airflow/providers/google/cloud/operators/dataproc.py | 1 + 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py index 84ef778954a67..03d2974acb5d2 100644 --- a/airflow/providers/google/cloud/hooks/dataproc.py +++ b/airflow/providers/google/cloud/hooks/dataproc.py @@ -995,8 +995,6 @@ def create_batch( """ client = self.get_batch_client(region) parent = f"projects/{project_id}/regions/{region}" - - self.log.debug(f"Creating batch: {batch}") result = client.create_batch( request={ @@ -1036,9 +1034,6 @@ def delete_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" - - self.log.debug(f"Deleting batch with batch id: {batch_id}") - client.delete_batch( request={ "name": name, @@ -1073,8 +1068,6 @@ def get_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" - - self.log.debug(f"Getting batch with batch id: {batch_id}") result = client.get_batch( request={ @@ -1857,7 +1850,6 @@ async def create_batch( """ client = self.get_batch_client(region) parent = f"projects/{project_id}/regions/{region}" - self.log.debug(f"Creating batch: {batch}") result = await client.create_batch( request={ @@ -1897,7 +1889,6 @@ async def delete_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" - self.log.debug(f"Deleting batch with batch id: {batch_id}") await client.delete_batch( request={ @@ -1933,7 +1924,6 @@ async def get_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" - self.log.debug(f"Getting batch with batch id: {batch_id}") result = await client.get_batch( request={ diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index 2384bfbd6251a..a3cec5a910abe 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -3035,6 +3035,7 @@ def execute(self, context: Context): self.log.info("Starting batch. The batch ID will be generated since it was not provided.") try: + self.log.info("Creating batch: %s", self.batch) self.operation = self.hook.create_batch( region=self.region, project_id=self.project_id, From 3d797f356408839334b406dc76c6d0594938b17f Mon Sep 17 00:00:00 2001 From: Gagan Date: Mon, 2 Sep 2024 09:28:53 -0600 Subject: [PATCH 4/4] revert --- airflow/providers/google/cloud/hooks/dataproc.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py index 03d2974acb5d2..2eb8d8952c56e 100644 --- a/airflow/providers/google/cloud/hooks/dataproc.py +++ b/airflow/providers/google/cloud/hooks/dataproc.py @@ -1034,6 +1034,7 @@ def delete_batch( """ client = self.get_batch_client(region) name = f"projects/{project_id}/locations/{region}/batches/{batch_id}" + client.delete_batch( request={ "name": name,