Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use demo.test_db;

create table schema_change_with_time_travel (c1 int);
insert into schema_change_with_time_travel values (1);

alter table schema_change_with_time_travel add column c2 int;
insert into schema_change_with_time_travel values (2,3);

alter table schema_change_with_time_travel add column c3 int;
insert into schema_change_with_time_travel values (4,5,6);

alter table schema_change_with_time_travel drop column c2;
insert into schema_change_with_time_travel values (7,8);

alter table schema_change_with_time_travel add column c2 int;
insert into schema_change_with_time_travel values (9,10,11);

alter table schema_change_with_time_travel add column c4 int;


create table schema_change_with_time_travel_orc (c1 int) tblproperties ("write.format.default"="orc");
insert into schema_change_with_time_travel_orc values (1);

alter table schema_change_with_time_travel_orc add column c2 int;
insert into schema_change_with_time_travel_orc values (2,3);

alter table schema_change_with_time_travel_orc add column c3 int;
insert into schema_change_with_time_travel_orc values (4,5,6);

alter table schema_change_with_time_travel_orc drop column c2;
insert into schema_change_with_time_travel_orc values (7,8);

alter table schema_change_with_time_travel_orc add column c2 int;
insert into schema_change_with_time_travel_orc values (9,10,11);

alter table schema_change_with_time_travel_orc add column c4 int;

11 changes: 0 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,6 @@ default List<Column> getBaseSchemaOrEmpty() {

Column getColumn(String name);

default int getBaseColumnIdxByName(String colName) {
int i = 0;
for (Column col : getBaseSchema()) {
if (col.getName().equalsIgnoreCase(colName)) {
return i;
}
++i;
}
return -1;
}

String getMysqlType();

String getEngine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public abstract class ExternalCatalog
CREATE_TIME,
USE_META_CACHE);

protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = Runtime.getRuntime().availableProcessors();

// Unique id of this catalog, will be assigned after catalog is loaded.
@SerializedName(value = "id")
protected long id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,9 @@ public void addSchemaForTest(String dbName, String tblName, ImmutableList<Column
}

public void invalidateTableCache(String dbName, String tblName) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
schemaCache.invalidate(key);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName, tblName, catalog.getName());
}
schemaCache.asMap().keySet().stream()
.filter(key -> key.dbName.equals(dbName) && key.tblName.equals(tblName))
.forEach(schemaCache::invalidate);
}

public void invalidateDbCache(String dbName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,14 @@ private void setColumnPositionMapping()
}
SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId());
String colName = slotDesc.getColumn().getName();
int idx = tbl.getBaseColumnIdxByName(colName);
int idx = -1;
List<Column> columns = getColumns();
for (int i = 0; i < columns.size(); i++) {
if (columns.get(i).getName().equals(colName)) {
idx = i;
break;
}
}
if (idx == -1) {
throw new UserException("Column " + colName + " not found in table " + tbl.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ protected void setDefaultValueExprs(TableIf tbl,
TExpr tExpr = new TExpr();
tExpr.setNodes(Lists.newArrayList());

for (Column column : tbl.getBaseSchema()) {
for (Column column : getColumns()) {
Expr expr;
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,13 @@ abstract MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<Mv
@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
}

/**
* If the table is supported as related table.
* For example, an Iceberg table may become unsupported after partition revolution.
* @return
*/
protected boolean isValidRelatedTable() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ protected void initLocalObjectsImpl() {
String.valueOf(Config.hive_metastore_client_timeout_second));
}
HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
Integer.MAX_VALUE,
String.format("hms_iceberg_catalog_%s_executor_pool", name),
true,
preExecutionAuthenticator);
FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
this.bindBrokerName(), this.catalogProperty.getHadoopProperties());
this.fileSystemExecutor = ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergMvccSnapshot;
import org.apache.doris.datasource.iceberg.IcebergSchemaCacheKey;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot;
Expand Down Expand Up @@ -209,7 +210,7 @@ protected synchronized void makeSureInitialized() {
} else {
if (supportedIcebergTable()) {
dlaType = DLAType.ICEBERG;
dlaTable = new HiveDlaTable(this);
dlaTable = new IcebergDlaTable(this);
} else if (supportedHoodieTable()) {
dlaType = DLAType.HUDI;
dlaTable = new HudiDlaTable(this);
Expand Down Expand Up @@ -313,6 +314,8 @@ public List<Column> getFullSchema() {
if (getDlaType() == DLAType.HUDI) {
return ((HudiDlaTable) dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this))
.getSchema();
} else if (getDlaType() == DLAType.ICEBERG) {
return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(), getName());
}
Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(dbName, name);
return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
Expand Down Expand Up @@ -1047,8 +1050,12 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
if (getDlaType() == DLAType.HUDI) {
return new HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this));
} else if (getDlaType() == DLAType.ICEBERG) {
return new IcebergMvccSnapshot(
IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot, getCatalog(), getDbName(), getName()));
} else {
return new EmptyMvccSnapshot();
}
return new EmptyMvccSnapshot();
}

public boolean firstColumnIsString() {
Expand Down Expand Up @@ -1084,4 +1091,9 @@ public List<SysTable> getSupportedSysTables() {
return Lists.newArrayList();
}
}

public boolean isValidRelatedTable() {
makeSureInitialized();
return dlaTable.isValidRelatedTable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue;
import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot;
import org.apache.doris.mtmv.MTMVSnapshotIf;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class IcebergDlaTable extends HMSDlaTable {

private boolean isValidRelatedTableCached = false;
private boolean isValidRelatedTable = false;

public IcebergDlaTable(HMSExternalTable table) {
super(table);
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return Maps.newHashMap(
IcebergUtils.getOrFetchSnapshotCacheValue(
snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName())
.getPartitionInfo().getNameToPartitionItem());
}

@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
return isValidRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED;
}

@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet());
}

@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(
snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName());
IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName(),
snapshotValue.getSnapshot().getSchemaId());
return schemaValue.getPartitionColumns();
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException {
IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(
snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName());
long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
if (latestSnapshotId <= 0) {
throw new AnalysisException("can not find partition: " + partitionName);
}
return new MTMVSnapshotIdSnapshot(latestSnapshotId);
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
throws AnalysisException {
hmsTable.makeSureInitialized();
IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(
snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName());
return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
}

@Override
boolean isPartitionColumnAllowNull() {
return true;
}

@Override
protected boolean isValidRelatedTable() {
if (isValidRelatedTableCached) {
return isValidRelatedTable;
}
isValidRelatedTable = false;
Set<String> allFields = Sets.newHashSet();
Table table = IcebergUtils.getIcebergTable(
hmsTable.getCatalog(),
hmsTable.getDbName(),
hmsTable.getName()
);
for (PartitionSpec spec : table.specs().values()) {
if (spec == null) {
isValidRelatedTableCached = true;
return false;
}
List<PartitionField> fields = spec.fields();
if (fields.size() != 1) {
isValidRelatedTableCached = true;
return false;
}
PartitionField partitionField = spec.fields().get(0);
String transformName = partitionField.transform().toString();
if (!IcebergUtils.YEAR.equals(transformName)
&& !IcebergUtils.MONTH.equals(transformName)
&& !IcebergUtils.DAY.equals(transformName)
&& !IcebergUtils.HOUR.equals(transformName)) {
isValidRelatedTableCached = true;
return false;
}
allFields.add(table.schema().findColumnName(partitionField.sourceId()));
}
isValidRelatedTableCached = true;
isValidRelatedTable = allFields.size() == 1;
return isValidRelatedTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
protected String icebergCatalogType;
protected Catalog catalog;
private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16;

public IcebergExternalCatalog(long catalogId, String name, String comment) {
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
Expand Down
Loading