Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(smoke-test): remove stateful ingestion config check #6781

Merged
merged 1 commit into from
Dec 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions smoke-test/tests/test_stateful_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from datahub.ingestion.api.committable import StatefulCommittable
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource
from datahub.ingestion.source.sql.sql_common import \
BaseSQLAlchemyCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.checkpoint import Checkpoint
from tests.utils import (get_gms_url, get_mysql_password, get_mysql_url,
get_mysql_username)
from tests.utils import (
get_gms_url,
get_mysql_password,
get_mysql_url,
get_mysql_username,
)


def test_stateful_ingestion(wait_for_healthchecks):
Expand Down Expand Up @@ -45,7 +48,7 @@ def validate_all_providers_have_committed_successfully(pipeline: Pipeline) -> No

def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint]:
) -> Optional[Checkpoint[GenericCheckpointState]]:
mysql_source = cast(MySQLSource, pipeline.source)
return mysql_source.get_current_checkpoint(
mysql_source.stale_entity_removal_handler.job_id
Expand Down Expand Up @@ -111,8 +114,8 @@ def get_current_checkpoint_from_pipeline(
assert checkpoint2.state

# 5. Perform all assertions on the states
state1 = cast(BaseSQLAlchemyCheckpointState, checkpoint1.state)
state2 = cast(BaseSQLAlchemyCheckpointState, checkpoint2.state)
state1 = checkpoint1.state
state2 = checkpoint2.state
difference_urns = list(
state1.get_urns_not_in(type="*", other_checkpoint_state=state2)
)
Expand All @@ -122,13 +125,10 @@ def get_current_checkpoint_from_pipeline(
== "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.stateful_ingestion_test_t1,PROD)"
)

# 6. Perform all assertions on the config.
assert checkpoint1.config == checkpoint2.config

# 7. Cleanup table t2 as well to prevent other tests that rely on data in the smoke-test world.
# 6. Cleanup table t2 as well to prevent other tests that rely on data in the smoke-test world.
drop_table(mysql_engine, table_names[1])

# 8. Validate that all providers have committed successfully.
# 7. Validate that all providers have committed successfully.
# NOTE: The following validation asserts for presence of state as well
# and validates reporting.
validate_all_providers_have_committed_successfully(pipeline_run1)
Expand Down