Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airbyte ingestion improvements #513

Merged
merged 1 commit into from
Aug 3, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion splitgraph/ingestion/airbyte/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from typing import Dict, Any, Iterable, Generator, Optional, List, Tuple

from psycopg2.sql import SQL, Identifier
from target_postgres.db_sync import column_type

from splitgraph.config import DEFAULT_CHUNK_SIZE
Expand Down Expand Up @@ -63,8 +64,27 @@ def _store_raw_airbyte_tables(
default_sync_mode,
)
sync_mode = default_sync_mode
logging.info("Storing %s. Sync mode: %s", raw_table, sync_mode)

# By default, the PK on the Airbyte table is _airbyte_ab_id which is a UUID. We also
# cluster by the PK, which means it will be used to detect overlaps and we'll be sorting
# by it, which is bad as currently it forces Splitgraph to materialize a table into PG
# (since it thinks parts of the table overwrite each other). Instead, as a semi-hack,
# we change the table's schema to set the PK as (_airbyte_emitted_at, _airbyte_ab_id)
# which will make it not overlap (as each new chunk will have a different
# _airbyte_emitted_at).

logging.info("Changing the table's primary key...")
repository.object_engine.run_sql(
SQL(
"ALTER TABLE {0}.{1} DROP CONSTRAINT {2}; "
"ALTER TABLE {0}.{1} ADD PRIMARY KEY (_airbyte_emitted_at, _airbyte_ab_id)"
).format(
Identifier(staging_schema), Identifier(raw_table), Identifier(raw_table + "_pkey")
)
)
repository.object_engine.commit()

logging.info("Storing %s. Sync mode: %s", raw_table, sync_mode)
# Make sure the raw table's schema didn't change (very rare, since it's
# just hash, JSON, timestamp)
new_schema = engine.get_full_table_schema(staging_schema, raw_table)
Expand Down