-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Snowflake Cortex destination : Bug fixes #38206
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
@@ -16,7 +16,7 @@ | |||
from destination_snowflake_cortex.config import ConfigModel | |||
from destination_snowflake_cortex.indexer import SnowflakeCortexIndexer | |||
|
|||
BATCH_SIZE = 32 | |||
BATCH_SIZE = 150 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
32 seemed to low in general. each batch calls pyairbyte for write once.
@@ -85,7 +86,7 @@ def _get_updated_catalog(self) -> ConfiguredAirbyteCatalog: | |||
metadata -> metadata of the record | |||
embedding -> embedding of the document content | |||
""" | |||
updated_catalog = self.catalog | |||
updated_catalog = copy.deepcopy(self.catalog) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed to not change the original catalog since this method is called twice
pass | ||
|
||
def pre_sync(self, catalog: ConfiguredAirbyteCatalog) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new method - meant to be implemented for vector dbs - deletes records beforehand for overwrite.
@@ -144,7 +145,8 @@ def get_write_strategy(self, stream_name: str) -> WriteStrategy: | |||
for stream in self.catalog.streams: | |||
if stream.stream.name == stream_name: | |||
if stream.destination_sync_mode == DestinationSyncMode.overwrite: | |||
return WriteStrategy.REPLACE | |||
# we will use append here since we will remove the existing records and add new ones. | |||
return WriteStrategy.APPEND |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for overwrite mode we delete records first and just use append in pyairbyte, since data is sent in batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving with one caveat. When in 'replace' mode, we would ideally load to a stage table and then swap the table name with the existing table after the load is complete. The SQLProcessor
class should do this automatically when in 'replace' mode, but it may require a refactor to actually implement this with confidence.
So, non-blocking, but something to think about for next iterations.
Otherwise, this all looks great!
Yea, that makes sense. Created this issue: https://github.com/airbytehq/airbyte-internal-issues/issues/7928 |
This PR addresses the following:
destinationMode=Overwrite
, we first delete all records for the specified stream and then call cortexProcessor withWriteStrategy.append
otherwise the batch size enforced by vector_db_based results in records getting overwritten every time batch size is met. We usually do this for all vector dbs, I missed it earlier.