Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airbyte data source [CU-15xp9xt] #493

Merged
merged 31 commits into from
Jul 21, 2021

Conversation

mildbyte
Copy link
Contributor

Intro

Splitgraph DataSource implementation for connectors that use the Airbyte standard.

Similar to Singer (some Airbyte taps wrap around Singer) but with a few benefits:

  • JSONSchema for parameters pre-specified
  • Ship as Docker images (no need to mess around with venvs and PEXes)
  • Support for custom cursor and PK fields
  • Normalization (converting data streams to actual table schemas) is decoupled from ingestion and is a separate step (supporting custom dbt transforms (not supported here))

Not exposed publicly: this is just an AirbyteDataSource class that can be inherited from with some overrides to make an SG-compatible data source.

Example for MySQL:

class MySQLAirbyteDataSource(AirbyteDataSource):
    docker_image = "airbyte/source-mysql:latest"
    airbyte_name = "airbyte-mysql"

    credentials_schema = {"type": "object", "properties": {"password": {"type": "string"}}}
    params_schema = {
        "type": "object",
        "properties": {
            "host": {"type": "string"},
            "port": {"type": "integer"},
            "database": {"type": "string"},
            "username": {"type": "string"},
            "replication_method": {"type": "string"},
        },
        "required": ["host", "port", "database", "username", "replication_method"],
    }

    @classmethod
    def get_name(cls) -> str:
        return "MySQL (Airbyte)"

    @classmethod
    def get_description(cls) -> str:
        return "MySQL (Airbyte)"

Implementation:

  • JSONSchema: theoretically could run the source with --spec but this is currently done OOB.
  • Introspection: Run the source in the discovery mode, parse the catalog, convert it to SG schema. Note there is no guarantee the actual schema will be the same as what we output: we have no control over Airbyte's normalization and can't easily infer what it'll do or whether it'll split some substreams into separate tables. The only thing this lets the user do is ignore individual streams (named same as the tables)
  • Load/sync:
    • Start the source container and the Postgres receiver container, pipe data between them.
    • At the end of it, we get a schema with tables named _airbyte_raw_[streamname].
    • These are always append-or-truncate only, so we don't do sgr checkout -- we just manually link these tables to the previous version (extending the Splitgraph table)
    • Normalization: This converts the raw tables into the actual final tables using dbt. We check the raw tables out using LQ (which makes the dbt normalization step faster, as it always reads the whole table), run the Airbyte normalization container and then commit the new tables into the image.

Limitations

  • Normalization a bit crude and doesn't detect some things like timestamps (Airbyte recommends writing custom dbt models for raw data instead -- we don't currently support those either)
  • Sometimes the discovery step has issues when scraping its logs for the catalog -- I think this is a Docker-level problem and I haven't been able to reproduce it for the last couple of days
  • Our load maps to a series of Airbyte-level settings that are configurable separately per-stream but not through our interface:
    • sync mode: full_refresh (source ignores state)
    • destination sync mode: overwrite (always delete raw tables)
  • Same with sync:
    • sync mode: incremental (source uses the state)
    • destination sync mode: append_dedup (this appends to the raw tables and, when using dbt to normalize, always uses the primary key)
  • Sometimes the sources don't come with well-defined parameters:
    • primary key: this is required for deduplication at normalization time, if missing, normalization will break for append_dedup.
    • cursor: this is required for incremental loads, if missing, normalization will break for append/append_dedup.
    • Added a class-level override for these. Might need to expose in the plugin params instead
  • Use a horrid regex hack to find out what sync mode Airbyte chose for each "raw" table (can't infer it directly because the algorithm for converting stream names into table names involves a lot of slugging and Unicode normalization).

References:

mildbyte added 22 commits July 12, 2021 16:24
…at can be referenced by non-Singer data sources.
…d use it for source containers too (in test we want to hit the MySQL Docker container from the host).
…it them; ignore log lines that aren't Airbyte messages.
…for the destination, we want to override the namespace since otherwise PG will write out to the wrong schema).
… from it into the codebase, since that's all we need. Update the Poetry lockfile.
…ar imports (since plugins are loaded in the commandline module).
@mildbyte mildbyte force-pushed the feature/airbyte-data-source-CU-15xp9xt branch from 7be2128 to 5302bef Compare July 20, 2021 11:58
mildbyte added 5 commits July 20, 2021 15:23
…hout various image creation routines instead of ad hoc numbers.
…ta sources (through `airbyte_cursor_field` and `airbyte_primary_key` table params). Also, report the plugin's default cursor/PK back to the user at introspection time (as suggested default table params).
@mildbyte mildbyte force-pushed the feature/airbyte-data-source-CU-15xp9xt branch from 6696b32 to 6cd734a Compare July 21, 2021 11:40
@mildbyte mildbyte merged commit 1affeba into master Jul 21, 2021
@mildbyte mildbyte deleted the feature/airbyte-data-source-CU-15xp9xt branch July 21, 2021 13:46
mildbyte added a commit that referenced this pull request Jul 26, 2021
  * API functionality to get the raw URL for a data source (#457)
  * LQ scan / filtering simplification to speed up writes / Singer loads (#464, #489)
  * API functionality for Airbyte support (`AirbyteDataSource` class, #493)
  * Speed up `sgr cloud load` by bulk API calls (#500)

Full set of changes: [`v0.2.14...v0.2.15`](v0.2.14...v0.2.15)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant