Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into flash1293/airbyte-l…
Browse files Browse the repository at this point in the history
…ib-fix-processed-records
  • Loading branch information
Joe Reuter committed Feb 6, 2024
2 parents 0cf81f1 + 0b8496c commit 5e806aa
Show file tree
Hide file tree
Showing 50 changed files with 1,368 additions and 221 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.60.1
current_version = 0.60.2
commit = False

[bumpversion:file:setup.py]
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.60.2
Improve error messages for concurrent CDK

## 0.60.1
Emit state when no partitions are generated for ccdk and update StateBuilder

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.60.1
LABEL io.airbyte.version=0.60.2
LABEL io.airbyte.name=airbyte/source-declarative-manifest
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ def _prune_futures(self, futures: List[Future[Any]]) -> None:
if optional_exception:
# Exception handling should be done in the main thread. Hence, we only store the exception and expect the main
# thread to call raise_if_exception
self._most_recently_seen_exception = RuntimeError(f"Failed reading with error: {optional_exception}")
# We do not expect this error to happen. The futures created during concurrent syncs should catch the exception and
# push it to the queue. If this exception occurs, please review the futures and how they handle exceptions.
self._most_recently_seen_exception = RuntimeError(
f"Failed processing a future: {optional_exception}. Please contact the Airbyte team."
)
futures.pop(index)

def shutdown(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,19 @@ def _add_slice_to_state(self, partition: Partition) -> None:
)
elif self._most_recent_record:
if self._has_closed_at_least_one_slice:
# If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
# the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
# boundaries. There are cases where partitions could not have boundaries:
# * The cursor should be per-partition
# * The stream state is actually the parent stream state
# There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
# state management. For the specific user that was affected with this issue, we need to:
# * Fix state tracking (which is currently broken)
# * Make the new version available
# * (Probably) ask the user to reset the stream to avoid data loss
raise ValueError(
"Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
"expected."
"expected. Please contact the Airbyte team."
)

