Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,26 @@ INSERT INTO test_paimon_time_travel_db.`tbl_time_travel$branch_b_2` VALUES
(20007, 8007, '2024-02-07', 378.45, '465 Lavender Avenue, Thorndale, WY 82201', 'PROCESSING', false),
(20008, 8008, '2024-02-08', 92.30, '729 Iris Lane, Riverside, MN 55987', 'SHIPPED', true),
(20009, 8009, '2024-02-09', 445.80, '856 Tulip Boulevard, Sunnydale, ND 58301', 'PENDING', false),
(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401', 'CANCELLED', true);
(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401', 'CANCELLED', true);


-- time travle schema change
ALTER TABLE test_paimon_time_travel_db.tbl_time_travel ADD COLUMNS (
new_col1 INT
);

-- - snpashot 5
INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
(6001, 9001, '2024-02-11', 456.80, '123 New Street, Downtown, CA 90210', 'COMPLETED', true, 100),
(6002, 9002, '2024-02-12', 289.45, '456 Updated Ave, Midtown, NY 10001', 'PROCESSING', false, 200),
(6003, 9003, '2024-02-13', 378.90, '789 Modern Blvd, Uptown, TX 75201', 'SHIPPED', true, 300);

CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag => 't_5', snapshot => 5);

-- - snapshot 6
INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
(6004, 9004, '2024-02-14', 199.99, '321 Future Lane, Innovation, WA 98001', 'PENDING', true, 400),
(6005, 9005, '2024-02-15', 567.25, '654 Progress Drive, Tech City, OR 97201', 'COMPLETED', false, 500),
(6006, 9006, '2024-02-16', 123.75, '987 Advanced Court, Silicon Valley, CA 94301', 'CANCELLED', true, 600);

CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag => 't_6', snapshot => 6);
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,45 @@ public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional<TableSnapshot> tableSnapshot,
Optional<TableScanParams> scanParams) {
makeSureInitialized();

// Current limitation: cannot specify both table snapshot and scan parameters simultaneously.
if (tableSnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag())) {
// If a snapshot is specified,
// use the specified snapshot and the corresponding schema(not the latest
// schema).
try {
Snapshot snapshot = PaimonUtil.getPaimonSnapshot(paimonTable, tableSnapshot, scanParams);
Table dataTable = paimonTable.copy(
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), String.valueOf(snapshot.id())));
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), paimonTable));
new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), dataTable));
} catch (Exception e) {
LOG.warn("Failed to get Paimon snapshot for table {}", paimonTable.name(), e);
throw new RuntimeException(
"Failed to get Paimon snapshot: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()),
e);
}
} else if (scanParams.isPresent() && scanParams.get().isBranch()) {
try {
String branch = PaimonUtil.resolvePaimonBranch(scanParams.get(), paimonTable);
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(getOrBuildNameMapping(), branch, null);
Optional<Snapshot> latestSnapshot = table.latestSnapshot();
long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
if (latestSnapshot.isPresent()) {
latestSnapshotId = latestSnapshot.get().id();
}
// Branches in Paimon can have independent schemas and snapshots.
// TODO: Add time travel support for paimon branch tables.
DataTable dataTable = (DataTable) table;
Long schemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L);
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
new PaimonSnapshot(latestSnapshotId, schemaId, dataTable));
} catch (Exception e) {
LOG.warn("Failed to get Paimon branch for table {}", paimonTable.name(), e);
throw new RuntimeException(
"Failed to get Paimon branch: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()),
e);
}
} else {
// Otherwise, use the latest snapshot and the latest schema.
return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HiveUtil;
import org.apache.doris.datasource.paimon.source.PaimonSource;
import org.apache.doris.thrift.TColumnType;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.schema.external.TArrayField;
Expand All @@ -48,8 +46,6 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
Expand Down Expand Up @@ -88,7 +84,6 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
Expand All @@ -103,22 +98,6 @@ public class PaimonUtil {
private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding();
private static final Pattern DIGITAL_REGEX = Pattern.compile("\\d+");

private static final List<ConfigOption<?>> PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS = Arrays.asList(
CoreOptions.SCAN_SNAPSHOT_ID,
CoreOptions.SCAN_TAG_NAME,
CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
CoreOptions.INCREMENTAL_BETWEEN,
CoreOptions.INCREMENTAL_TO_AUTO_TAG);

private static final List<ConfigOption<?>> PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS = Arrays.asList(
CoreOptions.SCAN_TIMESTAMP_MILLIS,
CoreOptions.SCAN_TIMESTAMP,
CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
CoreOptions.INCREMENTAL_BETWEEN,
CoreOptions.INCREMENTAL_TO_AUTO_TAG);

public static List<InternalRow> read(
Table table, @Nullable int[] projection, @Nullable Predicate predicate,
Pair<ConfigOption<?>, String>... dynamicOptions)
Expand Down Expand Up @@ -430,100 +409,6 @@ public static <T> String encodeObjectToString(T t) {
}
}

/**
* Builds a snapshot-specific table for time travel queries.
*
* @param baseTable the base Paimon table to copy configuration from
* @param tableSnapshot the snapshot specification (type + value)
* @return a Table instance configured for the specified time travel query
* @throws UserException if snapshot configuration is invalid
*/
public static Table getTableBySnapshot(Table baseTable, TableSnapshot tableSnapshot)
throws UserException {
final String value = tableSnapshot.getValue();
final TableSnapshot.VersionType type = tableSnapshot.getType();
final boolean isDigital = DIGITAL_REGEX.matcher(value).matches();

switch (type) {
case TIME:
return isDigital
? getTableBySnapshotTimestampMillis(baseTable, value)
: getTableBySnapshotTime(baseTable, value);

case VERSION:
if (isDigital) {
return getTableBySnapshotId(baseTable, value);
}
return getTableByTag(baseTable, value);

default:
throw new UserException(String.format("Unsupported version type: %s", type));
}
}

/**
* Builds a table configured to read from a specific snapshot ID.
*
* @param baseTable the base Paimon table to copy configuration from
* @param snapshotId the snapshot ID as a string
* @return a Table instance configured to read from the specified snapshot ID
*/
private static Table getTableBySnapshotId(Table baseTable, String snapshotId) {
Map<String, String> options = new HashMap<>(
PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);

// For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
// [scan_tag_name, scan_watermark, scan_snapshot_id]
options.put(CoreOptions.SCAN_TAG_NAME.key(), null);
options.put(CoreOptions.SCAN_WATERMARK.key(), null);
options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), snapshotId);
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));

