Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ CREATE TABLE date_partition (
'file.format'='orc'
);

insert into date_partition values(1,date '2020-01-01');
insert into date_partition values(1,date '2020-01-01');

drop table if exists test_schema_change;
create table test_schema_change;
alter table test_schema_change add column id int;
insert into test_schema_change values(1);
CALL sys.create_tag(table => 'test_schema_change', tag => 'tag1', snapshot => 1);
CALL sys.create_branch('test_schema_change', 'branch1', 'tag1');
alter table test_schema_change add column name string;
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
package org.apache.doris.analysis;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.bouncycastle.util.Strings;

import java.util.List;
import java.util.Map;

public class TableScanParams {
public static final String PARAMS_NAME = "name";
public static String INCREMENTAL_READ = "incr";
public static String BRANCH = "branch";
public static String TAG = "tag";
public static final String INCREMENTAL_READ = "incr";
public static final String BRANCH = "branch";
public static final String TAG = "tag";
private static final ImmutableSet<String> VALID_PARAM_TYPES = ImmutableSet.of(
INCREMENTAL_READ,
BRANCH,
TAG);

private final String paramType;
// There are two ways to pass parameters to a function.
Expand All @@ -38,10 +43,18 @@ public class TableScanParams {
private final Map<String, String> mapParams;
private final List<String> listParams;

private void validate() {
if (!VALID_PARAM_TYPES.contains(paramType)) {
throw new IllegalArgumentException("Invalid param type: " + paramType);
}
// TODO: validate mapParams and listParams for different param types
}

public TableScanParams(String paramType, Map<String, String> mapParams, List<String> listParams) {
this.paramType = Strings.toLowerCase(paramType);
this.mapParams = mapParams == null ? ImmutableMap.of() : ImmutableMap.copyOf(mapParams);
this.listParams = listParams;
validate();
}

public List<String> getListParams() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ public static long timeStringToLong(String timeStr, TimeZone timeZone) {
return d.atZone(timeZone.toZoneId()).toInstant().toEpochMilli();
}

public static long msTimeStringToLong(String timeStr, TimeZone timeZone) {
DateTimeFormatter dateFormatTimeZone = getDatetimeMsFormatWithTimeZone();
dateFormatTimeZone.withZone(timeZone.toZoneId());
LocalDateTime d;
try {
d = LocalDateTime.parse(timeStr, dateFormatTimeZone);
} catch (DateTimeParseException e) {
return -1;
}
return d.atZone(timeZone.toZoneId()).toInstant().toEpochMilli();
}

// Check if the time zone_value is valid
public static String checkTimeZoneValidAndStandardize(String value) throws DdlException {
Function<String, String> standardizeValue = s -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
Expand Down Expand Up @@ -98,10 +99,28 @@ public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
return getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable();
}

private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() {
private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional<TableSnapshot> tableSnapshot,
Optional<TableScanParams> scanParams) {
makeSureInitialized();
return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
.getPaimonSnapshot(this);
if (tableSnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag())) {
// If a snapshot is specified,
// use the specified snapshot and the corresponding schema(not the latest
// schema).
try {
Snapshot snapshot = PaimonUtil.getPaimonSnapshot(paimonTable, tableSnapshot, scanParams);
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
new PaimonSnapshot(snapshot.id(), snapshot.schemaId(), paimonTable));
} catch (Exception e) {
LOG.warn("Failed to get Paimon snapshot for table {}", paimonTable.name(), e);
throw new RuntimeException(
"Failed to get Paimon snapshot: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()),
e);
}
} else {
// Otherwise, use the latest snapshot and the latest schema.
return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
.getPaimonSnapshot(this);
}
}

@Override
Expand Down Expand Up @@ -216,7 +235,7 @@ public boolean isPartitionColumnAllowNull() {

@Override
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot, Optional<TableScanParams> scanParams) {
return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue());
return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue(tableSnapshot, scanParams));
}

@Override
Expand Down Expand Up @@ -278,7 +297,7 @@ private PaimonSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional<MvccSnaps
if (snapshot.isPresent()) {
return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue();
} else {
return getPaimonSnapshotCacheValue();
return getPaimonSnapshotCacheValue(Optional.empty(), Optional.empty());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -115,15 +117,15 @@ private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOE
Table snapshotTable = table;
// snapshotId and schemaId
Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
long latestSchemaId = 0L;
Optional<Snapshot> optionalSnapshot = table.latestSnapshot();
if (optionalSnapshot.isPresent()) {
latestSnapshotId = optionalSnapshot.get().id();
latestSchemaId = table.snapshot(latestSnapshotId).schemaId();
snapshotTable =
table.copy(
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), latestSnapshotId.toString()));
}
DataTable dataTable = (DataTable) table;
long latestSchemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L);
return new PaimonSnapshot(latestSnapshotId, latestSchemaId, snapshotTable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HiveUtil;
import org.apache.doris.datasource.paimon.source.PaimonSource;
Expand All @@ -49,6 +50,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
Expand All @@ -58,9 +60,11 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
Expand All @@ -75,8 +79,10 @@
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.DateTimeException;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
Expand All @@ -87,6 +93,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -577,6 +584,75 @@ public static Table getTableByTag(Table baseTable, String tagName) throws UserEx
return baseTable.copy(options);
}

