Skip to content

Commit

Permalink
fix: Add new normalization config check
Browse files Browse the repository at this point in the history
Fixes dagster-io#13524.
The Airbyte API changed source of truth for normalization availability in airbytehq/airbyte#21005.
This commit adds an additional check for normalization configuration in
`does_dest_support_normalization`. The current check is kept to
ensure backward compatibility - thus, if any of the checks are true,
the function returns true.
  • Loading branch information
nina-j committed May 9, 2023
1 parent 628dfb7 commit 3cce17c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,10 @@ def get_source_schema(self, source_id: str) -> Mapping[str, Any]:

def does_dest_support_normalization(
self, destination_definition_id: str, workspace_id: str
) -> Dict[str, Any]:
return cast(
) -> bool:
# Airbyte API changed source of truth for normalization in PR
# https://github.com/airbytehq/airbyte/pull/21005
norm_dest_def_spec: bool = cast(
Dict[str, Any],
check.not_none(
self.make_request_cached(
Expand All @@ -528,6 +530,24 @@ def does_dest_support_normalization(
),
).get("supportsNormalization", False)

norm_dest_def: bool = (
cast(
Dict[str, Any],
check.not_none(
self.make_request_cached(
endpoint="/destination_definitions/get",
data={
"destinationDefinitionId": destination_definition_id,
},
)
),
)
.get("normalizationConfig", {})
.get("supported", False)
)

return any([norm_dest_def_spec, norm_dest_def])

def get_job_status(self, connection_id: str, job_id: int) -> Mapping[str, object]:
if self.forward_logs:
return check.not_none(self.make_request(endpoint="/jobs/get", data={"id": job_id}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,53 @@ def test_sync_and_poll_timeout(
assert responses.assert_call_count(f"{ab_resource.api_base_url}/jobs/cancel", 1) is True
else:
assert responses.assert_call_count(f"{ab_resource.api_base_url}/jobs/cancel", 0) is True


@responses.activate
@pytest.mark.parametrize(
"supports_norm,norm_config_supported",
[
(True, True),
(True, False),
(False, True),
(False, False),
],
)
def test_normalization_support(
supports_norm: bool,
norm_config_supported: bool,
airbyte_instance_constructor: Callable[[Dict[str, Any]], AirbyteResource],
):
ab_resource = airbyte_instance_constructor(
{
"host": "some_host",
"port": "8000",
}
)
# See https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/destination_definition_specifications/get
responses.post(
url=ab_resource.api_base_url + "/destination_definition_specifications/get",
json={"supportsNormalization": supports_norm},
)
# See https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/destination_definitions/get
responses.post(
url=ab_resource.api_base_url + "/destination_definitions/get",
json={"normalizationConfig": {"supported": norm_config_supported}},
)

assert ab_resource.does_dest_support_normalization("some_destination", "some_workspace") == any(
[supports_norm, norm_config_supported]
)

# Check for expected behaviour when keys do not exist
responses.post(
url=ab_resource.api_base_url + "/destination_definition_specifications/get",
json={},
)
responses.post(
url=ab_resource.api_base_url + "/destination_definitions/get",
json={},
)
assert (
ab_resource.does_dest_support_normalization("some_destination", "some_workspace") is False
)

0 comments on commit 3cce17c

Please sign in to comment.