Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;

import com.google.common.collect.Lists;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -41,8 +48,7 @@ public class IcebergCherrypickSnapshotAction extends BaseIcebergAction {
public static final String SNAPSHOT_ID = "snapshot_id";

public IcebergCherrypickSnapshotAction(Map<String, String> properties,
Optional<PartitionNamesInfo> partitionNamesInfo,
Optional<Expression> whereCondition,
Optional<PartitionNamesInfo> partitionNamesInfo, Optional<Expression> whereCondition,
IcebergExternalTable icebergTable) {
super("cherrypick_snapshot", properties, partitionNamesInfo, whereCondition, icebergTable);
}
Expand All @@ -65,7 +71,38 @@ protected void validateIcebergAction() throws UserException {

@Override
protected List<String> executeAction(TableIf table) throws UserException {
throw new DdlException("Iceberg cherrypick_snapshot procedure is not implemented yet");
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
Long sourceSnapshotId = namedArguments.getLong(SNAPSHOT_ID);

try {
Snapshot targetSnapshot = icebergTable.snapshot(sourceSnapshotId);
if (targetSnapshot == null) {
throw new UserException("Snapshot not found in table");
}

icebergTable.manageSnapshots().cherrypick(sourceSnapshotId).commit();
Snapshot currentSnapshot = icebergTable.currentSnapshot();

// invalid iceberg catalog table cache.
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
return Lists.newArrayList(
String.valueOf(sourceSnapshotId),
String.valueOf(currentSnapshot.snapshotId()
)
);

} catch (Exception e) {
throw new UserException("Failed to cherry-pick snapshot " + sourceSnapshotId + ": " + e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(new Column("source_snapshot_id", Type.BIGINT, false,
"ID of the snapshot whose changes were cherry-picked into the current table state"),
new Column("current_snapshot_id", Type.BIGINT, false,
"ID of the new snapshot created as a result of the cherry-pick operation, "
+ "now set as the current snapshot"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;

import com.google.common.collect.Lists;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -49,11 +55,11 @@ public IcebergFastForwardAction(Map<String, String> properties,
protected void registerIcebergArguments() {
// Register required arguments for branch and to
namedArguments.registerRequiredArgument(BRANCH,
"Name of the target branch to fast-forward to",
"Name of the branch to fast-forward to",
ArgumentParsers.nonEmptyString(BRANCH));
namedArguments.registerRequiredArgument(TO,
"Target snapshot ID to fast-forward to",
ArgumentParsers.positiveLong(TO));
"Target branch to fast-forward to",
ArgumentParsers.nonEmptyString(TO));
}

@Override
Expand All @@ -65,7 +71,41 @@ protected void validateIcebergAction() throws UserException {

@Override
protected List<String> executeAction(TableIf table) throws UserException {
throw new DdlException("Iceberg fast_forward procedure is not implemented yet");
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();

String sourceBranch = namedArguments.getString(BRANCH);
String desBranch = namedArguments.getString(TO);

try {
Long snapshotBefore =
icebergTable.snapshot(sourceBranch) != null ? icebergTable.snapshot(sourceBranch).snapshotId()
: null;
icebergTable.manageSnapshots().fastForwardBranch(sourceBranch, desBranch).commit();
long snapshotAfter = icebergTable.snapshot(sourceBranch).snapshotId();
// invalid iceberg catalog table cache.
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
return Lists.newArrayList(
sourceBranch.trim(),
String.valueOf(snapshotBefore),
String.valueOf(snapshotAfter)
);

} catch (Exception e) {
throw new UserException(
"Failed to fast-forward branch " + sourceBranch + " to snapshot " + desBranch + ": "
+ e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(
new Column("branch_updated", Type.STRING, false,
"Name of the branch that was fast-forwarded to match the target branch"),
new Column("previous_ref", Type.BIGINT, true,
"Snapshot ID that the branch was pointing to before the fast-forward operation"),
new Column("updated_ref", Type.BIGINT, false,
"Snapshot ID that the branch is pointing to after the fast-forward operation"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;

import com.google.common.collect.Lists;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -62,7 +69,43 @@ protected void validateIcebergAction() throws UserException {

@Override
protected List<String> executeAction(TableIf table) throws UserException {
throw new DdlException("Iceberg rollback_to_snapshot procedure is not implemented yet");
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
Long targetSnapshotId = namedArguments.getLong(SNAPSHOT_ID);

Snapshot targetSnapshot = icebergTable.snapshot(targetSnapshotId);
if (targetSnapshot == null) {
throw new UserException("Snapshot " + targetSnapshotId + " not found in table " + icebergTable.name());
}

try {
Snapshot previousSnapshot = icebergTable.currentSnapshot();
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;
if (previousSnapshot != null && previousSnapshot.snapshotId() == targetSnapshotId) {
return Lists.newArrayList(
String.valueOf(previousSnapshotId),
String.valueOf(targetSnapshotId)
);
}
icebergTable.manageSnapshots().rollbackTo(targetSnapshotId).commit();
// invalid iceberg catalog table cache.
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
return Lists.newArrayList(
String.valueOf(previousSnapshotId),
String.valueOf(targetSnapshotId)
);

} catch (Exception e) {
throw new UserException("Failed to rollback to snapshot " + targetSnapshotId + ": " + e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(
new Column("previous_snapshot_id", Type.BIGINT, false,
"ID of the snapshot that was current before the rollback operation"),
new Column("current_snapshot_id", Type.BIGINT, false,
"ID of the snapshot that is now current after rolling back to the specified snapshot"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;

import com.google.common.collect.Lists;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;

import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -34,6 +43,7 @@
* at a specific timestamp.
*/
public class IcebergRollbackToTimestampAction extends BaseIcebergAction {
private static final DateTimeFormatter DATETIME_MS_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must with millionsecond?

public static final String TIMESTAMP = "timestamp";

public IcebergRollbackToTimestampAction(Map<String, String> properties,
Expand All @@ -48,7 +58,7 @@ protected void registerIcebergArguments() {
// Create a custom timestamp parser that supports both ISO datetime and
// millisecond formats
namedArguments.registerRequiredArgument(TIMESTAMP,
"A timestamp to rollback to (ISO datetime 'yyyy-MM-ddTHH:mm:ss' or milliseconds since epoch)",
"A timestamp to rollback to (formats: 'yyyy-MM-dd HH:mm:ss.SSS' or milliseconds since epoch)",
value -> {
if (value == null || value.trim().isEmpty()) {
throw new IllegalArgumentException("timestamp cannot be empty");
Expand All @@ -64,14 +74,13 @@ protected void registerIcebergArguments() {
}
return trimmed;
} catch (NumberFormatException e) {
// If not a number, try as ISO datetime format
// Second attempt: Parse as ISO datetime format (yyyy-MM-dd HH:mm:ss.SSS)
try {
java.time.LocalDateTime.parse(trimmed,
java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME);
java.time.LocalDateTime.parse(trimmed, DATETIME_MS_FORMAT);
return trimmed;
} catch (java.time.format.DateTimeParseException dte) {
throw new IllegalArgumentException("Invalid timestamp format. Expected ISO datetime "
+ "(yyyy-MM-ddTHH:mm:ss) or timestamp in milliseconds: " + trimmed);
+ "(yyyy-MM-dd HH:mm:ss.SSS) or timestamp in milliseconds: " + trimmed);
}
}
});
Expand All @@ -87,7 +96,38 @@ protected void validateIcebergAction() throws UserException {

@Override
protected List<String> executeAction(TableIf table) throws UserException {
throw new DdlException("Iceberg rollback_to_timestamp procedure is not implemented yet");
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();

String timestampStr = namedArguments.getString(TIMESTAMP);

Snapshot previousSnapshot = icebergTable.currentSnapshot();
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;

try {
long targetTimestamp = TimeUtils.msTimeStringToLong(timestampStr, TimeUtils.getTimeZone());
icebergTable.manageSnapshots().rollbackToTime(targetTimestamp).commit();

Snapshot currentSnapshot = icebergTable.currentSnapshot();
Long currentSnapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null;
// invalid iceberg catalog table cache.
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
return Lists.newArrayList(
String.valueOf(previousSnapshotId),
String.valueOf(currentSnapshotId)
);

} catch (Exception e) {
throw new UserException("Failed to rollback to timestamp " + timestampStr + ": " + e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(
new Column("previous_snapshot_id", Type.BIGINT, false,
"ID of the snapshot that was current before the rollback operation"),
new Column("current_snapshot_id", Type.BIGINT, false,
"ID of the snapshot that was current at the specified timestamp and is now set as current"));
}

@Override
Expand Down
Loading
Loading