diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7f48a86ce8cc..ff57d74c0287 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -423,7 +423,7 @@ - name: Faker sourceDefinitionId: dfd88b22-b603-4c3d-aad7-3701784586b1 dockerRepository: airbyte/source-faker - dockerImageTag: 0.2.0 + dockerImageTag: 0.2.1 documentationUrl: https://docs.airbyte.com/integrations/sources/faker sourceType: api releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 648f1a1afadf..a0d7c36d4104 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3655,7 +3655,7 @@ oauthFlowInitParameters: [] oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-faker:0.2.0" +- dockerImage: "airbyte/source-faker:0.2.1" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/faker" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-faker/Dockerfile b/airbyte-integrations/connectors/source-faker/Dockerfile index 86f7f29a80a0..67498507c5a7 100644 --- a/airbyte-integrations/connectors/source-faker/Dockerfile +++ b/airbyte-integrations/connectors/source-faker/Dockerfile @@ -34,5 +34,5 @@ COPY source_faker ./source_faker ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.0 +LABEL io.airbyte.version=0.2.1 LABEL io.airbyte.name=airbyte/source-faker diff --git a/airbyte-integrations/connectors/source-faker/acceptance-test-config.yml b/airbyte-integrations/connectors/source-faker/acceptance-test-config.yml index 8e505bb363ad..ca177ab8d1a6 100644 --- a/airbyte-integrations/connectors/source-faker/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-faker/acceptance-test-config.yml @@ -8,7 +8,7 @@ tests: - config_path: "secrets/config.json" status: "succeed" - config_path: "integration_tests/invalid_config.json" - status: "exception" + status: "failed" discovery: - config_path: "secrets/config.json" basic_read: diff --git a/airbyte-integrations/connectors/source-faker/setup.py b/airbyte-integrations/connectors/source-faker/setup.py index ab62499037f5..e6fe864c1265 100644 --- a/airbyte-integrations/connectors/source-faker/setup.py +++ b/airbyte-integrations/connectors/source-faker/setup.py @@ -5,10 +5,10 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "mimesis==6.1.1"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "mimesis==6.1.1"] TEST_REQUIREMENTS = [ - "pytest~=6.1", + "pytest~=7.0", "source-acceptance-test", ] diff --git a/airbyte-integrations/connectors/source-faker/source_faker/source.py b/airbyte-integrations/connectors/source-faker/source_faker/source.py index 6e664751df24..4a81b6337cb4 100644 --- a/airbyte-integrations/connectors/source-faker/source_faker/source.py +++ b/airbyte-integrations/connectors/source-faker/source_faker/source.py @@ -13,13 +13,17 @@ from airbyte_cdk.models import ( AirbyteCatalog, AirbyteConnectionStatus, + AirbyteEstimateTraceMessage, AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, AirbyteStream, + AirbyteTraceMessage, ConfiguredAirbyteCatalog, + EstimateType, Status, + TraceType, Type, ) from airbyte_cdk.sources import Source @@ -42,7 +46,10 @@ def check(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteConnect """ # As this is an in-memory source, it always succeeds - return AirbyteConnectionStatus(status=Status.SUCCEEDED) + if type(config["count"]) == int or type(config["count"]) == float: + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + else: + return AirbyteConnectionStatus(status=Status.FAILED) def discover(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteCatalog: """ @@ -136,6 +143,10 @@ def read( records_in_sync = 0 records_in_page = 0 + users_estimate = count - cursor + yield generate_estimate(stream.stream.name, users_estimate, 450) + yield generate_estimate("Purchases", users_estimate * 1.5, 230) # a fuzzy guess, some users have purchases, some don't + for i in range(cursor, count): user = generate_user(person, dt, i) yield generate_record(stream, user) @@ -162,6 +173,7 @@ def read( elif stream.stream.name == "Products": products = generate_products() + yield generate_estimate(stream.stream.name, len(products), 180) for p in products: yield generate_record(stream, p) yield generate_state(state, stream, {"product_count": len(products)}) @@ -204,6 +216,14 @@ def log_stream(stream_name: str): ) +def generate_estimate(stream_name: str, total: int, bytes_per_row: int): + emitted_at = int(datetime.datetime.now().timestamp() * 1000) + estimate = AirbyteEstimateTraceMessage( + type=EstimateType.STREAM, name=stream_name, row_estimate=round(total), byte_estimate=round(total * bytes_per_row) + ) + return AirbyteMessage(type=Type.TRACE, trace=AirbyteTraceMessage(type=TraceType.ESTIMATE, emitted_at=emitted_at, estimate=estimate)) + + def generate_state(state: Dict[str, any], stream: any, data: any): state[ stream.stream.name diff --git a/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py index 0db54325bffa..68a4351ba2b5 100644 --- a/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py @@ -44,7 +44,13 @@ def test_read_small_random_data(): logger = None config = {"count": 10} catalog = ConfiguredAirbyteCatalog( - streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}] + streams=[ + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + } + ] ) state = {} iterator = source.read(logger, config, catalog, state) @@ -70,8 +76,16 @@ def test_read_big_random_data(): config = {"count": 1000, "records_per_slice": 100, "records_per_sync": 1000} catalog = ConfiguredAirbyteCatalog( streams=[ - {"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, - {"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, + { + "stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, ] ) state = {} @@ -98,9 +112,21 @@ def test_with_purchases(): config = {"count": 1000, "records_per_sync": 1000} catalog = ConfiguredAirbyteCatalog( streams=[ - {"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, - {"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, - {"stream": {"name": "Purchases", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, + { + "stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, + { + "stream": {"name": "Purchases", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, ] ) state = {} @@ -128,7 +154,13 @@ def test_sync_ends_with_limit(): logger = None config = {"count": 100, "records_per_sync": 5} catalog = ConfiguredAirbyteCatalog( - streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}] + streams=[ + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + } + ] ) state = {} iterator = source.read(logger, config, catalog, state) @@ -157,7 +189,13 @@ def test_read_with_seed(): logger = None config = {"count": 1, "seed": 100} catalog = ConfiguredAirbyteCatalog( - streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}] + streams=[ + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + } + ] ) state = {} iterator = source.read(logger, config, catalog, state) diff --git a/docs/integrations/sources/faker.md b/docs/integrations/sources/faker.md index 54332a4f0ad0..c730c643d9dc 100644 --- a/docs/integrations/sources/faker.md +++ b/docs/integrations/sources/faker.md @@ -41,7 +41,8 @@ N/A | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------- | -| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! | +| 0.2.1 | 2022-10-14 | [19197](https://github.com/airbytehq/airbyte/pull/19197) | Emit `AirbyteEstimateTraceMessage` | +| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! | | 0.1.8 | 2022-10-12 | [17889](https://github.com/airbytehq/airbyte/pull/17889) | Bump to test publish command (2) | | 0.1.7 | 2022-10-11 | [17848](https://github.com/airbytehq/airbyte/pull/17848) | Bump to test publish command | | 0.1.6 | 2022-09-07 | [16418](https://github.com/airbytehq/airbyte/pull/16418) | Log start of each stream |