Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't merge into existing deltalake with partitions #3013

Open
tzugen opened this issue Nov 21, 2024 · 3 comments
Open

Can't merge into existing deltalake with partitions #3013

tzugen opened this issue Nov 21, 2024 · 3 comments
Labels
bug Something isn't working on-hold Issues and Pull Requests that are on hold for some reason

Comments

@tzugen
Copy link

tzugen commented Nov 21, 2024

Environment

Delta-rs version:
0.21


Bug

I have an existing deltalake that was created with spark. Now I am trying to write/merge into it with the new delta lake library.
It fails with "Generic error: Error partitioning record batch: Missing partition column: failed to parse"

from deltalake import DeltaTable, write_deltalake
import polars as pl
import pyarrow as pa
from datetime import datetime
from decimal import Decimal
from datetime import date

DATABASE_NAME = "TEST_DB"

data = {
    "timestamp": [
        datetime(2024, 11, 25, 9, 44, 46, 660000),
        datetime(2024, 11, 25, 9, 47, 4, 240000)
    ],
    "date": [
        date(2024, 11, 25),
        date(2024, 11, 25)
    ],
    "value": [
        Decimal("823.0"),
        Decimal("823.0")
    ]
}

df = pl.DataFrame(data)

# Simplified schema
schema = pa.schema(
    [
        ("timestamp", pa.timestamp("us")),
        ("date", pa.date32()),
        ("value", pa.decimal128(6, 1)),
    ]
)

# Create new Delta table with partitioning
dt = DeltaTable.create(
    DATABASE_NAME,
    schema=schema,
    partition_by=["date"]
)

# Initial write
write_deltalake(dt, df.to_pandas(), mode="append")

# Read Delta table and display schema and content
dt_read = DeltaTable(DATABASE_NAME)
print("Schema:")
print(dt_read.schema())

# Display content
df_read = dt_read.to_pandas()
print("\nContent:")
print(df_read)

# Merge operation
dt.merge(
    source=df.to_pandas(),
    predicate="target.timestamp = source.timestamp AND target.value = source.value",
    source_alias="source",
    target_alias="target",
).when_matched_update_all().when_not_matched_insert_all().execute()


@tzugen tzugen added the bug Something isn't working label Nov 21, 2024
@jntkit
Copy link

jntkit commented Nov 22, 2024

I tried use date32 for "date" field instead and it is work for me

@ion-elgreco
Copy link
Collaborator

@tzugen can you provide a proper reproducible example which I can run?

@ion-elgreco ion-elgreco added on-hold Issues and Pull Requests that are on hold for some reason mre-needed Whether an MRE needs to be provided labels Nov 24, 2024
@tzugen
Copy link
Author

tzugen commented Nov 27, 2024

@ion-elgreco I have updated the code, you can now copy and paste it and it should crash

@ion-elgreco ion-elgreco removed the mre-needed Whether an MRE needs to be provided label Nov 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working on-hold Issues and Pull Requests that are on hold for some reason
Projects
None yet
Development

No branches or pull requests

3 participants