Skip to content

Commit

Permalink
Protocol: make supported_sync_modes a required not empty list on `A…
Browse files Browse the repository at this point in the history
…irbyteStream` (#15591)
  • Loading branch information
alafanechere authored Oct 19, 2022
1 parent 4b0d1d4 commit b564f3e
Show file tree
Hide file tree
Showing 54 changed files with 307 additions and 153 deletions.
4 changes: 4 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.1.104

- Protocol change: `supported_sync_modes` is now a required properties on AirbyteStream. [#15591](https://github.com/airbytehq/airbyte/pull/15591)

## 0.1.103

- Low-code: added hash filter to jinja template
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class Config:

name: str = Field(..., description="Stream's name.")
json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.")
supported_sync_modes: Optional[List[SyncMode]] = None
supported_sync_modes: List[SyncMode] = Field(..., description="List of sync modes supported by this stream.", min_items=1)
source_defined_cursor: Optional[bool] = Field(
None,
description="If the source defines the cursor field, then any other cursor field inputs will be ignored. If it does not, either the user_provided one is used, or the default one is used as a backup.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def singer_catalog_to_airbyte_catalog(
for stream in singer_catalog.get("streams"): # type: ignore
name = stream.get("stream")
schema = stream.get("schema")
airbyte_stream = AirbyteStream(name=name, json_schema=schema)
airbyte_stream = AirbyteStream(name=name, json_schema=schema, supported_sync_modes=[SyncMode.full_refresh])
if name in sync_mode_overrides:
override_sync_modes(airbyte_stream, sync_mode_overrides[name])
else:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.103",
version="0.1.104",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch
dummy_catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(name="mystream", json_schema={"type": "object"}),
stream=AirbyteStream(name="mystream", json_schema={"type": "object"}, supported_sync_modes=[SyncMode.full_refresh]),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/unit_tests/singer/test_singer_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_singer_discover_metadata(mock_read_catalog):
_user_stream = airbyte_catalog.streams[0]
_roles_stream = airbyte_catalog.streams[1]

assert _user_stream.supported_sync_modes is None
# assert _user_stream.supported_sync_modes is None
assert _user_stream.default_cursor_field is None
assert _roles_stream.supported_sync_modes == [SyncMode.incremental]
assert _roles_stream.default_cursor_field == ["name"]
Expand Down
10 changes: 7 additions & 3 deletions airbyte-cdk/python/unit_tests/sources/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ def catalog():
configured_catalog = {
"streams": [
{
"stream": {"name": "mock_http_stream", "json_schema": {}},
"stream": {"name": "mock_http_stream", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"destination_sync_mode": "overwrite",
"sync_mode": "full_refresh",
},
{
"stream": {"name": "mock_stream", "json_schema": {}},
"stream": {"name": "mock_stream", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
"destination_sync_mode": "overwrite",
"sync_mode": "full_refresh",
},
Expand Down Expand Up @@ -317,7 +317,11 @@ def test_read_catalog(source):
configured_catalog = {
"streams": [
{
"stream": {"name": "mystream", "json_schema": {"type": "object", "properties": {"k": "v"}}},
"stream": {
"name": "mystream",
"json_schema": {"type": "object", "properties": {"k": "v"}},
"supported_sync_modes": ["full_refresh"],
},
"destination_sync_mode": "overwrite",
"sync_mode": "full_refresh",
}
Expand Down
3 changes: 2 additions & 1 deletion airbyte-cdk/python/unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
AirbyteStream,
ConnectorSpecification,
Status,
SyncMode,
Type,
)
from airbyte_cdk.sources import Source
Expand Down Expand Up @@ -172,7 +173,7 @@ def test_run_check(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock

def test_run_discover(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="discover", config="config_path")
expected = AirbyteCatalog(streams=[AirbyteStream(name="stream", json_schema={"k": "v"})])
expected = AirbyteCatalog(streams=[AirbyteStream(name="stream", json_schema={"k": "v"}, supported_sync_modes=[SyncMode.full_refresh])])
mocker.patch.object(MockSource, "discover", return_value=expected)
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
assert spec_mock.called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
- name: Azure Table Storage
sourceDefinitionId: 798ae795-5189-42b6-b64e-3cb91db93338
dockerRepository: airbyte/source-azure-table
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.com/integrations/sources/azure-table
icon: azureblobstorage.svg
sourceType: database
Expand Down Expand Up @@ -263,7 +263,7 @@
- name: E2E Testing
sourceDefinitionId: d53f9084-fa6b-4a5a-976c-5b8392f4ad8a
dockerRepository: airbyte/source-e2e-test
dockerImageTag: 2.1.1
dockerImageTag: 2.1.3
documentationUrl: https://docs.airbyte.com/integrations/sources/e2e-test
icon: airbyte.svg
sourceType: api
Expand Down Expand Up @@ -418,7 +418,7 @@
- name: Google Sheets
sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
dockerRepository: airbyte/source-google-sheets
dockerImageTag: 0.2.20
dockerImageTag: 0.2.21
documentationUrl: https://docs.airbyte.com/integrations/sources/google-sheets
icon: google-sheets.svg
sourceType: file
Expand Down
8 changes: 4 additions & 4 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-azure-table:0.1.2"
- dockerImage: "airbyte/source-azure-table:0.1.3"
spec:
documentationUrl: "https://docsurl.com"
connectionSpecification:
Expand All @@ -1329,7 +1329,6 @@
required:
- "storage_account_name"
- "storage_access_key"
additionalProperties: false
properties:
storage_account_name:
title: "Account Name"
Expand Down Expand Up @@ -2410,7 +2409,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-e2e-test:2.1.1"
- dockerImage: "airbyte/source-e2e-test:2.1.3"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/e2e-test"
connectionSpecification:
Expand Down Expand Up @@ -2523,6 +2522,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
protocol_version: "0.2.1"
- dockerImage: "airbyte/source-exchange-rates:1.2.6"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/exchangeratesapi"
Expand Down Expand Up @@ -4353,7 +4353,7 @@
oauthFlowOutputParameters:
- - "access_token"
- - "refresh_token"
- dockerImage: "airbyte/source-google-sheets:0.2.20"
- dockerImage: "airbyte/source-google-sheets:0.2.21"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-sheets"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.collect.Lists;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

Expand All @@ -17,32 +19,37 @@ class S3OutputPathHelperTest {
public void testGetOutputPrefix() {
// No namespace
assertEquals("bucket_path/stream_name", S3OutputPathHelper
.getOutputPrefix("bucket_path", new AirbyteStream().withName("stream_name")));
.getOutputPrefix("bucket_path",
new AirbyteStream().withName("stream_name").withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH))));

// With namespace
assertEquals("bucket_path/namespace/stream_name", S3OutputPathHelper
.getOutputPrefix("bucket_path",
new AirbyteStream().withNamespace("namespace").withName("stream_name")));
new AirbyteStream().withNamespace("namespace").withName("stream_name")
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH))));

// With empty namespace
assertEquals("bucket_path/stream_name", S3OutputPathHelper
.getOutputPrefix("bucket_path",
new AirbyteStream().withNamespace("").withName("stream_name")));
new AirbyteStream().withNamespace("").withName("stream_name").withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH))));

