Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data loss bug in MergeIntoCommand #11765

Closed
3 tasks
BsoBird opened this issue Dec 12, 2024 · 10 comments
Closed
3 tasks

Data loss bug in MergeIntoCommand #11765

BsoBird opened this issue Dec 12, 2024 · 10 comments
Labels
bug Something isn't working

Comments

@BsoBird
Copy link

BsoBird commented Dec 12, 2024

Apache Iceberg version

1.7.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

Recently, I've noticed that when using the merge into statement in Spark to merge data, Spark-SQL occasionally loses some of the original data. I've looked up relevant materials and found that delta-spark has also encountered similar issues. This is the related issue link for delta-spark. Therefore, I would like to know if spark-iceberg has the same problem?

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@BsoBird BsoBird added the bug Something isn't working label Dec 12, 2024
@RussellSpitzer
Copy link
Member

We don't have auto merge schema so I don't think we have the same issue as in the Delta issue (at least not yet). Do you have any more details about the data loss?

@BsoBird
Copy link
Author

BsoBird commented Dec 13, 2024

We don't have auto merge schema so I don't think we have the same issue as in the Delta issue (at least not yet). Do you have any more details about the data loss?

Sir, if I run merge-into commands for multiple tables in parallel in the same spark-application, is there any problem with this?

Because we always start running merge-into commands on many tables in the early hours of the morning, it's hard to spot the problem in the first place. And if I test the merge-into statement on its own, there's a good chance that I won't be able to reproduce the problem.

Currently, the MergeInto statements I am running have the following characteristics:

  1. I submit multiple different table merge into statements concurrently within a single Spark application.
  2. All SQL statements generally follow the following pattern:
  merge into target_iceberg_table t using
   (
       select primark_key,modified_time
       from (
           select prikaey_key,modified_time,row_number() over(partition by primary_key order by modified_time desc) as rank
           from ods_table
       ) s1 where rank=1
  ) s
  when matched when s.modified_time > t.modified_time then update set t.modified_time = s.modified_time
  when not matched then insert *

@sfc-gh-rspitzer
Copy link

Shouldn't be any problem i'm aware of. If there are incorrectly applied merges in that situation it would be the validations at commit time are failing.

@BsoBird
Copy link
Author

BsoBird commented Dec 13, 2024

Shouldn't be any problem i'm aware of. If there are incorrectly applied merges in that situation it would be the validations at commit time are failing.

Sir, currently it appears that I am losing some of the increment new data writes, cmd: when not matched then insert * seems to not be working properly.

@RussellSpitzer
Copy link
Member

You'll need to elaborate a bit more. What does "losing some of the increment new data writes" mean?

Can you give an example? We can't really debug the generic case since we don't even know what you are actually attempting. Other important info would be things like what serialization level you are using, what version of Spark, the queries you are using, how you are running them etc ...

@RussellSpitzer
Copy link
Member

It would also be very helpful to know how you are determining there is data loss

@BsoBird
Copy link
Author

BsoBird commented Dec 13, 2024

@RussellSpitzer
Sir. I am using Spark version 3.5.1, and the Iceberg version is 1.7.1/1.6.1.
Sql:

  merge into target_iceberg_table t using
   (
       select primark_key,modified_time
       from (
           select prikaey_key,modified_time,row_number() over(partition by primary_key order by modified_time desc) as rank
           from ods_table
       ) s1 where rank=1
  ) s
  when matched when s.modified_time > t.modified_time then update set t.modified_time = s.modified_time
  when not matched then insert *

In this SQL, target_iceberg_table is a COW (Copy-On-Write) table.

However, sir, I might have discovered some issues. When executing the COW-MERGE-INTO command, Spark needs to use the ods_table twice. The first time is to match data files based on incremental records, and the second time is to perform the actual data merge. If the data in the ods_table changes between the first and second usage, I would like to know if this could lead to abnormal execution results? What would happen if the data in the ods_table suddenly increases? What about if the data in the ods_table suddenly decreases?

Example: ods_table is a partitioned table, during the execution of the merge-into statement, someone adds or deletes partitions.

@RussellSpitzer
Copy link
Member

However, sir, I might have discovered some issues. When executing the COW-MERGE-INTO command, Spark needs to use the ods_table twice. The first time is to match data files based on incremental records, and the second time is to perform the actual data merge. If the data in the ods_table changes between the first and second usage, I would like to know if this could lead to abnormal execution results? What would happen if the data in the ods_table suddenly increases? What about if the data in the ods_table suddenly decreases?

Example: ods_table is a partitioned table, during the execution of the merge-into statement, someone adds or deletes partitions.

Yes this is true, the relation which is created of source data must remain constant through the two different passes of the source data. The Target (Iceberg Table) can change. If the query would return different results I believe you could see odd behavior.

We have some hooks to prevent this when the source is Iceberg I think but I don't believe we have any for non Iceberg sources. I may be forgetting something else but @aokolnychyi probably knows. I believe the workaround here is for a non-idempotent Subquery you should cache or persist it prior to merging.

@BsoBird
Copy link
Author

BsoBird commented Dec 14, 2024

@RussellSpitzer
Sir.If you have time, please review this PR. I believe we need to warn users against doing this in the documentation.
#11787

@rdblue
Copy link
Contributor

rdblue commented Dec 20, 2024

I just left a -1 on the docs PR. I don't think that this is the right place to put a warning and I also think that the warning is overly broad and would lead to confusion.

@BsoBird BsoBird closed this as completed Jan 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants