Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class JdbcResource extends Resource {
public static final String CHECK_SUM = "checksum";
public static final String CREATE_TIME = "create_time";
public static final String TEST_CONNECTION = "test_connection";
public static final String FUNCTION_RULES = "function_rules";

private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
JDBC_URL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.DeepCopy;
import org.apache.doris.common.io.Text;
import org.apache.doris.datasource.ExternalFunctionRules;
import org.apache.doris.thrift.TJdbcTable;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.doris.thrift.TTableDescriptor;
Expand Down Expand Up @@ -103,6 +104,8 @@ public class JdbcTable extends Table {
private int connectionPoolMaxLifeTime;
private boolean connectionPoolKeepAlive;

private ExternalFunctionRules functionRules;

static {
Map<String, TOdbcTableType> tempMap = new CaseInsensitiveMap();
tempMap.put("mysql", TOdbcTableType.MYSQL);
Expand All @@ -128,6 +131,8 @@ public JdbcTable(long id, String name, List<Column> schema, Map<String, String>
throws DdlException {
super(id, name, TableType.JDBC, schema);
validate(properties);
// check and set external function rules
checkAndSetExternalFunctionRules(properties);
}

public JdbcTable(long id, String name, List<Column> schema, TableType type) {
Expand Down Expand Up @@ -412,6 +417,12 @@ private void validate(Map<String, String> properties) throws DdlException {
}
}

private void checkAndSetExternalFunctionRules(Map<String, String> properties) throws DdlException {
ExternalFunctionRules.check(properties.getOrDefault(JdbcResource.FUNCTION_RULES, ""));
this.functionRules = ExternalFunctionRules.create(jdbcTypeName,
properties.getOrDefault(JdbcResource.FUNCTION_RULES, ""));
}

/**
* Formats the provided name (for example, a database, table, or schema name) according to the specified parameters.
*
Expand Down Expand Up @@ -509,4 +520,13 @@ public static String properNameWithRemoteName(TOdbcTableType tableType, String r
public static String formatNameWithRemoteName(String remoteName, String wrapStart, String wrapEnd) {
return wrapStart + remoteName + wrapEnd;
}

// This is used when converting JdbcExternalTable to JdbcTable.
public void setExternalFunctionRules(ExternalFunctionRules functionRules) {
this.functionRules = functionRules;
}

public ExternalFunctionRules getExternalFunctionRules() {
return functionRules;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
Expand Down Expand Up @@ -106,7 +105,6 @@
/**
* The abstract class for all types of external catalogs.
*/
@Data
public abstract class ExternalCatalog
implements CatalogIf<ExternalDatabase<? extends ExternalTable>>, Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(ExternalCatalog.class);
Expand Down Expand Up @@ -180,6 +178,8 @@ public abstract class ExternalCatalog
private volatile Configuration cachedConf = null;
private byte[] confLock = new byte[0];

private volatile boolean isInitializing = false;

public ExternalCatalog() {
}

Expand Down Expand Up @@ -303,38 +303,46 @@ public boolean tableExistInLocal(String dbName, String tblName) {
* So you have to make sure the client of third system is initialized before any method was called.
*/
public final synchronized void makeSureInitialized() {
initLocalObjects();
if (!initialized) {
if (useMetaCache.get()) {
if (metaCache == null) {
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> getFilteredDatabaseNames(),
localDbName -> Optional.ofNullable(
buildDbForInit(null, localDbName, Util.genIdByName(name, localDbName), logType,
true)),
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized(invalidCacheInInit)));
}
setLastUpdateTime(System.currentTimeMillis());
} else {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(id, -1);
} catch (Exception e) {
Util.logAndThrowRuntimeException(LOG,
String.format("failed to forward init catalog %s operation to master.", name), e);
if (isInitializing) {
return;
}
isInitializing = true;
try {
initLocalObjects();
if (!initialized) {
if (useMetaCache.get()) {
if (metaCache == null) {
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> getFilteredDatabaseNames(),
localDbName -> Optional.ofNullable(
buildDbForInit(null, localDbName, Util.genIdByName(name, localDbName), logType,
true)),
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized(invalidCacheInInit)));
}
return;
setLastUpdateTime(System.currentTimeMillis());
} else {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(id, -1);
} catch (Exception e) {
Util.logAndThrowRuntimeException(LOG,
String.format("failed to forward init catalog %s operation to master.", name), e);
}
return;
}
init();
}
init();
initialized = true;
}
initialized = true;
} finally {
isInitializing = false;
}
}

Expand All @@ -352,7 +360,7 @@ public boolean isInitialized() {
// check if all required properties are set when creating catalog
public void checkProperties() throws DdlException {
// check refresh parameter of catalog
Map<String, String> properties = getCatalogProperty().getProperties();
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC)) {
try {
int metadataRefreshIntervalSec = Integer.parseInt(
Expand Down Expand Up @@ -385,7 +393,7 @@ public void checkProperties() throws DdlException {
* isDryRun: if true, it will try to create the custom access controller, but will not add it to the access manager.
*/
public void initAccessController(boolean isDryRun) {
Map<String, String> properties = getCatalogProperty().getProperties();
Map<String, String> properties = catalogProperty.getProperties();
// 1. get access controller class name
String className = properties.getOrDefault(CatalogMgr.ACCESS_CONTROLLER_CLASS_PROP, "");
if (Strings.isNullOrEmpty(className)) {
Expand Down Expand Up @@ -545,7 +553,7 @@ private List<Pair<String, String>> getFilteredDatabaseNames() {
* @param invalidCache if {@code true}, the catalog cache will be invalidated
* and reloaded during the refresh process.
*/
public void resetToUninitialized(boolean invalidCache) {
public synchronized void resetToUninitialized(boolean invalidCache) {
this.objectCreated = false;
this.initialized = false;
synchronized (this.propLock) {
Expand Down Expand Up @@ -998,6 +1006,14 @@ public void addDatabaseForTest(ExternalDatabase<? extends ExternalTable> db) {
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
}

/**
* Set the initialized status for testing purposes only.
* This method should only be used in test cases.
*/
public void setInitializedForTest(boolean initialized) {
this.initialized = initialized;
}

@Override
public void createDb(CreateDbStmt stmt) throws DdlException {
makeSureInitialized();
Expand Down Expand Up @@ -1249,4 +1265,24 @@ public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
Env.getCurrentEnv().getExtMetaCacheMgr().invalidSchemaCache(id);
}
}

public CatalogProperty getCatalogProperty() {
return catalogProperty;
}

public Optional<Boolean> getUseMetaCache() {
return useMetaCache;
}

public Map<Pair<String, String>, String> getTableAutoAnalyzePolicy() {
return tableAutoAnalyzePolicy;
}

public TransactionManager getTransactionManager() {
return transactionManager;
}

public ThreadPoolExecutor getThreadPoolWithPreAuth() {
return threadPoolWithPreAuth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public abstract class ExternalDatabase<T extends ExternalTable>

private MetaCache<T> metaCache;

private volatile boolean isInitializing = false;

/**
* Create external database.
*
Expand Down Expand Up @@ -132,7 +134,7 @@ public void setTableExtCatalog(ExternalCatalog extCatalog) {
}
}

public void setUnInitialized(boolean invalidCache) {
public synchronized void setUnInitialized(boolean invalidCache) {
this.initialized = false;
this.invalidCacheInInit = invalidCache;
if (extCatalog.getUseMetaCache().isPresent()) {
Expand All @@ -154,39 +156,49 @@ public boolean isInitialized() {
}

public final synchronized void makeSureInitialized() {
extCatalog.makeSureInitialized();
if (!initialized) {
if (extCatalog.getUseMetaCache().get()) {
if (metaCache == null) {
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> listTableNames(),
localTableName -> Optional.ofNullable(
buildTableForInit(null, localTableName,
Util.genIdByName(extCatalog.getName(), name, localTableName), extCatalog,
this, true)),
(key, value, cause) -> value.ifPresent(ExternalTable::unsetObjectCreated));
}
setLastUpdateTime(System.currentTimeMillis());
} else {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(extCatalog.getId(), id);
} catch (Exception e) {
Util.logAndThrowRuntimeException(LOG,
String.format("failed to forward init external db %s operation to master", name), e);
if (isInitializing) {
return;
}
isInitializing = true;
try {
extCatalog.makeSureInitialized();
if (!initialized) {
if (extCatalog.getUseMetaCache().get()) {
if (metaCache == null) {
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> listTableNames(),
localTableName -> Optional.ofNullable(
buildTableForInit(null, localTableName,
Util.genIdByName(extCatalog.getName(), name, localTableName),
extCatalog,
this, true)),
(key, value, cause) -> value.ifPresent(ExternalTable::unsetObjectCreated));
}
setLastUpdateTime(System.currentTimeMillis());
} else {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeout();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(extCatalog.getId(), id);
} catch (Exception e) {
Util.logAndThrowRuntimeException(LOG,
String.format("failed to forward init external db %s operation to master", name),
e);
}
return;
}
return;
init();
}
init();
initialized = true;
}
initialized = true;
} finally {
isInitializing = false;
}
}

Expand Down
Loading