Skip to content

Commit

Permalink
fix(smoke-test): remove stateful ingestion config check (datahub-proj…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and szalai1 committed Dec 22, 2022
1 parent 5f9574c commit a9c28c9
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions smoke-test/tests/test_stateful_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from datahub.ingestion.api.committable import StatefulCommittable
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource
from datahub.ingestion.source.sql.sql_common import \
BaseSQLAlchemyCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.checkpoint import Checkpoint
from tests.utils import (get_gms_url, get_mysql_password, get_mysql_url,
get_mysql_username)
from tests.utils import (
get_gms_url,
get_mysql_password,
get_mysql_url,
get_mysql_username,
)


def test_stateful_ingestion(wait_for_healthchecks):
Expand Down Expand Up @@ -45,7 +48,7 @@ def validate_all_providers_have_committed_successfully(pipeline: Pipeline) -> No

def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint]:
) -> Optional[Checkpoint[GenericCheckpointState]]:
mysql_source = cast(MySQLSource, pipeline.source)
return mysql_source.get_current_checkpoint(
mysql_source.stale_entity_removal_handler.job_id
Expand Down Expand Up @@ -111,8 +114,8 @@ def get_current_checkpoint_from_pipeline(
assert checkpoint2.state

# 5. Perform all assertions on the states
state1 = cast(BaseSQLAlchemyCheckpointState, checkpoint1.state)
state2 = cast(BaseSQLAlchemyCheckpointState, checkpoint2.state)
state1 = checkpoint1.state
state2 = checkpoint2.state
difference_urns = list(
state1.get_urns_not_in(type="*", other_checkpoint_state=state2)
)
Expand All @@ -122,13 +125,10 @@ def get_current_checkpoint_from_pipeline(
== "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.stateful_ingestion_test_t1,PROD)"
)

# 6. Perform all assertions on the config.
assert checkpoint1.config == checkpoint2.config

# 7. Cleanup table t2 as well to prevent other tests that rely on data in the smoke-test world.
# 6. Cleanup table t2 as well to prevent other tests that rely on data in the smoke-test world.
drop_table(mysql_engine, table_names[1])

# 8. Validate that all providers have committed successfully.
# 7. Validate that all providers have committed successfully.
# NOTE: The following validation asserts for presence of state as well
# and validates reporting.
validate_all_providers_have_committed_successfully(pipeline_run1)
Expand Down

0 comments on commit a9c28c9

Please sign in to comment.