Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 57 additions & 53 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@

package org.apache.doris.catalog;

import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogFactory;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalCatalog;
Expand All @@ -35,6 +31,7 @@
import org.apache.doris.nereids.trees.plans.commands.refresh.RefreshCatalogCommand;
import org.apache.doris.persist.OperationType;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -57,10 +54,10 @@ public class RefreshManager {
private Map<Long, Integer[]> refreshMap = Maps.newConcurrentMap();

// Refresh catalog
public void handleRefreshCatalog(RefreshCatalogStmt stmt) throws UserException {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(stmt.getCatalogName());
CatalogLog log = CatalogFactory.createCatalogLog(catalog.getId(), stmt);
refreshCatalogInternal(catalog, log.isInvalidCache());
public void handleRefreshCatalog(String catalogName, boolean invalidCache) throws UserException {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(catalogName);
refreshCatalogInternal(catalog, invalidCache);
CatalogLog log = CatalogLog.createForRefreshCatalog(catalog.getId(), invalidCache);
Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_REFRESH_CATALOG, log);
}

Expand All @@ -82,9 +79,7 @@ private void refreshCatalogInternal(CatalogIf catalog, boolean invalidCache) {
}

// Refresh database
public void handleRefreshDb(RefreshDbStmt stmt) throws DdlException {
String catalogName = stmt.getCatalogName();
String dbName = stmt.getDbName();
public void handleRefreshDb(String catalogName, String dbName, boolean invalidCache) throws DdlException {
Env env = Env.getCurrentEnv();
CatalogIf catalog = catalogName != null ? env.getCatalogMgr().getCatalog(catalogName) : env.getCurrentCatalog();
if (catalog == null) {
Expand All @@ -94,40 +89,42 @@ public void handleRefreshDb(RefreshDbStmt stmt) throws DdlException {
throw new DdlException("Only support refresh database in external catalog");
}
DatabaseIf db = catalog.getDbOrDdlException(dbName);
((ExternalDatabase) db).setUnInitialized(stmt.isInvalidCache());
refreshDbInternal((ExternalDatabase) db, invalidCache);

ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setInvalidCache(stmt.isInvalidCache());
ExternalObjectLog log = ExternalObjectLog.createForRefreshDb(catalog.getId(), db.getFullName(), invalidCache);
Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log);
}

public void replayRefreshDb(ExternalObjectLog log) {
refreshDbInternal(log.getCatalogId(), log.getDbId(), log.isInvalidCache());
}
ExternalCatalog catalog = (ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(log.getCatalogId());
if (catalog == null) {
LOG.warn("failed to find catalog when replaying refresh db: {}", log.debugForRefreshDb());
}
Optional<ExternalDatabase<? extends ExternalTable>> db;
if (!Strings.isNullOrEmpty(log.getDbName())) {
db = catalog.getDbForReplay(log.getDbName());
} else {
db = catalog.getDbForReplay(log.getDbId());
}

private void refreshDbInternal(long catalogId, long dbId, boolean invalidCache) {
ExternalCatalog catalog = (ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
Optional<ExternalDatabase<? extends ExternalTable>> db = catalog.getDbForReplay(dbId);
// Database may not exist if 'use_meta_cache' is true.
// Because each FE fetch the meta data independently.
db.ifPresent(e -> {
e.setUnInitialized(invalidCache);
LOG.info("refresh database {} in catalog {} with invalidCache {}", e.getFullName(),
catalog.getName(), invalidCache);
});
if (!db.isPresent()) {
LOG.warn("failed to find db when replaying refresh db: {}", log.debugForRefreshDb());
} else {
refreshDbInternal(db.get(), log.isInvalidCache());
}
}

// Refresh table
public void handleRefreshTable(RefreshTableStmt stmt) throws UserException {
String catalogName = stmt.getCtl();
String dbName = stmt.getDbName();
String tableName = stmt.getTblName();
refreshTable(catalogName, dbName, tableName, false);
private void refreshDbInternal(ExternalDatabase db, boolean invalidCache) {
db.setUnInitialized();
if (invalidCache) {
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(db.getCatalog().getId(), db.getFullName());
}
LOG.info("refresh database {} in catalog {} with invalidCache {}", db.getFullName(),
db.getCatalog().getName(), invalidCache);
}

public void refreshTable(String catalogName, String dbName, String tableName, boolean ignoreIfNotExists)
// Refresh table
public void handleRefreshTable(String catalogName, String dbName, String tableName, boolean ignoreIfNotExists)
throws DdlException {
Env env = Env.getCurrentEnv();
CatalogIf catalog = catalogName != null ? env.getCatalogMgr().getCatalog(catalogName) : env.getCurrentCatalog();
Expand All @@ -153,33 +150,41 @@ public void refreshTable(String catalogName, String dbName, String tableName, bo
}
return;
}
refreshTableInternal(catalog, db, table, 0);
refreshTableInternal((ExternalDatabase) db, (ExternalTable) table, 0);

ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
log.setTableId(table.getId());
ExternalObjectLog log = ExternalObjectLog.createForRefreshTable(catalog.getId(), db.getFullName(),
table.getName());
Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}

public void replayRefreshTable(ExternalObjectLog log) {
ExternalCatalog catalog = (ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(log.getCatalogId());
if (catalog == null) {
LOG.warn("failed to find catalog replaying refresh table {}", log.getCatalogId());
LOG.warn("failed to find catalog when replaying refresh table: {}", log.debugForRefreshTable());
return;
}
Optional<ExternalDatabase<? extends ExternalTable>> db = catalog.getDbForReplay(log.getDbId());
Optional<ExternalDatabase<? extends ExternalTable>> db;
if (!Strings.isNullOrEmpty(log.getDbName())) {
db = catalog.getDbForReplay(log.getDbName());
} else {
db = catalog.getDbForReplay(log.getDbId());
}
// See comment in refreshDbInternal for why db and table may be null.
if (!db.isPresent()) {
LOG.warn("failed to find db replaying refresh table {}", log.getDbId());
LOG.warn("failed to find db when replaying refresh table: {}", log.debugForRefreshTable());
return;
}
Optional<? extends ExternalTable> table = db.get().getTableForReplay(log.getTableId());
Optional<? extends ExternalTable> table;
if (!Strings.isNullOrEmpty(log.getTableName())) {
table = db.get().getTableForReplay(log.getTableName());
} else {
table = db.get().getTableForReplay(log.getTableId());
}
if (!table.isPresent()) {
LOG.warn("failed to find table replaying refresh table {}", log.getTableId());
LOG.warn("failed to find table when replaying refresh table: {}", log.debugForRefreshTable());
return;
}
refreshTableInternal(catalog, db.get(), table.get(), log.getLastUpdateTime());
refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime());
}

public void refreshExternalTableFromEvent(String catalogName, String dbName, String tableName,
Expand All @@ -200,18 +205,17 @@ public void refreshExternalTableFromEvent(String catalogName, String dbName, Str
if (table == null) {
return;
}
refreshTableInternal(catalog, db, table, updateTime);
refreshTableInternal((ExternalDatabase) db, (ExternalTable) table, updateTime);
}

public void refreshTableInternal(CatalogIf catalog, DatabaseIf db, TableIf table, long updateTime) {
if (table instanceof ExternalTable) {
((ExternalTable) table).unsetObjectCreated();
}
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);
public void refreshTableInternal(ExternalDatabase db, ExternalTable table, long updateTime) {
table.unsetObjectCreated();
if (table instanceof HMSExternalTable && updateTime > 0) {
((HMSExternalTable) table).setEventUpdateTime(updateTime);
}
LOG.info("refresh table {} from db {} in catalog {}", table.getName(), db.getFullName(), catalog.getName());
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(table);
LOG.info("refresh table {}, id {} from db {} in catalog {}",
table.getName(), table.getId(), db.getFullName(), db.getCatalog().getName());
}

// Refresh partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
package org.apache.doris.datasource;

import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.DropCatalogStmt;
import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.common.DdlException;
Expand Down Expand Up @@ -48,22 +45,6 @@
public class CatalogFactory {
private static final Logger LOG = LogManager.getLogger(CatalogFactory.class);

/**
* Convert the sql statement into catalog log.
*/
public static CatalogLog createCatalogLog(long catalogId, StatementBase stmt) {
CatalogLog log = new CatalogLog();
if (stmt instanceof DropCatalogStmt) {
log.setCatalogId(catalogId);
} else if (stmt instanceof RefreshCatalogStmt) {
log.setCatalogId(catalogId);
log.setInvalidCache(((RefreshCatalogStmt) stmt).isInvalidCache());
} else {
throw new RuntimeException("Unknown stmt for catalog manager " + stmt.getClass().getSimpleName());
}
return log;
}

/**
* create the catalog instance from catalog log.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public class CatalogLog implements Writable {
@SerializedName(value = "comment")
private String comment;

public static CatalogLog createForRefreshCatalog(long catalogId, boolean invalidCache) {
CatalogLog log = new CatalogLog();
log.setCatalogId(catalogId);
log.setInvalidCache(invalidCache);
return log;
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ public abstract class ExternalCatalog
// db name does not contains "default_cluster"
protected Map<String, Long> dbNameToId = Maps.newConcurrentMap();
private boolean objectCreated = false;
protected boolean invalidCacheInInit = true;
protected ExternalMetadataOps metadataOps;
protected TransactionManager transactionManager;

Expand Down Expand Up @@ -354,7 +353,7 @@ private void buildMetaCache() {
localDbName -> Optional.ofNullable(
buildDbForInit(null, localDbName, Util.genIdByName(name, localDbName), logType,
true)),
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized(invalidCacheInInit)));
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized()));
}
}

Expand Down Expand Up @@ -580,11 +579,10 @@ private void refreshOnlyCatalogCache(boolean invalidCache) {
} else if (!useMetaCache.get()) {
this.initialized = false;
for (ExternalDatabase<? extends ExternalTable> db : idToDb.values()) {
db.setUnInitialized(invalidCache);
db.setUnInitialized();
}
}
}
this.invalidCacheInInit = invalidCache;
if (invalidCache) {
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
}
Expand Down Expand Up @@ -1103,6 +1101,7 @@ public boolean createTable(CreateTableStmt stmt) throws UserException {
// we should get the table stored in Doris, and use local name in edit log.
CreateTableInfo info = new CreateTableInfo(getName(), stmt.getDbName(), stmt.getTableName());
Env.getCurrentEnv().getEditLog().logCreateTable(info);
LOG.info("finished to create table {}.{}.{}", getName(), stmt.getDbName(), stmt.getTableName());
}
return res;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public abstract class ExternalDatabase<T extends ExternalTable>
protected long lastUpdateTime;
protected final InitDatabaseLog.Type dbLogType;
protected ExternalCatalog extCatalog;
protected boolean invalidCacheInInit = true;

private MetaCache<T> metaCache;

Expand Down Expand Up @@ -131,9 +130,9 @@ public void setTableExtCatalog(ExternalCatalog extCatalog) {
}
}

public synchronized void setUnInitialized(boolean invalidCache) {
public synchronized void setUnInitialized() {
this.initialized = false;
this.invalidCacheInInit = invalidCache;
this.lowerCaseToTableName = Maps.newConcurrentMap();
if (extCatalog.getUseMetaCache().isPresent()) {
if (extCatalog.getUseMetaCache().get() && metaCache != null) {
metaCache.invalidateAll();
Expand All @@ -143,9 +142,6 @@ public synchronized void setUnInitialized(boolean invalidCache) {
}
}
}
if (invalidCache) {
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(extCatalog.getId(), name);
}
}

public boolean isInitialized() {
Expand Down Expand Up @@ -315,6 +311,7 @@ private void buildMetaCache() {

private List<Pair<String, String>> listTableNames() {
List<Pair<String, String>> tableNames;
lowerCaseToTableName.clear();
if (name.equals(InfoSchemaDb.DATABASE_NAME)) {
tableNames = ExternalInfoSchemaDatabase.listTableNames().stream()
.map(tableName -> {
Expand All @@ -339,6 +336,10 @@ private List<Pair<String, String>> listTableNames() {
return Pair.of(tableName, localTableName);
}).collect(Collectors.toList());
}
if (LOG.isDebugEnabled()) {
LOG.debug("after list tables in {}.{}, lowerCaseToTableName: {}",
getCatalog().getName(), getFullName(), lowerCaseToTableName);
}
// Check for conflicts when stored table names or meta names are case-insensitive
if (Boolean.parseBoolean(extCatalog.getLowerCaseMetaNames())
|| this.isStoredTableNamesLowerCase()
Expand Down Expand Up @@ -610,37 +611,44 @@ public Set<String> getTableNamesWithLock() {
@Override
public T getTableNullable(String tableName) {
makeSureInitialized();
String finalName = tableName;
if (this.isStoredTableNamesLowerCase()) {
tableName = tableName.toLowerCase();
finalName = tableName.toLowerCase();
}
if (this.isTableNamesCaseInsensitive()) {
String realTableName = lowerCaseToTableName.get(tableName.toLowerCase());
if (realTableName == null) {
finalName = lowerCaseToTableName.get(tableName.toLowerCase());
if (finalName == null) {
// Here we need to execute listTableNames() once to fill in lowerCaseToTableName
// to prevent lowerCaseToTableName from being empty in some cases
listTableNames();
tableName = lowerCaseToTableName.get(tableName.toLowerCase());
if (tableName == null) {
finalName = lowerCaseToTableName.get(tableName.toLowerCase());
if (finalName == null) {
if (LOG.isDebugEnabled()) {
LOG.info("failed to get final table name from: {}.{}.{}",
getCatalog().getName(), getFullName(), tableName);
}
return null;
}
} else {
tableName = realTableName;
}
}
if (extCatalog.getLowerCaseMetaNames().equalsIgnoreCase("true")
&& (this.isTableNamesCaseInsensitive())) {
tableName = tableName.toLowerCase();
finalName = tableName.toLowerCase();
}
if (LOG.isDebugEnabled()) {
LOG.info("get table {} from database: {}.{}, final name is: {}",
tableName, getCatalog().getName(), getFullName(), finalName);
}
if (extCatalog.getUseMetaCache().get()) {
// must use full qualified name to generate id.
// otherwise, if 2 databases have the same table name, the id will be the same.
return metaCache.getMetaObj(tableName,
Util.genIdByName(extCatalog.getName(), name, tableName)).orElse(null);
return metaCache.getMetaObj(finalName,
Util.genIdByName(extCatalog.getName(), name, finalName)).orElse(null);
} else {
if (!tableNameToId.containsKey(tableName)) {
if (!tableNameToId.containsKey(finalName)) {
return null;
}
return idToTbl.get(tableNameToId.get(tableName));
return idToTbl.get(tableNameToId.get(finalName));
}
}

Expand Down
Loading
Loading