diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9e53df5a2208..418fddb658f9 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -327,6 +327,13 @@ documentationUrl: https://docs.airbyte.com/integrations/sources/convertkit sourceType: api releaseStage: alpha +- name: Commcare + sourceDefinitionId: f39208dc-7e1c-48b8-919b-5006360cc27f + dockerRepository: airbyte/source-commcare + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.com/integrations/sources/commcare + sourceType: api + releaseStage: alpha - name: Copper sourceDefinitionId: 44f3002f-2df9-4f6d-b21c-02cd3b47d0dc dockerRepository: airbyte/source-copper diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 56a59de04c41..da8a28c67873 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2799,6 +2799,46 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] +- dockerImage: "airbyte/source-commcare:0.1.0" + spec: + documentationUrl: "https://docsurl.com" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "Commcare Source Spec" + type: "object" + required: + - "api_key" + - "app_id" + - "start_date" + properties: + api_key: + type: "string" + title: "API Key" + description: "Commcare API Key" + airbyte_secret: true + order: 0 + project_space: + type: "string" + title: "Project Space" + description: "Project Space for commcare" + order: 1 + app_id: + type: "string" + title: "Application ID" + description: "The Application ID we are interested in" + airbyte_secret: true + order: 2 + start_date: + type: "string" + title: "Start date for extracting records" + pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" + default: "2022-10-01T00:00:00Z" + description: "UTC date and time in the format 2017-01-25T00:00:00Z. Only\ + \ records after this date will be replicated." + order: 3 + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] - dockerImage: "airbyte/source-copper:0.1.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/copper" diff --git a/airbyte-integrations/connectors/source-commcare/.dockerignore b/airbyte-integrations/connectors/source-commcare/.dockerignore new file mode 100644 index 000000000000..5cc68fe9552e --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/.dockerignore @@ -0,0 +1,6 @@ +* +!Dockerfile +!main.py +!source_commcare +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-commcare/Dockerfile b/airbyte-integrations/connectors/source-commcare/Dockerfile new file mode 100644 index 000000000000..75fee2efa69c --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/Dockerfile @@ -0,0 +1,39 @@ +FROM python:3.9.15-slim-bullseye as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apt-get update && apt-get install -y && rm -rf /var/lib/apt/lists/* \ + && pip install --upgrade pip \ + && python3 -m pip install --upgrade setuptools + +RUN DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt-get -y install tzdata + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +# RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY source_commcare ./source_commcare + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-commcare diff --git a/airbyte-integrations/connectors/source-commcare/README.md b/airbyte-integrations/connectors/source-commcare/README.md new file mode 100644 index 000000000000..1505013ace3f --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/README.md @@ -0,0 +1,132 @@ +# Commcare Source + +This is the repository for the Commcare source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/commcare). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.9.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install '.[tests]' +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-commcare:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/commcare) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_commcare/spec.yaml` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source commcare test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-commcare:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-commcare:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-commcare:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-commcare:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-commcare:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-commcare:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-commcare:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-commcare:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml new file mode 100644 index 000000000000..4f8fe24e4ca6 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/acceptance-test-config.yml @@ -0,0 +1,26 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-commcare:dev +tests: + spec: + - spec_path: "source_commcare/spec.yaml" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + # discovery: + # - config_path: "secrets/config.json" + # backward_compatibility_tests_config: + # disable_for_version: "0.1.0" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + incremental: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-commcare/acceptance-test-docker.sh new file mode 100644 index 000000000000..c51577d10690 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-commcare/build.gradle b/airbyte-integrations/connectors/source-commcare/build.gradle new file mode 100644 index 000000000000..344c8004cd66 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_commcare' +} diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/__init__.py b/airbyte-integrations/connectors/source-commcare/integration_tests/__init__.py new file mode 100644 index 000000000000..1100c1c58cf5 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json new file mode 100644 index 000000000000..6cfc6686b6e0 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "Assess a referred patient": { + "indexed_on": "2023-11-25T20:30:30.2423" + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-commcare/integration_tests/acceptance.py new file mode 100644 index 000000000000..1302b2f57e10 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/acceptance.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies if needed. otherwise remove the TODO comments + yield + # TODO: clean up test dependencies diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/catalog.json b/airbyte-integrations/connectors/source-commcare/integration_tests/catalog.json new file mode 100644 index 000000000000..0967ef424bce --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/catalog.json @@ -0,0 +1 @@ +{} diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json new file mode 100644 index 000000000000..d34c7a7f43ed --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/configured_catalog.json @@ -0,0 +1,23 @@ +{ + "streams": [ + { + "stream": { + "name": "Assess a referred patient", + "json_schema": {}, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], + "source_defined_cursor": true, + "default_cursor_field": [ + "indexed_on" + ] + }, + "sync_mode": "incremental", + "cursor_field": [ + "indexed_on" + ], + "destination_sync_mode": "append" + } + ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json new file mode 100644 index 000000000000..98da2559f717 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/invalid_config.json @@ -0,0 +1,6 @@ +{ + "app_id": "wrong app_id", + "api_key": "wrong api key", + "start_date": "This has the wrong format", + "project_space": "project_space" +} diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json new file mode 100644 index 000000000000..2613cb691748 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_config.json @@ -0,0 +1,6 @@ +{ + "app_id": "App ID", + "api_key": "API KEY", + "project_space": "project_space", + "start_date": "Start Date" +} diff --git a/airbyte-integrations/connectors/source-commcare/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_state.json new file mode 100644 index 000000000000..3587e579822d --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/integration_tests/sample_state.json @@ -0,0 +1,5 @@ +{ + "todo-stream-name": { + "todo-field-name": "value" + } +} diff --git a/airbyte-integrations/connectors/source-commcare/main.py b/airbyte-integrations/connectors/source-commcare/main.py new file mode 100644 index 000000000000..9ec4bb3ccec6 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_commcare import SourceCommcare + +if __name__ == "__main__": + source = SourceCommcare() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-commcare/requirements.txt b/airbyte-integrations/connectors/source-commcare/requirements.txt new file mode 100644 index 000000000000..0411042aa091 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-commcare/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-commcare/sample_files/configured_catalog.json new file mode 100644 index 000000000000..efb1c1536963 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/sample_files/configured_catalog.json @@ -0,0 +1,22 @@ +{ + "type": "CATALOG", + "catalog": { + "streams": [ + { + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": [ + "incremental", + "full_refresh" + ], + "supported_destination_sync_modes": [ + "overwrite", + "append_dedup" + ] + } + ] + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-commcare/setup.py b/airbyte-integrations/connectors/source-commcare/setup.py new file mode 100644 index 000000000000..a2ec6ab5ca25 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/setup.py @@ -0,0 +1,32 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk", + "bigquery_schema_generator~=1.5", + "gbqschema_converter~=1.2.0", + "flatten_json~=0.1.13", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6.1", + "source-acceptance-test", +] + +setup( + name="source_commcare", + description="Source implementation for Commcare.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/__init__.py b/airbyte-integrations/connectors/source-commcare/source_commcare/__init__.py new file mode 100644 index 000000000000..29bd9305294b --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceCommcare + +__all__ = ["SourceCommcare"] diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/schemas/TODO.md b/airbyte-integrations/connectors/source-commcare/source_commcare/schemas/TODO.md new file mode 100644 index 000000000000..cf1efadb3c9c --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/schemas/TODO.md @@ -0,0 +1,25 @@ +# TODO: Define your stream schemas +Your connector must describe the schema of each stream it can output using [JSONSchema](https://json-schema.org). + +The simplest way to do this is to describe the schema of your streams using one `.json` file per stream. You can also dynamically generate the schema of your stream in code, or you can combine both approaches: start with a `.json` file and dynamically add properties to it. + +The schema of a stream is the return value of `Stream.get_json_schema`. + +## Static schemas +By default, `Stream.get_json_schema` reads a `.json` file in the `schemas/` directory whose name is equal to the value of the `Stream.name` property. In turn `Stream.name` by default returns the name of the class in snake case. Therefore, if you have a class `class EmployeeBenefits(HttpStream)` the default behavior will look for a file called `schemas/employee_benefits.json`. You can override any of these behaviors as you need. + +Important note: any objects referenced via `$ref` should be placed in the `shared/` directory in their own `.json` files. + +## Dynamic schemas +If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org). + +## Dynamically modifying static schemas +Override `Stream.get_json_schema` to run the default behavior, edit the returned value, then return the edited value: +``` +def get_json_schema(self): + schema = super().get_json_schema() + schema['dynamically_determined_property'] = "property" + return schema +``` + +Delete this file once you're done. Or don't. Up to you :) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py new file mode 100644 index 000000000000..df3b9f9ef40e --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -0,0 +1,337 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import re +from abc import ABC +from datetime import datetime +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +from urllib.parse import parse_qs + +import requests +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import IncrementalMixin, Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +from flatten_json import flatten + + +# Basic full refresh stream +class CommcareStream(HttpStream, ABC): + def __init__(self, project_space, **kwargs): + super().__init__(**kwargs) + self.project_space = project_space + + @property + def url_base(self) -> str: + return f"https://www.commcarehq.org/a/{self.project_space}/api/v0.5/" + + # These class variables save state + # forms holds form ids and we filter cases which contain one of these form ids + # last_form_date stores the date of the last form read so the next cycle for forms and cases starts at the same timestamp + forms = set() + last_form_date = None + schemas = {} + unwantedfields = re.compile(r"^(case_|update_|meta|create_|commcare_).*$") + + @property + def dateformat(self): + return "%Y-%m-%dT%H:%M:%S.%f" + + def scrubUnwantedFields(self, form): + newform = {k: v for k, v in form.items() if not self.unwantedfields.match(k)} + return newform + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + try: + # Server returns status 500 when there are no more rows. + # raise an error if server returns an error + response.raise_for_status() + meta = response.json()["meta"] + return parse_qs(meta["next"][1:]) + except Exception as ex: + return ex + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + params = {"format": "json"} + return params + + +class Application(CommcareStream): + primary_key = "id" + + def __init__(self, app_id, **kwargs): + super().__init__(**kwargs) + self.app_id = app_id + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return f"application/{self.app_id}/" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + params = {"format": "json", "extras": "true"} + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + yield response.json() + + +class IncrementalStream(CommcareStream, IncrementalMixin): + cursor_field = "indexed_on" + _cursor_value = None + + @property + def state(self) -> Mapping[str, Any]: + if self._cursor_value: + return {self.cursor_field: self._cursor_value} + + @state.setter + def state(self, value: Mapping[str, Any]): + self._cursor_value = datetime.strptime(value[self.cursor_field], self.dateformat) + + @property + def sync_mode(self): + return SyncMode.incremental + + @property + def supported_sync_modes(self): + return [SyncMode.incremental] + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + try: + # Server returns status 500 when there are no more rows. + # raise an error if server returns an error + response.raise_for_status() + meta = response.json()["meta"] + if meta["next"]: + return parse_qs(meta["next"][1:]) + return None + except Exception: + return None + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + params = {"format": "json"} + if next_page_token: + params.update(next_page_token) + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + for o in iter(response.json()["objects"]): + yield o + return None + + +class Case(IncrementalStream): + + """ + docs: https://www.commcarehq.org/a/[domain]/api/[version]/case/ + """ + + cursor_field = "indexed_on" + primary_key = "id" + + def __init__(self, start_date, app_id, schema, **kwargs): + super().__init__(**kwargs) + self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") + self.schema = schema + + def get_json_schema(self): + return self.schema + + @property + def name(self): + # Airbyte orders streams in alpha order but since we have dependent peers and we need to + # pull all forms before cases, we name this stream to + # ensure this stream gets pulled last (assuming ascii stream names only) + return "zzz_case" + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "case" + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + # start date is what we saved for forms + # if self.cursor_field in self.state else (CommcareStream.last_form_date or self.initial_date) + ix = self.state[self.cursor_field] + params = {"format": "json", "indexed_on_start": ix.strftime(self.dateformat), "order_by": "indexed_on", "limit": "5000"} + if next_page_token: + params.update(next_page_token) + return params + + def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + for record in super().read_records(*args, **kwargs): + found = False + for f in record["xform_ids"]: + if f in CommcareStream.forms: + found = True + break + if found: + self._cursor_value = datetime.strptime(record[self.cursor_field], self.dateformat) + # Make indexed_on tz aware + record.update({"streamname": "case", "indexed_on": record["indexed_on"] + "Z"}) + # convert xform_ids field from array to comma separated list so flattening won't create + # one field per item. This is because some cases have up to 2000 xform_ids and we don't want 2000 extra + # fields in the schema + record["xform_ids"] = ",".join(record["xform_ids"]) + frec = flatten(record) + yield frec + if self._cursor_value.microsecond == 0: + # Airbyte converts the cursor_field value (datetime) to string when it saves the state and + # our state setter parses the saved state with a format that contains microseconds + # self._cursor_value must have non-zero microseconds for the formatting and parsing to work correctly. + # This issue would also occur if an incoming record had a timestamp with zero microseconds + self._cursor_value = self._cursor_value.replace(microsecond=10) + # This cycle of pull is complete so clear out the form ids we saved for this cycle + CommcareStream.forms.clear() + + +class Form(IncrementalStream): + """ + docs: https://www.commcarehq.org/a/[domain]/api/[version]/form/ + """ + + cursor_field = "indexed_on" + primary_key = "id" + + def __init__(self, start_date, app_id, name, xmlns, schema, **kwargs): + super().__init__(**kwargs) + self.app_id = app_id + self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") + self.streamname = name + self.xmlns = xmlns + self.schema = schema + + @property + def name(self): + return self.streamname + + def get_json_schema(self): + return self.schema + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return "form" + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + + # if self.cursor_field in self.state else self.initial_date + ix = self.state[self.cursor_field] + params = { + "format": "json", + "app_id": self.app_id, + "indexed_on_start": ix.strftime(self.dateformat), + "order_by": "indexed_on", + "limit": "1000", + "xmlns": self.xmlns, + } + if next_page_token: + params.update(next_page_token) + return params + + def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + upd = {"streamname": self.streamname, "xmlns": self.xmlns} + for record in super().read_records(*args, **kwargs): + self._cursor_value = datetime.strptime(record[self.cursor_field], self.dateformat) + CommcareStream.forms.add(record["id"]) + form = record["form"] + form.update(upd) + # Append Z to make it timezone aware + form.update({"id": record["id"], "indexed_on": record["indexed_on"] + "Z"}) + newform = self.scrubUnwantedFields(form) + yield flatten(newform) + if self._cursor_value.microsecond == 0: + # Airbyte converts the cursor_field value (datetime) to string when it saves the state and + # our state setter parses the saved state with a format that contains microseconds + # self._cursor_value must have non-zero microseconds for the formatting and parsing to work correctly. + # This issue would also occur if an incoming record had a timestamp with zero microseconds + self._cursor_value = self._cursor_value.replace(microsecond=10) + + +# Source +class SourceCommcare(AbstractSource): + def check_connection(self, logger, config) -> Tuple[bool, any]: + if "api_key" not in config: + return False, None + return True, None + + def base_schema(self): + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {"id": {"type": "string"}, "indexed_on": {"type": "string", "format": "date-time"}}, + } + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + auth = TokenAuthenticator(config["api_key"], auth_method="ApiKey") + args = { + "authenticator": auth, + } + appdata = Application(**{**args, "app_id": config["app_id"], "project_space": config["project_space"]}).read_records( + sync_mode=SyncMode.full_refresh + ) + + # Generate streams for forms, one per xmlns and one stream for cases. + streams = self.generate_streams(args, config, appdata) + return streams + + def generate_streams(self, args, config, appdata): + form_args = {"app_id": config["app_id"], "start_date": config["start_date"], "project_space": config["project_space"], **args} + streams = [] + name2xmlns = {} + + # Collect the form names and xmlns from the application + for record in appdata: + mods = record["modules"] + for m in mods: + forms = m["forms"] + for f in forms: + xmlns = f["xmlns"] + formname = "" + if "en" in f["name"]: + formname = f["name"]["en"].strip() + else: + # Unknown forms are named UNNAMED_xxxxx where xxxxx are the last 5 difits of the XMLNS + # This convention gives us repeatable names + formname = f"Unnamed_{xmlns[-5:]}" + + name = formname + name2xmlns[name] = xmlns + + # Create the streams from the collected names + # Sorted by name + for k in sorted(name2xmlns): + key = name2xmlns[k] + stream = Form(name=k, xmlns=key, schema=self.base_schema(), **form_args) + streams.append(stream) + + stream = Case( + app_id=config["app_id"], + start_date=config["start_date"], + schema=self.base_schema(), + project_space=config["project_space"], + **args, + ) + streams.append(stream) + + return streams diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml new file mode 100644 index 000000000000..97f9e4b43598 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/spec.yaml @@ -0,0 +1,38 @@ +documentationUrl: https://docsurl.com +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: Commcare Source Spec + type: object + required: + - api_key + - app_id + - start_date + properties: + api_key: + type: string + title: API Key + description: >- + Commcare API Key + airbyte_secret: true + order: 0 + project_space: + type: string + title: Project Space + description: >- + Project Space for commcare + order: 1 + app_id: + type: string + title: Application ID + description: >- + The Application ID we are interested in + airbyte_secret: true + order: 2 + start_date: + type: string + title: Start date for extracting records + pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$ + default: "2022-10-01T00:00:00Z" + description: >- + UTC date and time in the format 2017-01-25T00:00:00Z. Only records after this date will be replicated. + order: 3 diff --git a/airbyte-integrations/connectors/source-commcare/unit_tests/__init__.py b/airbyte-integrations/connectors/source-commcare/unit_tests/__init__.py new file mode 100644 index 000000000000..1100c1c58cf5 --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/unit_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py b/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py new file mode 100644 index 000000000000..d1f57393817f --- /dev/null +++ b/airbyte-integrations/connectors/source-commcare/unit_tests/test_source.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock, Mock + +import pytest +from source_commcare.source import SourceCommcare + + +@pytest.fixture(name='config') +def config_fixture(): + return {'api_key': 'apikey', 'app_id': 'appid', 'start_date': '2022-01-01T00:00:00Z'} + + +def test_check_connection_ok(mocker, config): + source = SourceCommcare() + logger_mock = Mock() + assert source.check_connection(logger_mock, config=config) == (True, None) + + +def test_check_connection_fail(mocker, config): + source = SourceCommcare() + logger_mock = MagicMock() + assert source.check_connection(logger_mock, config={}) == (False, None) diff --git a/docs/integrations/sources/commcare.md b/docs/integrations/sources/commcare.md new file mode 100644 index 000000000000..045b2e89b5f5 --- /dev/null +++ b/docs/integrations/sources/commcare.md @@ -0,0 +1,39 @@ +# Commcare + +This page guides you through the process of setting up the Commcare source connector. + +## Prerequisites + +- Your Commcare API Key +- The Application ID you are interested in +- The start date to replicate records + +## Set up the Commcare source connector + +1. Log into your [Airbyte Cloud](https://cloud.airbyte.io/workspaces) or Airbyte Open Source account. +2. Click **Sources** and then click **+ New source**. +3. On the Set up the source page, select **Commcare** from the Source type dropdown. +4. Enter a name for your source. +5. For **API Key**, enter your Commcare API Key. +6. Click **Set up source**. + +## Supported sync modes + +The Commcare source connector supports the following [sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-sync-modes): + +- Full Refresh +- Overwrite +- Incremental + +## Supported Streams + +The Commcare source connector supports the following streams: + +- Application +- Case +- Form + +## Changelog + +| Version | Date | Pull Request | Subject | +| 0.1.0 | 2022-11-08 | [20220](https://github.com/airbytehq/airbyte/pull/20220) | Commcare Source Connector | \ No newline at end of file