self.state["slices"].append(
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
name="airbyte-cdk",
# The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be
# updated if our semver format changes such as using release candidate versions.
version="0.60.1",
version="0.60.2",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
3 changes: 3 additions & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,15 @@ flowchart TD
build[Build connector docker image]
unit[Run unit tests]
integration[Run integration tests]
airbyte_lib_validation[Run airbyte-lib validation tests]
cat[Run connector acceptance tests]
secret[Load connector configuration]
unit-->secret
unit-->build
secret-->integration
secret-->cat
secret-->airbyte_lib_validation
build-->integration
build-->cat
end
Expand Down Expand Up @@ -610,6 +612,7 @@ E.G.: running `pytest` on a specific test folder:

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- |
| 3.10.2 | [#34044](https://github.com/airbytehq/airbyte/pull/34044) | Add pypi validation testing. |
| 3.10.1 | [#34756](https://github.com/airbytehq/airbyte/pull/34756) | Enable connectors tests in draft PRs. |
| 3.10.0 | [#34606](https://github.com/airbytehq/airbyte/pull/34606) | Allow configuration of separate check URL to check whether package exists already. |
| 3.9.0 | [#34606](https://github.com/airbytehq/airbyte/pull/34606) | Allow configuration of python registry URL via environment variable. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class CONNECTOR_TEST_STEP_ID(str, Enum):
BUILD = "build"
CHECK_BASE_IMAGE = "check_base_image"
INTEGRATION = "integration"
AIRBYTE_LIB_VALIDATION = "airbyte_lib_validation"
METADATA_VALIDATION = "metadata_validation"
QA_CHECKS = "qa_checks"
UNIT = "unit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@
from abc import ABC, abstractmethod
from typing import List, Sequence, Tuple

import dpath.util
import pipelines.dagger.actions.python.common
import pipelines.dagger.actions.system.docker
from dagger import Container, File
from pipelines import hacks
from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, CheckBaseImageIsUsed
from pipelines.consts import LOCAL_BUILD_PLATFORM
from pipelines.dagger.actions import secrets
from pipelines.dagger.actions.python.poetry import with_poetry
from pipelines.helpers.execution.run_steps import STEP_TREE, StepToRun
from pipelines.models.steps import STEP_PARAMS, Step, StepResult

Expand Down Expand Up @@ -189,6 +192,49 @@ def default_params(self) -> STEP_PARAMS:
return super().default_params | coverage_options


class AirbyteLibValidation(Step):
"""A step to validate the connector will work with airbyte-lib, using the airbyte-lib validation helper."""

title = "AirbyteLib validation tests"

context: ConnectorContext

async def _run(self, connector_under_test: Container) -> StepResult:
"""Run all pytest tests declared in the test directory of the connector code.
Args:
connector_under_test (Container): The connector under test container.
Returns:
StepResult: Failure or success of the unit tests with stdout and stdout.
"""
if dpath.util.get(self.context.connector.metadata, "remoteRegistries/pypi/enabled", default=False) is False:
return self.skip("Connector is not published on pypi, skipping airbyte-lib validation.")

test_environment = await self.install_testing_environment(with_poetry(self.context))
test_execution = test_environment.with_(
hacks.never_fail_exec(["airbyte-lib-validate-source", "--connector-dir", ".", "--validate-install-only"])
)

return await self.get_step_result(test_execution)

async def install_testing_environment(
self,
built_connector_container: Container,
) -> Container:
"""Add airbyte-lib and secrets to the test environment."""
context: ConnectorContext = self.context

container_with_test_deps = await pipelines.dagger.actions.python.common.with_python_package(
self.context, built_connector_container.with_entrypoint([]), str(context.connector.code_directory)
)
return container_with_test_deps.with_exec(
[
"pip",
"install",
"airbyte-lib",
]
)


class IntegrationTests(PytestStep):
"""A step to run the connector integration tests with Pytest."""

Expand Down Expand Up @@ -218,6 +264,12 @@ def get_test_steps(context: ConnectorContext) -> STEP_TREE:
args=lambda results: {"connector_under_test": results[CONNECTOR_TEST_STEP_ID.BUILD].output_artifact[LOCAL_BUILD_PLATFORM]},
depends_on=[CONNECTOR_TEST_STEP_ID.BUILD],
),
StepToRun(
id=CONNECTOR_TEST_STEP_ID.AIRBYTE_LIB_VALIDATION,
step=AirbyteLibValidation(context),
args=lambda results: {"connector_under_test": results[CONNECTOR_TEST_STEP_ID.BUILD].output_artifact[LOCAL_BUILD_PLATFORM]},
depends_on=[CONNECTOR_TEST_STEP_ID.BUILD],
),
StepToRun(
id=CONNECTOR_TEST_STEP_ID.ACCEPTANCE,
step=AcceptanceTests(context, context.concurrent_cat),
Expand Down
13 changes: 12 additions & 1 deletion airbyte-ci/connectors/pipelines/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "3.10.1"
version = "3.10.2"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <contact@airbyte.io>"]

Expand All @@ -30,6 +30,7 @@ certifi = "^2023.11.17"
tomli = "^2.0.1"
tomli-w = "^1.0.0"
types-requests = "2.28.2"
dpath = "^2.1.6"

[tool.poetry.group.dev.dependencies]
freezegun = "^1.2.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from unittest.mock import patch

import pytest
from connector_ops.utils import Connector, ConnectorLanguage
from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.test.steps.python_connectors import UnitTests
from pipelines.models.steps import StepResult
from pipelines.airbyte_ci.connectors.test.steps.python_connectors import AirbyteLibValidation, UnitTests
from pipelines.models.steps import StepResult, StepStatus

pytestmark = [
pytest.mark.anyio,
Expand Down Expand Up @@ -105,3 +107,72 @@ def test_params(self, context_for_certified_connector_with_setup):
f"--cov={context_for_certified_connector_with_setup.connector.technical_name.replace('-', '_')}",
f"--cov-fail-under={step.MINIMUM_COVERAGE_FOR_CERTIFIED_CONNECTORS}",
]


class TestAirbyteLibValidationTests:
@pytest.fixture
def compatible_connector(self):
return Connector("source-faker")

@pytest.fixture
def incompatible_connector(self):
return Connector("source-postgres")

@pytest.fixture
def context_for_valid_connector(self, compatible_connector, dagger_client, current_platform):
context = ConnectorContext(
pipeline_name="test airbyte-lib validation",
connector=compatible_connector,
git_branch="test",
git_revision="test",
report_output_prefix="test",
is_local=True,
use_remote_secrets=True,
targeted_platforms=[current_platform],
)
context.dagger_client = dagger_client
return context

@pytest.fixture
def context_for_invalid_connector(self, incompatible_connector, dagger_client, current_platform):
context = ConnectorContext(
pipeline_name="test airbyte-lib validation",
connector=incompatible_connector,
git_branch="test",
git_revision="test",
report_output_prefix="test",
is_local=True,
use_remote_secrets=True,
targeted_platforms=[current_platform],
)
context.dagger_client = dagger_client
return context

async def test__run_validation_success(self, mocker, context_for_valid_connector: ConnectorContext):
result = await AirbyteLibValidation(context_for_valid_connector)._run(mocker.MagicMock())
assert isinstance(result, StepResult)
assert result.status == StepStatus.SUCCESS
assert "Creating source and validating spec is returned successfully..." in result.stdout

async def test__run_validation_skip_unpublished_connector(
self,
mocker,
context_for_invalid_connector: ConnectorContext,
):
result = await AirbyteLibValidation(context_for_invalid_connector)._run(mocker.MagicMock())
assert isinstance(result, StepResult)
assert result.status == StepStatus.SKIPPED

async def test__run_validation_fail(
self,
mocker,
context_for_invalid_connector: ConnectorContext,
):
metadata = context_for_invalid_connector.connector.metadata
metadata["remoteRegistries"] = {"pypi": {"enabled": True, "packageName": "airbyte-source-postgres"}}
metadata_mock = mocker.PropertyMock(return_value=metadata)
with patch.object(Connector, "metadata", metadata_mock):
result = await AirbyteLibValidation(context_for_invalid_connector)._run(mocker.MagicMock())
assert isinstance(result, StepResult)
assert result.status == StepStatus.FAILURE
assert "does not appear to be a Python project" in result.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ data:
packageName: airbyte-source-github
registries:
cloud:
dockerImageTag: 1.5.7
enabled: true
oss:
enabled: true
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gong/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_gong ./source_gong
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-gong
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gong/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 32382e40-3b49-4b99-9c5c-4076501914e7
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
dockerRepository: airbyte/source-gong
githubIssueLabel: source-gong
icon: gong.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
},
"isPrivate": {
"type": ["null", "boolean"]
},
"calendarEventId": {
"type": ["null", "string"]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
"emailAddress": {
"type": ["null", "string"]
},
"trustedEmailAddress": {
"type": ["null", "string"]
},
"created": {
"type": ["null", "string"],
"format": "date-time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.6
dockerImageTag: 1.2.7
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Loading

0 comments on commit 5e806aa

Please sign in to comment.