From a605c66180f162f360d1f34406e78ce674424ebd Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Sat, 1 Mar 2025 14:46:19 +0530 Subject: [PATCH 01/10] Add method to retrieve Druid task status URL based on ingestion type --- .../src/airflow/providers/apache/druid/hooks/druid.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py index c44a6be346e4a..ba399a4326507 100644 --- a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py +++ b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py @@ -118,6 +118,13 @@ def get_verify(self) -> bool | str: return self.verify_ssl + def get_status_url(self, ingestion_type): + + if ingestion_type == IngestionType.MSQ: + return f"{self.conn.schema}://{self.conn.host}:{self.conn.port}/druid/indexer/v1/task" + else: + return self.get_conn_url(ingestion_type) + def submit_indexing_job( self, json_index_spec: dict[str, Any] | str, ingestion_type: IngestionType = IngestionType.BATCH ) -> None: @@ -141,7 +148,7 @@ def submit_indexing_job( druid_task_id = req_json["task"] else: druid_task_id = req_json["taskId"] - druid_task_status_url = f"{self.get_conn_url()}/{druid_task_id}/status" + druid_task_status_url = self.get_status_url(ingestion_type) + f"/{druid_task_id}/status" self.log.info("Druid indexing task-id: %s", druid_task_id) running = True From 7fb4e15828c9f420570f11ed3ebc6324c79695f5 Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Sat, 1 Mar 2025 14:51:37 +0530 Subject: [PATCH 02/10] Add method to retrieve Druid task status URL based on ingestion type --- .../druid/src/airflow/providers/apache/druid/hooks/druid.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py index ba399a4326507..f757aec08d2d0 100644 --- a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py +++ b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py @@ -119,7 +119,6 @@ def get_verify(self) -> bool | str: return self.verify_ssl def get_status_url(self, ingestion_type): - if ingestion_type == IngestionType.MSQ: return f"{self.conn.schema}://{self.conn.host}:{self.conn.port}/druid/indexer/v1/task" else: From d4f0d51324be9d4bf9616f746ed102fb00c209a7 Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Sat, 1 Mar 2025 15:22:11 +0530 Subject: [PATCH 03/10] Add method to retrieve Druid task status URL based on ingestion type --- .../tests/unit/apache/druid/hooks/test_druid.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py b/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py index 5b350904e65f2..a77fa58782896 100644 --- a/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py +++ b/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py @@ -316,6 +316,18 @@ def test_get_conn_url_with_ingestion_type_and_schema(self, mock_get_connection): hook = DruidHook(timeout=1, max_ingestion_time=5) assert hook.get_conn_url(IngestionType.MSQ) == "https://test_host:1/sql_ingest" + @patch("airflow.providers.apache.druid.hooks.druid.DruidHook.get_status_url") + def test_get_status_url(self, mock_get_status_url): + get_status_url = MagicMock() + get_status_url.host = "test_host" + get_status_url.conn_type = "http" + get_status_url.schema = "https" + get_status_url.port = "1" + get_status_url.extra_dejson = {"endpoint": "ingest", "msq_endpoint": "sql_ingest"} + mock_get_status_url.return_value = get_status_url + hook = DruidHook(timeout=1, max_ingestion_time=5) + assert hook.get_status_url(IngestionType.MSQ) == "https://test_host:1/druid/indexer/v1/task" + @patch("airflow.providers.apache.druid.hooks.druid.DruidHook.get_connection") def test_get_auth(self, mock_get_connection): get_conn_value = MagicMock() From f11992d25c21005f591e7452c260e8a5cf125a9a Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Sat, 1 Mar 2025 15:45:07 +0530 Subject: [PATCH 04/10] Add method to retrieve Druid task status URL based on ingestion type --- .../providers/apache/druid/hooks/druid.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py index f757aec08d2d0..b6c625907abe4 100644 --- a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py +++ b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py @@ -83,20 +83,31 @@ def __init__( def conn(self) -> Connection: return self.get_connection(self.druid_ingest_conn_id) - def get_conn_url(self, ingestion_type: IngestionType = IngestionType.BATCH) -> str: - """Get Druid connection url.""" - host = self.conn.host - port = self.conn.port + @property + def get_connection_type(self) -> str: if self.conn.schema: conn_type = self.conn.schema else: conn_type = self.conn.conn_type or "http" + return conn_type + + def get_conn_url(self, ingestion_type: IngestionType = IngestionType.BATCH) -> str: + """Get Druid connection url.""" + host = self.conn.host + port = self.conn.port + conn_type = self.get_connection_type if ingestion_type == IngestionType.BATCH: endpoint = self.conn.extra_dejson.get("endpoint", "") else: endpoint = self.conn.extra_dejson.get("msq_endpoint", "") return f"{conn_type}://{host}:{port}/{endpoint}" + def get_status_url(self, ingestion_type): + if ingestion_type == IngestionType.MSQ: + return f"{self.get_connection_type}://{self.conn.host}:{self.conn.port}/druid/indexer/v1/task" + else: + return self.get_conn_url(ingestion_type) + def get_auth(self) -> requests.auth.HTTPBasicAuth | None: """ Return username and password from connections tab as requests.auth.HTTPBasicAuth object. @@ -118,12 +129,6 @@ def get_verify(self) -> bool | str: return self.verify_ssl - def get_status_url(self, ingestion_type): - if ingestion_type == IngestionType.MSQ: - return f"{self.conn.schema}://{self.conn.host}:{self.conn.port}/druid/indexer/v1/task" - else: - return self.get_conn_url(ingestion_type) - def submit_indexing_job( self, json_index_spec: dict[str, Any] | str, ingestion_type: IngestionType = IngestionType.BATCH ) -> None: From 49c78d30bee0b516b53dba094a498414bf4140f0 Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Sat, 1 Mar 2025 16:59:19 +0530 Subject: [PATCH 05/10] Add method to retrieve Druid task status URL based on ingestion type --- .../druid/src/airflow/providers/apache/druid/hooks/druid.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py index b6c625907abe4..c375d989f15eb 100644 --- a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py +++ b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py @@ -103,8 +103,9 @@ def get_conn_url(self, ingestion_type: IngestionType = IngestionType.BATCH) -> s return f"{conn_type}://{host}:{port}/{endpoint}" def get_status_url(self, ingestion_type): - if ingestion_type == IngestionType.MSQ: - return f"{self.get_connection_type}://{self.conn.host}:{self.conn.port}/druid/indexer/v1/task" + if ingestion_type == IngestionType.MSQ and self.get_connection_type in {"http", "https"}: + status_link_suffix = self.conn.extra_dejson.get("status_endpoint", "druid/indexer/v1/task") + return f"{self.get_connection_type}://{self.conn.host}:{self.conn.port}/{status_link_suffix}" else: return self.get_conn_url(ingestion_type) From 1df09a163c54b1779330a8b439b7d447771bdbb7 Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Sat, 1 Mar 2025 19:08:44 +0530 Subject: [PATCH 06/10] Add method to retrieve Druid task status URL based on ingestion type --- .../src/airflow/providers/apache/druid/hooks/druid.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py index c375d989f15eb..b5bc680ec418d 100644 --- a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py +++ b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py @@ -103,9 +103,14 @@ def get_conn_url(self, ingestion_type: IngestionType = IngestionType.BATCH) -> s return f"{conn_type}://{host}:{port}/{endpoint}" def get_status_url(self, ingestion_type): - if ingestion_type == IngestionType.MSQ and self.get_connection_type in {"http", "https"}: + if ingestion_type == IngestionType.MSQ: + if self.get_connection_type == "druid": + conn_type = self.conn.extra_dejson.get("schema", "http") + else: + conn_type = self.get_connection_type + status_link_suffix = self.conn.extra_dejson.get("status_endpoint", "druid/indexer/v1/task") - return f"{self.get_connection_type}://{self.conn.host}:{self.conn.port}/{status_link_suffix}" + return f"{conn_type}://{self.conn.host}:{self.conn.port}/{status_link_suffix}" else: return self.get_conn_url(ingestion_type) From cba6ae5da3d4b0c1363b22b795806df3478c9396 Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Sat, 1 Mar 2025 20:58:05 +0530 Subject: [PATCH 07/10] Add method to retrieve Druid task status URL based on ingestion type --- .../unit/apache/druid/hooks/test_druid.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py b/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py index a77fa58782896..478509480ea9e 100644 --- a/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py +++ b/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py @@ -316,15 +316,15 @@ def test_get_conn_url_with_ingestion_type_and_schema(self, mock_get_connection): hook = DruidHook(timeout=1, max_ingestion_time=5) assert hook.get_conn_url(IngestionType.MSQ) == "https://test_host:1/sql_ingest" - @patch("airflow.providers.apache.druid.hooks.druid.DruidHook.get_status_url") - def test_get_status_url(self, mock_get_status_url): - get_status_url = MagicMock() - get_status_url.host = "test_host" - get_status_url.conn_type = "http" - get_status_url.schema = "https" - get_status_url.port = "1" - get_status_url.extra_dejson = {"endpoint": "ingest", "msq_endpoint": "sql_ingest"} - mock_get_status_url.return_value = get_status_url + @patch("airflow.providers.apache.druid.hooks.druid.DruidHook.get_connection") + def test_get_status_url(self, mock_get_connection): + get_conn_value = MagicMock() + get_conn_value.host = "test_host" + get_conn_value.conn_type = "http" + get_conn_value.schema = "https" + get_conn_value.port = "1" + get_conn_value.extra_dejson = {"endpoint": "ingest", "msq_endpoint": "sql_ingest"} + mock_get_connection.return_value = get_conn_value hook = DruidHook(timeout=1, max_ingestion_time=5) assert hook.get_status_url(IngestionType.MSQ) == "https://test_host:1/druid/indexer/v1/task" From 849b804109b1b09e2a02fe9a5bcd898d249f38e7 Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Sat, 1 Mar 2025 22:20:28 +0530 Subject: [PATCH 08/10] Add method to retrieve Druid task status URL based on ingestion type --- .../druid/src/airflow/providers/apache/druid/hooks/druid.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py index b5bc680ec418d..1dd0e5211ea07 100644 --- a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py +++ b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py @@ -79,6 +79,7 @@ def __init__( if self.timeout < 1: raise ValueError("Druid timeout should be equal or greater than 1") + self.default_status_url_suffix = "druid/indexer/v1/task" @cached_property def conn(self) -> Connection: return self.get_connection(self.druid_ingest_conn_id) @@ -109,8 +110,8 @@ def get_status_url(self, ingestion_type): else: conn_type = self.get_connection_type - status_link_suffix = self.conn.extra_dejson.get("status_endpoint", "druid/indexer/v1/task") - return f"{conn_type}://{self.conn.host}:{self.conn.port}/{status_link_suffix}" + status_url_suffix = self.conn.extra_dejson.get("status_endpoint", self.default_status_url_suffix) + return f"{conn_type}://{self.conn.host}:{self.conn.port}/{status_url_suffix}" else: return self.get_conn_url(ingestion_type) From 246c41c4aa1530818ebff404c404f5a95bfe9eaa Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Sun, 2 Mar 2025 09:01:43 +0530 Subject: [PATCH 09/10] Add method to retrieve Druid task status URL based on ingestion type --- .../druid/src/airflow/providers/apache/druid/hooks/druid.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py index 1dd0e5211ea07..2adb0d9fb2b00 100644 --- a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py +++ b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py @@ -104,6 +104,9 @@ def get_conn_url(self, ingestion_type: IngestionType = IngestionType.BATCH) -> s return f"{conn_type}://{host}:{port}/{endpoint}" def get_status_url(self, ingestion_type): + """ + Return Druid status url. + """ if ingestion_type == IngestionType.MSQ: if self.get_connection_type == "druid": conn_type = self.conn.extra_dejson.get("schema", "http") From a9843629fe5cc62d39c6685ea38ec9c08c4251bc Mon Sep 17 00:00:00 2001 From: k0d04mr Date: Mon, 3 Mar 2025 08:48:24 +0530 Subject: [PATCH 10/10] Add method to retrieve Druid task status URL based on ingestion type --- .../src/airflow/providers/apache/druid/hooks/druid.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py index 2adb0d9fb2b00..14899c94ed7a4 100644 --- a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py +++ b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py @@ -79,7 +79,8 @@ def __init__( if self.timeout < 1: raise ValueError("Druid timeout should be equal or greater than 1") - self.default_status_url_suffix = "druid/indexer/v1/task" + self.status_endpoint = "druid/indexer/v1/task" + @cached_property def conn(self) -> Connection: return self.get_connection(self.druid_ingest_conn_id) @@ -104,17 +105,15 @@ def get_conn_url(self, ingestion_type: IngestionType = IngestionType.BATCH) -> s return f"{conn_type}://{host}:{port}/{endpoint}" def get_status_url(self, ingestion_type): - """ - Return Druid status url. - """ + """Return Druid status url.""" if ingestion_type == IngestionType.MSQ: if self.get_connection_type == "druid": conn_type = self.conn.extra_dejson.get("schema", "http") else: conn_type = self.get_connection_type - status_url_suffix = self.conn.extra_dejson.get("status_endpoint", self.default_status_url_suffix) - return f"{conn_type}://{self.conn.host}:{self.conn.port}/{status_url_suffix}" + status_endpoint = self.conn.extra_dejson.get("status_endpoint", self.status_endpoint) + return f"{conn_type}://{self.conn.host}:{self.conn.port}/{status_endpoint}" else: return self.get_conn_url(ingestion_type)