Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix JSONDecodeError in Datafusion operators #26202

Merged
merged 4 commits into from
Sep 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions airflow/providers/google/cloud/hooks/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ def _cdap_request(
response = request(method=method, url=url, headers=headers, body=payload)
return response

@staticmethod
def _check_response_status_and_data(response, message: str) -> None:
if response.status != 200:
raise AirflowException(message)
if response.data is None:
raise AirflowException(
"Empty response received. Please, check for possible root "
"causes of this behavior either in DAG code or on Cloud Datafusion side"
)

def get_conn(self) -> Resource:
"""Retrieves connection to DataFusion."""
if not self._conn:
Expand Down Expand Up @@ -310,10 +320,9 @@ def create_pipeline(
"""
url = os.path.join(self._base_url(instance_url, namespace), quote(pipeline_name))
response = self._cdap_request(url=url, method="PUT", body=pipeline)
if response.status != 200:
raise AirflowException(
f"Creating a pipeline failed with code {response.status} while calling {url}"
)
self._check_response_status_and_data(
response, f"Creating a pipeline failed with code {response.status} while calling {url}"
)

def delete_pipeline(
self,
Expand All @@ -337,8 +346,9 @@ def delete_pipeline(
url = os.path.join(url, "versions", version_id)

response = self._cdap_request(url=url, method="DELETE", body=None)
if response.status != 200:
raise AirflowException(f"Deleting a pipeline failed with code {response.status}")
self._check_response_status_and_data(
response, f"Deleting a pipeline failed with code {response.status}"
)

def list_pipelines(
self,
Expand Down Expand Up @@ -367,8 +377,9 @@ def list_pipelines(
url = os.path.join(url, urlencode(query))

response = self._cdap_request(url=url, method="GET", body=None)
if response.status != 200:
raise AirflowException(f"Listing pipelines failed with code {response.status}")
self._check_response_status_and_data(
response, f"Listing pipelines failed with code {response.status}"
)
return json.loads(response.data)

def get_pipeline_workflow(
Expand All @@ -387,8 +398,9 @@ def get_pipeline_workflow(
quote(pipeline_id),
)
response = self._cdap_request(url=url, method="GET")
if response.status != 200:
raise AirflowException(f"Retrieving a pipeline state failed with code {response.status}")
self._check_response_status_and_data(
response, f"Retrieving a pipeline state failed with code {response.status}"
)
workflow = json.loads(response.data)
return workflow

Expand Down Expand Up @@ -429,9 +441,9 @@ def start_pipeline(
}
]
response = self._cdap_request(url=url, method="POST", body=body)
if response.status != 200:
raise AirflowException(f"Starting a pipeline failed with code {response.status}")

self._check_response_status_and_data(
response, f"Starting a pipeline failed with code {response.status}"
)
response_json = json.loads(response.data)
return response_json[0]["runId"]

Expand All @@ -453,5 +465,6 @@ def stop_pipeline(self, pipeline_name: str, instance_url: str, namespace: str =
"stop",
)
response = self._cdap_request(url=url, method="POST")
if response.status != 200:
raise AirflowException(f"Stopping a pipeline failed with code {response.status}")
self._check_response_status_and_data(
response, f"Stopping a pipeline failed with code {response.status}"
)
193 changes: 193 additions & 0 deletions tests/providers/google/cloud/hooks/test_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import pytest

from airflow import AirflowException
from airflow.providers.google.cloud.hooks.datafusion import DataFusionHook
from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id

Expand All @@ -31,6 +32,7 @@
INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
PROJECT_ID = "test_project_id"
PIPELINE_NAME = "shrubberyPipeline"
PIPELINE_ID = "123"
PIPELINE = {"test": "pipeline"}
INSTANCE_URL = "http://datafusion.instance.com"
RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}
Expand Down Expand Up @@ -158,6 +160,33 @@ def test_create_pipeline(self, mock_request, hook):
body=PIPELINE,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_create_pipeline_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.create_pipeline(pipeline_name=PIPELINE_NAME, pipeline=PIPELINE, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
method="PUT",
body=PIPELINE,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_create_pipeline_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Creating a pipeline failed with code 404"):
hook.create_pipeline(pipeline_name=PIPELINE_NAME, pipeline=PIPELINE, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
method="PUT",
body=PIPELINE,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_delete_pipeline(self, mock_request, hook):
mock_request.return_value.status = 200
Expand All @@ -168,6 +197,33 @@ def test_delete_pipeline(self, mock_request, hook):
body=None,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_delete_pipeline_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.delete_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
method="DELETE",
body=None,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_delete_pipeline_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Deleting a pipeline failed with code 404"):
hook.delete_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
method="DELETE",
body=None,
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_list_pipelines(self, mock_request, hook):
data = {"data": "test"}
Expand All @@ -179,6 +235,29 @@ def test_list_pipelines(self, mock_request, hook):
)
assert result == data

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_list_pipelines_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.list_pipelines(instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps", method="GET", body=None
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_list_pipelines_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Listing pipelines failed with code 404"):
hook.list_pipelines(instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps", method="GET", body=None
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_start_pipeline(self, mock_request, hook):
run_id = 1234
Expand All @@ -197,6 +276,49 @@ def test_start_pipeline(self, mock_request, hook):
url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", body=body
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_start_pipeline_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.start_pipeline(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, runtime_args=RUNTIME_ARGS
)
body = [
{
"appId": PIPELINE_NAME,
"programType": "workflow",
"programId": "DataPipelineWorkflow",
"runtimeargs": RUNTIME_ARGS,
}
]
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", body=body
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_start_pipeline_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Starting a pipeline failed with code 404"):
hook.start_pipeline(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, runtime_args=RUNTIME_ARGS
)
body = [
{
"appId": PIPELINE_NAME,
"programType": "workflow",
"programId": "DataPipelineWorkflow",
"runtimeargs": RUNTIME_ARGS,
}
]
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", body=body
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline(self, mock_request, hook):
mock_request.return_value.status = 200
Expand All @@ -206,3 +328,74 @@ def test_stop_pipeline(self, mock_request, hook):
f"workflows/DataPipelineWorkflow/stop",
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.stop_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/stop",
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Stopping a pipeline failed with code 404"):
hook.stop_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/stop",
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_get_pipeline_workflow(self, mock_request, hook):
run_id = 1234
mock_request.return_value = mock.MagicMock(status=200, data=f'[{{"runId":{run_id}}}]')
hook.get_pipeline_workflow(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, pipeline_id=PIPELINE_ID
)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/runs/{PIPELINE_ID}",
method="GET",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_get_pipeline_workflow_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
AirflowException,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud Datafusion side",
):
hook.get_pipeline_workflow(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, pipeline_id=PIPELINE_ID
)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/runs/{PIPELINE_ID}",
method="GET",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_get_pipeline_workflow_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
with pytest.raises(AirflowException, match=r"Retrieving a pipeline state failed with code 404"):
hook.get_pipeline_workflow(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, pipeline_id=PIPELINE_ID
)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/runs/{PIPELINE_ID}",
method="GET",
)