Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.es.EsExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
Expand Down Expand Up @@ -432,13 +433,13 @@ private void refreshOnlyCatalogCache(boolean invalidCache) {
}
}

public final Optional<SchemaCacheValue> getSchema(String dbName, String tblName) {
public final Optional<SchemaCacheValue> getSchema(SchemaCacheKey key) {
makeSureInitialized();
Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName);
Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(key.getDbName());
if (db.isPresent()) {
Optional<? extends ExternalTable> table = db.get().getTable(tblName);
Optional<? extends ExternalTable> table = db.get().getTable(key.getTblName());
if (table.isPresent()) {
return table.get().initSchemaAndUpdateTime();
return table.get().initSchemaAndUpdateTime(key);
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.paimon.PaimonMetadataCache;
import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.nereids.exceptions.NotSupportedException;

Expand Down Expand Up @@ -92,6 +94,7 @@ public class ExternalMetaCacheMgr {
private ExternalRowCountCache rowCountCache;
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;

public ExternalMetaCacheMgr() {
rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
Expand Down Expand Up @@ -122,6 +125,7 @@ public ExternalMetaCacheMgr() {
hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor);
}

public ExecutorService getFileListingExecutor() {
Expand Down Expand Up @@ -167,6 +171,10 @@ public IcebergMetadataCache getIcebergMetadataCache() {
return icebergMetadataCacheMgr.getIcebergMetadataCache();
}

public PaimonMetadataCache getPaimonMetadataCache() {
return paimonMetadataCacheMgr.getPaimonMetadataCache();
}

public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
return maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
}
Expand All @@ -189,6 +197,7 @@ public void removeCache(long catalogId) {
hudiPartitionMgr.removePartitionProcessor(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
paimonMetadataCacheMgr.removeCache(catalogId);
}

public void invalidateTableCache(long catalogId, String dbName, String tblName) {
Expand All @@ -204,6 +213,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName)
hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId);
}
Expand All @@ -222,6 +232,7 @@ public void invalidateDbCache(long catalogId, String dbName) {
hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
}
Expand All @@ -239,6 +250,7 @@ public void invalidateCatalogCache(long catalogId) {
hudiPartitionMgr.cleanPartitionProcess(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid catalog cache for {}", catalogId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Long getValue() {
}

private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) {
Optional<SchemaCacheValue> schema = catalog.getSchema(key.dbName, key.tblName);
Optional<SchemaCacheValue> schema = catalog.getSchema(key);
if (LOG.isDebugEnabled()) {
LOG.debug("load schema for {} in catalog {}", key, catalog.getName());
}
Expand All @@ -83,6 +83,10 @@ private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) {

public Optional<SchemaCacheValue> getSchemaValue(String dbName, String tblName) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
return getSchemaValue(key);
}

public Optional<SchemaCacheValue> getSchemaValue(SchemaCacheKey key) {
return schemaCache.get(key);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.persist.gson.GsonPostProcessable;
Expand Down Expand Up @@ -317,8 +318,12 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
*
* @return
*/
public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) {
schemaUpdateTime = System.currentTimeMillis();
return initSchema(key);
}

public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
return initSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
Expand Down Expand Up @@ -501,6 +502,10 @@ public Set<String> getPartitionNames() {
}

@Override
public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) {
return initSchemaAndUpdateTime();
}

public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient()
.getTable(dbName, name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.mvcc;

import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.qe.ConnectContext;

import java.util.Optional;

public class MvccUtil {
/**
* get Snapshot From StatementContext
*
* @param tableIf
* @return MvccSnapshot
*/
public static Optional<MvccSnapshot> getSnapshotFromContext(TableIf tableIf) {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
return Optional.empty();
}
StatementContext statementContext = connectContext.getStatementContext();
if (statementContext == null) {
return Optional.empty();
}
return statementContext.getSnapshot(tableIf);
}
}
Loading
Loading