Skip to content

Commit

Permalink
CAT: add validation for stream statuses (#34675)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem1205 authored Feb 8, 2024
1 parent 89accff commit a96b7f4
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Changelog

## 3.5.0
Add `validate_stream_statuses` to TestBasicRead.test_read:: Validate all statuses for all streams in the catalogs were emitted in correct order.

## 3.4.0
Add TestConnectorDocumentation suite for validating connectors documentation structure and content.

## 3.3.3
Аix `NoAdditionalPropertiesValidator` if no type found in `items`

## 3.3.2
Fix TestBasicRead.test_read.validate_schema: set `additionalProperties` to False recursively for objects
Fix TestBasicRead.test_read.validate_schema: set `additionalProperties` to False recursively for objects.

## 3.3.1
Fix TestSpec.test_oauth_is_default_method to skip connectors that doesn't have predicate_key object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class BasicReadTestConfig(BaseConfig):
)
expect_records: Optional[ExpectedRecordsConfig] = Field(description="Expected records from the read")
validate_schema: bool = Field(True, description="Ensure that records match the schema of the corresponding stream")
validate_stream_statuses: bool = Field(None, description="Ensure that all streams emit status messages")
fail_on_extra_columns: bool = Field(True, description="Fail if extra top-level properties (i.e. columns) are detected in records.")
# TODO: remove this field after https://github.com/airbytehq/airbyte/issues/8312 is done
validate_data_points: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,8 @@ def docs_path_fixture(base_path, connector_metadata) -> Path:
def connector_documentation_fixture(docs_path: str) -> str:
with open(docs_path, "r") as f:
return f.read().rstrip()


@pytest.fixture(name="is_connector_certified")
def connector_certification_status_fixture(connector_metadata: dict) -> bool:
return connector_metadata.get("data", {}).get("ab_internal", {}).get("ql", 0) >= 400
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from airbyte_protocol.models import (
AirbyteRecordMessage,
AirbyteStream,
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Expand Down Expand Up @@ -979,6 +981,14 @@ def should_validate_schema_fixture(self, inputs: BasicReadTestConfig, test_stric
else:
return inputs.validate_schema

@pytest.fixture(name="should_validate_stream_statuses")
def should_validate_stream_statuses_fixture(self, inputs: BasicReadTestConfig, is_connector_certified: bool):
if inputs.validate_stream_statuses is None and is_connector_certified:
return True
if not inputs.validate_stream_statuses and is_connector_certified:
pytest.fail("High strictness level error: validate_stream_statuses must be set to true in the basic read test configuration.")
return inputs.validate_stream_statuses

@pytest.fixture(name="should_fail_on_extra_columns")
def should_fail_on_extra_columns_fixture(self, inputs: BasicReadTestConfig):
# TODO (Ella): enforce this param once all connectors are passing
Expand Down Expand Up @@ -1030,6 +1040,7 @@ async def test_read(
expect_records_config: ExpectedRecordsConfig,
should_validate_schema: Boolean,
should_validate_data_points: Boolean,
should_validate_stream_statuses: Boolean,
should_fail_on_extra_columns: Boolean,
empty_streams: Set[EmptyStreamConfiguration],
ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]],
Expand All @@ -1039,6 +1050,7 @@ async def test_read(
certified_file_based_connector: bool,
):
output = await docker_runner.call_read(connector_config, configured_catalog)

records = [message.record for message in filter_output(output, Type.RECORD)]

if certified_file_based_connector:
Expand Down Expand Up @@ -1071,6 +1083,14 @@ async def test_read(
detailed_logger=detailed_logger,
)

if should_validate_stream_statuses:
all_statuses = [
message.trace.stream_status
for message in filter_output(output, Type.TRACE)
if message.trace.type == TraceType.STREAM_STATUS
]
self._validate_stream_statuses(configured_catalog=configured_catalog, statuses=all_statuses)

async def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner):
if not inputs.expect_trace_message_on_failure:
pytest.skip("Skipping `test_airbyte_trace_message_on_failure` because `inputs.expect_trace_message_on_failure=False`")
Expand Down Expand Up @@ -1189,15 +1209,13 @@ def group_by_stream(records: List[AirbyteRecordMessage]) -> MutableMapping[str,
return result

@pytest.fixture(name="certified_file_based_connector")
def is_certified_file_based_connector(self, connector_metadata: Dict[str, Any]) -> bool:
def is_certified_file_based_connector(self, connector_metadata: Dict[str, Any], is_connector_certified: bool) -> bool:
metadata = connector_metadata.get("data", {})

# connector subtype is specified in data.connectorSubtype field
file_based_connector = metadata.get("connectorSubtype") == "file"
# a certified connector has ab_internal.ql value >= 400
certified_connector = metadata.get("ab_internal", {}).get("ql", 0) >= 400

return file_based_connector and certified_connector
return file_based_connector and is_connector_certified

@staticmethod
def _get_file_extension(file_name: str) -> str:
Expand Down Expand Up @@ -1237,6 +1255,29 @@ async def test_all_supported_file_types_present(self, certified_file_based_conne
"or add them to the `file_types -> unsupported_types` list in config."
)

@staticmethod
def _validate_stream_statuses(configured_catalog: ConfiguredAirbyteCatalog, statuses: List[AirbyteStreamStatusTraceMessage]):
"""Validate all statuses for all streams in the catalogs were emitted in correct order:
1. STARTED
2. RUNNING (can be >1)
3. COMPLETE
"""
stream_statuses = defaultdict(list)
for status in statuses:
stream_statuses[f"{status.stream_descriptor.namespace}-{status.stream_descriptor.name}"].append(status.status)

assert set(f"{x.stream.namespace}-{x.stream.name}" for x in configured_catalog.streams) == set(
stream_statuses
), "All stream must emit status"

for stream_name, status_list in stream_statuses.items():
assert (
len(status_list) >= 3
), f"Stream `{stream_name}` statuses should be emitted in the next order: `STARTED`, `RUNNING`,... `COMPLETE`"
assert status_list[0] == AirbyteStreamStatus.STARTED
assert status_list[-1] == AirbyteStreamStatus.COMPLETE
assert all(x == AirbyteStreamStatus.RUNNING for x in status_list[1:-1])


@pytest.mark.default_timeout(TEN_MINUTES)
class TestConnectorAttributes(BaseTest):
Expand All @@ -1245,13 +1286,13 @@ class TestConnectorAttributes(BaseTest):
MANDATORY_FOR_TEST_STRICTNESS_LEVELS = []

@pytest.fixture(name="operational_certification_test")
async def operational_certification_test_fixture(self, connector_metadata: dict) -> bool:
async def operational_certification_test_fixture(self, is_connector_certified: bool) -> bool:
"""
Fixture that is used to skip a test that is reserved only for connectors that are supposed to be tested
against operational certification criteria
"""

if connector_metadata.get("data", {}).get("ab_internal", {}).get("ql") < 400:
if not is_connector_certified:
pytest.skip("Skipping operational connector certification test for uncertified connector")
return True

Expand Down Expand Up @@ -1350,12 +1391,12 @@ class TestConnectorDocumentation(BaseTest):
CONNECTOR_SPECIFIC_HEADINGS = "<Connector-specific features>"

@pytest.fixture(name="operational_certification_test")
async def operational_certification_test_fixture(self, connector_metadata: dict) -> bool:
async def operational_certification_test_fixture(self, is_connector_certified: bool) -> bool:
"""
Fixture that is used to skip a test that is reserved only for connectors that are supposed to be tested
against operational certification criteria
"""
if connector_metadata.get("data", {}).get("ab_internal", {}).get("ql") < 400:
if not is_connector_certified:
pytest.skip("Skipping testing source connector documentation due to low ql.")
return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "connector-acceptance-test"
version = "3.4.0"
version = "3.5.0"
description = "Contains acceptance tests for connectors."
authors = ["Airbyte <contact@airbyte.io>"]
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,40 +90,35 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):
@pytest.mark.parametrize(
"json_schema, record, should_fail",
[
(
{"type": "object", "properties": {"a": {"type": "string"}}},
{"a": "str", "b": "extra_string"},
True
),
({"type": "object", "properties": {"a": {"type": "string"}}}, {"a": "str", "b": "extra_string"}, True),
(
{"type": "object", "properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"]}}},
{"a": "str", "some_obj": {"b": "extra_string"}},
False
False,
),
(
{
"type": "object",
"properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"], "properties": {"a": {"type": "string"}}}},
},
{"a": "str", "some_obj": {"a": "str", "b": "extra_string"}},
True
True,
),
(
{"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "array", "items": {"type": "object"}}}},
{"a": "str", "b": [{"a": "extra_string"}]},
False
False,
),
(
{
"type": "object",
"properties": {
"a": {"type": "string"},
"b": {"type": "array", "items": {"type": "object", "properties": {"a": {"type": "string"}}}},
}
},
},
{"a": "str", "b": [{"a": "string", "b": "extra_string"}]},
True
True,
),
],
ids=[
Expand All @@ -136,7 +131,7 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):
)
def test_verify_records_schema_with_fail_on_extra_columns(configured_catalog: ConfiguredAirbyteCatalog, json_schema, record, should_fail):
"""Test that fail_on_extra_columns works correctly with nested objects, array of objects"""
configured_catalog.streams[0].stream.json_schema =json_schema
configured_catalog.streams[0].stream.json_schema = json_schema
records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0)]
streams_with_errors = verify_records_schema(records, configured_catalog, fail_on_extra_columns=True)
errors = [error.message for error in streams_with_errors["my_stream"].values()]
Expand Down
Loading

0 comments on commit a96b7f4

Please sign in to comment.