Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][Writes] Support idempotent writes #3051

Merged
merged 3 commits into from
May 6, 2024

Conversation

vkorukanti
Copy link
Collaborator

Description

(Split from #2944)

Adds an API on TransactionBuilder to take the transaction identifier for idempotent writes

    /*
     * Set the transaction identifier for idempotent writes. Incremental processing systems (e.g.,
     * streaming systems) that track progress using their own application-specific versions need to
     * record what progress has been made, in order to avoid duplicating data in the face of
     * failures and retries during writes. By setting the transaction identifier, the Delta table
     * can ensure that the data with same identifier is not written multiple times. For more
     * information refer to the Delta protocol section <a
     * href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers">
     * Transaction Identifiers</a>.
     *
     * @param engine             {@link Engine} instance to use.
     * @param applicationId      The application ID that is writing to the table.
     * @param transactionVersion The version of the transaction. This should be monotonically
     *                           increasing with each write for the same application ID.
     * @return updated {@link TransactionBuilder} instance.
     */
    TransactionBuilder withTransactionId(
            Engine engine,
            String applicationId,
            long transactionVersion);

During the transaction build, check the latest txn version of the given AppId. If it is not monotonically increasing throw ConcurrentTransactionException.

How was this patch tested?

Added to DeltaTableWriteSuite.scala

Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with minor comments

*
* @param engine {@link Engine} instance to use.
* @throws ConcurrentTransactionException if the table already has a committed transaction with
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we check during build but I am assuming and hoping we also check during the time of transaction commit, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, currently, we just fail if there is a txn conflict. I have PR coming shortly that resolves conflicts and as part of the that we check txn id is changed or not.

import io.delta.kernel.annotation.Evolving;

/**
* Thrown when concurrent transaction both attempt to update the same idempotent transaction.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Transaction" is pretty high-level here.

I'd also comment and point to some docs or the SetTransaction delta action to specifically indicate what we mean here, or mention the Transaction ID specified in the transaction builder

Else, to me, "Concurrent Transaction" seems like the same thing as "Concurrent Append"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added details and also link to the protocol txn identifiers section.

@@ -131,6 +148,16 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
SchemaUtils.validatePartitionColumns(
schema.get(), partitionColumns.orElse(Collections.emptyList()));
}

transactionId.ifPresent(txnId -> {
Optional<Long> lastTxnVersion = snapshot.getLatestTransactionVersion(txnId.getAppId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the conflict checker check this during logical conflicts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, see my prev comment resp here.

val ex = intercept[ConcurrentTransactionException] {
fn
}
assert(ex.getMessage.contains(s"This error occurs when multiple updates are using the " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @allisonport-db - what is our final decision with error codes? Checking errror codes would be a lot nicer/cleaner in tests

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we decided to punt on the error codes and add them later if the need arises.

@vkorukanti vkorukanti merged commit 96f87bf into delta-io:master May 6, 2024
10 checks passed
vkorukanti added a commit that referenced this pull request May 6, 2024
(Split from #2944)

Adds an API on `TransactionBuilder` to take the transaction identifier
for idempotent writes
```
    /*
     * Set the transaction identifier for idempotent writes. Incremental processing systems (e.g.,
     * streaming systems) that track progress using their own application-specific versions need to
     * record what progress has been made, in order to avoid duplicating data in the face of
     * failures and retries during writes. By setting the transaction identifier, the Delta table
     * can ensure that the data with same identifier is not written multiple times. For more
     * information refer to the Delta protocol section <a
     * href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers">
     * Transaction Identifiers</a>.
     *
     * @param engine             {@link Engine} instance to use.
     * @param applicationId      The application ID that is writing to the table.
     * @param transactionVersion The version of the transaction. This should be monotonically
     *                           increasing with each write for the same application ID.
     * @return updated {@link TransactionBuilder} instance.
     */
    TransactionBuilder withTransactionId(
            Engine engine,
            String applicationId,
            long transactionVersion);
```

During the transaction build, check the latest txn version of the given
AppId. If it is not monotonically increasing throw
`ConcurrentTransactionException`.

Added to `DeltaTableWriteSuite.scala`
@vkorukanti vkorukanti deleted the idempotentWrites branch May 9, 2024 02:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants