From 4be0a3252797d91dc2b9ec3236ba33517a582646 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Tue, 6 Feb 2024 15:54:37 -0800 Subject: [PATCH] AirbyteLib: Case insensitive missing column checks, deterministic column ordering in duckdb inserts (#34824) --- airbyte-lib/airbyte_lib/_file_writers/parquet.py | 12 ++++++++++-- airbyte-lib/airbyte_lib/_util/text_util.py | 15 +++++++++++++++ airbyte-lib/airbyte_lib/caches/base.py | 12 ++++++++++-- airbyte-lib/airbyte_lib/caches/duckdb.py | 14 +++++++++++--- airbyte-lib/airbyte_lib/secrets.py | 2 +- airbyte-lib/airbyte_lib/source.py | 13 +++++++++---- docusaurus/src/components/AirbyteLibExample.jsx | 4 ++-- 7 files changed, 58 insertions(+), 14 deletions(-) create mode 100644 airbyte-lib/airbyte_lib/_util/text_util.py diff --git a/airbyte-lib/airbyte_lib/_file_writers/parquet.py b/airbyte-lib/airbyte_lib/_file_writers/parquet.py index fbe02776eb4e..bc7fbe9cd704 100644 --- a/airbyte-lib/airbyte_lib/_file_writers/parquet.py +++ b/airbyte-lib/airbyte_lib/_file_writers/parquet.py @@ -17,6 +17,7 @@ FileWriterBatchHandle, FileWriterConfigBase, ) +from airbyte_lib._util.text_util import lower_case_set class ParquetWriterConfig(FileWriterConfigBase): @@ -47,12 +48,19 @@ def _get_missing_columns( stream_name: str, record_batch: pa.Table, ) -> list[str]: - """Return a list of columns that are missing in the batch.""" + """Return a list of columns that are missing in the batch. + + The comparison is based on a case-insensitive comparison of the column names. + """ if not self._catalog_manager: raise exc.AirbyteLibInternalError(message="Catalog manager should exist but does not.") stream = self._catalog_manager.get_stream_config(stream_name) stream_property_names = stream.stream.json_schema["properties"].keys() - return [col for col in stream_property_names if col not in record_batch.schema.names] + return [ + col + for col in stream_property_names + if col.lower() not in lower_case_set(record_batch.schema.names) + ] @overrides def _write_batch( diff --git a/airbyte-lib/airbyte_lib/_util/text_util.py b/airbyte-lib/airbyte_lib/_util/text_util.py new file mode 100644 index 000000000000..d5f890993868 --- /dev/null +++ b/airbyte-lib/airbyte_lib/_util/text_util.py @@ -0,0 +1,15 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +"""Internal utility functions for dealing with text.""" +from __future__ import annotations + +from typing import TYPE_CHECKING + + +if TYPE_CHECKING: + from collections.abc import Iterable + + +def lower_case_set(str_iter: Iterable[str]) -> set[str]: + """Converts a list of strings to a set of lower case strings.""" + return {s.lower() for s in str_iter} diff --git a/airbyte-lib/airbyte_lib/caches/base.py b/airbyte-lib/airbyte_lib/caches/base.py index a05e2f525003..74c9516afc46 100644 --- a/airbyte-lib/airbyte_lib/caches/base.py +++ b/airbyte-lib/airbyte_lib/caches/base.py @@ -29,6 +29,7 @@ from airbyte_lib import exceptions as exc from airbyte_lib._file_writers.base import FileWriterBase, FileWriterBatchHandle from airbyte_lib._processors import BatchHandle, RecordProcessor +from airbyte_lib._util.text_util import lower_case_set from airbyte_lib.caches._catalog_manager import CatalogManager from airbyte_lib.config import CacheConfigBase from airbyte_lib.datasets._sql import CachedDataset @@ -407,12 +408,19 @@ def _ensure_compatible_table_schema( stream_column_names: list[str] = json_schema["properties"].keys() table_column_names: list[str] = self.get_sql_table(stream_name).columns.keys() - missing_columns: set[str] = set(stream_column_names) - set(table_column_names) + lower_case_table_column_names = lower_case_set(table_column_names) + missing_columns = [ + stream_col + for stream_col in stream_column_names + if stream_col.lower() not in lower_case_table_column_names + ] if missing_columns: if raise_on_error: raise exc.AirbyteLibCacheTableValidationError( violation="Cache table is missing expected columns.", context={ + "stream_column_names": stream_column_names, + "table_column_names": table_column_names, "missing_columns": missing_columns, }, ) @@ -870,7 +878,7 @@ def _emulated_merge_temp_table_to_final_table( # Define a condition that checks for records in temp_table that do not have a corresponding # record in final_table - where_not_exists_clause = final_table.c.id == null() + where_not_exists_clause = getattr(final_table.c, pk_columns[0]) == null() # Select records from temp_table that are not in final_table select_new_records_stmt = ( diff --git a/airbyte-lib/airbyte_lib/caches/duckdb.py b/airbyte-lib/airbyte_lib/caches/duckdb.py index 8a58e1d232a8..1b60c8e60c75 100644 --- a/airbyte-lib/airbyte_lib/caches/duckdb.py +++ b/airbyte-lib/airbyte_lib/caches/duckdb.py @@ -5,7 +5,7 @@ from __future__ import annotations from pathlib import Path -from textwrap import dedent +from textwrap import dedent, indent from typing import cast from overrides import overrides @@ -172,12 +172,20 @@ def _write_files_to_new_table( stream_name=stream_name, batch_id=batch_id, ) + columns_list = list(self._get_sql_column_definitions(stream_name).keys()) + columns_list_str = indent("\n, ".join(columns_list), " ") files_list = ", ".join([f"'{f!s}'" for f in files]) insert_statement = dedent( f""" INSERT INTO {self.config.schema_name}.{temp_table_name} - SELECT * FROM read_parquet( - [{files_list}] + ( + {columns_list_str} + ) + SELECT + {columns_list_str} + FROM read_parquet( + [{files_list}], + union_by_name = true ) """ ) diff --git a/airbyte-lib/airbyte_lib/secrets.py b/airbyte-lib/airbyte_lib/secrets.py index 90774e9ee9c2..2156eab7e1c2 100644 --- a/airbyte-lib/airbyte_lib/secrets.py +++ b/airbyte-lib/airbyte_lib/secrets.py @@ -76,7 +76,7 @@ def _get_secret_from_source( if ( source in [SecretSource.GOOGLE_COLAB, SecretSource.ANY] and colab_userdata is not None - and colab_userdata.get(secret_name, None) + and colab_userdata.get(secret_name) ): return colab_userdata.get(secret_name) diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index 2e43194d2075..9e792caeb448 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -26,7 +26,8 @@ from airbyte_lib import exceptions as exc from airbyte_lib._factories.cache_factories import get_default_cache -from airbyte_lib._util import protocol_util # Internal utility functions +from airbyte_lib._util import protocol_util +from airbyte_lib._util.text_util import lower_case_set # Internal utility functions from airbyte_lib.datasets._lazy import LazyDataset from airbyte_lib.progress import progress from airbyte_lib.results import ReadResult @@ -300,13 +301,17 @@ def get_records(self, stream: str) -> LazyDataset: ) from KeyError(stream) configured_stream = configured_catalog.streams[0] - col_list = configured_stream.stream.json_schema["properties"].keys() + all_properties = set(configured_stream.stream.json_schema["properties"].keys()) def _with_missing_columns(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: """Add missing columns to the record with null values.""" for record in records: - appended_columns = set(col_list) - set(record.keys()) - appended_dict = {col: None for col in appended_columns} + existing_properties_lower = lower_case_set(record.keys()) + appended_dict = { + prop: None + for prop in all_properties + if prop.lower() not in existing_properties_lower + } yield {**record, **appended_dict} iterator: Iterator[dict[str, Any]] = _with_missing_columns( diff --git a/docusaurus/src/components/AirbyteLibExample.jsx b/docusaurus/src/components/AirbyteLibExample.jsx index cc9a17638e63..403c80d99270 100644 --- a/docusaurus/src/components/AirbyteLibExample.jsx +++ b/docusaurus/src/components/AirbyteLibExample.jsx @@ -4,7 +4,7 @@ import CodeBlock from '@theme/CodeBlock'; /** * Generate a fake config based on the spec. - * + * * As our specs are not 100% consistent, errors may occur. * Try to generate a few times before giving up. */ @@ -40,7 +40,7 @@ export const AirbyteLibExample = ({ config = ${fakeConfig} -result = ab.get_connector( +result = ab.get_source( "${connector}", config=config, ).read()