Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public class JdbcResource extends Resource {
public static final String CHECK_SUM = "checksum";
public static final String CREATE_TIME = "create_time";
public static final String TEST_CONNECTION = "test_connection";
public static final String FUNCTION_RULES = "function_rules";

private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
JDBC_URL,
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.DeepCopy;
import org.apache.doris.common.io.Text;
import org.apache.doris.datasource.ExternalFunctionRules;
import org.apache.doris.thrift.TJdbcTable;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.doris.thrift.TTableDescriptor;
Expand Down Expand Up @@ -103,6 +104,8 @@ public class JdbcTable extends Table {
private int connectionPoolMaxLifeTime;
private boolean connectionPoolKeepAlive;

private ExternalFunctionRules functionRules;

static {
Map<String, TOdbcTableType> tempMap = new CaseInsensitiveMap();
tempMap.put("mysql", TOdbcTableType.MYSQL);
Expand All @@ -128,6 +131,8 @@ public JdbcTable(long id, String name, List<Column> schema, Map<String, String>
throws DdlException {
super(id, name, TableType.JDBC, schema);
validate(properties);
// check and set external function rules
checkAndSetExternalFunctionRules(properties);
}

public JdbcTable(long id, String name, List<Column> schema, TableType type) {
Expand Down Expand Up @@ -412,6 +417,12 @@ private void validate(Map<String, String> properties) throws DdlException {
}
}

private void checkAndSetExternalFunctionRules(Map<String, String> properties) throws DdlException {
ExternalFunctionRules.check(properties.getOrDefault(JdbcResource.FUNCTION_RULES, ""));
this.functionRules = ExternalFunctionRules.create(jdbcTypeName,
properties.getOrDefault(JdbcResource.FUNCTION_RULES, ""));
}

/**
* Formats the provided name (for example, a database, table, or schema name) according to the specified parameters.
*
Expand Down Expand Up @@ -509,4 +520,13 @@ public static String properNameWithRemoteName(TOdbcTableType tableType, String r
public static String formatNameWithRemoteName(String remoteName, String wrapStart, String wrapEnd) {
return wrapStart + remoteName + wrapEnd;
}

// This is used when converting JdbcExternalTable to JdbcTable.
public void setExternalFunctionRules(ExternalFunctionRules functionRules) {
this.functionRules = functionRules;
}

public ExternalFunctionRules getExternalFunctionRules() {
return functionRules;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public abstract class ExternalCatalog
private volatile Configuration cachedConf = null;
private byte[] confLock = new byte[0];

private volatile boolean isInitializing = false;

public ExternalCatalog() {
}

Expand Down Expand Up @@ -305,38 +307,46 @@ public boolean tableExistInLocal(String dbName, String tblName) {
* So you have to make sure the client of third system is initialized before any method was called.
*/
public final synchronized void makeSureInitialized() {
initLocalObjects();
if (!initialized) {
if (useMetaCache.get()) {
if (metaCache == null) {
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> getFilteredDatabaseNames(),
localDbName -> Optional.ofNullable(
buildDbForInit(null, localDbName, Util.genIdByName(name, localDbName), logType,
true)),
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized(invalidCacheInInit)));
}
setLastUpdateTime(System.currentTimeMillis());
} else {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeoutS();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(id, -1);
} catch (Exception e) {
Util.logAndThrowRuntimeException(LOG,
String.format("failed to forward init catalog %s operation to master.", name), e);
if (isInitializing) {
return;
}
isInitializing = true;
try {
initLocalObjects();
if (!initialized) {
if (useMetaCache.get()) {
if (metaCache == null) {
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> getFilteredDatabaseNames(),
localDbName -> Optional.ofNullable(
buildDbForInit(null, localDbName, Util.genIdByName(name, localDbName), logType,
true)),
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized(invalidCacheInInit)));
}
setLastUpdateTime(System.currentTimeMillis());
} else {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeoutS();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(id, -1);
} catch (Exception e) {
Util.logAndThrowRuntimeException(LOG,
String.format("failed to forward init catalog %s operation to master.", name), e);
}
return;
}
return;
init();
}
init();
initialized = true;
}
initialized = true;
} finally {
isInitializing = false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public abstract class ExternalDatabase<T extends ExternalTable>

private MetaCache<T> metaCache;

private volatile boolean isInitializing = false;

/**
* Create external database.
*
Expand Down Expand Up @@ -154,39 +156,49 @@ public boolean isInitialized() {
}

public final synchronized void makeSureInitialized() {
extCatalog.makeSureInitialized();
if (!initialized) {
if (extCatalog.getUseMetaCache().get()) {
if (metaCache == null) {
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> listTableNames(),
localTableName -> Optional.ofNullable(
buildTableForInit(null, localTableName,
Util.genIdByName(extCatalog.getName(), name, localTableName), extCatalog,
this, true)),
(key, value, cause) -> value.ifPresent(ExternalTable::unsetObjectCreated));
}
setLastUpdateTime(System.currentTimeMillis());
} else {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeoutS();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(extCatalog.getId(), id);
} catch (Exception e) {
Util.logAndThrowRuntimeException(LOG,
String.format("failed to forward init external db %s operation to master", name), e);
if (isInitializing) {
return;
}
isInitializing = true;
try {
extCatalog.makeSureInitialized();
if (!initialized) {
if (extCatalog.getUseMetaCache().get()) {
if (metaCache == null) {
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> listTableNames(),
localTableName -> Optional.ofNullable(
buildTableForInit(null, localTableName,
Util.genIdByName(extCatalog.getName(), name, localTableName),
extCatalog,
this, true)),
(key, value, cause) -> value.ifPresent(ExternalTable::unsetObjectCreated));
}
setLastUpdateTime(System.currentTimeMillis());
} else {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeoutS();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(extCatalog.getId(), id);
} catch (Exception e) {
Util.logAndThrowRuntimeException(LOG,
String.format("failed to forward init external db %s operation to master", name),
e);
}
return;
}
return;
init();
}
init();
initialized = true;
}
initialized = true;
} finally {
isInitializing = false;
}
}

Expand Down
Loading
Loading