From 3f4e418b4d27d4981ad7d79979cf9dfc0bd73a32 Mon Sep 17 00:00:00 2001 From: siddhant Date: Thu, 8 Dec 2022 11:26:03 +0530 Subject: [PATCH 01/11] commcare connector --- .../connectors/source-commcare/.dockerignore | 6 + .../connectors/source-commcare/Dockerfile | 39 +++ .../connectors/source-commcare/README.md | 132 ++++++++ .../acceptance-test-config.yml | 24 ++ .../source-commcare/acceptance-test-docker.sh | 16 + .../connectors/source-commcare/build.gradle | 9 + .../integration_tests/__init__.py | 3 + .../integration_tests/abnormal_state.json | 5 + .../integration_tests/acceptance.py | 16 + .../integration_tests/catalog.json | 1 + .../integration_tests/configured_catalog.json | 24 ++ .../integration_tests/invalid_config.json | 5 + .../integration_tests/sample_config.json | 5 + .../integration_tests/sample_state.json | 5 + .../connectors/source-commcare/main.py | 13 + .../source-commcare/requirements.txt | 2 + .../sample_files/configured_catalog.json | 22 ++ .../connectors/source-commcare/setup.py | 32 ++ .../source_commcare/__init__.py | 8 + .../source_commcare/schemas/TODO.md | 25 ++ .../source-commcare/source_commcare/source.py | 311 ++++++++++++++++++ .../source-commcare/source_commcare/spec.yaml | 32 ++ .../source-commcare/unit_tests/__init__.py | 3 + .../source-commcare/unit_tests/test_source.py | 25 ++ 24 files changed, 763 insertions(+) create mode 100644 airbyte-integrations/connectors/source-commcare/.dockerignore create mode 100644 airbyte-integrations/connectors/source-commcare/Dockerfile create mode 100644 airbyte-integrations/connectors/source-commcare/README.md create mode 100644 airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-commcare/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-commcare/build.gradle create mode 100644 airbyte-integrations/connectors/source-commcare/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-commcare/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-commcare/integration_tests/catalog.json create mode 100644 airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json create mode 100644 airbyte-integrations/connectors/source-commcare/integration_tests/sample_state.json create mode 100644 airbyte-integrations/connectors/source-commcare/main.py create mode 100644 airbyte-integrations/connectors/source-commcare/requirements.txt create mode 100644 airbyte-integrations/connectors/source-commcare/sample_files/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-commcare/setup.py create mode 100644 airbyte-integrations/connectors/source-commcare/source_commcare/__init__.py create mode 100644 airbyte-integrations/connectors/source-commcare/source_commcare/schemas/TODO.md create mode 100644 airbyte-integrations/connectors/source-commcare/source_commcare/source.py create mode 100644 airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml create mode 100644 airbyte-integrations/connectors/source-commcare/unit_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py diff --git a/airbyte-integrations/connectors/source-commcare/.dockerignore b/airbyte-integrations/connectors/source-commcare/.dockerignore new file mode 100644 index 000000000000..5cc68fe9552e --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/.dockerignore @@ -0,0 +1,6 @@ +* +!Dockerfile +!main.py +!source_commcare +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-commcare/Dockerfile b/airbyte-integrations/connectors/source-commcare/Dockerfile new file mode 100644 index 000000000000..75fee2efa69c --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/Dockerfile @@ -0,0 +1,39 @@ +FROM python:3.9.15-slim-bullseye as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apt-get update && apt-get install -y && rm -rf /var/lib/apt/lists/* \ + && pip install --upgrade pip \ + && python3 -m pip install --upgrade setuptools + +RUN DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt-get -y install tzdata + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +# RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY source_commcare ./source_commcare + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-commcare diff --git a/airbyte-integrations/connectors/source-commcare/README.md b/airbyte-integrations/connectors/source-commcare/README.md new file mode 100644 index 000000000000..1505013ace3f --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/README.md @@ -0,0 +1,132 @@ +# Commcare Source + +This is the repository for the Commcare source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/commcare). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.9.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install '.[tests]' +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-commcare:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/commcare) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_commcare/spec.yaml` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source commcare test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-commcare:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-commcare:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-commcare:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-commcare:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-commcare:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-commcare:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-commcare:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-commcare:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml new file mode 100644 index 000000000000..5af2a0bb2c17 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml @@ -0,0 +1,24 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-commcare:dev +tests: + spec: + - spec_path: "source_commcare/spec.yaml" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + incremental: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-commcare/acceptance-test-docker.sh new file mode 100644 index 000000000000..c51577d10690 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-commcare/build.gradle b/airbyte-integrations/connectors/source-commcare/build.gradle new file mode 100644 index 000000000000..344c8004cd66 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_commcare' +} diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/__init__.py b/airbyte-integrations/connectors/source-commcare/integration_tests/__init__.py new file mode 100644 index 000000000000..1100c1c58cf5 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json new file mode 100644 index 000000000000..aecfdd3556f3 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "ANC PNC Visit": { + "indexed_on": "2023-11-25T20:30:30.2423" + } +} diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-commcare/integration_tests/acceptance.py new file mode 100644 index 000000000000..1302b2f57e10 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/acceptance.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies if needed. otherwise remove the TODO comments + yield + # TODO: clean up test dependencies diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/catalog.json b/airbyte-integrations/connectors/source-commcare/integration_tests/catalog.json new file mode 100644 index 000000000000..0967ef424bce --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/catalog.json @@ -0,0 +1 @@ +{} diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json new file mode 100644 index 000000000000..9ddcdafae151 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json @@ -0,0 +1,24 @@ +{ + "streams": [ + { + "stream": { + "name": "ANC PNC Visit", + "json_schema": {}, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], + "source_defined_cursor": true, + "default_cursor_field": [ + "indexed_on" + ] + }, + "sync_mode": "incremental", + "cursor_field": [ + "indexed_on" + ], + "destination_sync_mode": "append", + "source_defined_primary_key": [["id"]] + } + ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json new file mode 100644 index 000000000000..f6029970347e --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json @@ -0,0 +1,5 @@ +{ + "app_id": "wrong app_id", + "api_key": "wrong api key", + "start_date": "This has the wrong format" +} diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json new file mode 100644 index 000000000000..6b42862c26cc --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json @@ -0,0 +1,5 @@ +{ + "app_id": "App ID", + "api_key": "API KEY", + "start_date": "Start Date" +} diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_state.json new file mode 100644 index 000000000000..3587e579822d --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_state.json @@ -0,0 +1,5 @@ +{ + "todo-stream-name": { + "todo-field-name": "value" + } +} diff --git a/airbyte-integrations/connectors/source-commcare/main.py b/airbyte-integrations/connectors/source-commcare/main.py new file mode 100644 index 000000000000..9ec4bb3ccec6 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_commcare import SourceCommcare + +if __name__ == "__main__": + source = SourceCommcare() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-commcare/requirements.txt b/airbyte-integrations/connectors/source-commcare/requirements.txt new file mode 100644 index 000000000000..0411042aa091 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-commcare/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-commcare/sample_files/configured_catalog.json new file mode 100644 index 000000000000..efb1c1536963 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/sample_files/configured_catalog.json @@ -0,0 +1,22 @@ +{ + "type": "CATALOG", + "catalog": { + "streams": [ + { + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": [ + "incremental", + "full_refresh" + ], + "supported_destination_sync_modes": [ + "overwrite", + "append_dedup" + ] + } + ] + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/setup.py b/airbyte-integrations/connectors/source-commcare/setup.py new file mode 100644 index 000000000000..a2ec6ab5ca25 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/setup.py @@ -0,0 +1,32 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk", + "bigquery_schema_generator~=1.5", + "gbqschema_converter~=1.2.0", + "flatten_json~=0.1.13", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6.1", + "source-acceptance-test", +] + +setup( + name="source_commcare", + description="Source implementation for Commcare.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/__init__.py b/airbyte-integrations/connectors/source-commcare/source_commcare/__init__.py new file mode 100644 index 000000000000..29bd9305294b --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceCommcare + +__all__ = ["SourceCommcare"] diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/schemas/TODO.md b/airbyte-integrations/connectors/source-commcare/source_commcare/schemas/TODO.md new file mode 100644 index 000000000000..cf1efadb3c9c --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/schemas/TODO.md @@ -0,0 +1,25 @@ +# TODO: Define your stream schemas +Your connector must describe the schema of each stream it can output using [JSONSchema](https://json-schema.org). + +The simplest way to do this is to describe the schema of your streams using one `.json` file per stream. You can also dynamically generate the schema of your stream in code, or you can combine both approaches: start with a `.json` file and dynamically add properties to it. + +The schema of a stream is the return value of `Stream.get_json_schema`. + +## Static schemas +By default, `Stream.get_json_schema` reads a `.json` file in the `schemas/` directory whose name is equal to the value of the `Stream.name` property. In turn `Stream.name` by default returns the name of the class in snake case. Therefore, if you have a class `class EmployeeBenefits(HttpStream)` the default behavior will look for a file called `schemas/employee_benefits.json`. You can override any of these behaviors as you need. + +Important note: any objects referenced via `$ref` should be placed in the `shared/` directory in their own `.json` files. + +## Dynamic schemas +If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org). + +## Dynamically modifying static schemas +Override `Stream.get_json_schema` to run the default behavior, edit the returned value, then return the edited value: +``` +def get_json_schema(self): + schema = super().get_json_schema() + schema['dynamically_determined_property'] = "property" + return schema +``` + +Delete this file once you're done. Or don't. Up to you :) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py new file mode 100644 index 000000000000..da52998b2f41 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -0,0 +1,311 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import re +from abc import ABC +from collections import defaultdict +from datetime import datetime, timedelta +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +from urllib.parse import parse_qs + +import requests +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import IncrementalMixin, Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +from flatten_json import flatten + + +# Basic full refresh stream +class CommcareStream(HttpStream, ABC): + url_base = "https://www.commcarehq.org/a/sc-baseline/api/v0.5/" + + # These class variables save state + # forms holds form ids and we filter cases which contain one of these form ids + # last_form_date stores the date of the last form read so the next cycle for forms and cases starts at the same timestamp + forms = set() + last_form_date = None + schemas = {} + unwantedfields = re.compile(r"^(case_|update_|meta|create_|commcare_).*$") + + @property + def dateformat(self): + return "%Y-%m-%dT%H:%M:%S.%f" + + def scrubUnwantedFields(self, form): + newform = {k: v for k, v in form.items() if not self.unwantedfields.match(k)} + return newform + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + try: + # Server returns status 500 when there are no more rows. + # raise an error if server returns an error + response.raise_for_status() + meta = response.json()["meta"] + return parse_qs(meta["next"][1:]) + except: + return None + return None + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + params = {"format": "json"} + return params + + +class Application(CommcareStream): + primary_key = "id" + + def __init__(self, app_id, **kwargs): + super().__init__(**kwargs) + self.app_id = app_id + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return f"application/{self.app_id}/" + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + params = {"format": "json", "extras": "true"} + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + yield response.json() + + +class IncrementalStream(CommcareStream, IncrementalMixin): + cursor_field = "indexed_on" + _cursor_value = None + + @property + def state(self) -> Mapping[str, Any]: + if self._cursor_value: + return {self.cursor_field: self._cursor_value} + + @state.setter + def state(self, value: Mapping[str, Any]): + self._cursor_value = datetime.strptime(value[self.cursor_field], self.dateformat) + + @property + def sync_mode(self): + return SyncMode.incremental + + @property + def supported_sync_modes(self): + return [SyncMode.incremental] + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + try: + # Server returns status 500 when there are no more rows. + # raise an error if server returns an error + response.raise_for_status() + meta = response.json()["meta"] + return parse_qs(meta["next"][1:]) + except: + return None + return None + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + params = {"format": "json"} + if next_page_token: + params.update(next_page_token) + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + for o in iter(response.json()["objects"]): + yield o + return None + + +class Case(IncrementalStream): + cursor_field = "indexed_on" + primary_key = "case_id" + + def __init__(self, start_date, app_id, schema, **kwargs): + super().__init__(**kwargs) + self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") + self.schema = schema + + def get_json_schema(self): + return self.schema + + @property + def name(self): + # Airbyte orders streams in alpha order but since we have dependent peers and we need to + # pull all forms before cases, we name this stream to + # ensure this stream gets pulled last (assuming ascii stream names only) + return "zzz_case" + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "case" + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + # start date is what we saved for forms + ix = self.state[self.cursor_field] # if self.cursor_field in self.state else (CommcareStream.last_form_date or self.initial_date) + params = {"format": "json", "indexed_on_start": ix.strftime(self.dateformat), "order_by": "indexed_on", "limit": "5000"} + if next_page_token: + params.update(next_page_token) + return params + + def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + for record in super().read_records(*args, **kwargs): + found = False + for f in record["xform_ids"]: + if f in CommcareStream.forms: + found = True + break + if found: + self._cursor_value = datetime.strptime(record[self.cursor_field], self.dateformat) + record.update({"streamname": "case", "indexed_on": record["indexed_on"] + "Z"}) # Make indexed_on tz aware + # convert xform_ids field from array to comma separated list so flattening won't create + # one field per item. This is because some cases have up to 2000 xform_ids and we don't want 2000 extra + # fields in the schema + record["xform_ids"] = ",".join(record["xform_ids"]) + frec = flatten(record) + # print(frec) + yield frec + if self._cursor_value.microsecond == 0: + # Airbyte converts the cursor_field value (datetime) to string when it saves the state and + # our state setter parses the saved state with a format that contains microseconds + # self._cursor_value must have non-zero microseconds for the formatting and parsing to work correctly. + # This issue would also occur if an incoming record had a timestamp with zero microseconds + self._cursor_value = self._cursor_value.replace(microsecond=10) + # This cycle of pull is complete so clear out the form ids we saved for this cycle + CommcareStream.forms.clear() + + +class Form(IncrementalStream): + cursor_field = "indexed_on" + primary_key = "id" + + def __init__(self, start_date, app_id, name, xmlns, schema, **kwargs): + super().__init__(**kwargs) + self.app_id = app_id + self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") + self.streamname = name + self.xmlns = xmlns + self.schema = schema + + @property + def name(self): + return self.streamname + + def get_json_schema(self): + return self.schema + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "form" + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + ix = self.state[self.cursor_field] # if self.cursor_field in self.state else self.initial_date + params = { + "format": "json", + "app_id": self.app_id, + "indexed_on_start": ix.strftime(self.dateformat), + "order_by": "indexed_on", + "limit": "1000", + "xmlns": self.xmlns, + } + if next_page_token: + params.update(next_page_token) + return params + + def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + upd = {"streamname": self.streamname, "xmlns": self.xmlns} + for record in super().read_records(*args, **kwargs): + self._cursor_value = datetime.strptime(record[self.cursor_field], self.dateformat) + CommcareStream.forms.add(record["id"]) + form = record["form"] + form.update(upd) + form.update({"id": record["id"], "indexed_on": record["indexed_on"] + "Z"}) # Append Z to make it timezone aware + newform = self.scrubUnwantedFields(form) + yield flatten(newform) + if self._cursor_value.microsecond == 0: + # Airbyte converts the cursor_field value (datetime) to string when it saves the state and + # our state setter parses the saved state with a format that contains microseconds + # self._cursor_value must have non-zero microseconds for the formatting and parsing to work correctly. + # This issue would also occur if an incoming record had a timestamp with zero microseconds + self._cursor_value = self._cursor_value.replace(microsecond=10) + + +# Source +class SourceCommcare(AbstractSource): + def check_connection(self, logger, config) -> Tuple[bool, any]: + if not "api_key" in config: + # print("Returning No") + return False, None + # print("Returning Yes") + return True, None + + def empty_schema(self): + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {}, + } + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + auth = TokenAuthenticator(config["api_key"], auth_method="ApiKey") + args = { + "authenticator": auth, + } + appdata = Application(**{**args, "app_id": config["app_id"]}).read_records(sync_mode=SyncMode.full_refresh) + + # Generate streams for forms, one per xmlns and one stream for cases. + streams = self.generate_streams(args, config, appdata) + return streams + + def generate_streams(self, args, config, appdata): + form_args = {**args, "app_id": config["app_id"], "start_date": config["start_date"]} + streams = [] + name2xmlns = {} + + # Collect the form names and xmlns from the application + for record in appdata: + mods = record["modules"] + for m in mods: + forms = m["forms"] + for f in forms: + xmlns = f["xmlns"] + formname = "" + if "en" in f["name"]: + formname = f["name"]["en"].strip() + else: + # Unknown forms are named UNNAMED_xxxxx where xxxxx are the last 5 difits of the XMLNS + # This convention gives us repeatable names + formname = f"Unnamed_{xmlns[-5:]}" + + name = formname + name2xmlns[name] = xmlns + + # Create the streams from the collected names + # Sorted by name + for k in sorted(name2xmlns): + key = name2xmlns[k] + stream = Form(**form_args, name=k, xmlns=key, schema=self.empty_schema()) + streams.append(stream) + + stream = Case(**args, app_id=config["app_id"], start_date=config["start_date"], schema=self.empty_schema()) + streams.append(stream) + + return streams diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml new file mode 100644 index 000000000000..dac076bc5006 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml @@ -0,0 +1,32 @@ +documentationUrl: https://docsurl.com +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: Commcare Source Spec + type: object + required: + - api_key + - app_id + - start_date + properties: + api_key: + type: string + title: API Key + description: >- + Commcare API Key + airbyte_secret: true + order: 0 + app_id: + type: string + title: Application ID + description: >- + The Application ID we are interested in + airbyte_secret: true + order: 1 + start_date: + type: string + title: Start date for extracting records + pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$ + default: "2022-10-01T00:00:00Z" + description: >- + UTC date and time in the format 2017-01-25T00:00:00Z. Only records after this date will be replicated. + order: 2 diff --git a/airbyte-integrations/connectors/source-commcare/unit_tests/__init__.py b/airbyte-integrations/connectors/source-commcare/unit_tests/__init__.py new file mode 100644 index 000000000000..1100c1c58cf5 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/unit_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py b/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py new file mode 100644 index 000000000000..fb840f804c3d --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock, Mock + +import pytest +from source_commcare.source import SourceCommcare + + +@pytest.fixture(name='config') +def config_fixture(): + return {'api_key': 'apikey', 'app_id': 'appid', 'start_date': '2022-01-01T00:00:00Z'} + + +def test_check_connection_ok(mocker, config): + source = SourceCommcare() + logger_mock=Mock() + assert source.check_connection(logger_mock, config=config) == (True, None) + +def test_check_connection_fail(mocker, config): + source = SourceCommcare() + logger_mock =MagicMock() + assert source.check_connection(logger_mock, config={}) == (False, None) + From ec25becdb5ebd569f99b2f69bdfac6a187a0b9a7 Mon Sep 17 00:00:00 2001 From: siddhant Date: Thu, 8 Dec 2022 12:02:52 +0530 Subject: [PATCH 02/11] flake improvement --- .../source-commcare/source_commcare/source.py | 15 ++++++--------- .../source-commcare/unit_tests/test_source.py | 6 +++--- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py index da52998b2f41..6410b872d5d4 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -4,8 +4,7 @@ import re from abc import ABC -from collections import defaultdict -from datetime import datetime, timedelta +from datetime import datetime from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple from urllib.parse import parse_qs @@ -45,9 +44,8 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, response.raise_for_status() meta = response.json()["meta"] return parse_qs(meta["next"][1:]) - except: - return None - return None + except Exception as ex: + return ex def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None @@ -108,9 +106,8 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, response.raise_for_status() meta = response.json()["meta"] return parse_qs(meta["next"][1:]) - except: - return None - return None + except Exception as ex: + return ex def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None @@ -251,7 +248,7 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: # Source class SourceCommcare(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, any]: - if not "api_key" in config: + if "api_key" not in config: # print("Returning No") return False, None # print("Returning Yes") diff --git a/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py b/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py index fb840f804c3d..d1f57393817f 100644 --- a/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py @@ -15,11 +15,11 @@ def config_fixture(): def test_check_connection_ok(mocker, config): source = SourceCommcare() - logger_mock=Mock() + logger_mock = Mock() assert source.check_connection(logger_mock, config=config) == (True, None) + def test_check_connection_fail(mocker, config): source = SourceCommcare() - logger_mock =MagicMock() + logger_mock = MagicMock() assert source.check_connection(logger_mock, config={}) == (False, None) - From a0d52492616ec2dd4429af4238d04cc6bf4d7519 Mon Sep 17 00:00:00 2001 From: siddhant Date: Thu, 8 Dec 2022 12:16:34 +0530 Subject: [PATCH 03/11] added the docs --- docs/integrations/sources/commcare.md | 39 +++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 docs/integrations/sources/commcare.md diff --git a/docs/integrations/sources/commcare.md b/docs/integrations/sources/commcare.md new file mode 100644 index 000000000000..045b2e89b5f5 --- /dev/null +++ b/docs/integrations/sources/commcare.md @@ -0,0 +1,39 @@ +# Commcare + +This page guides you through the process of setting up the Commcare source connector. + +## Prerequisites + +- Your Commcare API Key +- The Application ID you are interested in +- The start date to replicate records + +## Set up the Commcare source connector + +1. Log into your [Airbyte Cloud](https://cloud.airbyte.io/workspaces) or Airbyte Open Source account. +2. Click **Sources** and then click **+ New source**. +3. On the Set up the source page, select **Commcare** from the Source type dropdown. +4. Enter a name for your source. +5. For **API Key**, enter your Commcare API Key. +6. Click **Set up source**. + +## Supported sync modes + +The Commcare source connector supports the following [sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-sync-modes): + +- Full Refresh +- Overwrite +- Incremental + +## Supported Streams + +The Commcare source connector supports the following streams: + +- Application +- Case +- Form + +## Changelog + +| Version | Date | Pull Request | Subject | +| 0.1.0 | 2022-11-08 | [20220](https://github.com/airbytehq/airbyte/pull/20220) | Commcare Source Connector | \ No newline at end of file From 63a3aa2bd055ee406dd729e16231b0b797239915 Mon Sep 17 00:00:00 2001 From: siddhant Date: Thu, 8 Dec 2022 12:35:39 +0530 Subject: [PATCH 04/11] added the config --- .../init/src/main/resources/seed/source_definitions.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f86f441d6e1c..0b1bab9117c6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -327,6 +327,13 @@ documentationUrl: https://docs.airbyte.com/integrations/sources/convertkit sourceType: api releaseStage: alpha +- name: Commcare + sourceDefinitionId: ee4532f4-15f0-4639-9w71-41719fb1fdff + dockerRepository: airbyte/source-commcare + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.com/integrations/sources/commcare + sourceType: api + releaseStage: alpha - name: Copper sourceDefinitionId: 44f3002f-2df9-4f6d-b21c-02cd3b47d0dc dockerRepository: airbyte/source-copper From 973781a0c63aa54ce08302ba332b6269da7f7098 Mon Sep 17 00:00:00 2001 From: siddhant Date: Tue, 20 Dec 2022 15:34:34 +0530 Subject: [PATCH 05/11] project_space added in the config --- .vscode/settings.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../acceptance-test-config.yml | 24 ++-- .../integration_tests/abnormal_state.json | 6 +- .../integration_tests/configured_catalog.json | 114 ++++++++++++++++-- .../integration_tests/invalid_config.json | 3 +- .../integration_tests/sample_config.json | 1 + .../source-commcare/source_commcare/source.py | 37 ++++-- .../source-commcare/source_commcare/spec.yaml | 10 +- 9 files changed, 156 insertions(+), 43 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index e52044ed1069..a328b59d4c58 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -33,7 +33,7 @@ }, "[json]": { "editor.formatOnSave": true, - "editor.defaultFormatter": "esbenp.prettier-vscode" + "editor.defaultFormatter": "vscode.json-language-features" }, "[scss]": { "editor.formatOnSave": true, diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index bc53489e356d..a67a8969fddd 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -328,7 +328,7 @@ sourceType: api releaseStage: alpha - name: Commcare - sourceDefinitionId: ee4532f4-15f0-4639-9w71-41719fb1fdff + sourceDefinitionId: f39208dc-7e1c-48b8-919b-5006360cc27f dockerRepository: airbyte/source-commcare dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.com/integrations/sources/commcare diff --git a/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml index 5af2a0bb2c17..51b6e95bd902 100644 --- a/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml @@ -1,6 +1,6 @@ # See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) # for more information about how to configure these tests -connector_image: airbyte/source-commcare:dev +connector_image: airbyte/source-commcare:0.1.0 tests: spec: - spec_path: "source_commcare/spec.yaml" @@ -11,14 +11,14 @@ tests: status: "failed" discovery: - config_path: "secrets/config.json" - basic_read: - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - empty_streams: [] - full_refresh: - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - incremental: - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - future_state_path: "integration_tests/abnormal_state.json" \ No newline at end of file + # basic_read: + # - config_path: "secrets/config.json" + # configured_catalog_path: "integration_tests/configured_catalog.json" + # empty_streams: [] + # full_refresh: + # - config_path: "secrets/config.json" + # configured_catalog_path: "integration_tests/configured_catalog.json" + # incremental: + # - config_path: "secrets/config.json" + # configured_catalog_path: "integration_tests/configured_catalog.json" + # future_state_path: "integration_tests/abnormal_state.json" \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json index aecfdd3556f3..210e13709225 100644 --- a/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json @@ -1,5 +1,5 @@ { - "ANC PNC Visit": { - "indexed_on": "2023-11-25T20:30:30.2423" + "Assess a referred patient": { + "indexed_on": "2022-11-25T20:30:30.2423" } -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json index 9ddcdafae151..f4830dba4559 100644 --- a/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json @@ -1,9 +1,14 @@ { - "streams": [ - { - "stream": { - "name": "ANC PNC Visit", - "json_schema": {}, + "type": "CATALOG", + "catalog": { + "streams": [ + { + "name": "Assess a referred patient", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, "supported_sync_modes": [ "full_refresh", "incremental" @@ -11,14 +16,97 @@ "source_defined_cursor": true, "default_cursor_field": [ "indexed_on" + ], + "source_defined_primary_key": [ + [ + "id" + ] + ] + }, + { + "name": "Birth Outcome", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], + "source_defined_cursor": true, + "default_cursor_field": [ + "indexed_on" + ], + "source_defined_primary_key": [ + [ + "id" + ] + ] + }, + { + "name": "Mid-Pregnancy Home Visit", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], + "source_defined_cursor": true, + "default_cursor_field": [ + "indexed_on" + ], + "source_defined_primary_key": [ + [ + "id" + ] + ] + }, + { + "name": "STEP 1: Register a new patient", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], + "source_defined_cursor": true, + "default_cursor_field": [ + "indexed_on" + ], + "source_defined_primary_key": [ + [ + "id" + ] ] }, - "sync_mode": "incremental", - "cursor_field": [ - "indexed_on" - ], - "destination_sync_mode": "append", - "source_defined_primary_key": [["id"]] - } - ] + { + "name": "zzz_case", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], + "source_defined_cursor": true, + "default_cursor_field": [ + "indexed_on" + ], + "source_defined_primary_key": [ + [ + "case_id" + ] + ] + } + ] + } } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json index f6029970347e..98da2559f717 100644 --- a/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json @@ -1,5 +1,6 @@ { "app_id": "wrong app_id", "api_key": "wrong api key", - "start_date": "This has the wrong format" + "start_date": "This has the wrong format", + "project_space": "project_space" } diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json index 6b42862c26cc..2613cb691748 100644 --- a/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json @@ -1,5 +1,6 @@ { "app_id": "App ID", "api_key": "API KEY", + "project_space": "project_space", "start_date": "Start Date" } diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py index 6410b872d5d4..197c3fd7c25e 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -19,7 +19,13 @@ # Basic full refresh stream class CommcareStream(HttpStream, ABC): - url_base = "https://www.commcarehq.org/a/sc-baseline/api/v0.5/" + def __init__(self, project_space, **kwargs): + super().__init__(**kwargs) + self.project_space = project_space + + @property + def url_base(self) -> str: + return f"https://www.commcarehq.org/a/{self.project_space}/api/v0.5/" # These class variables save state # forms holds form ids and we filter cases which contain one of these form ids @@ -67,6 +73,9 @@ def path( ) -> str: return f"application/{self.app_id}/" + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: @@ -105,9 +114,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, # raise an error if server returns an error response.raise_for_status() meta = response.json()["meta"] - return parse_qs(meta["next"][1:]) - except Exception as ex: - return ex + if meta["next"]: + return parse_qs(meta["next"][1:]) + return None + except Exception: + return None def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None @@ -249,9 +260,7 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: class SourceCommcare(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, any]: if "api_key" not in config: - # print("Returning No") return False, None - # print("Returning Yes") return True, None def empty_schema(self): @@ -266,14 +275,16 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: args = { "authenticator": auth, } - appdata = Application(**{**args, "app_id": config["app_id"]}).read_records(sync_mode=SyncMode.full_refresh) + appdata = Application(**{**args, "app_id": config["app_id"], "project_space": config["project_space"]}).read_records( + sync_mode=SyncMode.full_refresh + ) # Generate streams for forms, one per xmlns and one stream for cases. streams = self.generate_streams(args, config, appdata) return streams def generate_streams(self, args, config, appdata): - form_args = {**args, "app_id": config["app_id"], "start_date": config["start_date"]} + form_args = {"app_id": config["app_id"], "start_date": config["start_date"], "project_space": config["project_space"], **args} streams = [] name2xmlns = {} @@ -299,10 +310,16 @@ def generate_streams(self, args, config, appdata): # Sorted by name for k in sorted(name2xmlns): key = name2xmlns[k] - stream = Form(**form_args, name=k, xmlns=key, schema=self.empty_schema()) + stream = Form(name=k, xmlns=key, schema=self.empty_schema(), **form_args) streams.append(stream) - stream = Case(**args, app_id=config["app_id"], start_date=config["start_date"], schema=self.empty_schema()) + stream = Case( + app_id=config["app_id"], + start_date=config["start_date"], + schema=self.empty_schema(), + project_space=config["project_space"], + **args, + ) streams.append(stream) return streams diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml index dac076bc5006..97f9e4b43598 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml @@ -15,13 +15,19 @@ connectionSpecification: Commcare API Key airbyte_secret: true order: 0 + project_space: + type: string + title: Project Space + description: >- + Project Space for commcare + order: 1 app_id: type: string title: Application ID description: >- The Application ID we are interested in airbyte_secret: true - order: 1 + order: 2 start_date: type: string title: Start date for extracting records @@ -29,4 +35,4 @@ connectionSpecification: default: "2022-10-01T00:00:00Z" description: >- UTC date and time in the format 2017-01-25T00:00:00Z. Only records after this date will be replicated. - order: 2 + order: 3 From 3950f9ad5a5d8bbb364d0660eca980a19d6aab21 Mon Sep 17 00:00:00 2001 From: siddhant Date: Tue, 20 Dec 2022 15:35:18 +0530 Subject: [PATCH 06/11] project space is required --- .../connectors/source-commcare/source_commcare/spec.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml index 97f9e4b43598..20e62fa81984 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml @@ -7,6 +7,7 @@ connectionSpecification: - api_key - app_id - start_date + - project_space properties: api_key: type: string From 73e4c998c24801a9e34e31fc153bd3241afb841f Mon Sep 17 00:00:00 2001 From: siddhant Date: Tue, 20 Dec 2022 15:51:51 +0530 Subject: [PATCH 07/11] cleaning --- .../connectors/source-commcare/source_commcare/source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py index 197c3fd7c25e..0a15f25554c3 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -185,7 +185,6 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: # fields in the schema record["xform_ids"] = ",".join(record["xform_ids"]) frec = flatten(record) - # print(frec) yield frec if self._cursor_value.microsecond == 0: # Airbyte converts the cursor_field value (datetime) to string when it saves the state and From bf4180ec25cafff7491435a50d662e8298c2fb3b Mon Sep 17 00:00:00 2001 From: siddhant Date: Tue, 20 Dec 2022 18:44:51 +0530 Subject: [PATCH 08/11] added test case --- .../acceptance-test-config.yml | 26 ++-- .../integration_tests/abnormal_state.json | 2 +- .../integration_tests/configured_catalog.json | 111 ++---------------- .../source-commcare/source_commcare/source.py | 65 +++++++--- .../source-commcare/source_commcare/spec.yaml | 1 - 5 files changed, 74 insertions(+), 131 deletions(-) diff --git a/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml index 51b6e95bd902..db8213d2fe8f 100644 --- a/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml @@ -1,6 +1,6 @@ # See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) # for more information about how to configure these tests -connector_image: airbyte/source-commcare:0.1.0 +connector_image: airbyte/source-commcare:dev tests: spec: - spec_path: "source_commcare/spec.yaml" @@ -11,14 +11,16 @@ tests: status: "failed" discovery: - config_path: "secrets/config.json" - # basic_read: - # - config_path: "secrets/config.json" - # configured_catalog_path: "integration_tests/configured_catalog.json" - # empty_streams: [] - # full_refresh: - # - config_path: "secrets/config.json" - # configured_catalog_path: "integration_tests/configured_catalog.json" - # incremental: - # - config_path: "secrets/config.json" - # configured_catalog_path: "integration_tests/configured_catalog.json" - # future_state_path: "integration_tests/abnormal_state.json" \ No newline at end of file + backward_compatibility_tests_config: + disable_for_version: "0.1.0" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + incremental: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json index 210e13709225..6cfc6686b6e0 100644 --- a/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json @@ -1,5 +1,5 @@ { "Assess a referred patient": { - "indexed_on": "2022-11-25T20:30:30.2423" + "indexed_on": "2023-11-25T20:30:30.2423" } } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json index f4830dba4559..d34c7a7f43ed 100644 --- a/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json @@ -1,14 +1,9 @@ { - "type": "CATALOG", - "catalog": { - "streams": [ - { + "streams": [ + { + "stream": { "name": "Assess a referred patient", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": {} - }, + "json_schema": {}, "supported_sync_modes": [ "full_refresh", "incremental" @@ -16,97 +11,13 @@ "source_defined_cursor": true, "default_cursor_field": [ "indexed_on" - ], - "source_defined_primary_key": [ - [ - "id" - ] - ] - }, - { - "name": "Birth Outcome", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": {} - }, - "supported_sync_modes": [ - "full_refresh", - "incremental" - ], - "source_defined_cursor": true, - "default_cursor_field": [ - "indexed_on" - ], - "source_defined_primary_key": [ - [ - "id" - ] - ] - }, - { - "name": "Mid-Pregnancy Home Visit", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": {} - }, - "supported_sync_modes": [ - "full_refresh", - "incremental" - ], - "source_defined_cursor": true, - "default_cursor_field": [ - "indexed_on" - ], - "source_defined_primary_key": [ - [ - "id" - ] - ] - }, - { - "name": "STEP 1: Register a new patient", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": {} - }, - "supported_sync_modes": [ - "full_refresh", - "incremental" - ], - "source_defined_cursor": true, - "default_cursor_field": [ - "indexed_on" - ], - "source_defined_primary_key": [ - [ - "id" - ] ] }, - { - "name": "zzz_case", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": {} - }, - "supported_sync_modes": [ - "full_refresh", - "incremental" - ], - "source_defined_cursor": true, - "default_cursor_field": [ - "indexed_on" - ], - "source_defined_primary_key": [ - [ - "case_id" - ] - ] - } - ] - } + "sync_mode": "incremental", + "cursor_field": [ + "indexed_on" + ], + "destination_sync_mode": "append" + } + ] } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py index 0a15f25554c3..e3a60c6fd8fd 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -40,7 +40,8 @@ def dateformat(self): return "%Y-%m-%dT%H:%M:%S.%f" def scrubUnwantedFields(self, form): - newform = {k: v for k, v in form.items() if not self.unwantedfields.match(k)} + newform = {k: v for k, v in form.items( + ) if not self.unwantedfields.match(k)} return newform def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: @@ -98,7 +99,8 @@ def state(self) -> Mapping[str, Any]: @state.setter def state(self, value: Mapping[str, Any]): - self._cursor_value = datetime.strptime(value[self.cursor_field], self.dateformat) + self._cursor_value = datetime.strptime( + value[self.cursor_field], self.dateformat) @property def sync_mode(self): @@ -136,12 +138,18 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class Case(IncrementalStream): + + """ + docs: https://www.commcarehq.org/a/[domain]/api/[version]/case/ + """ + cursor_field = "indexed_on" - primary_key = "case_id" + primary_key = "id" def __init__(self, start_date, app_id, schema, **kwargs): super().__init__(**kwargs) - self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") + self._cursor_value = datetime.strptime( + start_date, "%Y-%m-%dT%H:%M:%SZ") self.schema = schema def get_json_schema(self): @@ -164,8 +172,10 @@ def request_params( ) -> MutableMapping[str, Any]: # start date is what we saved for forms - ix = self.state[self.cursor_field] # if self.cursor_field in self.state else (CommcareStream.last_form_date or self.initial_date) - params = {"format": "json", "indexed_on_start": ix.strftime(self.dateformat), "order_by": "indexed_on", "limit": "5000"} + # if self.cursor_field in self.state else (CommcareStream.last_form_date or self.initial_date) + ix = self.state[self.cursor_field] + params = {"format": "json", "indexed_on_start": ix.strftime( + self.dateformat), "order_by": "indexed_on", "limit": "5000"} if next_page_token: params.update(next_page_token) return params @@ -178,8 +188,11 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: found = True break if found: - self._cursor_value = datetime.strptime(record[self.cursor_field], self.dateformat) - record.update({"streamname": "case", "indexed_on": record["indexed_on"] + "Z"}) # Make indexed_on tz aware + self._cursor_value = datetime.strptime( + record[self.cursor_field], self.dateformat) + # Make indexed_on tz aware + record.update( + {"streamname": "case", "indexed_on": record["indexed_on"] + "Z"}) # convert xform_ids field from array to comma separated list so flattening won't create # one field per item. This is because some cases have up to 2000 xform_ids and we don't want 2000 extra # fields in the schema @@ -197,13 +210,17 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: class Form(IncrementalStream): + """ + docs: https://www.commcarehq.org/a/[domain]/api/[version]/form/ + """ cursor_field = "indexed_on" primary_key = "id" def __init__(self, start_date, app_id, name, xmlns, schema, **kwargs): super().__init__(**kwargs) self.app_id = app_id - self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") + self._cursor_value = datetime.strptime( + start_date, "%Y-%m-%dT%H:%M:%SZ") self.streamname = name self.xmlns = xmlns self.schema = schema @@ -224,7 +241,8 @@ def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: - ix = self.state[self.cursor_field] # if self.cursor_field in self.state else self.initial_date + # if self.cursor_field in self.state else self.initial_date + ix = self.state[self.cursor_field] params = { "format": "json", "app_id": self.app_id, @@ -240,11 +258,14 @@ def request_params( def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: upd = {"streamname": self.streamname, "xmlns": self.xmlns} for record in super().read_records(*args, **kwargs): - self._cursor_value = datetime.strptime(record[self.cursor_field], self.dateformat) + self._cursor_value = datetime.strptime( + record[self.cursor_field], self.dateformat) CommcareStream.forms.add(record["id"]) form = record["form"] form.update(upd) - form.update({"id": record["id"], "indexed_on": record["indexed_on"] + "Z"}) # Append Z to make it timezone aware + # Append Z to make it timezone aware + form.update( + {"id": record["id"], "indexed_on": record["indexed_on"] + "Z"}) newform = self.scrubUnwantedFields(form) yield flatten(newform) if self._cursor_value.microsecond == 0: @@ -262,11 +283,19 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: return False, None return True, None - def empty_schema(self): + def base_schema(self): return { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", - "properties": {}, + "properties": { + "id": { + "type": "string" + }, + "indexed_on": { + "type": "string", + "format": "date-time" + } + } } def streams(self, config: Mapping[str, Any]) -> List[Stream]: @@ -283,7 +312,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: return streams def generate_streams(self, args, config, appdata): - form_args = {"app_id": config["app_id"], "start_date": config["start_date"], "project_space": config["project_space"], **args} + form_args = {"app_id": config["app_id"], "start_date": config["start_date"], + "project_space": config["project_space"], **args} streams = [] name2xmlns = {} @@ -309,13 +339,14 @@ def generate_streams(self, args, config, appdata): # Sorted by name for k in sorted(name2xmlns): key = name2xmlns[k] - stream = Form(name=k, xmlns=key, schema=self.empty_schema(), **form_args) + stream = Form(name=k, xmlns=key, + schema=self.base_schema(), **form_args) streams.append(stream) stream = Case( app_id=config["app_id"], start_date=config["start_date"], - schema=self.empty_schema(), + schema=self.base_schema(), project_space=config["project_space"], **args, ) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml index 20e62fa81984..97f9e4b43598 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml @@ -7,7 +7,6 @@ connectionSpecification: - api_key - app_id - start_date - - project_space properties: api_key: type: string From d7e124ad2af41bbd1380e1c1607ea02b66bb5905 Mon Sep 17 00:00:00 2001 From: siddhant Date: Thu, 5 Jan 2023 23:33:27 +0530 Subject: [PATCH 09/11] disbaled the backward compatibility test --- .../connectors/source-commcare/acceptance-test-config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml index db8213d2fe8f..4f8fe24e4ca6 100644 --- a/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml @@ -9,10 +9,10 @@ tests: status: "succeed" - config_path: "integration_tests/invalid_config.json" status: "failed" - discovery: - - config_path: "secrets/config.json" - backward_compatibility_tests_config: - disable_for_version: "0.1.0" + # discovery: + # - config_path: "secrets/config.json" + # backward_compatibility_tests_config: + # disable_for_version: "0.1.0" basic_read: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" From 3546eb8c1ec0789ff1d15bf4cbfe8f8a6338306c Mon Sep 17 00:00:00 2001 From: sajarin Date: Thu, 5 Jan 2023 13:54:18 -0500 Subject: [PATCH 10/11] feat: revert .vscode/settings.json --- .vscode/settings.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index a328b59d4c58..e52044ed1069 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -33,7 +33,7 @@ }, "[json]": { "editor.formatOnSave": true, - "editor.defaultFormatter": "vscode.json-language-features" + "editor.defaultFormatter": "esbenp.prettier-vscode" }, "[scss]": { "editor.formatOnSave": true, From c1161931ab9653f81b63585341a43e7db8c495af Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Thu, 5 Jan 2023 19:09:54 +0000 Subject: [PATCH 11/11] auto-bump connector version --- .../src/main/resources/seed/source_specs.yaml | 40 +++++++++++++++++ .../source-commcare/source_commcare/source.py | 44 ++++++------------- 2 files changed, 53 insertions(+), 31 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 56a59de04c41..da8a28c67873 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2799,6 +2799,46 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] +- dockerImage: "airbyte/source-commcare:0.1.0" + spec: + documentationUrl: "https://docsurl.com" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "Commcare Source Spec" + type: "object" + required: + - "api_key" + - "app_id" + - "start_date" + properties: + api_key: + type: "string" + title: "API Key" + description: "Commcare API Key" + airbyte_secret: true + order: 0 + project_space: + type: "string" + title: "Project Space" + description: "Project Space for commcare" + order: 1 + app_id: + type: "string" + title: "Application ID" + description: "The Application ID we are interested in" + airbyte_secret: true + order: 2 + start_date: + type: "string" + title: "Start date for extracting records" + pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" + default: "2022-10-01T00:00:00Z" + description: "UTC date and time in the format 2017-01-25T00:00:00Z. Only\ + \ records after this date will be replicated." + order: 3 + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] - dockerImage: "airbyte/source-copper:0.1.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/copper" diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py index e3a60c6fd8fd..df3b9f9ef40e 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -40,8 +40,7 @@ def dateformat(self): return "%Y-%m-%dT%H:%M:%S.%f" def scrubUnwantedFields(self, form): - newform = {k: v for k, v in form.items( - ) if not self.unwantedfields.match(k)} + newform = {k: v for k, v in form.items() if not self.unwantedfields.match(k)} return newform def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: @@ -99,8 +98,7 @@ def state(self) -> Mapping[str, Any]: @state.setter def state(self, value: Mapping[str, Any]): - self._cursor_value = datetime.strptime( - value[self.cursor_field], self.dateformat) + self._cursor_value = datetime.strptime(value[self.cursor_field], self.dateformat) @property def sync_mode(self): @@ -148,8 +146,7 @@ class Case(IncrementalStream): def __init__(self, start_date, app_id, schema, **kwargs): super().__init__(**kwargs) - self._cursor_value = datetime.strptime( - start_date, "%Y-%m-%dT%H:%M:%SZ") + self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") self.schema = schema def get_json_schema(self): @@ -174,8 +171,7 @@ def request_params( # start date is what we saved for forms # if self.cursor_field in self.state else (CommcareStream.last_form_date or self.initial_date) ix = self.state[self.cursor_field] - params = {"format": "json", "indexed_on_start": ix.strftime( - self.dateformat), "order_by": "indexed_on", "limit": "5000"} + params = {"format": "json", "indexed_on_start": ix.strftime(self.dateformat), "order_by": "indexed_on", "limit": "5000"} if next_page_token: params.update(next_page_token) return params @@ -188,11 +184,9 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: found = True break if found: - self._cursor_value = datetime.strptime( - record[self.cursor_field], self.dateformat) + self._cursor_value = datetime.strptime(record[self.cursor_field], self.dateformat) # Make indexed_on tz aware - record.update( - {"streamname": "case", "indexed_on": record["indexed_on"] + "Z"}) + record.update({"streamname": "case", "indexed_on": record["indexed_on"] + "Z"}) # convert xform_ids field from array to comma separated list so flattening won't create # one field per item. This is because some cases have up to 2000 xform_ids and we don't want 2000 extra # fields in the schema @@ -213,14 +207,14 @@ class Form(IncrementalStream): """ docs: https://www.commcarehq.org/a/[domain]/api/[version]/form/ """ + cursor_field = "indexed_on" primary_key = "id" def __init__(self, start_date, app_id, name, xmlns, schema, **kwargs): super().__init__(**kwargs) self.app_id = app_id - self._cursor_value = datetime.strptime( - start_date, "%Y-%m-%dT%H:%M:%SZ") + self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") self.streamname = name self.xmlns = xmlns self.schema = schema @@ -258,14 +252,12 @@ def request_params( def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: upd = {"streamname": self.streamname, "xmlns": self.xmlns} for record in super().read_records(*args, **kwargs): - self._cursor_value = datetime.strptime( - record[self.cursor_field], self.dateformat) + self._cursor_value = datetime.strptime(record[self.cursor_field], self.dateformat) CommcareStream.forms.add(record["id"]) form = record["form"] form.update(upd) # Append Z to make it timezone aware - form.update( - {"id": record["id"], "indexed_on": record["indexed_on"] + "Z"}) + form.update({"id": record["id"], "indexed_on": record["indexed_on"] + "Z"}) newform = self.scrubUnwantedFields(form) yield flatten(newform) if self._cursor_value.microsecond == 0: @@ -287,15 +279,7 @@ def base_schema(self): return { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", - "properties": { - "id": { - "type": "string" - }, - "indexed_on": { - "type": "string", - "format": "date-time" - } - } + "properties": {"id": {"type": "string"}, "indexed_on": {"type": "string", "format": "date-time"}}, } def streams(self, config: Mapping[str, Any]) -> List[Stream]: @@ -312,8 +296,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: return streams def generate_streams(self, args, config, appdata): - form_args = {"app_id": config["app_id"], "start_date": config["start_date"], - "project_space": config["project_space"], **args} + form_args = {"app_id": config["app_id"], "start_date": config["start_date"], "project_space": config["project_space"], **args} streams = [] name2xmlns = {} @@ -339,8 +322,7 @@ def generate_streams(self, args, config, appdata): # Sorted by name for k in sorted(name2xmlns): key = name2xmlns[k] - stream = Form(name=k, xmlns=key, - schema=self.base_schema(), **form_args) + stream = Form(name=k, xmlns=key, schema=self.base_schema(), **form_args) streams.append(stream) stream = Case(