Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][Writes] Allow transaction retries for blind append #3055

Merged
merged 3 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.exceptions;

import io.delta.kernel.annotation.Evolving;

/**
* Thrown when the metadata of the Delta table has changed between the time of transaction start
* and the time of commit.
*
* @since 3.2.0
*/
@Evolving
public class MetadataChangedException extends ConcurrentWriteException {
public MetadataChangedException() {
super("The metadata of the Delta table has been changed by a concurrent update. " +
"Please try the operation again.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.exceptions;


import io.delta.kernel.annotation.Evolving;

/**
* Thrown when the protocol of the Delta table has changed between the time of transaction start
* and the time of commit.
*
* @since 3.2.0
*/
@Evolving
public class ProtocolChangedException extends ConcurrentWriteException {
private static final String helpfulMsgForNewTables = " This happens when multiple writers " +
"are writing to an empty directory. Creating the table ahead of time will avoid this " +
"conflict.";

public ProtocolChangedException(long attemptVersion) {
super(String.format("Transaction has encountered a conflict and can not be committed. " +
"Query needs to be re-executed using the latest version of the table.%s",
attemptVersion == 0 ? helpfulMsgForNewTables : ""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ public static KernelException concurrentTransaction(
return new ConcurrentTransactionException(appId, txnVersion, lastUpdated);
}

public static KernelException metadataChangedException() {
return new MetadataChangedException();
}

public static KernelException protocolChangedException(long attemptVersion) {
return new ProtocolChangedException(attemptVersion);
}

/* ------------------------ HELPER METHODS ----------------------------- */
private static String formatTimestamp(long millisSinceEpochUTC) {
return new Timestamp(millisSinceEpochUTC).toInstant().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.*;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.delta.kernel.*;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
Expand All @@ -32,18 +35,30 @@
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.VectorUtils;
import static io.delta.kernel.internal.TableConfig.CHECKPOINT_INTERVAL;
import static io.delta.kernel.internal.actions.SingleAction.*;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.Preconditions.checkState;
import static io.delta.kernel.internal.util.Utils.toCloseableIterator;

public class TransactionImpl
implements Transaction {
private static final Logger logger = LoggerFactory.getLogger(TransactionImpl.class);

public static final int DEFAULT_READ_VERSION = 1;
public static final int DEFAULT_WRITE_VERSION = 2;

/**
* Number of retries for concurrent write exceptions to resolve conflicts and retry commit. In
* Delta-Spark, for historical reasons the number of retries is really high (10m). We are
* starting with a lower number for now. If this is not sufficient we can update it.
*/
private static final int NUM_TXN_RETRIES = 200;

private final UUID txnId = UUID.randomUUID();

private final boolean isNewTable; // the transaction is creating a new table
Expand Down Expand Up @@ -95,13 +110,45 @@ public StructType getSchema(Engine engine) {
}

@Override
public TransactionCommitResult commit(
public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions)
throws ConcurrentWriteException {
try {
checkState(!closed,
"Transaction is already attempted to commit. Create a new transaction.");

long commitAsVersion = readSnapshot.getVersion(engine) + 1;
int numRetries = 0;
do {
logger.info("Committing transaction as version = {}.", commitAsVersion);
try {
return doCommit(engine, commitAsVersion, dataActions);
} catch (FileAlreadyExistsException fnfe) {
logger.info("Concurrent write detected when committing as version = {}. " +
"Trying to resolve conflicts and retry commit.", commitAsVersion);
TransactionRebaseState rebaseState = ConflictChecker
.resolveConflicts(engine, readSnapshot, commitAsVersion, this);
long newCommitAsVersion = rebaseState.getLatestVersion() + 1;
checkArgument(commitAsVersion < newCommitAsVersion,
"New commit version %d should be greater than the previous commit " +
"attempt version %d.", newCommitAsVersion, commitAsVersion);
commitAsVersion = newCommitAsVersion;
}
numRetries++;
} while (numRetries < NUM_TXN_RETRIES);
} finally {
closed = true;
}

// we have exhausted the number of retries, give up.
logger.info("Exhausted maximum retries ({}) for committing transaction.", NUM_TXN_RETRIES);
throw new ConcurrentWriteException();
}

private TransactionCommitResult doCommit(
Engine engine,
long commitAsVersion,
CloseableIterable<Row> dataActions)
throws ConcurrentWriteException {
checkState(
!closed,
"Transaction is already attempted to commit. Create a new transaction.");
throws FileAlreadyExistsException {
List<Row> metadataActions = new ArrayList<>();
metadataActions.add(createCommitInfoSingleAction(generateCommitAction()));
if (isNewTable) {
Expand All @@ -117,35 +164,40 @@ public TransactionCommitResult commit(
CloseableIterator<Row> dataAndMetadataActions =
toCloseableIterator(metadataActions.iterator()).combine(stageDataIter);

try {
long readVersion = readSnapshot.getVersion(engine);
if (readVersion == -1) {
// New table, create a delta log directory
if (!engine.getFileSystemClient().mkdirs(logPath.toString())) {
throw new RuntimeException(
"Failed to create delta log directory: " + logPath);
}
if (commitAsVersion == 0) {
// New table, create a delta log directory
if (!engine.getFileSystemClient().mkdirs(logPath.toString())) {
throw new RuntimeException(
"Failed to create delta log directory: " + logPath);
}

long newVersion = readVersion + 1;
// Write the staged data to a delta file
engine.getJsonHandler().writeJsonFileAtomically(
FileNames.deltaFile(logPath, newVersion),
dataAndMetadataActions,
false /* overwrite */);

return new TransactionCommitResult(newVersion, isReadyForCheckpoint(newVersion));
} catch (FileAlreadyExistsException e) {
// TODO: Resolve conflicts and retry commit
throw new ConcurrentWriteException();
}

// Write the staged data to a delta file
engine.getJsonHandler().writeJsonFileAtomically(
FileNames.deltaFile(logPath, commitAsVersion),
dataAndMetadataActions,
false /* overwrite */);

return new TransactionCommitResult(
commitAsVersion,
isReadyForCheckpoint(commitAsVersion));
} catch (FileAlreadyExistsException e) {
throw e;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
} finally {
closed = true;
}
}

public boolean isBlindAppend() {
// For now, Kernel just supports blind append.
// Change this when read-after-write is supported.
return true;
}

public Optional<SetTransaction> getSetTxnOpt() {
return setTxnOpt;
}

private Row generateCommitAction() {
return new CommitInfo(
System.currentTimeMillis(), /* timestamp */
Expand All @@ -162,12 +214,6 @@ private boolean isReadyForCheckpoint(long newVersion) {
return newVersion > 0 && newVersion % checkpointInterval == 0;
}

private boolean isBlindAppend() {
// For now, Kernel just supports blind append.
// Change this when read-after-write is supported.
return true;
}

private Map<String, String> getOperationParameters() {
if (isNewTable) {
List<String> partitionCols = VectorUtils.toJavaList(metadata.getPartitionColumns());
Expand All @@ -182,7 +228,7 @@ private Map<String, String> getOperationParameters() {
/**
* Get the part of the schema of the table that needs the statistics to be collected per file.
*
* @param engine {@link Engine} instance to use.
* @param engine {@link Engine} instance to use.
* @param transactionState State of the transaction
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ public class SingleAction {
// Once we start supporting updating CDC or domain metadata enabled tables, we should add the
// schema for those fields here.

/**
* Schema to use when reading the winning commit files for conflict resolution. This schema
* is just for resolving conflicts when doing a blind append. It doesn't cover case when the
* txn is reading data from the table and updating the table.
*/
public static StructType CONFLICT_RESOLUTION_SCHEMA = new StructType()
.add("txn", SetTransaction.FULL_SCHEMA)
// .add("add", AddFile.FULL_SCHEMA) // not needed for blind appends
// .add("remove", RemoveFile.FULL_SCHEMA) // not needed for blind appends
.add("metaData", Metadata.FULL_SCHEMA)
.add("protocol", Protocol.FULL_SCHEMA);
// Once we start supporting domain metadata/row tracking enabled tables, we should add the
// schema for domain metadata fields here.

// Schema to use when writing out the single action to the Delta Log.
public static StructType FULL_SCHEMA = new StructType()
.add("txn", SetTransaction.FULL_SCHEMA)
Expand Down
Loading
Loading