Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connectors-ci: re-enable publish tests #29149

Merged
merged 1 commit into from
Aug 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 47 additions & 61 deletions airbyte-ci/connectors/pipelines/tests/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,46 @@
]


@pytest.mark.skip(reason="Currently failing, should be fixed in the future")
@pytest.fixture
def publish_context(mocker, dagger_client, tmpdir):
return mocker.MagicMock(
dagger_client=dagger_client,
get_connector_dir=mocker.MagicMock(return_value=dagger_client.host().directory(str(tmpdir))),
docker_hub_username_secret=None,
docker_hub_password_secret=None,
docker_image="hello-world:latest",
)


class TestCheckConnectorImageDoesNotExists:
@pytest.fixture(scope="class")
def three_random_connectors_image_names(self, oss_registry: dict) -> List[str]:
connectors = oss_registry["sources"] + oss_registry["destinations"]
random.shuffle(connectors)
return [f"{connector['dockerRepository']}:{connector['dockerImageTag']}" for connector in connectors[:3]]

async def test_run(self, mocker, dagger_client, three_random_connectors_image_names):
"""We pick the first three connectors from the OSS registry and check that they are already published."""
async def test_run_skipped_when_already_published(self, three_random_connectors_image_names, publish_context):
"""We pick three random connectors from the OSS registry. They should be published. We check that the step is skipped."""
for image_name in three_random_connectors_image_names:
context = mocker.MagicMock(dagger_client=dagger_client, docker_image_name=image_name)
step = publish.CheckConnectorImageDoesNotExist(context)
publish_context.docker_image = image_name
step = publish.CheckConnectorImageDoesNotExist(publish_context)
step_result = await step.run()
assert step_result.status == StepStatus.SKIPPED
image_name = "airbyte/source-pokeapi:0.0.0"
context = mocker.MagicMock(dagger_client=dagger_client, docker_image_name=image_name)
step = publish.CheckConnectorImageDoesNotExist(context)

async def test_run_success_when_already_published(self, publish_context):

publish_context.docker_image = "airbyte/source-pokeapi:0.0.0"
step = publish.CheckConnectorImageDoesNotExist(publish_context)
step_result = await step.run()
assert step_result.status == StepStatus.SUCCESS


@pytest.mark.skip(reason="Currently failing, should be fixed in the future")
class TestUploadSpecToCache:
@pytest.fixture(scope="class")
def random_connector(self, oss_registry):
return random.choice(oss_registry["sources"] + oss_registry["destinations"])

@pytest.fixture
def context(self, mocker, dagger_client, random_connector, tmpdir):
image_name = f"{random_connector['dockerRepository']}:{random_connector['dockerImageTag']}"
tmp_dir = dagger_client.host().directory(str(tmpdir))
return mocker.MagicMock(
dagger_client=dagger_client, get_connector_dir=mocker.MagicMock(return_value=tmp_dir), docker_image_name=image_name
)
def random_connector(self, oss_registry: dict) -> dict:
connectors = oss_registry["sources"] + oss_registry["destinations"]
random.shuffle(connectors)
return connectors[0]

@pytest.mark.parametrize(
"valid_spec, successful_upload",
Expand All @@ -60,14 +65,15 @@ def context(self, mocker, dagger_client, random_connector, tmpdir):
[False, False],
],
)
async def test_run(self, mocker, dagger_client, valid_spec, successful_upload, random_connector, context):
async def test_run(self, mocker, dagger_client, valid_spec, successful_upload, random_connector, publish_context):
"""Test that the spec is correctly uploaded to the spec cache bucket.
We pick a random connector from the oss registry, by nature this connector should have a valid spec and be published.
We use load this connector as a Dagger container and run spec against it.
We load this connector as a Dagger container and run spec against it.
We validate that the outputted spec is the same as the one in the OSS registry.
We also artificially set the spec to be invalid and check that the step fails.
"""
image_name = f"{random_connector['dockerRepository']}:{random_connector['dockerImageTag']}"
publish_context.docker_image = image_name
expected_spec = random_connector["spec"]
connector_container = dagger_client.container().from_(image_name)

Expand All @@ -80,15 +86,15 @@ async def test_run(self, mocker, dagger_client, valid_spec, successful_upload, r
publish.UploadSpecToCache, "_get_connector_spec", mocker.Mock(side_effect=publish.InvalidSpecOutputError("Invalid spec."))
)

step = publish.UploadSpecToCache(context)
step = publish.UploadSpecToCache(publish_context)
step_result = await step.run(connector_container)
if valid_spec:
publish.upload_to_gcs.assert_called_once_with(
context.dagger_client,
publish_context.dagger_client,
mocker.ANY,
f"specs/{image_name.replace(':', '/')}/spec.json",
context.spec_cache_bucket_name,
context.spec_cache_gcs_credentials_secret,
publish_context.spec_cache_bucket_name,
publish_context.spec_cache_gcs_credentials_secret,
flags=['--cache-control="no-cache"'],
)

Expand All @@ -110,27 +116,27 @@ async def test_run(self, mocker, dagger_client, valid_spec, successful_upload, r
assert step_result.stdout is None
publish.upload_to_gcs.assert_not_called()

def test_parse_spec_output_valid(self, context, random_connector):
step = publish.UploadSpecToCache(context)
def test_parse_spec_output_valid(self, publish_context, random_connector):
step = publish.UploadSpecToCache(publish_context)
correct_spec_message = json.dumps({"type": "SPEC", "spec": random_connector["spec"]})
spec_output = f'random_stuff\n{{"type": "RANDOM_MESSAGE"}}\n{correct_spec_message}'
result = step._parse_spec_output(spec_output)
assert json.loads(result) == random_connector["spec"]

def test_parse_spec_output_invalid_json(self, context):
step = publish.UploadSpecToCache(context)
def test_parse_spec_output_invalid_json(self, publish_context):
step = publish.UploadSpecToCache(publish_context)
spec_output = "Invalid JSON"
with pytest.raises(publish.InvalidSpecOutputError):
step._parse_spec_output(spec_output)

def test_parse_spec_output_invalid_key(self, context):
step = publish.UploadSpecToCache(context)
def test_parse_spec_output_invalid_key(self, publish_context):
step = publish.UploadSpecToCache(publish_context)
spec_output = '{"type": "SPEC", "spec": {"invalid_key": "value"}}'
with pytest.raises(publish.InvalidSpecOutputError):
step._parse_spec_output(spec_output)

def test_parse_spec_output_no_spec(self, context):
step = publish.UploadSpecToCache(context)
def test_parse_spec_output_no_spec(self, publish_context):
step = publish.UploadSpecToCache(publish_context)
spec_output = '{"type": "OTHER"}'
with pytest.raises(publish.InvalidSpecOutputError):
step._parse_spec_output(spec_output)
Expand Down Expand Up @@ -175,12 +181,11 @@ async def test_run_connector_publish_pipeline_when_failed_validation(mocker, pre
)


@pytest.mark.skip(reason="Currently failing, should be fixed in the future")
@pytest.mark.parametrize(
"check_image_exists_status, pre_release",
[(StepStatus.SKIPPED, False), (StepStatus.SKIPPED, True), (StepStatus.FAILURE, True), (StepStatus.FAILURE, False)],
"check_image_exists_status",
[StepStatus.SKIPPED, StepStatus.FAILURE],
)
async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker, check_image_exists_status, pre_release):
async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker, check_image_exists_status, publish_context):
"""We validate that when the connector image exists or the check fails, we don't run the rest of the pipeline.
We also validate that the metadata upload step is called when the image exists (Skipped status).
We do this to ensure that the metadata is still updated in the case where the connector image already exists.
Expand All @@ -202,9 +207,8 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker

run_metadata_upload = publish.metadata.MetadataUpload.return_value.run

context = mocker.MagicMock(pre_release=pre_release)
semaphore = anyio.Semaphore(1)
report = await publish.run_connector_publish_pipeline(context, semaphore)
report = await publish.run_connector_publish_pipeline(publish_context, semaphore)
run_metadata_validation.assert_called_once()
run_check_connector_image_does_not_exist.assert_called_once()

Expand All @@ -213,22 +217,11 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
if to_mock not in ["MetadataValidation", "MetadataUpload", "CheckConnectorImageDoesNotExist", "UploadSpecToCache"]:
getattr(module, to_mock).return_value.run.assert_not_called()

if check_image_exists_status is StepStatus.SKIPPED and pre_release:
run_metadata_upload.assert_not_called()
assert (
report.steps_results
== context.report.steps_results
== [
run_metadata_validation.return_value,
run_check_connector_image_does_not_exist.return_value,
]
)

if check_image_exists_status is StepStatus.SKIPPED and not pre_release:
if check_image_exists_status is StepStatus.SKIPPED:
run_metadata_upload.assert_called_once()
assert (
report.steps_results
== context.report.steps_results
== publish_context.report.steps_results
== [
run_metadata_validation.return_value,
run_check_connector_image_does_not_exist.return_value,
Expand All @@ -241,15 +234,14 @@ async def test_run_connector_publish_pipeline_when_image_exists_or_failed(mocker
run_metadata_upload.assert_not_called()
assert (
report.steps_results
== context.report.steps_results
== publish_context.report.steps_results
== [
run_metadata_validation.return_value,
run_check_connector_image_does_not_exist.return_value,
]
)


@pytest.mark.skip(reason="Currently failing, should be fixed in the future")
@pytest.mark.parametrize(
"pre_release, build_step_status, push_step_status, pull_step_status, upload_to_spec_cache_step_status, metadata_upload_step_status",
[
Expand Down Expand Up @@ -318,15 +310,12 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
publish.PullConnectorImageFromRegistry.return_value.run,
]

if not pre_release:
steps_to_run += [publish.metadata.MetadataUpload.return_value.run]

for i, step_to_run in enumerate(steps_to_run):
if step_to_run.return_value.status is StepStatus.FAILURE or i == len(steps_to_run) - 1:
assert len(report.steps_results) == len(context.report.steps_results)

previous_steps = steps_to_run[:i]
for k, step_ran in enumerate(previous_steps):
for _, step_ran in enumerate(previous_steps):
step_ran.assert_called_once()
step_ran.return_value

Expand All @@ -341,6 +330,3 @@ async def test_run_connector_publish_pipeline_when_image_does_not_exist(
publish.PullConnectorImageFromRegistry.return_value.run.assert_not_called()
publish.UploadSpecToCache.return_value.run.assert_not_called()
publish.metadata.MetadataUpload.return_value.run.assert_not_called()

if pre_release:
publish.metadata.MetadataUpload.return_value.run.assert_not_called()