Skip to content

Commit

Permalink
Remove orphans by loading all ancestor IDs simultaneously
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>
  • Loading branch information
Pipboyguy committed Aug 26, 2024
1 parent 26ba0f5 commit 06e04d9
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 11 deletions.
35 changes: 25 additions & 10 deletions dlt/destinations/impl/lancedb/lancedb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
set_non_standard_providers_environment_variables,
generate_arrow_uuid_column,
get_default_arrow_value,
create_unique_table_lineage,
)
from dlt.destinations.job_impl import ReferenceFollowupJobRequest
from dlt.destinations.type_mapping import TypeMapper
Expand Down Expand Up @@ -711,9 +712,8 @@ def create_table_chain_completed_followup_jobs(
if table_chain[0].get("write_disposition") == "merge":
all_job_paths_ordered = [
job.file_path
for table in table_chain
for _ in table_chain
for job in completed_table_chain_jobs
if job.job_file_info.table_name == table.get("name")
]
root_table_file_name = FileStorage.get_file_name_from_file_path(
all_job_paths_ordered[0]
Expand Down Expand Up @@ -791,22 +791,35 @@ def run(self) -> None:
)
for file_path_ in self.references
]
table_lineage_unique: TTableLineage = create_unique_table_lineage(table_lineage)

for job in table_lineage:
for job in table_lineage_unique:
target_is_root_table = "parent" not in job.table_schema
fq_table_name = self._job_client.make_qualified_table_name(job.table_name)

if target_is_root_table:
target_table_id_field_name = "_dlt_id"
file_path = job.file_path
ancestors_file_paths = self.get_parent_paths(table_lineage, job.table_name)
else:
target_table_id_field_name = "_dlt_parent_id"
file_path = self.get_parent_path(table_lineage, job.table_schema.get("parent"))
ancestors_file_paths = self.get_parent_paths(
table_lineage, job.table_schema.get("parent")
)

with FileStorage.open_zipsafe_ro(file_path, mode="rb") as f:
payload_arrow_table: pa.Table = pq.read_table(f)
# `when_not_matched_by_source_delete` removes absent source IDs.
# Loading ancestors individually risks unintended ID deletion, necessitating simultaneous loading of all ancestor IDs.
payload_arrow_table = None
for file_path_ in ancestors_file_paths:
with FileStorage.open_zipsafe_ro(file_path_, mode="rb") as f:
ancestor_arrow_table: pa.Table = pq.read_table(f)
if payload_arrow_table is None:
payload_arrow_table = ancestor_arrow_table
else:
payload_arrow_table = pa.concat_tables(
[payload_arrow_table, ancestor_arrow_table]
)

# Get target table schema
# Get target table schema.
with FileStorage.open_zipsafe_ro(job.file_path, mode="rb") as f:
target_table_schema: pa.Schema = pq.read_schema(f)

Expand Down Expand Up @@ -843,5 +856,7 @@ def run(self) -> None:
)

@staticmethod
def get_parent_path(table_lineage: TTableLineage, table: str) -> Any:
return next(entry.file_path for entry in table_lineage if entry.table_name == table)
def get_parent_paths(table_lineage: TTableLineage, table: str) -> List[str]:
"""Return all load files for a given table in the same order in which they were
loaded, thereby maintaining the load history of the table."""
return [entry.file_path for entry in table_lineage if entry.table_name == table]
26 changes: 25 additions & 1 deletion dlt/destinations/impl/lancedb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dlt.common.schema import TTableSchema
from dlt.common.schema.utils import get_columns_names_with_prop
from dlt.destinations.impl.lancedb.configuration import TEmbeddingProvider
from dlt.destinations.impl.lancedb.schema import TArrowDataType
from dlt.destinations.impl.lancedb.schema import TArrowDataType, TTableLineage


PROVIDER_ENVIRONMENT_VARIABLES_MAP: Dict[TEmbeddingProvider, str] = {
Expand Down Expand Up @@ -93,3 +93,27 @@ def get_default_arrow_value(field_type: TArrowDataType) -> object:
return datetime.now()
else:
raise ValueError(f"Unsupported data type: {field_type}")


def create_unique_table_lineage(table_lineage: TTableLineage) -> TTableLineage:
"""Create a unique table lineage, keeping the last job for each table.
Args:
table_lineage: The full table lineage.
Returns:
A new list of TableJob objects with the duplicates removed, keeping the
last occurrence of each unique table name while maintaining the
original order of appearance.
"""
seen_table_names = set()
unique_lineage = []

for job in reversed(table_lineage):
if job.table_name not in seen_table_names:
seen_table_names.add(job.table_name)
unique_lineage.append(job)

return list(reversed(unique_lineage))


0 comments on commit 06e04d9

Please sign in to comment.