Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ CREATE TABLE date_partition (
'file.format'='orc'
);

insert into date_partition values(1,date '2020-01-01');
insert into date_partition values(1,date '2020-01-01');

drop table if exists test_schema_change;
create table test_schema_change;
alter table test_schema_change add column id int;
insert into test_schema_change values(1);
CALL sys.create_tag(table => 'test_schema_change', tag => 'tag1', snapshot => 1);
CALL sys.create_branch('test_schema_change', 'branch1', 'tag1');
alter table test_schema_change add column name string;
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ public static long timeStringToLong(String timeStr, TimeZone timeZone) {
return d.atZone(timeZone.toZoneId()).toInstant().toEpochMilli();
}

public static long msTimeStringToLong(String timeStr, TimeZone timeZone) {
DateTimeFormatter dateFormatTimeZone = getDatetimeMsFormatWithTimeZone();
dateFormatTimeZone.withZone(timeZone.toZoneId());
LocalDateTime d;
try {
d = LocalDateTime.parse(timeStr, dateFormatTimeZone);
} catch (DateTimeParseException e) {
return -1;
}
return d.atZone(timeZone.toZoneId()).toInstant().toEpochMilli();
}

// Check if the time zone_value is valid
public static String checkTimeZoneValidAndStandardize(String value) throws DdlException {
Function<String, String> standardizeValue = s -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
Expand Down Expand Up @@ -98,10 +99,28 @@ public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
return getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable();
}

private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() {
private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional<TableSnapshot> tableSnapshot,
Optional<TableScanParams> scanParams) {
makeSureInitialized();
return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
.getPaimonSnapshot(this);
if (tableSnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag())) {
// If a snapshot is specified,
// use the specified snapshot and the corresponding schema(not the latest
// schema).
try {
Snapshot snapshot = PaimonUtil.getPaimonSnapshot(paimonTable, tableSnapshot, scanParams);
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), paimonTable));
} catch (Exception e) {
LOG.warn("Failed to get Paimon snapshot for table {}", paimonTable.name(), e);
throw new RuntimeException(
"Failed to get Paimon snapshot: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()),
e);
}
} else {
// Otherwise, use the latest snapshot and the latest schema.
return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
.getPaimonSnapshot(this);
}
}

@Override
Expand Down Expand Up @@ -212,7 +231,8 @@ public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws A

@Override
public long getNewestUpdateVersionOrTime() {
return getPaimonSnapshotCacheValue().getPartitionInfo().getNameToPartition().values().stream()
return getPaimonSnapshotCacheValue(Optional.empty(), Optional.empty()).getPartitionInfo().getNameToPartition()
.values().stream()
.mapToLong(Partition::lastFileCreationTime).max().orElse(0);
}

Expand All @@ -228,7 +248,7 @@ public boolean isPartitionColumnAllowNull() {

@Override
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot, Optional<TableScanParams> scanParams) {
return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue());
return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue(tableSnapshot, scanParams));
}

@Override
Expand Down Expand Up @@ -289,7 +309,7 @@ private PaimonSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional<MvccSnaps
if (snapshot.isPresent()) {
return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue();
} else {
return getPaimonSnapshotCacheValue();
return getPaimonSnapshotCacheValue(Optional.empty(), Optional.empty());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -115,14 +117,14 @@ private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOE
Table snapshotTable = table;
// snapshotId and schemaId
Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
long latestSchemaId = 0L;
Optional<Snapshot> optionalSnapshot = table.latestSnapshot();
if (optionalSnapshot.isPresent()) {
latestSnapshotId = optionalSnapshot.get().id();
latestSchemaId = table.snapshot(latestSnapshotId).schemaId();
snapshotTable =
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), latestSnapshotId.toString()));
}
DataTable dataTable = (DataTable) table;
long latestSchemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L);
return new PaimonSnapshot(latestSnapshotId, latestSchemaId, snapshotTable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HiveUtil;
import org.apache.doris.datasource.paimon.source.PaimonSource;
Expand All @@ -49,6 +50,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
Expand All @@ -58,9 +60,11 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
Expand All @@ -75,8 +79,10 @@
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.DateTimeException;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
Expand All @@ -87,6 +93,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -656,6 +663,75 @@ public static Table getTableByTag(Table baseTable, String tagName) throws UserEx
return baseTable.copy(options);
}

// get snapshot info from query like 'for version/time as of' or '@tag'
public static Snapshot getPaimonSnapshot(Table table, Optional<TableSnapshot> querySnapshot,
Optional<TableScanParams> scanParams) throws UserException {
Preconditions.checkArgument(querySnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag()),
"should spec version or time or tag");
Preconditions.checkArgument(!(querySnapshot.isPresent() && scanParams.isPresent()),
"should not spec both snapshot and scan params");

DataTable dataTable = (DataTable) table;
if (querySnapshot.isPresent()) {
return getPaimonSnapshotByTableSnapshot(dataTable, querySnapshot.get());
} else if (scanParams.isPresent() && scanParams.get().isTag()) {
return getPaimonSnapshotByTag(dataTable, extractBranchOrTagName(scanParams.get()));
} else {
throw new UserException("should spec version or time or tag");
}
}

private static Snapshot getPaimonSnapshotByTableSnapshot(DataTable table, TableSnapshot tableSnapshot)
throws UserException {
final String value = tableSnapshot.getValue();
final TableSnapshot.VersionType type = tableSnapshot.getType();
final boolean isDigital = DIGITAL_REGEX.matcher(value).matches();

switch (type) {
case TIME:
return getPaimonSnapshotByTimestamp(table, value, isDigital);
case VERSION:
return isDigital ? getPaimonSnapshotBySnapshotId(table, value) : getPaimonSnapshotByTag(table, value);
default:
throw new UserException("Unsupported snapshot type: " + type);
}
}

private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String timestamp, boolean isDigital)
throws UserException {
long timestampMillis = 0;
if (isDigital) {
timestampMillis = Long.parseLong(timestamp);
} else {
timestampMillis = TimeUtils.msTimeStringToLong(timestamp, TimeUtils.getTimeZone());
if (timestampMillis < 0) {
throw new DateTimeException("can't parse time: " + timestamp);
}
}
Snapshot snapshot = table.snapshotManager().earlierOrEqualTimeMills(timestampMillis);
if (snapshot == null) {
throw new UserException("can't find snapshot older than : " + timestamp);
}
return snapshot;
}

private static Snapshot getPaimonSnapshotBySnapshotId(DataTable table, String snapshotString)
throws UserException {
long snapshotId = Long.parseLong(snapshotString);
try {
Snapshot snapshot = table.snapshotManager().tryGetSnapshot(snapshotId);
return snapshot;
} catch (FileNotFoundException e) {
throw new UserException("can't find snapshot by id: " + snapshotId, e);
}
}

private static Snapshot getPaimonSnapshotByTag(DataTable table, String tagName)
throws UserException {
Optional<Tag> tag = table.tagManager().get(tagName);
return tag.orElseThrow(() -> new UserException("can't find snapshot by tag: " + tagName));
}

/**
* Creates a map of conflicting Paimon options with null values for exclusion.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -384,20 +383,16 @@ public Map<String, String> getIncrReadParams() throws UserException {

@VisibleForTesting
public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() throws UserException {
if (!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key())) {
// an empty table in PaimonSnapshotCacheValue
return Collections.emptyList();
}
Table paimonTable = getProcessedTable();
int[] projected = desc.getSlots().stream().mapToInt(
slot -> source.getPaimonTable().rowType()
.getFieldNames()
.stream()
.map(String::toLowerCase)
.collect(Collectors.toList())
.indexOf(slot.getColumn().getName()))
slot -> paimonTable.rowType()
.getFieldNames()
.stream()
.map(String::toLowerCase)
.collect(Collectors.toList())
.indexOf(slot.getColumn().getName()))
.filter(i -> i >= 0)
.toArray();

Table paimonTable = getProcessedTable();
ReadBuilder readBuilder = paimonTable.newReadBuilder();
return readBuilder.withFilter(predicates)
.withProjection(projected)
Expand Down Expand Up @@ -713,5 +708,3 @@ private Table getProcessedTable() throws UserException {
return baseTable;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ private void beforeMTMVRefresh() throws AnalysisException, DdlException {
}
if (tableIf instanceof MvccTable) {
MvccTable mvccTable = (MvccTable) tableIf;
MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(Optional.empty(), null);
MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(Optional.empty(), Optional.empty());
snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !desc_schema --
id int Yes true \N
name text Yes true \N

-- !select_table --
1 \N
2 \N
3 \N

-- !select_branch --
1 \N
2 \N
3 \N

-- !select_branch2 --
1 \N
2 \N
3 \N

-- !select_tag --
1
2
3

-- !select_tag2 --
1
2
3

Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,22 @@ new_col1 array<int> Yes true \N
-- !count_4 --
3

-- !desc_latest_schema --
id int Yes true \N
name varchar(2147483647) Yes true \N

-- !query_latest_schema --
1 \N

-- !time_travel_schema_branch --
1 \N

-- !time_travel_schema_tag --
1

-- !time_travel_schema_tag2 --
1

-- !time_travel_schema_snapshot --
1

Loading