return baseTable.copy(options);
}

/**
* Builds a table configured to read from a specific timestamp.
*
* @param baseTable the base Paimon table to copy configuration from
* @param timestampStr the timestamp as a string
* @return a Table instance configured to read from the specified timestamp
*/
private static Table getTableBySnapshotTime(Table baseTable, String timestampStr) {
Map<String, String> options = new HashMap<>(
PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);

// For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
// [scan_timestamp, scan_timestamp_millis]
options.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString());
options.put(CoreOptions.SCAN_TIMESTAMP.key(), timestampStr);
options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));

return baseTable.copy(options);
}

/**
* Builds a table configured to read from a specific timestamp in milliseconds.
*
* @param baseTable the base Paimon table to copy configuration from
* @param timestampStr the timestamp in milliseconds as a string
* @return a Table instance configured to read from the specified timestamp
*/
private static Table getTableBySnapshotTimestampMillis(Table baseTable, String timestampStr) {
Map<String, String> options = new HashMap<>(
PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);

// For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
// [scan_timestamp, scan_timestamp_millis]
options.put(CoreOptions.SCAN_MODE.key(), StartupMode.FROM_TIMESTAMP.toString());
options.put(CoreOptions.SCAN_TIMESTAMP.key(), null);
options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), timestampStr);
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));

return baseTable.copy(options);
}

/**
* Extracts the reference name (branch or tag name) from table scan parameters.
*
Expand All @@ -545,53 +430,6 @@ public static String extractBranchOrTagName(TableScanParams scanParams) {
}
}


/**
* Builds a branch-specific table for time travel queries.
*
* @param source the Paimon source containing catalog and table information
* @param baseTable the base Paimon table
* @param branchName the branch name
* @return a Table instance configured to read from the specified branch
* @throws UserException if branch does not exist
*/
public static Table getTableByBranch(PaimonSource source, Table baseTable, String branchName) throws UserException {

if (!checkBranchExists(baseTable, branchName)) {
throw new UserException(String.format("Branch '%s' does not exist", branchName));
}

PaimonExternalCatalog catalog = (PaimonExternalCatalog) source.getCatalog();
ExternalTable externalTable = (ExternalTable) source.getTargetTable();
return catalog.getPaimonTable(externalTable.getOrBuildNameMapping(), branchName, null);
}

