Skip to content

Commit

Permalink
AirbyteLib: Case insensitive missing column checks, deterministic col…
Browse files Browse the repository at this point in the history
…umn ordering in duckdb inserts (airbytehq#34824)
  • Loading branch information
aaronsteers authored and jatinyadav-cc committed Feb 21, 2024
1 parent b02e295 commit 99a1c59
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 14 deletions.
12 changes: 10 additions & 2 deletions airbyte-lib/airbyte_lib/_file_writers/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
FileWriterBatchHandle,
FileWriterConfigBase,
)
from airbyte_lib._util.text_util import lower_case_set


class ParquetWriterConfig(FileWriterConfigBase):
Expand Down Expand Up @@ -47,12 +48,19 @@ def _get_missing_columns(
stream_name: str,
record_batch: pa.Table,
) -> list[str]:
"""Return a list of columns that are missing in the batch."""
"""Return a list of columns that are missing in the batch.
The comparison is based on a case-insensitive comparison of the column names.
"""
if not self._catalog_manager:
raise exc.AirbyteLibInternalError(message="Catalog manager should exist but does not.")
stream = self._catalog_manager.get_stream_config(stream_name)
stream_property_names = stream.stream.json_schema["properties"].keys()
return [col for col in stream_property_names if col not in record_batch.schema.names]
return [
col
for col in stream_property_names
if col.lower() not in lower_case_set(record_batch.schema.names)
]

@overrides
def _write_batch(
Expand Down
15 changes: 15 additions & 0 deletions airbyte-lib/airbyte_lib/_util/text_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""Internal utility functions for dealing with text."""
from __future__ import annotations

from typing import TYPE_CHECKING


if TYPE_CHECKING:
from collections.abc import Iterable


def lower_case_set(str_iter: Iterable[str]) -> set[str]:
"""Converts a list of strings to a set of lower case strings."""
return {s.lower() for s in str_iter}
12 changes: 10 additions & 2 deletions airbyte-lib/airbyte_lib/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airbyte_lib import exceptions as exc
from airbyte_lib._file_writers.base import FileWriterBase, FileWriterBatchHandle
from airbyte_lib._processors import BatchHandle, RecordProcessor
from airbyte_lib._util.text_util import lower_case_set
from airbyte_lib.caches._catalog_manager import CatalogManager
from airbyte_lib.config import CacheConfigBase
from airbyte_lib.datasets._sql import CachedDataset
Expand Down Expand Up @@ -407,12 +408,19 @@ def _ensure_compatible_table_schema(
stream_column_names: list[str] = json_schema["properties"].keys()
table_column_names: list[str] = self.get_sql_table(stream_name).columns.keys()

missing_columns: set[str] = set(stream_column_names) - set(table_column_names)
lower_case_table_column_names = lower_case_set(table_column_names)
missing_columns = [
stream_col
for stream_col in stream_column_names
if stream_col.lower() not in lower_case_table_column_names
]
if missing_columns:
if raise_on_error:
raise exc.AirbyteLibCacheTableValidationError(
violation="Cache table is missing expected columns.",
context={
"stream_column_names": stream_column_names,
"table_column_names": table_column_names,
"missing_columns": missing_columns,
},
)
Expand Down Expand Up @@ -870,7 +878,7 @@ def _emulated_merge_temp_table_to_final_table(

# Define a condition that checks for records in temp_table that do not have a corresponding
# record in final_table
where_not_exists_clause = final_table.c.id == null()
where_not_exists_clause = getattr(final_table.c, pk_columns[0]) == null()

# Select records from temp_table that are not in final_table
select_new_records_stmt = (
Expand Down
14 changes: 11 additions & 3 deletions airbyte-lib/airbyte_lib/caches/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import annotations

from pathlib import Path
from textwrap import dedent
from textwrap import dedent, indent
from typing import cast

from overrides import overrides
Expand Down Expand Up @@ -172,12 +172,20 @@ def _write_files_to_new_table(
stream_name=stream_name,
batch_id=batch_id,
)
columns_list = list(self._get_sql_column_definitions(stream_name).keys())
columns_list_str = indent("\n, ".join(columns_list), " ")
files_list = ", ".join([f"'{f!s}'" for f in files])
insert_statement = dedent(
f"""
INSERT INTO {self.config.schema_name}.{temp_table_name}
SELECT * FROM read_parquet(
[{files_list}]
(
{columns_list_str}
)
SELECT
{columns_list_str}
FROM read_parquet(
[{files_list}],
union_by_name = true
)
"""
)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-lib/airbyte_lib/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _get_secret_from_source(
if (
source in [SecretSource.GOOGLE_COLAB, SecretSource.ANY]
and colab_userdata is not None
and colab_userdata.get(secret_name, None)
and colab_userdata.get(secret_name)
):
return colab_userdata.get(secret_name)

Expand Down
13 changes: 9 additions & 4 deletions airbyte-lib/airbyte_lib/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

from airbyte_lib import exceptions as exc
from airbyte_lib._factories.cache_factories import get_default_cache
from airbyte_lib._util import protocol_util # Internal utility functions
from airbyte_lib._util import protocol_util
from airbyte_lib._util.text_util import lower_case_set # Internal utility functions
from airbyte_lib.datasets._lazy import LazyDataset
from airbyte_lib.progress import progress
from airbyte_lib.results import ReadResult
Expand Down Expand Up @@ -300,13 +301,17 @@ def get_records(self, stream: str) -> LazyDataset:
) from KeyError(stream)

configured_stream = configured_catalog.streams[0]
col_list = configured_stream.stream.json_schema["properties"].keys()
all_properties = set(configured_stream.stream.json_schema["properties"].keys())

def _with_missing_columns(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
"""Add missing columns to the record with null values."""
for record in records:
appended_columns = set(col_list) - set(record.keys())
appended_dict = {col: None for col in appended_columns}
existing_properties_lower = lower_case_set(record.keys())
appended_dict = {
prop: None
for prop in all_properties
if prop.lower() not in existing_properties_lower
}
yield {**record, **appended_dict}

iterator: Iterator[dict[str, Any]] = _with_missing_columns(
Expand Down
4 changes: 2 additions & 2 deletions docusaurus/src/components/AirbyteLibExample.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import CodeBlock from '@theme/CodeBlock';

/**
* Generate a fake config based on the spec.
*
*
* As our specs are not 100% consistent, errors may occur.
* Try to generate a few times before giving up.
*/
Expand Down Expand Up @@ -40,7 +40,7 @@ export const AirbyteLibExample = ({
config = ${fakeConfig}
result = ab.get_connector(
result = ab.get_source(
"${connector}",
config=config,
).read()
Expand Down

0 comments on commit 99a1c59

Please sign in to comment.