Skip to content

Add rollback_to_timestamp table procedure in Iceberg #24926

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,29 @@ state of the table to a previous snapshot id:
ALTER TABLE testdb.customer_orders EXECUTE rollback_to_snapshot(8954597067493422955);
```

The table procedure `rollback_to_timestamp` allows the caller to roll back the
state of the table to latest snapshot before or at the specified timestamp.

```sql
ALTER TABLE testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01 10:10:10.000')
```

Assuming that the session time zone is `America/Los_Angeles` the following queries are equivalent:

```sql
ALTER TABLE testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01')
```

```sql
SELECT *
FROM example.testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01 00:00:00');
```

```sql
SELECT *
FROM example.testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01 00:00:00.000 America/Los_Angeles');
```

#### `NOT NULL` column constraint

The Iceberg connector supports setting `NOT NULL` constraints on the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,7 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
case OPTIMIZE -> getTableHandleForOptimize(tableHandle, icebergTable, executeProperties, retryMode);
case DROP_EXTENDED_STATS -> getTableHandleForDropExtendedStats(session, tableHandle);
case ROLLBACK_TO_SNAPSHOT -> getTableHandleForRollbackToSnapshot(session, tableHandle, executeProperties);
case ROLLBACK_TO_TIMESTAMP -> getTableHandleForRollbackToTimestamp(session, tableHandle, executeProperties);
case EXPIRE_SNAPSHOTS -> getTableHandleForExpireSnapshots(session, tableHandle, executeProperties);
case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties);
case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties);
Expand Down Expand Up @@ -1777,6 +1778,22 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForRollbackToSnapsho
icebergTable.io().properties()));
}

private Optional<ConnectorTableExecuteHandle> getTableHandleForRollbackToTimestamp(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
{
Instant instant = (Instant) executeProperties.get("snapshot_timestamp");
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

long snapshotId = getSnapshotIdAsOfTime(icebergTable, instant.toEpochMilli());

// reuse rollback_to_snapshot
return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
ROLLBACK_TO_SNAPSHOT,
new IcebergRollbackToSnapshotHandle(snapshotId),
icebergTable.location(),
icebergTable.io().properties()));
}

private static Object requireProcedureArgument(Map<String, Object> properties, String name)
{
Object value = properties.get(name);
Expand All @@ -1793,6 +1810,7 @@ public Optional<ConnectorTableLayout> getLayoutForTableExecute(ConnectorSession
return getLayoutForOptimize(session, executeHandle);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case ROLLBACK_TO_TIMESTAMP:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1823,6 +1841,7 @@ public BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle
return beginOptimize(session, executeHandle, table);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case ROLLBACK_TO_TIMESTAMP:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1869,6 +1888,7 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
return;
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case ROLLBACK_TO_TIMESTAMP:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotTableProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotTimestampTableProcedure;
import io.trino.plugin.iceberg.procedure.UnregisterTableProcedure;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
Expand Down Expand Up @@ -132,6 +133,7 @@ public void configure(Binder binder)
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RollbackToSnapshotTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RollbackToSnapshotTimestampTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
pageSorter);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case ROLLBACK_TO_TIMESTAMP:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public enum IcebergTableProcedureId
OPTIMIZE,
DROP_EXTENDED_STATS,
ROLLBACK_TO_SNAPSHOT,
ROLLBACK_TO_TIMESTAMP,
EXPIRE_SNAPSHOTS,
REMOVE_ORPHAN_FILES,
ADD_FILES,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.SqlTimestampWithTimeZone;

import java.time.Instant;

import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_TIMESTAMP;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;

public class RollbackToSnapshotTimestampTableProcedure
implements Provider<TableProcedureMetadata>
{
@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
ROLLBACK_TO_TIMESTAMP.name(),
coordinatorOnly(),
ImmutableList.<PropertyMetadata<?>>builder()
.add(new PropertyMetadata<>(
"snapshot_timestamp",
"Snapshot timestamp",
TIMESTAMP_TZ_MILLIS,
Instant.class,
null,
false,
value -> ((SqlTimestampWithTimeZone) value).toZonedDateTime().toInstant(),
instant -> SqlTimestampWithTimeZone.fromInstant(3, instant, UTC_KEY.getZoneId())))
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,49 @@ private void testRollbackSnapshot(String rollbackToSnapshotFormat)
assertUpdate("DROP TABLE test_rollback");
}

@Test
public void testRollbackTimestamp()
{
testRollbackTimestamp("ALTER TABLE tpch.test_rollback_timestamp EXECUTE rollback_to_timestamp(TIMESTAMP '%s')");
testRollbackTimestamp("ALTER TABLE tpch.test_rollback_timestamp EXECUTE rollback_to_timestamp(snapshot_timestamp => TIMESTAMP '%s')");
}

private void testRollbackTimestamp(String rollbackToTimestampForamt)
{
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS VV");

assertUpdate("CREATE TABLE test_rollback_timestamp (col0 INTEGER, col1 BIGINT)");
String afterCreateTableTimestamp = getCurrentCommitTimestamp("test_rollback_timestamp").format(timeFormatter);

assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (123, CAST(987 AS BIGINT))", 1);
String afterFirstInsertTimestamp = getCurrentCommitTimestamp("test_rollback_timestamp").format(timeFormatter);
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

// Check that rollback_to_timestamp can be executed also when it does not do any changes
assertUpdate(format(rollbackToTimestampForamt, afterFirstInsertTimestamp));
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1);
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))");

assertUpdate(format(rollbackToTimestampForamt, afterFirstInsertTimestamp));
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

assertUpdate(format(rollbackToTimestampForamt, afterCreateTableTimestamp));
assertThat((long) computeActual("SELECT COUNT(*) FROM test_rollback_timestamp").getOnlyValue()).isEqualTo(0);

assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1);
String afterSecondInsertTimestamp = getCurrentCommitTimestamp("test_rollback_timestamp").format(timeFormatter);

// extra insert which should be dropped on rollback
assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1);

assertUpdate(format(rollbackToTimestampForamt, afterSecondInsertTimestamp));
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (789, CAST(987 AS BIGINT))");

assertUpdate("DROP TABLE test_rollback_timestamp");
}

@Test
void testRollbackToSnapshotWithNullArgument()
{
Expand Down Expand Up @@ -8868,6 +8911,11 @@ private long getCurrentSnapshotId(String tableName)
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
}

private ZonedDateTime getCurrentCommitTimestamp(String tableName)
{
return (ZonedDateTime) computeScalar("SELECT committed_at FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
}

private String getIcebergTableDataPath(String tableLocation)
{
return tableLocation + "/data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.function.Consumer;

Expand Down Expand Up @@ -366,13 +369,42 @@ public void testRollbackToSnapshot()
onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testRollbackToTimestamp()
throws InterruptedException
{
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS VV");

String tableName = "test_rollback_to_timestamp_" + randomNameSuffix();

onTrino().executeQuery("USE iceberg.default");
onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName));
onTrino().executeQuery(format("CREATE TABLE %s (a INTEGER)", tableName));
Thread.sleep(1);
onTrino().executeQuery(format("INSERT INTO %s VALUES 1", tableName));
String snapshotTimestamp = timeFormatter.format(getCurrentCommitTimestamp(tableName).toInstant().atZone(ZoneId.of("UTC")));
Thread.sleep(1);
onTrino().executeQuery(format("INSERT INTO %s VALUES 2", tableName));
onTrino().executeQuery(format("ALTER TABLE %s EXECUTE rollback_to_timestamp(TIMESTAMP '%s')", tableName, snapshotTimestamp));
assertThat(onTrino().executeQuery(format("SELECT * FROM %s", tableName)))
.containsOnly(row(1));
onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName));
}

private long getSecondOldestTableSnapshot(String tableName)
{
return (Long) onTrino().executeQuery(
format("SELECT snapshot_id FROM iceberg.default.\"%s$snapshots\" WHERE parent_id IS NOT NULL ORDER BY committed_at FETCH FIRST 1 ROW WITH TIES", tableName))
.getOnlyValue();
}

private Timestamp getCurrentCommitTimestamp(String tableName)
{
return (Timestamp) onTrino().executeQuery(
format("SELECT committed_at FROM \"%s$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES", tableName))
.getOnlyValue();
}

@DataProvider
public static Object[][] fileFormats()
{
Expand Down
Loading