Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;

import com.google.common.collect.Lists;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -41,8 +48,7 @@ public class IcebergCherrypickSnapshotAction extends BaseIcebergAction {
public static final String SNAPSHOT_ID = "snapshot_id";

public IcebergCherrypickSnapshotAction(Map<String, String> properties,
Optional<PartitionNamesInfo> partitionNamesInfo,
Optional<Expression> whereCondition,
Optional<PartitionNamesInfo> partitionNamesInfo, Optional<Expression> whereCondition,
IcebergExternalTable icebergTable) {
super("cherrypick_snapshot", properties, partitionNamesInfo, whereCondition, icebergTable);
}
Expand All @@ -65,7 +71,38 @@ protected void validateIcebergAction() throws UserException {

@Override
protected List<String> executeAction(TableIf table) throws UserException {
throw new DdlException("Iceberg cherrypick_snapshot procedure is not implemented yet");
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
Long sourceSnapshotId = namedArguments.getLong(SNAPSHOT_ID);

try {
Snapshot targetSnapshot = icebergTable.snapshot(sourceSnapshotId);
if (targetSnapshot == null) {
throw new UserException("Snapshot not found in table");
}

icebergTable.manageSnapshots().cherrypick(sourceSnapshotId).commit();
Snapshot currentSnapshot = icebergTable.currentSnapshot();

// invalid iceberg catalog table cache.
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
return Lists.newArrayList(
String.valueOf(sourceSnapshotId),
String.valueOf(currentSnapshot.snapshotId()
)
);

} catch (Exception e) {
throw new UserException("Failed to cherry-pick snapshot " + sourceSnapshotId + ": " + e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(new Column("source_snapshot_id", Type.BIGINT, false,
"ID of the snapshot whose changes were cherry-picked into the current table state"),
new Column("current_snapshot_id", Type.BIGINT, false,
"ID of the new snapshot created as a result of the cherry-pick operation, "
+ "now set as the current snapshot"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;

import com.google.common.collect.Lists;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -49,11 +55,11 @@ public IcebergFastForwardAction(Map<String, String> properties,
protected void registerIcebergArguments() {
// Register required arguments for branch and to
namedArguments.registerRequiredArgument(BRANCH,
"Name of the target branch to fast-forward to",
"Name of the branch to fast-forward to",
ArgumentParsers.nonEmptyString(BRANCH));
namedArguments.registerRequiredArgument(TO,
"Target snapshot ID to fast-forward to",
ArgumentParsers.positiveLong(TO));
"Target branch to fast-forward to",
ArgumentParsers.nonEmptyString(TO));
}

@Override
Expand All @@ -65,7 +71,41 @@ protected void validateIcebergAction() throws UserException {

@Override
protected List<String> executeAction(TableIf table) throws UserException {
throw new DdlException("Iceberg fast_forward procedure is not implemented yet");
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();

String sourceBranch = namedArguments.getString(BRANCH);
String desBranch = namedArguments.getString(TO);

try {
Long snapshotBefore =
icebergTable.snapshot(sourceBranch) != null ? icebergTable.snapshot(sourceBranch).snapshotId()
: null;
icebergTable.manageSnapshots().fastForwardBranch(sourceBranch, desBranch).commit();
long snapshotAfter = icebergTable.snapshot(sourceBranch).snapshotId();
// invalid iceberg catalog table cache.
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
return Lists.newArrayList(
sourceBranch.trim(),
String.valueOf(snapshotBefore),
String.valueOf(snapshotAfter)
);

} catch (Exception e) {
throw new UserException(
"Failed to fast-forward branch " + sourceBranch + " to snapshot " + desBranch + ": "
+ e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(
new Column("branch_updated", Type.STRING, false,
"Name of the branch that was fast-forwarded to match the target branch"),
new Column("previous_ref", Type.BIGINT, true,
"Snapshot ID that the branch was pointing to before the fast-forward operation"),
new Column("updated_ref", Type.BIGINT, false,
"Snapshot ID that the branch is pointing to after the fast-forward operation"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;

import com.google.common.collect.Lists;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -62,7 +69,43 @@ protected void validateIcebergAction() throws UserException {

@Override
protected List<String> executeAction(TableIf table) throws UserException {
throw new DdlException("Iceberg rollback_to_snapshot procedure is not implemented yet");
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
Long targetSnapshotId = namedArguments.getLong(SNAPSHOT_ID);

Snapshot targetSnapshot = icebergTable.snapshot(targetSnapshotId);
if (targetSnapshot == null) {
throw new UserException("Snapshot " + targetSnapshotId + " not found in table " + icebergTable.name());
}

try {
Snapshot previousSnapshot = icebergTable.currentSnapshot();
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;
if (previousSnapshot != null && previousSnapshot.snapshotId() == targetSnapshotId) {
return Lists.newArrayList(
String.valueOf(previousSnapshotId),
String.valueOf(targetSnapshotId)
);
}
icebergTable.manageSnapshots().rollbackTo(targetSnapshotId).commit();
// invalid iceberg catalog table cache.
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
return Lists.newArrayList(
String.valueOf(previousSnapshotId),
String.valueOf(targetSnapshotId)
);

} catch (Exception e) {
throw new UserException("Failed to rollback to snapshot " + targetSnapshotId + ": " + e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(
new Column("previous_snapshot_id", Type.BIGINT, false,
"ID of the snapshot that was current before the rollback operation"),
new Column("current_snapshot_id", Type.BIGINT, false,
"ID of the snapshot that is now current after rolling back to the specified snapshot"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;

import com.google.common.collect.Lists;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;

import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -34,6 +43,7 @@
* at a specific timestamp.
*/
public class IcebergRollbackToTimestampAction extends BaseIcebergAction {
private static final DateTimeFormatter DATETIME_MS_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static final String TIMESTAMP = "timestamp";

public IcebergRollbackToTimestampAction(Map<String, String> properties,
Expand All @@ -48,7 +58,7 @@ protected void registerIcebergArguments() {
// Create a custom timestamp parser that supports both ISO datetime and
// millisecond formats
namedArguments.registerRequiredArgument(TIMESTAMP,
"A timestamp to rollback to (ISO datetime 'yyyy-MM-ddTHH:mm:ss' or milliseconds since epoch)",
"A timestamp to rollback to (formats: 'yyyy-MM-dd HH:mm:ss.SSS' or milliseconds since epoch)",
value -> {
if (value == null || value.trim().isEmpty()) {
throw new IllegalArgumentException("timestamp cannot be empty");
Expand All @@ -64,14 +74,13 @@ protected void registerIcebergArguments() {
}
return trimmed;
} catch (NumberFormatException e) {
// If not a number, try as ISO datetime format
// Second attempt: Parse as ISO datetime format (yyyy-MM-dd HH:mm:ss.SSS)
try {
java.time.LocalDateTime.parse(trimmed,
java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME);
java.time.LocalDateTime.parse(trimmed, DATETIME_MS_FORMAT);
return trimmed;
} catch (java.time.format.DateTimeParseException dte) {
throw new IllegalArgumentException("Invalid timestamp format. Expected ISO datetime "
+ "(yyyy-MM-ddTHH:mm:ss) or timestamp in milliseconds: " + trimmed);
+ "(yyyy-MM-dd HH:mm:ss.SSS) or timestamp in milliseconds: " + trimmed);
}
}
});
Expand All @@ -87,7 +96,38 @@ protected void validateIcebergAction() throws UserException {

@Override
protected List<String> executeAction(TableIf table) throws UserException {
throw new DdlException("Iceberg rollback_to_timestamp procedure is not implemented yet");
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();

String timestampStr = namedArguments.getString(TIMESTAMP);

Snapshot previousSnapshot = icebergTable.currentSnapshot();
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;

try {
long targetTimestamp = TimeUtils.msTimeStringToLong(timestampStr, TimeUtils.getTimeZone());
icebergTable.manageSnapshots().rollbackToTime(targetTimestamp).commit();

Snapshot currentSnapshot = icebergTable.currentSnapshot();
Long currentSnapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null;
// invalid iceberg catalog table cache.
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
return Lists.newArrayList(
String.valueOf(previousSnapshotId),
String.valueOf(currentSnapshotId)
);

} catch (Exception e) {
throw new UserException("Failed to rollback to timestamp " + timestampStr + ": " + e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(
new Column("previous_snapshot_id", Type.BIGINT, false,
"ID of the snapshot that was current before the rollback operation"),
new Column("current_snapshot_id", Type.BIGINT, false,
"ID of the snapshot that was current at the specified timestamp and is now set as current"));
}

@Override
Expand Down
Loading
Loading