diff --git a/smoke-test/tests/test_stateful_ingestion.py b/smoke-test/tests/test_stateful_ingestion.py index b6861b18d2a576..e7b012a7888a0d 100644 --- a/smoke-test/tests/test_stateful_ingestion.py +++ b/smoke-test/tests/test_stateful_ingestion.py @@ -6,11 +6,14 @@ from datahub.ingestion.api.committable import StatefulCommittable from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource -from datahub.ingestion.source.sql.sql_common import \ - BaseSQLAlchemyCheckpointState +from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from datahub.ingestion.source.state.checkpoint import Checkpoint -from tests.utils import (get_gms_url, get_mysql_password, get_mysql_url, - get_mysql_username) +from tests.utils import ( + get_gms_url, + get_mysql_password, + get_mysql_url, + get_mysql_username, +) def test_stateful_ingestion(wait_for_healthchecks): @@ -45,7 +48,7 @@ def validate_all_providers_have_committed_successfully(pipeline: Pipeline) -> No def get_current_checkpoint_from_pipeline( pipeline: Pipeline, - ) -> Optional[Checkpoint]: + ) -> Optional[Checkpoint[GenericCheckpointState]]: mysql_source = cast(MySQLSource, pipeline.source) return mysql_source.get_current_checkpoint( mysql_source.stale_entity_removal_handler.job_id @@ -111,8 +114,8 @@ def get_current_checkpoint_from_pipeline( assert checkpoint2.state # 5. Perform all assertions on the states - state1 = cast(BaseSQLAlchemyCheckpointState, checkpoint1.state) - state2 = cast(BaseSQLAlchemyCheckpointState, checkpoint2.state) + state1 = checkpoint1.state + state2 = checkpoint2.state difference_urns = list( state1.get_urns_not_in(type="*", other_checkpoint_state=state2) ) @@ -122,13 +125,10 @@ def get_current_checkpoint_from_pipeline( == "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.stateful_ingestion_test_t1,PROD)" ) - # 6. Perform all assertions on the config. - assert checkpoint1.config == checkpoint2.config - - # 7. Cleanup table t2 as well to prevent other tests that rely on data in the smoke-test world. + # 6. Cleanup table t2 as well to prevent other tests that rely on data in the smoke-test world. drop_table(mysql_engine, table_names[1]) - # 8. Validate that all providers have committed successfully. + # 7. Validate that all providers have committed successfully. # NOTE: The following validation asserts for presence of state as well # and validates reporting. validate_all_providers_have_committed_successfully(pipeline_run1)