-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AirbyteLib: Support write strategies: 'merge' and 'auto' #34592
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
os.environ["AIRBYTE_LOCAL_REGISTRY"] = "./tests/integration_tests/fixtures/registry.json" | ||
os.environ["DO_NOT_TRACK"] = "true" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a function-scoped fixture and locally-scoped patch method to avoid bleeding 'AIRBYTE_LOCAL_REGISTRY' to other files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reviewing, but left some comments so far. It would be great to have a test for snowflake and postgres for merge to make sure it works as expected.
Theoretically it should be covered by sqlalchemy, but that seems important enough to have an e2e test for
airbyte-lib/pyproject.toml
Outdated
@@ -45,7 +45,7 @@ ruff = "^0.1.11" | |||
types-jsonschema = "^4.20.0.0" | |||
google-cloud-secret-manager = "^2.17.0" | |||
types-requests = "2.31.0.4" | |||
freezegun = "^1.4.0" | |||
source-faker = {path = "../airbyte-integrations/connectors/source-faker", develop = true} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need source-faker installed like this? Shouldn't airbyte-lib handle it via separate venv? It should also pull the local version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before deciding to include, I checked the dependencies of source-faker. It only depends on memesis and the CDK. And memesis has no hard dependencies.
Given the light footprint, and that this is only used as a test dependency, the benefits are:
- Stateless execution. There's no time at which the connector is not installed.
- Faster test invocation. Tests can start instantly without having to install of verify the source's installation fixture is ready.
- Simplicity. Following from the statelessness of not needing the fixtures to install it, the overall test framework is simpler. We can expect that
source-faker
is onPATH
and if not, its an error. - Tests BYO model for connector executables.
I'm glad we have the source-faker
example which installs itself and valides that part of the process. Given that we already have those checks, adding as a dev dependency just focuses on the value we want to get from this connector (running it) without additional complexity.
Sidebar: I see that source-faker
is now published to pypi 🙌, so we can pin to that specific published pypi version instead of using the relative reference. This will ensure our tests are stable even if the faker source changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested around a bit and looks mostly good, there is one important comment about primary keys being set on the final table. Maybe we should add a test validating that as well? Seems worth it.
@@ -415,10 +441,15 @@ def _create_table( | |||
self, | |||
table_name: str, | |||
column_definition_str: str, | |||
primary_keys: list[str] | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Primary keys are not set on the final table.
I think it's because this argument is never passed in, I think it should be on line 404 where the final table is created (self._create_table(table_name, column_definition_str, self._get_primary_keys(stream_name))
)
not sure whether there is an advantage of setting the primary key for the loading tables (I would guess not, maybe the join that's required for the merge would be slightly faster but OTOH the temporary tables are pretty small so that shouldn't change a thing).
assert catalog.streams[1].primary_key | ||
|
||
read_result = source_faker_seed_a.read(duckdb_cache, write_strategy="append") | ||
assert read_result.cache._get_primary_keys("products") == ["id"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to have. a test for the composite primary keys as that logic is never called at the moment
assert len(list(result.cache.streams["purchases"])) == FAKER_SCALE_A | ||
|
||
# Third run, seed B - should increase record count to the scale of B, which is greater than A. | ||
# TODO: See if we can reliably predict the exact number of records, since we use fixed seeds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from what I can tell we can rely on this (especially now with the pinned version of faker)
…r-integration-tests
Co-authored-by: Joe Reuter <joe@airbyte.io>
…4592) Co-authored-by: Joe Reuter <joe@airbyte.io>
…4592) Co-authored-by: Joe Reuter <joe@airbyte.io>
…4592) Co-authored-by: Joe Reuter <joe@airbyte.io>
Co-authored-by: Joe Reuter <joe@airbyte.io>
This adds write strategies 'merge' and 'auto'. This also adds primary key support for streams.
There's also a placeholder for
force_full_refresh
. Currently, all operations all full refresh, but it was necessary to add this option in order to makereplace
a viable write strategy. Thereplace
write strategy will fail unlessforce_full_refresh
is also set, because we don't want to replace previous sync resurlts with a new incremental (aka partial) one.When we add a state/incremental support, the
force_full_refresh
option would give an option to ignore (not pass) any state which was previously tracked.This PR also adds primary key definitions to source-faker. While
source-faker
does haveid
columns, it didn't previously declare these as primary keys. The needed changes tosource-faker
have now been migrated to this PR:Unfortunately, this PR can't merge until the above merges. Or otherwise, we'll need to create a new test source with primary keys.
Implementation Details
There are two code paths for merge upsert behavior. As of this PR, we now support a generic 2-step merge upsert which is fully generic and should work for all SQL DB platforms. Essentially, this runs an UPDATE and then an INSERT operation. And because SQL dialects differ, this is written in SQLAlchemy abstractions, which will be translated into the dialect of the respective database.
The second code path is the one-step native merge upsert. The base class does have an implementation of this, based on Postgres dialect. However, if caches do not declare that
supports_merge_insert = True
, then they will automatically fall back to the emulated (2-step) operation. To optimize loads, they can setsupports_merge_insert = True
and use the base implementation, if compatible, or they can override the merge upsert method to match their native dialect.