// get snapshot info from query like 'for version/time as of' or '@tag'
public static Snapshot getPaimonSnapshot(Table table, Optional<TableSnapshot> querySnapshot,
Optional<TableScanParams> scanParams) throws UserException {
Preconditions.checkArgument(querySnapshot.isPresent() || (scanParams.isPresent() && scanParams.get().isTag()),
"should spec version or time or tag");
Preconditions.checkArgument(!(querySnapshot.isPresent() && scanParams.isPresent()),
"should not spec both snapshot and scan params");

DataTable dataTable = (DataTable) table;
if (querySnapshot.isPresent()) {
return getPaimonSnapshotByTableSnapshot(dataTable, querySnapshot.get());
} else if (scanParams.isPresent() && scanParams.get().isTag()) {
return getPaimonSnapshotByTag(dataTable, extractBranchOrTagName(scanParams.get()));
} else {
throw new UserException("should spec version or time or tag");
}
}

private static Snapshot getPaimonSnapshotByTableSnapshot(DataTable table, TableSnapshot tableSnapshot)
throws UserException {
final String value = tableSnapshot.getValue();
final TableSnapshot.VersionType type = tableSnapshot.getType();
final boolean isDigital = DIGITAL_REGEX.matcher(value).matches();

switch (type) {
case TIME:
return getPaimonSnapshotByTimestamp(table, value, isDigital);
case VERSION:
return isDigital ? getPaimonSnapshotBySnapshotId(table, value) : getPaimonSnapshotByTag(table, value);
default:
throw new UserException("Unsupported snapshot type: " + type);
}
}

private static Snapshot getPaimonSnapshotByTimestamp(DataTable table, String timestamp, boolean isDigital)
throws UserException {
long timestampMillis = 0;
if (isDigital) {
timestampMillis = Long.parseLong(timestamp);
} else {
timestampMillis = TimeUtils.msTimeStringToLong(timestamp, TimeUtils.getTimeZone());
if (timestampMillis < 0) {
throw new DateTimeException("can't parse time: " + timestamp);
}
}
Snapshot snapshot = table.snapshotManager().earlierOrEqualTimeMills(timestampMillis);
if (snapshot == null) {
throw new UserException("can't find snapshot older than : " + timestamp);
}
return snapshot;
}

private static Snapshot getPaimonSnapshotBySnapshotId(DataTable table, String snapshotString)
throws UserException {
long snapshotId = Long.parseLong(snapshotString);
try {
Snapshot snapshot = table.snapshotManager().tryGetSnapshot(snapshotId);
return snapshot;
} catch (FileNotFoundException e) {
throw new UserException("can't find snapshot by id: " + snapshotId, e);
}
}

private static Snapshot getPaimonSnapshotByTag(DataTable table, String tagName)
throws UserException {
Optional<Tag> tag = table.tagManager().get(tagName);
return tag.orElseThrow(() -> new UserException("can't find snapshot by tag: " + tagName));
}

/**
* Creates a map of conflicting Paimon options with null values for exclusion.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -383,20 +382,16 @@ public Map<String, String> getIncrReadParams() throws UserException {

@VisibleForTesting
public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() throws UserException {
if (!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key())) {
// an empty table in PaimonSnapshotCacheValue
return Collections.emptyList();
}
Table paimonTable = getProcessedTable();
int[] projected = desc.getSlots().stream().mapToInt(
slot -> source.getPaimonTable().rowType()
.getFieldNames()
.stream()
.map(String::toLowerCase)
.collect(Collectors.toList())
.indexOf(slot.getColumn().getName()))
slot -> paimonTable.rowType()
.getFieldNames()
.stream()
.map(String::toLowerCase)
.collect(Collectors.toList())
.indexOf(slot.getColumn().getName()))
.filter(i -> i >= 0)
.toArray();

Table paimonTable = getProcessedTable();
ReadBuilder readBuilder = paimonTable.newReadBuilder();
return readBuilder.withFilter(predicates)
.withProjection(projected)
Expand Down Expand Up @@ -712,5 +707,3 @@ private Table getProcessedTable() throws UserException {
return baseTable;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ private void beforeMTMVRefresh() throws AnalysisException, DdlException {
}
if (tableIf instanceof MvccTable) {
MvccTable mvccTable = (MvccTable) tableIf;
MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(Optional.empty(), null);
MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(Optional.empty(), Optional.empty());
snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !desc_schema --
id int Yes true \N
name text Yes true \N

-- !select_table --
1 \N
2 \N
3 \N

-- !select_branch --
1 \N
2 \N
3 \N

-- !select_branch2 --
1 \N
2 \N
3 \N

-- !select_tag --
1
2
3

-- !select_tag2 --
1
2
3

Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,22 @@ new_col1 array<int> Yes true \N
-- !count_4 --
3

-- !desc_latest_schema --
id int Yes true \N
name varchar(2147483647) Yes true \N

-- !query_latest_schema --
1 \N

-- !time_travel_schema_branch --
1 \N

-- !time_travel_schema_tag --
1

-- !time_travel_schema_tag2 --
1

-- !time_travel_schema_snapshot --
1

Loading