Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a join for upsert deduplication #1685

Merged
merged 11 commits into from
Feb 21, 2025
Merged

Use a join for upsert deduplication #1685

merged 11 commits into from
Feb 21, 2025

Conversation

Fokko
Copy link
Contributor

@Fokko Fokko commented Feb 19, 2025

This changes the deduplication logic to use join to duplicate the rows. While the original design wasn't wrong, it is more efficient to push things down into PyArrow to have better multi-threading and no GIL.

I did a small benchmark:

import time
import pyarrow as pa

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, IntegerType


def _drop_table(catalog: Catalog, identifier: str) -> None:
    try:
        catalog.drop_table(identifier)
    except NoSuchTableError:
        pass
def test_vo(session_catalog: Catalog):
    catalog = session_catalog
    identifier = "default.test_upsert_benchmark"
    _drop_table(catalog, identifier)

    schema = Schema(
        NestedField(1, "idx", IntegerType(), required=True),
        NestedField(2, "number", IntegerType(), required=True),
        # Mark City as the identifier field, also known as the primary-key
        identifier_field_ids=[1],
    )

    tbl = catalog.create_table(identifier, schema=schema)

    arrow_schema = pa.schema(
        [
            pa.field("idx", pa.int32(), nullable=False),
            pa.field("number", pa.int32(), nullable=False),
        ]
    )

    # Write some data
    df = pa.Table.from_pylist(
        [
            {"idx": idx, "number": idx}
            for idx in range(1, 100000)
        ],
        schema=arrow_schema,
    )
    tbl.append(df)

    df_upsert = pa.Table.from_pylist(
        # Overlap
        [
            {"idx": idx, "number": idx}
            for idx in range(80000, 90000)
        ]+
        # Update
        [
            {"idx": idx, "number": idx + 1}
            for idx in range(90000, 100000)
        ]
        # Insert
        + [
            {"idx": idx, "number": idx}
            for idx in range(100000, 110000)],
        schema=arrow_schema,
    )

    start = time.time()

    tbl.upsert(df_upsert)

    stop = time.time()

    print(f"Took {stop-start} seconds")

And the result was:

Took 2.0412521362304688 seconds on the fd-join branch
Took 3.5236432552337646 seconds on lastest main

@Fokko Fokko changed the title Use a join for deduplication Use a join for upsert deduplication Feb 19, 2025
Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! a few nit comments

return rows_to_update_table
non_key_cols = all_columns - join_cols_set

diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != pc.field(f"{col}-rhs") for col in non_key_cols])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

de morgans law in the wild 🥇

source_table
# We already know that the schema is compatible, this is to fix large_ types
.cast(target_table.schema)
.join(target_table, keys=list(join_cols_set), join_type="inner", left_suffix="-lhs", right_suffix="-rhs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we add coalesce_keys=True here to avoid duplicates in the resulting join table

since we only check if source_table has duplicates, the target_table might produce duplicates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Since we've already filtered the target_table, I think we could also do the check there, it isn't that expensive anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Included a test 👍

.join(target_table, keys=list(join_cols_set), join_type="inner", left_suffix="-lhs", right_suffix="-rhs")
.filter(diff_expr)
.drop_columns([f"{col}-rhs" for col in non_key_cols])
.rename_columns({f"{col}-lhs" if col not in join_cols else col: col for col in source_table.column_names})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this is a dictionary! https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.rename_columns
and the non-join columns will be ignored by create_match_filter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, only the non-join columns get postfixed :)

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for adding the benchmark numbers too!

@kevinjqliu kevinjqliu merged commit b95e792 into apache:main Feb 21, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants