Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,26 @@ INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_2` VALUES
(20007, 8007, '2024-02-07', 378.45, '465 Lavender Avenue, Thorndale, WY 82201', 'PROCESSING', false),
(20008, 8008, '2024-02-08', 92.30, '729 Iris Lane, Riverside, MN 55987', 'SHIPPED', true),
(20009, 8009, '2024-02-09', 445.80, '856 Tulip Boulevard, Sunnydale, ND 58301', 'PENDING', false),
(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401', 'CANCELLED', true);
(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401', 'CANCELLED', true);


-- time travle schema change
ALTER TABLE test_paimon_time_travel_db.tbl_time_travel ADD COLUMNS (
new_col1 INT
);

-- - snpashot 5
INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
(6001, 9001, '2024-02-11', 456.80, '123 New Street, Downtown, CA 90210', 'COMPLETED', true, 100),
(6002, 9002, '2024-02-12', 289.45, '456 Updated Ave, Midtown, NY 10001', 'PROCESSING', false, 200),
(6003, 9003, '2024-02-13', 378.90, '789 Modern Blvd, Uptown, TX 75201', 'SHIPPED', true, 300);

CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag => 't_5', snapshot => 5);

-- - snapshot 6
INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
(6004, 9004, '2024-02-14', 199.99, '321 Future Lane, Innovation, WA 98001', 'PENDING', true, 400),
(6005, 9005, '2024-02-15', 567.25, '654 Progress Drive, Tech City, OR 97201', 'COMPLETED', false, 500),
(6006, 9006, '2024-02-16', 123.75, '987 Advanced Court, Silicon Valley, CA 94301', 'CANCELLED', true, 600);

CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag => 't_6', snapshot => 6);
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,45 @@ public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional<TableSnapshot> tableSnapshot,
Optional<TableScanParams> scanParams) {
makeSureInitialized();

// Current limitation: cannot specify both table snapshot and scan parameters simultaneously.
if (tableSnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag())) {
// If a snapshot is specified,
// use the specified snapshot and the corresponding schema(not the latest
// schema).
try {
Snapshot snapshot = PaimonUtil.getPaimonSnapshot(paimonTable, tableSnapshot, scanParams);
Table dataTable = paimonTable.copy(
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), String.valueOf(snapshot.id())));
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), paimonTable));
new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), dataTable));
} catch (Exception e) {
LOG.warn("Failed to get Paimon snapshot for table {}", paimonTable.name(), e);
throw new RuntimeException(
"Failed to get Paimon snapshot: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()),
e);
}
} else if (scanParams.isPresent() && scanParams.get().isBranch()) {
try {
String branch = PaimonUtil.resolvePaimonBranch(scanParams.get(), paimonTable);
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(getOrBuildNameMapping(), branch, null);
Optional<Snapshot> latestSnapshot = table.latestSnapshot();
long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
if (latestSnapshot.isPresent()) {
latestSnapshotId = latestSnapshot.get().id();
}
// Branches in Paimon can have independent schemas and snapshots.
// TODO: Add time travel support for paimon branch tables.
DataTable dataTable = (DataTable) table;
Long schemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L);
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
new PaimonSnapshot(latestSnapshotId, schemaId, dataTable));
} catch (Exception e) {
LOG.warn("Failed to get Paimon branch for table {}", paimonTable.name(), e);
throw new RuntimeException(
"Failed to get Paimon branch: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()),
e);
}
} else {
// Otherwise, use the latest snapshot and the latest schema.
return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HiveUtil;
import org.apache.doris.datasource.paimon.source.PaimonSource;
import org.apache.doris.thrift.TColumnType;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.schema.external.TArrayField;
Expand All @@ -48,8 +46,6 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
Expand Down Expand Up @@ -88,7 +84,6 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
Expand All @@ -103,22 +98,6 @@ public class PaimonUtil {
private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding();
private static final Pattern DIGITAL_REGEX = Pattern.compile("\\d+");

private static final List<ConfigOption<?>> PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS = Arrays.asList(
CoreOptions.SCAN_SNAPSHOT_ID,
CoreOptions.SCAN_TAG_NAME,
CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
CoreOptions.INCREMENTAL_BETWEEN,
CoreOptions.INCREMENTAL_TO_AUTO_TAG);

private static final List<ConfigOption<?>> PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS = Arrays.asList(
CoreOptions.SCAN_TIMESTAMP_MILLIS,
CoreOptions.SCAN_TIMESTAMP,
CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
CoreOptions.INCREMENTAL_BETWEEN,
CoreOptions.INCREMENTAL_TO_AUTO_TAG);

public static List<InternalRow> read(
Table table, @Nullable int[] projection, @Nullable Predicate predicate,
Pair<ConfigOption<?>, String>... dynamicOptions)
Expand Down Expand Up @@ -509,100 +488,6 @@ private static String serializePartitionValue(org.apache.paimon.types.DataType t
}
}

/**
* Builds a snapshot-specific table for time travel queries.
*
* @param baseTable the base Paimon table to copy configuration from
* @param tableSnapshot the snapshot specification (type + value)
* @return a Table instance configured for the specified time travel query
* @throws UserException if snapshot configuration is invalid
*/
public static Table getTableBySnapshot(Table baseTable, TableSnapshot tableSnapshot)
throws UserException {
final String value = tableSnapshot.getValue();
final TableSnapshot.VersionType type = tableSnapshot.getType();
final boolean isDigital = DIGITAL_REGEX.matcher(value).matches();

switch (type) {
case TIME:
return isDigital
? getTableBySnapshotTimestampMillis(baseTable, value)
: getTableBySnapshotTime(baseTable, value);

case VERSION:
if (isDigital) {
return getTableBySnapshotId(baseTable, value);
}
return getTableByTag(baseTable, value);

default:
throw new UserException(String.format("Unsupported version type: %s", type));
}
}

/**
* Builds a table configured to read from a specific snapshot ID.
*
* @param baseTable the base Paimon table to copy configuration from
* @param snapshotId the snapshot ID as a string
* @return a Table instance configured to read from the specified snapshot ID
*/
private static Table getTableBySnapshotId(Table baseTable, String snapshotId) {
Map<String, String> options = new HashMap<>(
PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);

// For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
// [scan_tag_name, scan_watermark, scan_snapshot_id]
options.put(CoreOptions.SCAN_TAG_NAME.key(), null);
options.put(CoreOptions.SCAN_WATERMARK.key(), null);
options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), snapshotId);
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));

return baseTable.copy(options);
}

/**
* Builds a table configured to read from a specific timestamp.
*
* @param baseTable the base Paimon table to copy configuration from
* @param timestampStr the timestamp as a string
* @return a Table instance configured to read from the specified timestamp
*/
private static Table getTableBySnapshotTime(Table baseTable, String timestampStr) {
Map<String, String> options = new HashMap<>(
PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);

// For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
// [scan_timestamp, scan_timestamp_millis]
options.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString());
options.put(CoreOptions.SCAN_TIMESTAMP.key(), timestampStr);
options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));

return baseTable.copy(options);
}

/**
* Builds a table configured to read from a specific timestamp in milliseconds.
*
* @param baseTable the base Paimon table to copy configuration from
* @param timestampStr the timestamp in milliseconds as a string
* @return a Table instance configured to read from the specified timestamp
*/
private static Table getTableBySnapshotTimestampMillis(Table baseTable, String timestampStr) {
Map<String, String> options = new HashMap<>(
PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);

// For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
// [scan_timestamp, scan_timestamp_millis]
options.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString());
options.put(CoreOptions.SCAN_TIMESTAMP.key(), null);
options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), timestampStr);
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));

return baseTable.copy(options);
}

/**
* Extracts the reference name (branch or tag name) from table scan parameters.
*
Expand All @@ -624,53 +509,6 @@ public static String extractBranchOrTagName(TableScanParams scanParams) {
}
}


/**
* Builds a branch-specific table for time travel queries.
*
* @param source the Paimon source containing catalog and table information
* @param baseTable the base Paimon table
* @param branchName the branch name
* @return a Table instance configured to read from the specified branch
* @throws UserException if branch does not exist
*/
public static Table getTableByBranch(PaimonSource source, Table baseTable, String branchName) throws UserException {

if (!checkBranchExists(baseTable, branchName)) {
throw new UserException(String.format("Branch '%s' does not exist", branchName));
}

PaimonExternalCatalog catalog = (PaimonExternalCatalog) source.getCatalog();
ExternalTable externalTable = (ExternalTable) source.getTargetTable();
return catalog.getPaimonTable(externalTable.getOrBuildNameMapping(), branchName, null);
}

/**
* Builds a tag-specific table for time travel queries.
*
* @param baseTable the base Paimon table to copy configuration from
* @param tagName the tag name
* @return a Table instance configured to read from the specified tag
* @throws UserException if tag does not exist
*/
public static Table getTableByTag(Table baseTable, String tagName) throws UserException {
if (!checkTagsExists(baseTable, tagName)) {
throw new UserException(String.format("Tag '%s' does not exist", tagName));
}

Map<String, String> options = new HashMap<>(
PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);

// For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
// [scan_tag_name, scan_watermark, scan_snapshot_id]
options.put(CoreOptions.SCAN_TAG_NAME.key(), tagName);
options.put(CoreOptions.SCAN_WATERMARK.key(), null);
options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));

return baseTable.copy(options);
}

// get snapshot info from query like 'for version/time as of' or '@tag'
public static Snapshot getPaimonSnapshot(Table table, Optional<TableSnapshot> querySnapshot,
Optional<TableScanParams> scanParams) throws UserException {
Expand Down Expand Up @@ -711,14 +549,24 @@ private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String tim
if (isDigital) {
timestampMillis = Long.parseLong(timestamp);
} else {
timestampMillis = TimeUtils.msTimeStringToLong(timestamp, TimeUtils.getTimeZone());
// Supported formats include:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS.
// use default local time zone.
timestampMillis = DateTimeUtils.parseTimestampData(timestamp, 3, TimeUtils.getTimeZone()).getMillisecond();
if (timestampMillis < 0) {
throw new DateTimeException("can't parse time: " + timestamp);
}
}
Snapshot snapshot = table.snapshotManager().earlierOrEqualTimeMills(timestampMillis);
if (snapshot == null) {
throw new UserException("can't find snapshot older than : " + timestamp);
Snapshot earliestSnapshot = table.snapshotManager().earliestSnapshot();
throw new UserException(
String.format(
"There is currently no snapshot earlier than or equal to timestamp [%s], "
+ "the earliest snapshot's timestamp is [%s]",
timestampMillis,
earliestSnapshot == null
? "null"
: String.valueOf(earliestSnapshot.timeMillis())));
}
return snapshot;
}
Expand All @@ -740,50 +588,17 @@ private static Snapshot getPaimonSnapshotByTag(DataTable table, String tagName)
return tag.orElseThrow(() -> new UserException("can't find snapshot by tag: " + tagName));
}

/**
* Creates a map of conflicting Paimon options with null values for exclusion.
*
* @param illegalOptions the list of ConfigOptions that should be set to null
* @return a HashMap containing the illegal options as keys with null values
*/
public static Map<String, String> excludePaimonConflictOptions(List<ConfigOption<?>> illegalOptions) {
return illegalOptions.stream()
.collect(HashMap::new,
(m, option) -> m.put(option.key(), null),
HashMap::putAll);
}

/**
* Checks if a tag exists in the given table.
*
* @param baseTable the Paimon table
* @param tagName the tag name to check
* @return true if tag exists, false otherwise
* @throws UserException if table is not a FileStoreTable
*/
public static boolean checkTagsExists(Table baseTable, String tagName) throws UserException {
public static String resolvePaimonBranch(TableScanParams tableScanParams, Table baseTable)
throws UserException {
String branchName = extractBranchOrTagName(tableScanParams);
if (!(baseTable instanceof FileStoreTable)) {
throw new UserException("Table type should be FileStoreTable but got: " + baseTable.getClass().getName());
}

final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
return fileStoreTable.tagManager().tagExists(tagName);
}

/**
* Checks if a branch exists in the given table.
*
* @param baseTable the Paimon table
* @param branchName the branch name to check
* @return true if branch exists, false otherwise
* @throws UserException if table is not a FileStoreTable
*/
public static boolean checkBranchExists(Table baseTable, String branchName) throws UserException {
if (!(baseTable instanceof FileStoreTable)) {
throw new UserException("Table type should be FileStoreTable but got: " + baseTable.getClass().getName());
if (!fileStoreTable.branchManager().branchExists(branchName)) {
throw new UserException("can't find branch: " + branchName);
}

final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
return fileStoreTable.branchManager().branchExists(branchName);
return branchName;
}
}
Loading