/**
* Builds a tag-specific table for time travel queries.
*
* @param baseTable the base Paimon table to copy configuration from
* @param tagName the tag name
* @return a Table instance configured to read from the specified tag
* @throws UserException if tag does not exist
*/
public static Table getTableByTag(Table baseTable, String tagName) throws UserException {
if (!checkTagsExists(baseTable, tagName)) {
throw new UserException(String.format("Tag '%s' does not exist", tagName));
}

Map<String, String> options = new HashMap<>(
PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);

// For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
// [scan_tag_name, scan_watermark, scan_snapshot_id]
options.put(CoreOptions.SCAN_TAG_NAME.key(), tagName);
options.put(CoreOptions.SCAN_WATERMARK.key(), null);
options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));

return baseTable.copy(options);
}

// get snapshot info from query like 'for version/time as of' or '@tag'
public static Snapshot getPaimonSnapshot(Table table, Optional<TableSnapshot> querySnapshot,
Optional<TableScanParams> scanParams) throws UserException {
Expand Down Expand Up @@ -632,14 +470,24 @@ private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String tim
if (isDigital) {
timestampMillis = Long.parseLong(timestamp);
} else {
timestampMillis = TimeUtils.msTimeStringToLong(timestamp, TimeUtils.getTimeZone());
// Supported formats include:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS.
// use default local time zone.
timestampMillis = DateTimeUtils.parseTimestampData(timestamp, 3, TimeUtils.getTimeZone()).getMillisecond();
if (timestampMillis < 0) {
throw new DateTimeException("can't parse time: " + timestamp);
}
}
Snapshot snapshot = table.snapshotManager().earlierOrEqualTimeMills(timestampMillis);
if (snapshot == null) {
throw new UserException("can't find snapshot older than : " + timestamp);
Snapshot earliestSnapshot = table.snapshotManager().earliestSnapshot();
throw new UserException(
String.format(
"There is currently no snapshot earlier than or equal to timestamp [%s], "
+ "the earliest snapshot's timestamp is [%s]",
timestampMillis,
earliestSnapshot == null
? "null"
: String.valueOf(earliestSnapshot.timeMillis())));
}
return snapshot;
}
Expand All @@ -661,51 +509,18 @@ private static Snapshot getPaimonSnapshotByTag(DataTable table, String tagName)
return tag.orElseThrow(() -> new UserException("can't find snapshot by tag: " + tagName));
}

/**
* Creates a map of conflicting Paimon options with null values for exclusion.
*
* @param illegalOptions the list of ConfigOptions that should be set to null
* @return a HashMap containing the illegal options as keys with null values
*/
public static Map<String, String> excludePaimonConflictOptions(List<ConfigOption<?>> illegalOptions) {
return illegalOptions.stream()
.collect(HashMap::new,
(m, option) -> m.put(option.key(), null),
HashMap::putAll);
}

/**
* Checks if a tag exists in the given table.
*
* @param baseTable the Paimon table
* @param tagName the tag name to check
* @return true if tag exists, false otherwise
* @throws UserException if table is not a FileStoreTable
*/
public static boolean checkTagsExists(Table baseTable, String tagName) throws UserException {
public static String resolvePaimonBranch(TableScanParams tableScanParams, Table baseTable)
throws UserException {
String branchName = extractBranchOrTagName(tableScanParams);
if (!(baseTable instanceof FileStoreTable)) {
throw new UserException("Table type should be FileStoreTable but got: " + baseTable.getClass().getName());
}

final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
return fileStoreTable.tagManager().tagExists(tagName);
}

/**
* Checks if a branch exists in the given table.
*
* @param baseTable the Paimon table
* @param branchName the branch name to check
* @return true if branch exists, false otherwise
* @throws UserException if table is not a FileStoreTable
*/
public static boolean checkBranchExists(Table baseTable, String branchName) throws UserException {
if (!(baseTable instanceof FileStoreTable)) {
throw new UserException("Table type should be FileStoreTable but got: " + baseTable.getClass().getName());
if (!fileStoreTable.branchManager().branchExists(branchName)) {
throw new UserException("can't find branch: " + branchName);
}

final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
return fileStoreTable.branchManager().branchExists(branchName);
return branchName;
}

public static Map<String, String> getPartitionInfoMap(Table table, BinaryRow partitionValues, String timeZone) {
Expand Down
Loading