// With namespace with slash chart in the end
assertEquals("bucket_path/namespace/stream_name", S3OutputPathHelper
.getOutputPrefix("bucket_path",
new AirbyteStream().withNamespace("namespace/").withName("stream_name")));
new AirbyteStream().withNamespace("namespace/").withName("stream_name")
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH))));

// With namespace with slash chart in the name
assertEquals("bucket_path/namespace/subfolder/stream_name", S3OutputPathHelper
.getOutputPrefix("bucket_path",
new AirbyteStream().withNamespace("namespace/subfolder/").withName("stream_name")));
new AirbyteStream().withNamespace("namespace/subfolder/").withName("stream_name")
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH))));

// With an AWS Glue crawler
assertEquals("bucket_path/namespace/date=2022-03-15", S3OutputPathHelper
.getOutputPrefix("bucket_path",
new AirbyteStream().withNamespace("namespace").withName("date=2022-03-15")));
new AirbyteStream().withNamespace("namespace").withName("date=2022-03-15")
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH))));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.9
Update tests after protocol change making `supported_sync_modes` a required property of `AirbyteStream` [#15591](https://github.com/airbytehq/airbyte/pull/15591/)

## 0.2.8
Make full refresh tests tolerant to new records in a sequential read.[#17660](https://github.com/airbytehq/airbyte/pull/17660/)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.version=0.2.9
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/source-acceptance-test/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import setuptools

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1.56",
"airbyte-cdk~=0.1.104",
"docker~=5.0.3",
"PyYAML~=5.4",
"icdiff~=1.9",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def record_schema_fixture():
def catalog_fixture(request, record_schema) -> ConfiguredAirbyteCatalog:
record_schema = request.param if hasattr(request, "param") else record_schema
stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name="my_stream", json_schema=record_schema),
stream=AirbyteStream(name="my_stream", json_schema=record_schema, supported_sync_modes=[SyncMode.full_refresh]),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append,
)
Expand Down
Loading

0 comments on commit b564f3e

Please sign in to comment.