diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 1f0d402d11f28b..1ccdaea1d2877a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -95,6 +95,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; @@ -182,10 +183,10 @@ public enum OlapTableState { private PartitionInfo partitionInfo; @SerializedName(value = "itp", alternate = {"idToPartition"}) @Getter - private ConcurrentHashMap idToPartition = new ConcurrentHashMap<>(); + protected ConcurrentHashMap idToPartition = new ConcurrentHashMap<>(); // handled in postgsonprocess @Getter - private Map nameToPartition = Maps.newTreeMap(); + protected Map nameToPartition = Maps.newTreeMap(); @SerializedName(value = "di", alternate = {"distributionInfo"}) private DistributionInfo defaultDistributionInfo; @@ -3698,4 +3699,48 @@ public Index getInvertedIndex(Column column, List subPath) { .filter(Index::isAnalyzedInvertedIndex).findFirst().orElse(null); } } + + /** + * caller should acquire the read lock and should not modify any field of the return obj + */ + public OlapTable copyTableMeta() { + OlapTable table = new OlapTable(); + // metaobj + table.signature = signature; + table.lastCheckTime = lastCheckTime; + // abstract table + table.id = id; + table.name = name; + table.qualifiedDbName = qualifiedDbName; + table.type = type; + table.createTime = createTime; + table.fullSchema = fullSchema; + table.comment = comment; + table.tableAttributes = tableAttributes; + // olap table + // NOTE: currently do not need temp partitions, colocateGroup, autoIncrementGenerator + table.idToPartition = new ConcurrentHashMap<>(); + table.tempPartitions = new TempPartitions(); + + table.state = state; + table.indexIdToMeta = ImmutableMap.copyOf(indexIdToMeta); + table.indexNameToId = ImmutableMap.copyOf(indexNameToId); + table.keysType = keysType; + table.partitionInfo = partitionInfo; + table.defaultDistributionInfo = defaultDistributionInfo; + table.bfColumns = bfColumns; + table.bfFpp = bfFpp; + table.indexes = indexes; + table.baseIndexId = baseIndexId; + table.tableProperty = tableProperty; + return table; + } + + public long getCatalogId() { + return Env.getCurrentInternalCatalog().getId(); + } + + public ImmutableMap getAllBackendsByAllCluster() throws AnalysisException { + return Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index e25262f1c9cf0e..59626553d66e39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -437,6 +437,8 @@ public String toEngineName() { return "iceberg"; case DICTIONARY: return "dictionary"; + case DORIS_EXTERNAL_TABLE: + return "External_Doris"; default: return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index e777285a07f587..4932f3aa8f9d19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -22,6 +22,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.datasource.doris.DorisExternalMetaCacheMgr; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -100,6 +101,7 @@ public class ExternalMetaCacheMgr { private final IcebergMetadataCacheMgr icebergMetadataCacheMgr; private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; private final PaimonMetadataCacheMgr paimonMetadataCacheMgr; + private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr; public ExternalMetaCacheMgr(boolean isCheckpointCatalog) { rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog, @@ -131,6 +133,7 @@ public ExternalMetaCacheMgr(boolean isCheckpointCatalog) { icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); + dorisExternalMetaCacheMgr = new DorisExternalMetaCacheMgr(commonRefreshExecutor); } private ExecutorService newThreadPool(boolean isCheckpointCatalog, int numThread, int queueSize, @@ -219,6 +222,10 @@ public ExternalRowCountCache getRowCountCache() { return rowCountCache; } + public DorisExternalMetaCacheMgr getDorisExternalMetaCacheMgr() { + return dorisExternalMetaCacheMgr; + } + public void removeCache(long catalogId) { if (cacheMap.remove(catalogId) != null) { LOG.info("remove hive metastore cache for catalog {}", catalogId); @@ -232,6 +239,7 @@ public void removeCache(long catalogId) { icebergMetadataCacheMgr.removeCache(catalogId); maxComputeMetadataCacheMgr.removeCache(catalogId); paimonMetadataCacheMgr.removeCache(catalogId); + dorisExternalMetaCacheMgr.removeCache(catalogId); } public void invalidateTableCache(ExternalTable dorisTable) { @@ -288,6 +296,7 @@ public void invalidateCatalogCache(long catalogId) { icebergMetadataCacheMgr.invalidateCatalogCache(catalogId); maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId); paimonMetadataCacheMgr.invalidateCatalogCache(catalogId); + dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId); if (LOG.isDebugEnabled()) { LOG.debug("invalid catalog cache for {}", catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/DorisExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/DorisExternalMetaCacheMgr.java new file mode 100644 index 00000000000000..70f92853ccc526 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/DorisExternalMetaCacheMgr.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.doris; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; +import org.apache.doris.system.Backend; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +public class DorisExternalMetaCacheMgr { + private static final Logger LOG = LogManager.getLogger(DorisExternalMetaCacheMgr.class); + private final LoadingCache> backendsCache; + + public DorisExternalMetaCacheMgr(ExecutorService executor) { + CacheFactory cacheFactory = new CacheFactory( + OptionalLong.of(Config.external_cache_expire_time_seconds_after_access), + OptionalLong.of(Config.external_cache_refresh_time_minutes * 60), + 20, + true, + null); + backendsCache = cacheFactory.buildCache(key -> loadBackends(key), executor); + } + + private ImmutableMap loadBackends(Long catalogId) { + RemoteDorisExternalCatalog catalog = (RemoteDorisExternalCatalog) Env.getCurrentEnv().getCatalogMgr() + .getCatalog(catalogId); + List backends = catalog.getFeServiceClient().listBackends(); + if (LOG.isDebugEnabled()) { + List names = backends.stream().map(b -> b.getAddress()).collect(Collectors.toList()); + LOG.debug("load backends:{} from:{}", String.join(",", names), catalog.getName()); + } + Map backendMap = Maps.newHashMap(); + backends.forEach(backend -> backendMap.put(backend.getId(), backend)); + return ImmutableMap.copyOf(backendMap); + } + + public void removeCache(long catalogId) { + backendsCache.invalidate(catalogId); + } + + public void invalidateBackendCache(long catalogId) { + backendsCache.invalidate(catalogId); + } + + public void invalidateCatalogCache(long catalogId) { + invalidateBackendCache(catalogId); + } + + public ImmutableMap getBackends(long catalogId) { + ImmutableMap backends = backendsCache.get(catalogId); + if (backends == null) { + return ImmutableMap.of(); + } + return backends; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java new file mode 100644 index 00000000000000..56e28f605c6b0a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.doris; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TGetBackendMetaRequest; +import org.apache.doris.thrift.TGetBackendMetaResult; +import org.apache.doris.thrift.TGetOlapTableMetaRequest; +import org.apache.doris.thrift.TGetOlapTableMetaResult; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPartitionMeta; +import org.apache.doris.thrift.TStatusCode; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +public class FeServiceClient { + private static final Logger LOG = LogManager.getLogger(FeServiceClient.class); + + private final Random random = new Random(System.currentTimeMillis()); + private final String name; + private final List addresses; + private volatile TNetworkAddress master; + private final String user; + private final String password; + private final int retryCount; + private final int timeout; + + public FeServiceClient(String name, List addresses, String user, String password, + int retryCount, int timeout) { + this.name = name; + this.addresses = addresses; + this.user = user; + this.password = password; + this.retryCount = retryCount; + this.timeout = timeout; + } + + private List getAddresses() { + return addresses; + } + + private FrontendService.Client getRemoteFeClient(TNetworkAddress address, int timeout) { + try { + return ClientPool.frontendPool.borrowObject(address, timeout); + } catch (Exception e) { + String msg = String.format("failed to get remote doris:%s fe connection", name); + throw new RuntimeException(msg, e); + } + } + + private void returnClient(TNetworkAddress address, FrontendService.Client client, boolean returnObj) { + if (returnObj) { + ClientPool.frontendPool.returnObject(address, client); + } else { + ClientPool.frontendPool.invalidateObject(address, client); + } + } + + private T randomCallWithRetry(ThriftCall call, String errorMsg, int timeout) { + List addresses = getAddresses(); + int retries = 0; + Exception lastException = null; + while (retries < retryCount) { + int index = random.nextInt(addresses.size()); + FrontendService.Client client = null; + for (int i = 0; i < addresses.size() && retries < retryCount; i++) { + TNetworkAddress address = addresses.get((index + i) % addresses.size()); + client = getRemoteFeClient(address, timeout); + boolean returnObj = false; + try { + T result = call.call(client); + returnObj = true; + return result; + } catch (TException | IOException e) { + lastException = e; + retries++; + } catch (Exception e) { + throw new RuntimeException(errorMsg + ":" + e.getMessage(), e); + } finally { + returnClient(address, client, returnObj); + } + } + } + throw new RuntimeException(errorMsg + ":" + lastException.getMessage(), lastException); + } + + private T callFromMaster(ThriftCall> call, String errorMsg, int timeout) { + TNetworkAddress address = master; + FrontendService.Client client = null; + Exception lastException = null; + if (address != null) { + client = getRemoteFeClient(address, timeout); + boolean returnObj = false; + try { + MasterResult ret = call.call(client); + returnObj = true; + if (ret.isMaster) { + if (ret.hasError) { + throw new RuntimeException(ret.errorMsg); + } + return ret.result; + } + } catch (TException | IOException e) { + lastException = e; + } catch (Exception e) { + throw new RuntimeException(errorMsg + ":" + e.getMessage(), e); + } finally { + returnClient(address, client, returnObj); + } + } + master = null; + List addresses = getAddresses(); + int retries = 0; + while (retries < retryCount) { + int index = random.nextInt(addresses.size()); + for (int i = 0; i < addresses.size() && retries < retryCount; i++) { + address = addresses.get((index + i) % addresses.size()); + client = getRemoteFeClient(address, timeout); + boolean returnObj = false; + try { + MasterResult ret = call.call(client); + returnObj = true; + if (ret.isMaster) { + master = address; + if (ret.hasError) { + throw new RuntimeException(ret.errorMsg); + } + return ret.result; + } + } catch (TException | IOException e) { + lastException = e; + retries++; + } catch (Exception e) { + throw new RuntimeException(errorMsg + ":" + e.getMessage(), e); + } finally { + returnClient(address, client, returnObj); + } + } + } + throw new RuntimeException(errorMsg + ":" + lastException.getMessage(), lastException); + } + + public List listBackends() { + TGetBackendMetaRequest request = new TGetBackendMetaRequest(); + request.setUser(user); + request.setPasswd(password); + String msg = String.format("failed to get backends from remote doris:%s", name); + return callFromMaster(client -> { + TGetBackendMetaResult result = client.getBackendMeta(request); + if (result.getStatus().getStatusCode() == TStatusCode.NOT_MASTER) { + return MasterResult.notMaster(); + } + if (result.getStatus().getStatusCode() != TStatusCode.OK) { + return MasterResult.masterWithError(result.getStatus().toString()); + } + List backends = result.getBackends().stream() + .map(b -> Backend.fromThrift(b)) + .collect(Collectors.toList()); + return MasterResult.withResult(backends); + }, msg, timeout); + } + + public RemoteOlapTable getOlapTable(String dbName, String table, long tableId, List partitions) { + TGetOlapTableMetaRequest request = new TGetOlapTableMetaRequest(); + request.setDb(dbName); + request.setTable(table); + request.setTableId(tableId); + request.setUser(user); + request.setPasswd(password); + request.setVersion(FeConstants.meta_version); + for (Partition partition : partitions) { + TPartitionMeta meta = new TPartitionMeta(); + meta.setId(partition.getId()); + meta.setVisibleVersion(partition.getVisibleVersion()); + meta.setVisibleVersionTime(partition.getVisibleVersionTime()); + request.addToPartitions(meta); + } + String msg = String.format("failed to get table meta from remote doris:%s", name); + return randomCallWithRetry(client -> { + TGetOlapTableMetaResult result = client.getOlapTableMeta(request); + if (result.getStatus().getStatusCode() != TStatusCode.OK) { + throw new UserException(result.getStatus().toString()); + } + RemoteOlapTable remoteOlapTable = null; + try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(result.getTableMeta()))) { + OlapTable olapTable = OlapTable.read(in); + remoteOlapTable = RemoteOlapTable.fromOlapTable(olapTable); + } + List updatedPartitions = new ArrayList<>(result.getUpdatedPartitionsSize()); + if (result.getUpdatedPartitionsSize() > 0) { + for (ByteBuffer buffer : result.getUpdatedPartitions()) { + try (ByteArrayInputStream in = + new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.remaining()); + DataInputStream dataInputStream = new DataInputStream(in)) { + String partitionStr = Text.readString(dataInputStream); + Partition partition = GsonUtils.GSON.fromJson(partitionStr, Partition.class); + updatedPartitions.add(partition); + } + } + } + List removedPartitions = result.getRemovedPartitions(); + if (removedPartitions == null) { + removedPartitions = new ArrayList<>(); + } + remoteOlapTable.rebuildPartitions(partitions, updatedPartitions, removedPartitions); + return remoteOlapTable; + }, msg, timeout); + } + + private interface ThriftCall { + public T call(FrontendService.Client client) throws Exception; + } + + private static class MasterResult { + boolean isMaster = true; + T result; + boolean hasError = false; + String errorMsg; + + static MasterResult notMaster() { + MasterResult ret = new MasterResult(); + ret.isMaster = false; + return ret; + } + + static MasterResult withResult(T result) { + MasterResult ret = new MasterResult(); + ret.isMaster = true; + ret.hasError = false; + ret.result = result; + return ret; + } + + // is master but has error code + static MasterResult masterWithError(String errorMsg) { + MasterResult ret = new MasterResult(); + ret.isMaster = true; + ret.hasError = true; + ret.errorMsg = errorMsg; + return ret; + } + + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java index b63a2a03b1f37a..76d42b70b4260b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalCatalog.java @@ -17,17 +17,20 @@ package org.apache.doris.datasource.doris; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; import org.apache.doris.datasource.property.constants.RemoteDorisProperties; +import org.apache.doris.thrift.TNetworkAddress; import com.google.common.collect.ImmutableList; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -36,11 +39,14 @@ public class RemoteDorisExternalCatalog extends ExternalCatalog { private static final Logger LOG = LogManager.getLogger(RemoteDorisExternalCatalog.class); private RemoteDorisRestClient dorisRestClient; + private FeServiceClient client; private static final List REQUIRED_PROPERTIES = ImmutableList.of( + RemoteDorisProperties.FE_THRIFT_HOSTS, RemoteDorisProperties.FE_HTTP_HOSTS, RemoteDorisProperties.FE_ARROW_HOSTS, RemoteDorisProperties.USER, - RemoteDorisProperties.PASSWORD + RemoteDorisProperties.PASSWORD, + RemoteDorisProperties.USE_ARROW_FLIGHT ); /** @@ -61,6 +67,11 @@ public void checkProperties() throws DdlException { throw new DdlException("Required property '" + requiredProperty + "' is missing"); } } + if (!useArrowFlight() && Config.isCloudMode()) { + // TODO we not validate it in cloud mode, so currently not support it + throw new DdlException("Cloud mode is not supported when " + + RemoteDorisProperties.USE_ARROW_FLIGHT + " is false"); + } } public List getFeNodes() { @@ -71,6 +82,19 @@ public List getFeArrowNodes() { return parseArrowHosts(catalogProperty.getOrDefault(RemoteDorisProperties.FE_ARROW_HOSTS, "")); } + public List getFeThriftNodes() { + String addresses = catalogProperty.getOrDefault(RemoteDorisProperties.FE_THRIFT_HOSTS, ""); + List tAddresses = new ArrayList<>(); + for (String address : addresses.split(",")) { + int index = address.lastIndexOf(":"); + String host = address.substring(0, index); + int port = Integer.parseInt(address.substring(index + 1)); + TNetworkAddress thriftAddress = new TNetworkAddress(host, port); + tAddresses.add(thriftAddress); + } + return tAddresses; + } + public String getUsername() { return catalogProperty.getOrDefault(RemoteDorisProperties.USER, ""); } @@ -139,6 +163,11 @@ public int getMetadataCallTimeoutSec() { "0")); } + public boolean useArrowFlight() { + return Boolean.parseBoolean(catalogProperty.getOrDefault(RemoteDorisProperties.USE_ARROW_FLIGHT, + "true")); + } + @Override protected void initLocalObjectsImpl() { if (isCompatible()) { @@ -158,6 +187,8 @@ protected void initLocalObjectsImpl() { throw new RuntimeException("Failed to connect to Doris cluster," + " please check your Doris cluster or your Doris catalog configuration."); } + client = new FeServiceClient(name, getFeThriftNodes(), getUsername(), getPassword(), + getMetadataSyncRetryCount(), getMetadataReadTimeoutSec()); } protected List listDatabaseNames() { @@ -181,6 +212,10 @@ public RemoteDorisRestClient getDorisRestClient() { return dorisRestClient; } + public FeServiceClient getFeServiceClient() { + return client; + } + private List parseHttpHosts(String hosts) { String[] hostUrls = hosts.trim().split(","); fillUrlsWithSchema(hostUrls, enableSsl()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java index 4b6117aa313d1f..c86d91af92ba48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisExternalTable.java @@ -18,9 +18,13 @@ package org.apache.doris.datasource.doris; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -28,6 +32,7 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,9 +41,14 @@ public class RemoteDorisExternalTable extends ExternalTable { private static final Logger LOG = LogManager.getLogger(RemoteDorisExternalTable.class); + private volatile List partitions = Lists.newArrayList(); + private volatile long tableId = -1; + private volatile boolean isSyncOlapTable = false; + private volatile RemoteOlapTable remoteOlapTable = null; + private volatile Exception lastException = null; public RemoteDorisExternalTable(long id, String name, String remoteName, - RemoteDorisExternalCatalog catalog, ExternalDatabase db) { + RemoteDorisExternalCatalog catalog, ExternalDatabase db) { super(id, name, remoteName, catalog, db, TableType.DORIS_EXTERNAL_TABLE); } @@ -50,6 +60,73 @@ protected synchronized void makeSureInitialized() { } } + private RemoteOlapTable getDorisOlapTable() { + if (!isSyncOlapTable) { + synchronized (this) { + if (!isSyncOlapTable) { + try { + isSyncOlapTable = true; + remoteOlapTable = null; + lastException = null; // clear previous exception + + List cachedPartitions = Lists.newArrayList(partitions); + RemoteOlapTable olapTable = ((RemoteDorisExternalCatalog) catalog).getFeServiceClient() + .getOlapTable(dbName, remoteName, tableId, cachedPartitions); + olapTable.setCatalog((RemoteDorisExternalCatalog) catalog); + olapTable.setDatabase((RemoteDorisExternalDatabase) db); + + // Remove redundant nested synchronized block + tableId = olapTable.getId(); + partitions = Lists.newArrayList(olapTable.getPartitions()); + + olapTable.setId(id); // change id in case of possible conflicts + olapTable.invalidateBackendsIfNeed(); + remoteOlapTable = olapTable; + } catch (Exception e) { + // Save exception for waiting threads + lastException = e; + LOG.warn("Failed to get remote doris olap table: {}.{}", dbName, remoteName, e); + throw e; // Re-throw the exception + } finally { + isSyncOlapTable = false; + this.notifyAll(); + } + return remoteOlapTable; + } + } + } + + synchronized (this) { + while (isSyncOlapTable) { + try { + this.wait(); + } catch (InterruptedException e) { + throw new AnalysisException("interrupted while getting doris olap table", e); + } + } + + // If there is a saved exception, throw it with more details + if (remoteOlapTable == null) { + if (lastException != null) { + throw new AnalysisException( + "failed to get remote doris olap table: " + Util.getRootCauseMessage(lastException), + lastException); + } + throw new AnalysisException("failed to get remote doris olap table"); + } + return remoteOlapTable; + } + } + + public OlapTable getOlapTable() { + makeSureInitialized(); + return getDorisOlapTable(); + } + + public boolean useArrowFlight() { + return ((RemoteDorisExternalCatalog) catalog).useArrowFlight(); + } + @Override public TTableDescriptor toThrift() { List schema = getFullSchema(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java new file mode 100644 index 00000000000000..80089b3a14cdce --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteOlapTable.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.doris; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.system.Backend; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class RemoteOlapTable extends OlapTable { + private static final Logger LOG = LogManager.getLogger(RemoteOlapTable.class); + + private RemoteDorisExternalCatalog catalog; + private RemoteDorisExternalDatabase database; + + public RemoteDorisExternalCatalog getCatalog() { + return catalog; + } + + public void setCatalog(RemoteDorisExternalCatalog catalog) { + this.catalog = catalog; + } + + @Override + public RemoteDorisExternalDatabase getDatabase() { + return database; + } + + public void setDatabase(RemoteDorisExternalDatabase database) { + this.database = database; + } + + public static RemoteOlapTable fromOlapTable(OlapTable olapTable) { + try { + RemoteOlapTable externalOlapTable = new RemoteOlapTable(); + Class currentClass = olapTable.getClass(); + while (currentClass != null) { + for (Field field : currentClass.getDeclaredFields()) { + if (Modifier.isStatic(field.getModifiers())) { + continue; + } + field.setAccessible(true); + field.set(externalOlapTable, field.get(olapTable)); + } + currentClass = currentClass.getSuperclass(); + } + return externalOlapTable; + } catch (Exception e) { + throw new RuntimeException("failed to initial external olap table", e); + } + } + + public void rebuildPartitions(List oldPartitions, List updatedPartitions, + List removedPartitions) + throws AnalysisException { + if (LOG.isDebugEnabled()) { + LOG.debug("rebuildPartitions oldPartitions: " + oldPartitions.size() + ", updatedPartitions: " + + updatedPartitions.size() + ", removedPartitions: " + removedPartitions.size()); + } + ConcurrentHashMap newIdToPartition = new ConcurrentHashMap<>(); + for (Partition oldPartition : oldPartitions) { + newIdToPartition.put(oldPartition.getId(), oldPartition); + } + for (Long removedPartition : removedPartitions) { + newIdToPartition.remove(removedPartition); + } + for (Partition updatedPartition : updatedPartitions) { + newIdToPartition.put(updatedPartition.getId(), updatedPartition); + } + Map newNameToPartition = Maps.newTreeMap(); + for (Partition partition : newIdToPartition.values()) { + newNameToPartition.put(partition.getName(), partition); + } + this.idToPartition = newIdToPartition; + this.nameToPartition = newNameToPartition; + } + + public void invalidateBackendsIfNeed() { + ImmutableMap backends = + Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr().getBackends(catalog.getId()); + for (Partition partition : getPartitions()) { + for (Tablet tablet : partition.getBaseIndex().getTablets()) { + for (long backendId : tablet.getBackendIds()) { + if (!backends.containsKey(backendId)) { + Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr() + .invalidateBackendCache(catalog.getId()); + return; + } + } + } + } + } + + @Override + public long getCatalogId() { + return catalog.getId(); + } + + public ImmutableMap getAllBackendsByAllCluster() { + return Env.getCurrentEnv().getExtMetaCacheMgr().getDorisExternalMetaCacheMgr().getBackends(catalog.getId()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java index c6bceed94cb3a4..54c5051b244ab6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/RemoteDorisProperties.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.property.constants; public class RemoteDorisProperties { + public static final String FE_THRIFT_HOSTS = "fe_thrift_hosts"; public static final String FE_HTTP_HOSTS = "fe_http_hosts"; public static final String FE_ARROW_HOSTS = "fe_arrow_hosts"; @@ -26,6 +27,9 @@ public class RemoteDorisProperties { public static final String ENABLE_PARALLEL_RESULT_SINK = "enable_parallel_result_sink"; + // query remote doris use arrow flight or treat it as olap table + public static final String USE_ARROW_FLIGHT = "use_arrow_flight"; + // Supports older versions of remote Doris; enabling this may introduce some inaccuracies in schema parsing. public static final String COMPATIBLE = "compatible"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 4976a3810f5095..85b7cc1a9dbf25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -38,6 +38,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.ExternalView; +import org.apache.doris.datasource.doris.RemoteDorisExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; import org.apache.doris.datasource.iceberg.IcebergExternalTable; @@ -470,7 +471,24 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio case MAX_COMPUTE_EXTERNAL_TABLE: case TRINO_CONNECTOR_EXTERNAL_TABLE: case LAKESOUl_EXTERNAL_TABLE: + return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table, + qualifierWithoutTableName, ImmutableList.of(), + unboundRelation.getTableSample(), + unboundRelation.getTableSnapshot(), + Optional.ofNullable(unboundRelation.getScanParams()), Optional.empty()); case DORIS_EXTERNAL_TABLE: + ConnectContext ctx = cascadesContext.getConnectContext(); + RemoteDorisExternalTable externalTable = (RemoteDorisExternalTable) table; + if (!externalTable.useArrowFlight()) { + if (!ctx.getSessionVariable().isEnableNereidsDistributePlanner()) { + // use isEnableNereidsDistributePlanner instead of canUseNereidsDistributePlanner + // because it cannot work in explain command + throw new AnalysisException("query remote doris only support NereidsDistributePlanner" + + " when catalog use_arrow_flight is false"); + } + OlapTable olapTable = externalTable.getOlapTable(); + return makeOlapScan(olapTable, unboundRelation, qualifierWithoutTableName, cascadesContext); + } return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table, qualifierWithoutTableName, ImmutableList.of(), unboundRelation.getTableSample(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java index 8ea22e00e4297b..7404f01786fc24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java @@ -83,7 +83,8 @@ public static DistributionSpec convertDistribution(LogicalOlapScan olapScan) { // rounded robin algorithm. Therefore, the hashDistributedSpec can be broken except they are in // the same stable colocateGroup(CG) boolean isBelongStableCG = colocateTableIndex.isColocateTable(olapTable.getId()) - && !colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId())); + && !colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId())) + && olapTable.getCatalogId() == Env.getCurrentInternalCatalog().getId(); boolean isSelectUnpartition = olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED || olapScan.getSelectedPartitionIds().size() == 1; // TODO: find a better way to handle both tablet num == 1 and colocate table together in future diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index f103c627143711..20aa06b9fa0964 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.distribute; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.nereids.StatementContext; @@ -39,6 +40,7 @@ import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.MultiCastPlanFragment; +import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.qe.ConnectContext; @@ -85,6 +87,7 @@ public FragmentIdMapping plan() { try { BackendDistributedPlanWorkerManager workerManager = new BackendDistributedPlanWorkerManager( statementContext.getConnectContext(), notNeedBackend, isLoadJob); + addExternalBackends(workerManager); LoadBalanceScanWorkerSelector workerSelector = new LoadBalanceScanWorkerSelector(workerManager); FragmentIdMapping fragmentJobs = UnassignedJobBuilder.buildJobs(workerSelector, statementContext, idToFragments); @@ -119,6 +122,17 @@ public FragmentIdMapping plan() { } } + private void addExternalBackends(BackendDistributedPlanWorkerManager workerManager) throws AnalysisException { + for (PlanFragment planFragment : idToFragments.values()) { + List scanNodes = planFragment.getPlanRoot() + .collectInCurrentFragment(OlapScanNode.class::isInstance); + for (OlapScanNode scanNode : scanNodes) { + workerManager.addBackends(scanNode.getCatalogId(), + scanNode.getOlapTable().getAllBackendsByAllCluster()); + } + } + } + private FragmentIdMapping buildDistributePlans( Map idToUnassignedJobs, ListMultimap idToAssignedJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java index f67cd86891d1f5..52a31351c8cd08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/SelectedWorkers.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.distribute; +import org.apache.doris.catalog.Env; import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager; @@ -34,7 +35,7 @@ /** SelectedWorkers */ public class SelectedWorkers { private final DistributedPlanWorkerManager workerManager; - private final Map usedWorkersAddressToBackendID; + private final Map> usedWorkersAddressToBackendID; private final Set usedWorkers; public SelectedWorkers(DistributedPlanWorkerManager workerManager) { @@ -48,7 +49,8 @@ public void onCreateAssignedJob(AssignedJob assignedJob) { BackendWorker worker = (BackendWorker) assignedJob.getAssignedWorker(); if (usedWorkers.add(worker)) { Backend backend = worker.getBackend(); - usedWorkersAddressToBackendID.put( + usedWorkersAddressToBackendID.computeIfAbsent(worker.getCatalogId(), k -> Maps.newLinkedHashMap()); + usedWorkersAddressToBackendID.get(worker.getCatalogId()).put( new TNetworkAddress(backend.getHost(), backend.getBePort()), backend.getId() ); } @@ -56,11 +58,19 @@ public void onCreateAssignedJob(AssignedJob assignedJob) { /** tryToSelectRandomUsedWorker */ public DistributedPlanWorker tryToSelectRandomUsedWorker() { + long catalogId = Env.getCurrentInternalCatalog().getId(); if (usedWorkers.isEmpty()) { - return workerManager.randomAvailableWorker(); + return workerManager.randomAvailableWorker(catalogId); } else { - long id = workerManager.randomAvailableWorker(usedWorkersAddressToBackendID); - return workerManager.getWorker(id); + Map backendIDs; + if (usedWorkersAddressToBackendID.containsKey(catalogId)) { + backendIDs = usedWorkersAddressToBackendID.get(catalogId); + } else { + catalogId = usedWorkers.iterator().next().getCatalogId(); + backendIDs = usedWorkersAddressToBackendID.get(catalogId); + } + long id = workerManager.randomAvailableWorker(backendIDs); + return workerManager.getWorker(catalogId, id); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java index 5ca2c4bc6bf10e..e36165f10b4a9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java @@ -53,19 +53,26 @@ public class BackendDistributedPlanWorkerManager implements DistributedPlanWorke DUMMY_BACKEND.setAlive(true); } - private final Supplier> allClusterBackends = Suppliers.memoize(() -> { - try { - return Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); - } catch (Exception t) { - throw new NereidsException("Can not get backends: " + t, t); - } - }); + private final Map>> allClusterBackends = Maps.newHashMap(); - private final ImmutableMap currentClusterBackends; + private final Map> currentClusterBackends; + /** + * Constructor + */ public BackendDistributedPlanWorkerManager( ConnectContext context, boolean notNeedBackend, boolean isLoadJob) throws UserException { - this.currentClusterBackends = checkAndInitClusterBackends(context, notNeedBackend, isLoadJob); + this.currentClusterBackends = Maps.newHashMap(); + ImmutableMap internalBackends = checkAndInitClusterBackends(context, notNeedBackend, isLoadJob); + this.currentClusterBackends.put(Env.getCurrentInternalCatalog().getId(), internalBackends); + allClusterBackends.put(Env.getCurrentInternalCatalog().getId(), + Suppliers.memoize(() -> { + try { + return Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); + } catch (Exception t) { + throw new NereidsException("Can not get backends: " + t, t); + } + })); } private ImmutableMap checkAndInitClusterBackends( @@ -115,28 +122,38 @@ private ImmutableMap checkAndInitClusterBackends( } @Override - public DistributedPlanWorker getWorker(long backendId) { - ImmutableMap backends = this.allClusterBackends.get(); + public void addBackends(long catalogId, ImmutableMap backends) { + if (!currentClusterBackends.containsKey(catalogId)) { + currentClusterBackends.put(catalogId, backends); + } + if (!allClusterBackends.containsKey(catalogId)) { + allClusterBackends.put(catalogId, Suppliers.ofInstance(backends)); + } + } + + @Override + public DistributedPlanWorker getWorker(long catalogId, long backendId) { + ImmutableMap backends = this.allClusterBackends.get(catalogId).get(); Backend backend = backends.get(backendId); if (backend == null) { throw new IllegalStateException("Backend " + backendId + " is not exist"); } - return new BackendWorker(backend); + return new BackendWorker(catalogId, backend); } @Override - public DistributedPlanWorker getWorker(Backend backend) { - return new BackendWorker(backend); + public DistributedPlanWorker getWorker(long catalogId, Backend backend) { + return new BackendWorker(catalogId, backend); } @Override - public DistributedPlanWorker randomAvailableWorker() { + public DistributedPlanWorker randomAvailableWorker(long catalogId) { try { Reference selectedBackendId = new Reference<>(); - ImmutableMap backends = this.currentClusterBackends; + ImmutableMap backends = this.currentClusterBackends.get(catalogId); SimpleScheduler.getHost(backends, selectedBackendId); Backend selctedBackend = backends.get(selectedBackendId.getRef()); - return new BackendWorker(selctedBackend); + return new BackendWorker(catalogId, selctedBackend); } catch (Exception t) { throw new NereidsException("Can not get backends: " + t, t); } @@ -149,17 +166,17 @@ public long randomAvailableWorker(Map addressToBackendID) } @Override - public List getAllBackends(boolean needAlive) { + public List getAllBackends(long catalogId, boolean needAlive) { List backends = null; if (needAlive) { backends = Lists.newArrayList(); - for (Map.Entry entry : this.allClusterBackends.get().entrySet()) { + for (Map.Entry entry : this.allClusterBackends.get(catalogId).get().entrySet()) { if (entry.getValue().isQueryAvailable()) { backends.add(entry.getValue()); } } } else { - backends = Lists.newArrayList(this.allClusterBackends.get().values()); + backends = Lists.newArrayList(this.allClusterBackends.get(catalogId).get().values()); } return backends; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java index e76934cf847597..99f03be0e6ab47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java @@ -24,15 +24,22 @@ /** BackendWorker */ public class BackendWorker implements DistributedPlanWorker { private final Backend backend; + private final long catalogId; - public BackendWorker(Backend backend) { + public BackendWorker(long catalogId, Backend backend) { this.backend = Objects.requireNonNull(backend, "backend can not be null"); + this.catalogId = catalogId; } public Backend getBackend() { return backend; } + @Override + public long getCatalogId() { + return catalogId; + } + @Override public long id() { return backend.getId(); @@ -70,7 +77,7 @@ public boolean available() { @Override public int hashCode() { - return Objects.hash(backend.getId()); + return Objects.hash(backend.getId(), catalogId); } @Override @@ -78,7 +85,8 @@ public boolean equals(Object obj) { if (!(obj instanceof BackendWorker)) { return false; } - return backend.getId() == ((BackendWorker) obj).backend.getId(); + return backend.getId() == ((BackendWorker) obj).backend.getId() + && getCatalogId() == ((BackendWorker) obj).getCatalogId(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java index 79f8b482d88c2f..8def27bd68ecb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java @@ -21,6 +21,8 @@ * DistributedPlanWorker: a worker who can execute the assigned job(instance) of the DistributedPlan */ public interface DistributedPlanWorker extends Comparable { + long getCatalogId(); + long id(); // ipv4/ipv6 address diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java index 35aa1701a1c027..1efc024d57462b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorkerManager.java @@ -20,18 +20,22 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; +import com.google.common.collect.ImmutableMap; + import java.util.List; import java.util.Map; /** DistributedPlanWorkerManager */ public interface DistributedPlanWorkerManager { - DistributedPlanWorker getWorker(long backendId); + void addBackends(long catalogId, ImmutableMap backends); + + DistributedPlanWorker getWorker(long catalogId, long backendId); - DistributedPlanWorker getWorker(Backend backend); + DistributedPlanWorker getWorker(long catalogId, Backend backend); - DistributedPlanWorker randomAvailableWorker(); + DistributedPlanWorker randomAvailableWorker(long catalogId); long randomAvailableWorker(Map addressToBackendID); - List getAllBackends(boolean needAlive); + List getAllBackends(long catalogId, boolean needAlive); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java index 9a7d2f42476233..09fbd40d8d7bb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java @@ -17,12 +17,19 @@ package org.apache.doris.nereids.trees.plans.distribute.worker; +import org.apache.doris.catalog.Env; + /** DummyWorker */ public class DummyWorker implements DistributedPlanWorker { public static final DummyWorker INSTANCE = new DummyWorker(); private DummyWorker() {} + @Override + public long getCatalogId() { + return Env.getCurrentInternalCatalog().getId(); + } + @Override public long id() { return 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java index b479d24a0c99a8..4293408fa885b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java @@ -100,7 +100,7 @@ public Map selectReplicaAndWorkerW long bytes = getScanRangeSize(scanNode, onePartitionOneScanRangeLocation); WorkerScanRanges assigned = selectScanReplicaAndMinWorkloadWorker( - onePartitionOneScanRangeLocation, bytes, orderedScanRangeLocations); + onePartitionOneScanRangeLocation, bytes, orderedScanRangeLocations, scanNode.getCatalogId()); UninstancedScanSource scanRanges = workerScanRanges.computeIfAbsent( assigned.worker, w -> new UninstancedScanSource( @@ -202,7 +202,8 @@ private Map selectForBucket( WorkerScanRanges replicaAndWorker = selectScanReplicaAndMinWorkloadWorker( allPartitionTabletsInOneBucketInOneTable.get(0), allScanNodeScanBytesInOneBucket, - orderedScanRangeLocations + orderedScanRangeLocations, + scanNode.getCatalogId() ); selectedWorker = replicaAndWorker.worker; break; @@ -240,11 +241,11 @@ private Map selectForBucket( } private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker( - TScanRangeLocations tabletLocation, long tabletBytes, boolean orderedScanRangeLocations) { + TScanRangeLocations tabletLocation, long tabletBytes, boolean orderedScanRangeLocations, long catalogId) { List replicaLocations = tabletLocation.getLocations(); if (replicaLocations.size() == 1) { TScanRangeLocation replicaLocation = replicaLocations.get(0); - DistributedPlanWorker worker = workerManager.getWorker(replicaLocation.getBackendId()); + DistributedPlanWorker worker = workerManager.getWorker(catalogId, replicaLocation.getBackendId()); ScanRanges scanRanges = new ScanRanges(); TScanRangeParams scanReplicaParams = ScanWorkerSelector.buildScanReplicaParams(tabletLocation, replicaLocation); @@ -265,7 +266,7 @@ private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker( for (int i = 0; i < replicaNum; i++) { TScanRangeLocation replicaLocation = replicaLocations.get(i); - DistributedPlanWorker worker = workerManager.getWorker(replicaLocation.getBackendId()); + DistributedPlanWorker worker = workerManager.getWorker(catalogId, replicaLocation.getBackendId()); if (!worker.available()) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java index 2657be975b2163..167ea3dc33419a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.distribute.worker.job; +import org.apache.doris.catalog.Env; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; @@ -188,10 +189,14 @@ protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddP } protected List fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) { + long catalogId = Env.getCurrentInternalCatalog().getId(); + if (scanNodes != null && scanNodes.size() > 0) { + catalogId = scanNodes.get(0).getCatalogId(); + } return ImmutableList.of( assignWorkerAndDataSources(0, statementContext.getConnectContext().nextInstanceId(), - workerManager.randomAvailableWorker(), + workerManager.randomAvailableWorker(catalogId), DefaultScanSource.empty()) ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java index 045b1af8a03579..e8b30730103cb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.distribute.worker.job; +import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.ExternalScanNode; import org.apache.doris.datasource.mvcc.MvccUtil; @@ -64,6 +65,10 @@ public List computeAssignedJobs(DistributeContext distributeContext DictionarySink sink = (DictionarySink) fragment.getSink(); // it may be ScanNode or optimized to EmptySetNode. use universay function to get the deepest source. PlanNode rootNode = fragment.getDeepestLinearSource(); + long catalogId = Env.getCurrentInternalCatalog().getId(); + if (rootNode instanceof OlapScanNode) { + catalogId = ((OlapScanNode) rootNode).getCatalogId(); + } List bes; if (sink.allowAdaptiveLoad() && rootNode instanceof OlapScanNode) { Dictionary dictionary = sink.getDictionary(); @@ -75,21 +80,21 @@ public List computeAssignedJobs(DistributeContext distributeContext } if (usingVersion > lastVersion) { // load new data - bes = computeFullLoad(workerManager, inputJobs); + bes = computeFullLoad(workerManager, inputJobs, catalogId); } else { // try to load only for the BEs which is outdated - bes = computePartiallLoad(workerManager, inputJobs, dictionary, sink); + bes = computePartiallLoad(workerManager, inputJobs, dictionary, sink, catalogId); statementContext.setPartialLoadDictionary(true); } } else { // we explicitly request all BEs to load data. or ExternalTable. (or EmptySetNode - should not happen) - bes = computeFullLoad(workerManager, inputJobs); + bes = computeFullLoad(workerManager, inputJobs, catalogId); } List assignedJobs = Lists.newArrayList(); for (int i = 0; i < bes.size(); ++i) { // every time one BE is selected - DistributedPlanWorker worker = workerManager.getWorker(bes.get(i)); + DistributedPlanWorker worker = workerManager.getWorker(catalogId, bes.get(i)); if (worker != null) { assignedJobs.add(assignWorkerAndDataSources(i, connectContext.nextInstanceId(), worker, new DefaultScanSource(ImmutableMap.of()))); @@ -122,7 +127,7 @@ public List computeAssignedJobs(DistributeContext distributeContext } private List computeFullLoad(DistributedPlanWorkerManager workerManager, - ListMultimap inputJobs) { + ListMultimap inputJobs, long catalogId) { // input jobs from upstream fragment - may have many instances. ExchangeNode exchange = inputJobs.keySet().iterator().next(); // random one - should be same for any exchange. int expectInstanceNum = exchange.getNumInstances(); @@ -130,7 +135,7 @@ private List computeFullLoad(DistributedPlanWorkerManager workerManager // for Coordinator to know the right parallelism of DictionarySink exchange.getFragment().setParallelExecNum(expectInstanceNum); - List bes = workerManager.getAllBackends(true); + List bes = workerManager.getAllBackends(catalogId, true); if (bes.size() != expectInstanceNum) { // BE number changed when planning throw new IllegalArgumentException("BE number should be " + expectInstanceNum + ", but is " + bes.size()); @@ -139,10 +144,11 @@ private List computeFullLoad(DistributedPlanWorkerManager workerManager } private List computePartiallLoad(DistributedPlanWorkerManager workerManager, - ListMultimap inputJobs, Dictionary dictionary, DictionarySink sink) { + ListMultimap inputJobs, Dictionary dictionary, DictionarySink sink, + long catalogId) { // dictionary's src version(bundled with dictionary's version) is same with usingVersion(otherwise FullLoad) // so we can just use the src version to find the outdated backends - List outdateBEs = dictionary.filterOutdatedBEs(workerManager.getAllBackends(true)); + List outdateBEs = dictionary.filterOutdatedBEs(workerManager.getAllBackends(catalogId, true)); // reset all exchange node's instance number to the number of outdated backends PlanFragment fragment = inputJobs.keySet().iterator().next().getFragment(); // random one exchange diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java index bcd7218f9f4355..d4f32cce8961c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.distribute.worker.job; +import org.apache.doris.catalog.Env; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker; @@ -42,7 +43,8 @@ public UnassignedGroupCommitJob(StatementContext statementContext, public List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs) { TUniqueId instanceId = statementContext.getConnectContext().nextInstanceId(); - BackendWorker selectBackend = new BackendWorker(statementContext.getGroupCommitMergeBackend()); + BackendWorker selectBackend = new BackendWorker(Env.getCurrentEnv().getInternalCatalog().getId(), + statementContext.getGroupCommitMergeBackend()); return ImmutableList.of( new StaticAssignedJob( 0, instanceId, this, selectBackend, DefaultScanSource.empty() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index 472fafe451323c..44731efd3c728c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -455,7 +455,7 @@ private ListMultimap selectWorkerForMissingBucke for (Integer bucketIndex : selectBucketIndexes) { Long tabletIdInBucket = tabletIdsInOrder.get(bucketIndex); Tablet tabletInBucket = partition.getTablet(tabletIdInBucket); - List workers = getWorkersByReplicas(tabletInBucket); + List workers = getWorkersByReplicas(tabletInBucket, olapScanNode.getCatalogId()); if (workers.isEmpty()) { throw new IllegalStateException("Can not found available replica for bucket " + bucketIndex + ", table: " + olapScanNode); @@ -466,12 +466,12 @@ private ListMultimap selectWorkerForMissingBucke return fillUpWorkerToBuckets; } - private List getWorkersByReplicas(Tablet tablet) { + private List getWorkersByReplicas(Tablet tablet, long catalogId) { DistributedPlanWorkerManager workerManager = scanWorkerSelector.getWorkerManager(); List replicas = tablet.getReplicas(); List workers = Lists.newArrayListWithCapacity(replicas.size()); for (Replica replica : replicas) { - DistributedPlanWorker worker = workerManager.getWorker(replica.getBackendIdWithoutException()); + DistributedPlanWorker worker = workerManager.getWorker(catalogId, replica.getBackendIdWithoutException()); if (worker.available()) { workers.add(worker); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 0c57e4c481d754..897dc22283315d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -30,7 +30,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.DistributionInfo; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.Index; import org.apache.doris.catalog.MaterializedIndex; @@ -470,7 +469,7 @@ private void addScanRangeLocations(Partition partition, boolean isInvalidComputeGroup = ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup); boolean isNotCloudComputeGroup = computeGroup != null && !Config.isCloudMode(); - ImmutableMap allBackends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); + ImmutableMap allBackends = olapTable.getAllBackendsByAllCluster(); long partitionVisibleVersion = visibleVersion; String partitionVisibleVersionStr = fastToString(visibleVersion); for (Tablet tablet : tablets) { @@ -860,7 +859,7 @@ private void computeTabletInfo() throws UserException { Preconditions.checkState(scanBackendIds.isEmpty()); Preconditions.checkState(scanTabletIds.isEmpty()); Map> backendAlivePathHashs = Maps.newHashMap(); - for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) { + for (Backend backend : olapTable.getAllBackendsByAllCluster().values()) { Set hashSet = Sets.newLinkedHashSet(); for (DiskInfo diskInfo : backend.getDisks().values()) { if (diskInfo.isAlive()) { @@ -1377,4 +1376,12 @@ public void addTopnLazyMaterializeOutputColumns(Column column) { public void setGlobalRowIdColumn(Column globalRowIdColumn) { this.globalRowIdColumn = globalRowIdColumn; } + + @Override + public long getCatalogId() { + if (olapTable != null) { + return olapTable.getCatalogId(); + } + return super.getCatalogId(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 850564b109ad66..1dc2dda2c25f67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -35,6 +35,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PartitionInfo; @@ -712,4 +713,8 @@ public boolean hasSerialScanChildren() { public void setDesc(TupleDescriptor desc) { this.desc = desc; } + + public long getCatalogId() { + return Env.getCurrentInternalCatalog().getId(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java index 3970acfd5fe259..54e607dd27afa0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java @@ -19,6 +19,7 @@ import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker; import org.apache.doris.qe.runtime.BackendFragmentId; import org.apache.doris.qe.runtime.MultiFragmentsPipelineTask; import org.apache.doris.qe.runtime.PipelineExecutionTask; @@ -122,8 +123,9 @@ private Map buildBackendFragmentT PipelineExecutionTask executionTask) { ImmutableMap.Builder backendFragmentTasks = ImmutableMap.builder(); - for (Entry backendTask : executionTask.getChildrenTasks().entrySet()) { - Long backendId = backendTask.getKey(); + for (Entry backendTask : + executionTask.getChildrenTasks().entrySet()) { + Long backendId = backendTask.getKey().id(); for (Entry fragmentIdToTask : backendTask.getValue() .getChildrenTasks().entrySet()) { Integer fragmentId = fragmentIdToTask.getKey(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java index 4d2da16ff26e4b..6a52e3a6d9f855 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java @@ -23,6 +23,7 @@ import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; import org.apache.doris.qe.CoordinatorContext; import org.apache.doris.qe.SimpleScheduler; @@ -56,7 +57,7 @@ * * This class is used to describe which backend process which fragments */ -public class PipelineExecutionTask extends AbstractRuntimeTask { +public class PipelineExecutionTask extends AbstractRuntimeTask { private static final Logger LOG = LogManager.getLogger(PipelineExecutionTask.class); // immutable parameters @@ -68,7 +69,7 @@ public class PipelineExecutionTask extends AbstractRuntimeTask fragmentTasks) { + Map fragmentTasks) { // insert into stmt need latch to wait finish, but query stmt not need because result receiver can wait finish super(new ChildrenRuntimeTasks<>(fragmentTasks)); this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null"); @@ -78,13 +79,13 @@ public PipelineExecutionTask( // flatten to fragment tasks to quickly index by BackendFragmentId, when receive the report message ImmutableMap.Builder backendFragmentTasks = ImmutableMap.builder(); - for (Entry backendTask : fragmentTasks.entrySet()) { - Long backendId = backendTask.getKey(); + for (Entry backendTask : fragmentTasks.entrySet()) { + BackendWorker worker = backendTask.getKey(); for (Entry fragmentIdToTask : backendTask.getValue() .getChildrenTasks().entrySet()) { Integer fragmentId = fragmentIdToTask.getKey(); SingleFragmentPipelineTask fragmentTask = fragmentIdToTask.getValue(); - backendFragmentTasks.put(new BackendFragmentId(backendId, fragmentId), fragmentTask); + backendFragmentTasks.put(new BackendFragmentId(worker.id(), fragmentId), fragmentTask); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java index 0da6f4a5fe2e43..d9503e3145a51c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java @@ -65,13 +65,13 @@ private PipelineExecutionTask buildTask(CoordinatorContext coordinatorContext, return pipelineExecutionTask; } - private Map buildMultiFragmentTasks( + private Map buildMultiFragmentTasks( CoordinatorContext coordinatorContext, BackendServiceProxy backendServiceProxy, Map workerToFragmentsParam) { Map workerToSerializeFragments = serializeFragments(workerToFragmentsParam); - Map fragmentTasks = Maps.newLinkedHashMap(); + Map fragmentTasks = Maps.newLinkedHashMap(); for (Entry kv : workerToFragmentsParam.entrySet()) { BackendWorker worker = (BackendWorker) kv.getKey(); @@ -80,7 +80,7 @@ private Map buildMultiFragmentTasks( Backend backend = worker.getBackend(); fragmentTasks.put( - worker.id(), + worker, new MultiFragmentsPipelineTask( coordinatorContext, backend, diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 2bb495dc494829..61c81c9eae218b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -57,6 +57,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; @@ -69,6 +70,7 @@ import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; +import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.Util; import org.apache.doris.cooldown.CooldownDelete; @@ -90,6 +92,7 @@ import org.apache.doris.load.routineload.RoutineLoadJob.JobState; import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.master.MasterImpl; +import org.apache.doris.meta.MetaContext; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.PlanNodeAndHash; @@ -187,6 +190,8 @@ import org.apache.doris.thrift.TGetMetaRequest; import org.apache.doris.thrift.TGetMetaResult; import org.apache.doris.thrift.TGetMetaTable; +import org.apache.doris.thrift.TGetOlapTableMetaRequest; +import org.apache.doris.thrift.TGetOlapTableMetaResult; import org.apache.doris.thrift.TGetQueryStatsRequest; import org.apache.doris.thrift.TGetSnapshotRequest; import org.apache.doris.thrift.TGetSnapshotResult; @@ -223,6 +228,7 @@ import org.apache.doris.thrift.TNullableStringLiteral; import org.apache.doris.thrift.TOlapTableIndexTablets; import org.apache.doris.thrift.TOlapTablePartition; +import org.apache.doris.thrift.TPartitionMeta; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TPlsqlPackageResult; @@ -297,7 +303,10 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collection; @@ -4507,6 +4516,102 @@ public TGetTableTDEInfoResult getTableTDEInfo(TGetTableTDEInfoRequest request) t return result; } + /** + * only copy basic meta about table + * do not copy the partitions + * @param request + * @return + * @throws TException + */ + @Override + public TGetOlapTableMetaResult getOlapTableMeta(TGetOlapTableMetaRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + if (LOG.isDebugEnabled()) { + LOG.debug("receive getOlapTableMeta request: {}, client: {}", request, clientAddr); + } + TGetOlapTableMetaResult result = new TGetOlapTableMetaResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), + request.getTable(), clientAddr, PrivPredicate.SELECT); + String dbName = request.getDb(); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName); + if (db == null) { + throw new UserException("unknown database, database=" + dbName); + } + OlapTable table = (OlapTable) db.getTableNullable(request.getTable()); + if (table == null) { + throw new UserException("unknown table, table=" + request.getTable()); + } + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeConstants.meta_version); + metaContext.setThreadLocalInfo(); + table.readLock(); + try (ByteArrayOutputStream bOutputStream = new ByteArrayOutputStream(8192)) { + OlapTable copyTable = table.copyTableMeta(); + try (DataOutputStream out = new DataOutputStream(bOutputStream)) { + copyTable.write(out); + out.flush(); + result.setTableMeta(bOutputStream.toByteArray()); + } + Set updatedPartitionIds = Sets.newHashSet(table.getPartitionIds()); + List partitionMetas = request.getPartitionsSize() == 0 ? Lists.newArrayList() + : request.getPartitions(); + for (TPartitionMeta partitionMeta : partitionMetas) { + if (request.getTableId() != table.getId()) { + result.addToRemovedPartitions(partitionMeta.getId()); + continue; + } + Partition partition = table.getPartition(partitionMeta.getId()); + if (partition == null) { + result.addToRemovedPartitions(partitionMeta.getId()); + continue; + } + if (partition.getVisibleVersion() == partitionMeta.getVisibleVersion() + && partition.getVisibleVersionTime() == partitionMeta.getVisibleVersionTime()) { + updatedPartitionIds.remove(partitionMeta.getId()); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("receive getOlapTableMeta db: {} table:{} update partitions: {} removed partition:{}", + request.getDb(), request.getTable(), updatedPartitionIds.size(), + result.getRemovedPartitionsSize()); + } + for (Long partitionId : updatedPartitionIds) { + bOutputStream.reset(); + Partition partition = table.getPartition(partitionId); + try (DataOutputStream out = new DataOutputStream(bOutputStream)) { + Text.writeString(out, GsonUtils.GSON.toJson(partition)); + out.flush(); + result.addToUpdatedPartitions(ByteBuffer.wrap(bOutputStream.toByteArray())); + } + } + return result; + } finally { + table.readUnlock(); + MetaContext.remove(); + } + } catch (AuthenticationException e) { + LOG.warn("failed to check user auth: {}", e); + status.setStatusCode(TStatusCode.NOT_AUTHORIZED); + status.addToErrorMsgs(e.getMessage()); + return result; + } catch (UserException e) { + LOG.warn("failed to get table meta db:{} table:{} : {}", + request.getDb(), request.getTable(), e); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } catch (Exception e) { + LOG.warn("unknown exception when get table meta db:{} table:{} : {}", + request.getDb(), request.getTable(), e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(e.getMessage()); + return result; + } + } + private TStatus checkMaster() { TStatus status = new TStatus(TStatusCode.OK); if (!Env.getCurrentEnv().isMaster()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 3f54db60a86a4e..3eb001ba624be2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -33,6 +33,7 @@ import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.resource.Tag; import org.apache.doris.system.HeartbeatResponse.HbStatus; +import org.apache.doris.thrift.TBackend; import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStorageMedium; @@ -1125,5 +1126,16 @@ private void migrateEndpointTag(Map tagMap, String oldTagName, S } } + public static Backend fromThrift(TBackend backend) { + Backend result = new Backend(); + result.id = backend.getId(); + result.host = backend.getHost(); + result.httpPort = backend.getHttpPort(); + result.brpcPort = backend.getBrpcPort(); + result.bePort = backend.getBePort(); + result.setAlive(backend.isIsAlive()); + return result; + } + } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 88365ecba4562d..faf74d0b5f2e78 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1678,6 +1678,29 @@ struct TGetTableTDEInfoResult { 2: optional AgentService.TEncryptionAlgorithm algorithm } +struct TPartitionMeta { + 1: optional i64 id + 2: optional i64 visible_version + 3: optional i64 visible_version_time +} + +struct TGetOlapTableMetaRequest { + 1: required string user + 2: required string passwd + 3: required string db + 4: required string table + 5: required i64 table_id + 6: optional i32 version // todo serialize according to the version + 7: optional list partitions // client owned partition meta +} + +struct TGetOlapTableMetaResult { + 1: required Status.TStatus status + 2: required binary table_meta + 3: optional list updated_partitions + 4: optional list removed_partitions +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1784,4 +1807,6 @@ service FrontendService { TGetEncryptionKeysResult getEncryptionKeys(1: TGetEncryptionKeysRequest request) TGetTableTDEInfoResult getTableTDEInfo(1: TGetTableTDEInfoRequest request) + + TGetOlapTableMetaResult getOlapTableMeta(1: TGetOlapTableMetaRequest request) } diff --git a/regression-test/data/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.out b/regression-test/data/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.out new file mode 100644 index 00000000000000..1bb50b2cc13860 --- /dev/null +++ b/regression-test/data/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.out @@ -0,0 +1,48 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +2025-05-18T01:00 true -128 -32768 -2147483648 -9223372036854775808 -1234567890123456790 -123.456 -123456.789 -123457 -123456789012346 -1234567890123456789012345678 1970-01-01 0000-01-01T00:00 A Hello Hello, Doris! ["apple", "banana", "orange"] {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"} +2025-05-18T02:00 \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N +2025-05-18T03:00 false 127 32767 2147483647 9223372036854775807 1234567890123456789 123.456 123456.789 123457 123456789012346 1234567890123456789012345678 9999-12-31 9999-12-31T23:59:59 [] {} {"f1":11, "f2":3.14, "f3":"Emily"} +2025-05-18T04:00 true 0 0 0 0 0 0.0 0 0 0 0 2023-10-01 2023-10-01T12:34:56 A Hello Hello, Doris! ["apple", "banana", "orange"] {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"} + +-- !sql -- +2025-05-18T01:00 [1] [-128] [-32768] [-2147483648] [-9223372036854775808] [-1234567890123456790] [-123.456] [-123456.789] [-123457] [-123456789012346] [-1234567890123456789012345678] ["0000-01-01"] ["0000-01-01 00:00:00"] ["A"] ["Hello"] ["Hello, Doris!"] +2025-05-18T02:00 [null] [null] [null] [null] [null] [null] [null] [null] [null] [null] [null] [null] [null] [null] [null] [null] +2025-05-18T03:00 [0] [127] [32767] [2147483647] [9223372036854775807] [1234567890123456789] [123.456] [123456.789] [123457] [123456789012346] [1234567890123456789012345678] ["9999-12-31"] ["9999-12-31 23:59:59"] [""] [""] [""] +2025-05-18T04:00 [1] [0] [0] [0] [0] [0] [0] [0] [0] [0] [0] ["2023-10-01"] ["2023-10-01 12:34:56"] ["A"] ["Hello"] ["Hello, Doris!"] + +-- !sql -- +2025-05-18T01:00 2025-05-18T01:00 2025-05-18T01:00:00.100 2025-05-18T01:00:00.110 2025-05-18T01:00:00.111 2025-05-18T01:00:00.111100 2025-05-18T01:00:00.111110 2025-05-18T01:00:00.111111 + +-- !query_after_insert -- +2025-05-18T01:00 2025-05-18T01:00 2025-05-18T01:00:00.100 2025-05-18T01:00:00.110 2025-05-18T01:00:00.111 2025-05-18T01:00:00.111100 2025-05-18T01:00:00.111110 2025-05-18T01:00:00.111111 +2025-05-19T01:00 2025-05-19T01:00 2025-05-19T01:00:00.100 2025-05-19T01:00:00.110 2025-05-19T01:00:00.111 2025-05-19T01:00:00.111100 2025-05-19T01:00:00.111110 2025-05-19T01:00:00.111111 + +-- !after_insert_cmd -- +2025-05-18T01:00 true -128 -32768 -2147483648 -9223372036854775808 -1234567890123456790 -123.456 -123456.789 -123457 -123456789012346 -1234567890123456789012345678 1970-01-01 0000-01-01T00:00 A Hello Hello, Doris! ["apple", "banana", "orange"] {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"} +2025-05-18T02:00 \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N +2025-05-18T03:00 false 127 32767 2147483647 9223372036854775807 1234567890123456789 123.456 123456.789 123457 123456789012346 1234567890123456789012345678 9999-12-31 9999-12-31T23:59:59 [] {} {"f1":11, "f2":3.14, "f3":"Emily"} +2025-05-18T04:00 true 0 0 0 0 0 0.0 0 0 0 0 2023-10-01 2023-10-01T12:34:56 A Hello Hello, Doris! ["apple", "banana", "orange"] {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"} + +-- !after_insert_overwrite_cmd -- +2025-05-18T01:00 true -128 -32768 -2147483648 -9223372036854775808 -1234567890123456790 -123.456 -123456.789 -123457 -123456789012346 -1234567890123456789012345678 1970-01-01 0000-01-01T00:00 A Hello Hello, Doris! ["apple", "banana", "orange"] {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"} +2025-05-18T02:00 \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N +2025-05-18T03:00 false 127 32767 2147483647 9223372036854775807 1234567890123456789 123.456 123456.789 123457 123456789012346 1234567890123456789012345678 9999-12-31 9999-12-31T23:59:59 [] {} {"f1":11, "f2":3.14, "f3":"Emily"} +2025-05-18T04:00 true 0 0 0 0 0 0.0 0 0 0 0 2023-10-01 2023-10-01T12:34:56 A Hello Hello, Doris! ["apple", "banana", "orange"] {"Emily":101, "age":25} {"f1":11, "f2":3.14, "f3":"Emily"} + +-- !join -- +1 reason1 2023-01-01 1 100 error1 1000 2023-01-01T00:00 +2 reason2 2023-01-01 2 200 error2 2000 2023-01-01T00:00 +3 reason3 2023-01-01 3 300 error3 3000 2023-01-01T00:00 + +-- !join_predicate -- +2 reason2 2023-01-01 2 200 error2 2000 2023-01-01T00:00 + +-- !join_partition -- +1 2023-01-01 reason1 2023-01-01 1 100 error1 1000 2023-01-01T00:00 +2 2023-01-02 reason2 2023-01-02 2 200 error2 2000 2023-01-02T00:00 +3 2023-01-03 reason3 2023-01-03 3 300 error3 3000 2023-01-03T00:00 + +-- !join_partition_predicate -- +2 2023-01-02 reason2 2023-01-02 2 200 error2 2000 2023-01-02T00:00 + diff --git a/regression-test/pipeline/external/conf/regression-conf.groovy b/regression-test/pipeline/external/conf/regression-conf.groovy index 0d3e94ace11a56..75299d7908292f 100644 --- a/regression-test/pipeline/external/conf/regression-conf.groovy +++ b/regression-test/pipeline/external/conf/regression-conf.groovy @@ -31,6 +31,7 @@ extArrowFlightSqlPort = 8081 extArrowFlightSqlUser = "root" extArrowFlightSqlPassword= "" extArrowFlightHttpPort= 8131 +extFeThriftPort = 9020 ccrDownstreamUrl = "jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true&allowLoadLocalInfile=true" ccrDownstreamUser = "root" diff --git a/regression-test/suites/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.groovy b/regression-test/suites/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.groovy new file mode 100644 index 00000000000000..50cb13deca88d9 --- /dev/null +++ b/regression-test/suites/external_table_p0/remote_doris/test_query_remote_doris_as_olap_table_select.groovy @@ -0,0 +1,350 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_query_remote_doris_as_olap_table_select", "p0,external,doris,external_docker,external_docker_doris") { + String remote_doris_host = context.config.otherConfigs.get("extArrowFlightSqlHost") + String remote_doris_arrow_port = context.config.otherConfigs.get("extArrowFlightSqlPort") + String remote_doris_http_port = context.config.otherConfigs.get("extArrowFlightHttpPort") + String remote_doris_user = context.config.otherConfigs.get("extArrowFlightSqlUser") + String remote_doris_psw = context.config.otherConfigs.get("extArrowFlightSqlPassword") + String remote_doris_thrift_port = context.config.otherConfigs.get("extFeThriftPort") + + def showres = sql "show frontends"; + remote_doris_arrow_port = showres[0][6] + remote_doris_http_port = showres[0][3] + remote_doris_thrift_port = showres[0][5] + log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: ${remote_doris_thrift_port}") + + def showres2 = sql "show backends"; + log.info("show backends log = ${showres2}") + + def db_name = "test_query_remote_doris_as_olap_table_select_db" + def catalog_name = "test_query_remote_doris_as_olap_table_select_catalog" + + sql """DROP DATABASE IF EXISTS ${db_name}""" + + sql """CREATE DATABASE IF NOT EXISTS ${db_name}""" + + sql """ + CREATE TABLE `${db_name}`.`test_remote_doris_all_types_select_t` ( + `id` datetime(3) NOT NULL, + `c_boolean` boolean NULL, + `c_tinyint` tinyint NULL, + `c_smallint` smallint NULL, + `c_int` int NULL, + `c_bigint` bigint NULL, + `c_largeint` largeint NULL, + `c_float` float NULL, + `c_double` double NULL, + `c_decimal9` decimal(9,0) NULL, + `c_decimal18` decimal(18,0) NULL, + `c_decimal32` decimal(32,0) NULL, + `c_date` date NULL, + `c_datetime` datetime NULL, + `c_char` char(1) NULL, + `c_varchar` varchar(65533) NULL, + `c_string` text NULL, + `c_array_s` array NULL, + `c_map` MAP NULL, + `c_struct` STRUCT NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t` values('2025-05-18 01:00:00.000', true, -128, -32768, -2147483648, -9223372036854775808, -1234567890123456790, -123.456, -123456.789, -123457, -123456789012346, -1234567890123456789012345678, '1970-01-01', '0000-01-01 00:00:00', 'A', 'Hello', 'Hello, Doris!', '["apple", "banana", "orange"]', {"Emily":101,"age":25} , {11, 3.14, "Emily"}) + """ + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t` values('2025-05-18 02:00:00.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL) + """ + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t` values('2025-05-18 03:00:00.000', false, 127, 32767, 2147483647, 9223372036854775807, 1234567890123456789, 123.456, 123456.789, 123457, 123456789012346, 1234567890123456789012345678, '9999-12-31', '9999-12-31 23:59:59', '', '', '', [], {}, {11, 3.14, "Emily"}) + """ + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t` values('2025-05-18 04:00:00.000', true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, '2023-10-01', '2023-10-01 12:34:56', 'A', 'Hello', 'Hello, Doris!', '["apple", "banana", "orange"]', {"Emily":101,"age":25} , {11, 3.14, "Emily"}); + """ + + sql """ + CREATE TABLE `${db_name}`.`test_remote_doris_all_types_select_t2` ( + `id` datetime(3) NOT NULL, + `a_boolean` array NULL, + `a_tinyint` array NULL, + `a_smallint` array NULL, + `a_int` array NULL, + `a_bigint` array NULL, + `a_largeint` array NULL, + `a_float` array NULL, + `a_double` array NULL, + `a_decimal9` array NULL, + `a_decimal18` array NULL, + `a_decimal32` array NULL, + `a_date` array NULL, + `a_datetime` array NULL, + `a_char` array NULL, + `a_varchar` array NULL, + `a_string` array NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2` values('2025-05-18 01:00:00.000', [true], [-128], [-32768], [-2147483648], [-9223372036854775808], [-1234567890123456790], [-123.456], [-123456.789], [-123457], [-123456789012346], [-1234567890123456789012345678], ['0000-01-01'], ['0000-01-01 00:00:00'], ['A'], ['Hello'], ['Hello, Doris!']) + """ + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2` values('2025-05-18 02:00:00.000', [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL], [NULL]) + """ + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2` values('2025-05-18 03:00:00.000', [false], [127], [32767], [2147483647], [9223372036854775807], [1234567890123456789], [123.456], [123456.789], [123457], [123456789012346], [1234567890123456789012345678], ['9999-12-31'], ['9999-12-31 23:59:59'], [''], [''], ['']) + """ + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t2` values('2025-05-18 04:00:00.000', [true], [0], [0], [0], [0], [0], [0], [0], [0], [0], [0], ['2023-10-01'], ['2023-10-01 12:34:56'], ['A'], ['Hello'], ['Hello, Doris!']); + """ + + sql """ + CREATE TABLE `${db_name}`.`test_remote_doris_all_types_select_t3` ( + `id` datetime NOT NULL, + `datetime_0` datetime(0) NULL, + `datetime_1` datetime(1) NULL, + `datetime_3` datetime(2) NULL, + `datetime_4` datetime(3) NULL, + `datetime_5` datetime(4) NULL, + `datetime_6` datetime(5) NULL, + `datetime_7` datetime(6) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t3` values('2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111', '2025-05-18 01:00:00.111111'); + """ + + + sql """ + DROP CATALOG IF EXISTS `${catalog_name}` + """ + + + sql """ + CREATE CATALOG `${catalog_name}` PROPERTIES ( + 'type' = 'doris', + 'fe_http_hosts' = 'http://${remote_doris_host}:${remote_doris_http_port}', + 'fe_arrow_hosts' = '${remote_doris_host}:${remote_doris_arrow_port}', + 'fe_thrift_hosts' = '${remote_doris_host}:${remote_doris_thrift_port}', + 'user' = '${remote_doris_user}', + 'password' = '${remote_doris_psw}', + 'use_arrow_flight' = 'false' + ); + """ + + qt_sql """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t` order by id + """ + + qt_sql """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t2` order by id + """ + + qt_sql """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t3` order by id + """ + + // test select after insert also get correct data, it seems we can newly partition version + sql """ + INSERT INTO `${db_name}`.`test_remote_doris_all_types_select_t3` values('2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111', '2025-05-19 01:00:00.111111'); + """ + qt_query_after_insert """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t3` order by id + """ + + // test insert command + sql """ + CREATE TABLE `${db_name}`.`test_remote_doris_all_types_insert` ( + `id` datetime(3) NOT NULL, + `c_boolean` boolean NULL, + `c_tinyint` tinyint NULL, + `c_smallint` smallint NULL, + `c_int` int NULL, + `c_bigint` bigint NULL, + `c_largeint` largeint NULL, + `c_float` float NULL, + `c_double` double NULL, + `c_decimal9` decimal(9,0) NULL, + `c_decimal18` decimal(18,0) NULL, + `c_decimal32` decimal(32,0) NULL, + `c_date` date NULL, + `c_datetime` datetime NULL, + `c_char` char(1) NULL, + `c_varchar` varchar(65533) NULL, + `c_string` text NULL, + `c_array_s` array NULL, + `c_map` MAP NULL, + `c_struct` STRUCT NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + // test insert into + explain { + sql("insert into `${db_name}`.`test_remote_doris_all_types_insert` select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`") + contains("VOlapScanNode") + } + sql """ + insert into `${db_name}`.`test_remote_doris_all_types_insert` select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t` + """ + qt_after_insert_cmd """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${db_name}`.`test_remote_doris_all_types_insert` order by id + """ + // test insert overwrite + explain { + sql("insert OVERWRITE table `${db_name}`.`test_remote_doris_all_types_insert` select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t`") + contains("VOlapScanNode") + } + sql """ + insert OVERWRITE table `${db_name}`.`test_remote_doris_all_types_insert` select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${catalog_name}`.`${db_name}`.`test_remote_doris_all_types_select_t` + """ + qt_after_insert_overwrite_cmd """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${db_name}`.`test_remote_doris_all_types_insert` order by id + """ + + // test join operation + sql """ + CREATE TABLE `${db_name}`.`left_inner_table` ( + log_type INT NOT NULL, + reason VARCHAR(1024) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`log_type`) + DISTRIBUTED BY HASH(`log_type`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ + INSERT INTO `${db_name}`.`left_inner_table` VALUES + (1,'reason1'), + (2,'reason2'), + (3,'reason3'); + """ + sql """ + CREATE TABLE `${db_name}`.`right_remote_table` ( + log_time DATE NOT NULL, + log_type INT NOT NULL, + error_code INT, + error_msg VARCHAR(1024), + op_id BIGINT, + op_time DATETIME + ) ENGINE=OLAP + DUPLICATE KEY(log_time, log_type, error_code) + DISTRIBUTED BY HASH(`log_type`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ + INSERT INTO `${db_name}`.`right_remote_table` VALUES + ('2023-01-01',1,100,'error1',1000,'2023-01-01 00:00:00'), + ('2023-01-01',2,200,'error2',2000,'2023-01-01 00:00:00'), + ('2023-01-01',3,300,'error3',3000,'2023-01-01 00:00:00'); + """ + qt_join """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table` a + join `${catalog_name}`.`${db_name}`.`right_remote_table` b on a.`log_type` = b.`log_type` order by a.`log_type` + """ + qt_join_predicate """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table` a + join `${catalog_name}`.`${db_name}`.`right_remote_table` b on a.`log_type` = b.`log_type` and b.op_id=2000 order by a.`log_type` + """ + + // test partition table + sql """ + CREATE TABLE `${db_name}`.`left_inner_table_partition` ( + log_type INT NOT NULL, + `log_time` date NOT NULL, + reason VARCHAR(1024) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`log_type`) + PARTITION BY RANGE(`log_time`) + (PARTITION p20230101 VALUES [('2023-01-01'), ('2023-01-02')), + PARTITION p20230102 VALUES [('2023-01-02'), ('2023-01-03')), + PARTITION p20230103 VALUES [('2023-01-03'), ('2023-01-04'))) + DISTRIBUTED BY HASH(`log_type`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + INSERT INTO `${db_name}`.`left_inner_table_partition` VALUES + (1,'2023-01-01','reason1'), + (2,'2023-01-02','reason2'), + (3,'2023-01-03','reason3'); + """ + + sql """ + CREATE TABLE `${db_name}`.`right_remote_table_partition` ( + log_time DATE NOT NULL, + log_type INT NOT NULL, + error_code INT, + error_msg VARCHAR(1024), + op_id BIGINT, + op_time DATETIME + ) ENGINE=OLAP + DUPLICATE KEY(log_time, log_type, error_code) + PARTITION BY RANGE(`log_time`) + (PARTITION p20230101 VALUES [('2023-01-01'), ('2023-01-02')), + PARTITION p20230102 VALUES [('2023-01-02'), ('2023-01-03')), + PARTITION p20230103 VALUES [('2023-01-03'), ('2023-01-04'))) + DISTRIBUTED BY HASH(`log_type`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + INSERT INTO `${db_name}`.`right_remote_table_partition` VALUES + ('2023-01-01',1,100,'error1',1000,'2023-01-01 00:00:00'), + ('2023-01-02',2,200,'error2',2000,'2023-01-02 00:00:00'), + ('2023-01-03',3,300,'error3',3000,'2023-01-03 00:00:00'); + """ + + qt_join_partition """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table_partition` a + join `${catalog_name}`.`${db_name}`.`right_remote_table_partition` b on a.`log_type` = b.`log_type` and a.`log_time` = b.`log_time` order by a.`log_type` + """ + + qt_join_partition_predicate """ + select /*+ SET_VAR(enable_nereids_distribute_planner=true, enable_sql_cache=true) */ * from `${db_name}`.`left_inner_table_partition` a + join `${catalog_name}`.`${db_name}`.`right_remote_table_partition` b on a.`log_type` = b.`log_type` and a.`log_time` = b.`log_time` and b.log_time='2023-01-02' order by a.`log_type` + """ + + sql """ DROP DATABASE IF EXISTS ${db_name} """ + sql """ DROP CATALOG IF EXISTS `${catalog_name}` """ +} \ No newline at end of file diff --git a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy index a77d21f59f55b4..ef5bfcae8f653d 100644 --- a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy +++ b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_select.groovy @@ -21,11 +21,13 @@ suite("test_remote_doris_all_types_select", "p0,external,doris,external_docker,e String remote_doris_http_port = context.config.otherConfigs.get("extArrowFlightHttpPort") String remote_doris_user = context.config.otherConfigs.get("extArrowFlightSqlUser") String remote_doris_psw = context.config.otherConfigs.get("extArrowFlightSqlPassword") + String remote_doris_thrift_port = context.config.otherConfigs.get("extFeThriftPort") def showres = sql "show frontends"; remote_doris_arrow_port = showres[0][6] remote_doris_http_port = showres[0][3] - log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}") + remote_doris_thrift_port = showres[0][5] + log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: ${remote_doris_thrift_port}") def showres2 = sql "show backends"; log.info("show backends log = ${showres2}") @@ -150,8 +152,10 @@ suite("test_remote_doris_all_types_select", "p0,external,doris,external_docker,e 'type' = 'doris', 'fe_http_hosts' = 'http://${remote_doris_host}:${remote_doris_http_port}', 'fe_arrow_hosts' = '${remote_doris_host}:${remote_doris_arrow_port}', + 'fe_thrift_hosts' = '${remote_doris_host}:${remote_doris_thrift_port}', 'user' = '${remote_doris_user}', - 'password' = '${remote_doris_psw}' + 'password' = '${remote_doris_psw}', + 'use_arrow_flight' = 'true' ); """ diff --git a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy index ab9f89019e245f..5438617e09c829 100644 --- a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy +++ b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_all_types_show.groovy @@ -21,11 +21,13 @@ suite("test_remote_doris_all_types_show", "p0,external,doris,external_docker,ext String remote_doris_http_port = context.config.otherConfigs.get("extArrowFlightHttpPort") String remote_doris_user = context.config.otherConfigs.get("extArrowFlightSqlUser") String remote_doris_psw = context.config.otherConfigs.get("extArrowFlightSqlPassword") + String remote_doris_thrift_port = context.config.otherConfigs.get("extFeThriftPort") def showres = sql "show frontends"; remote_doris_arrow_port = showres[0][6] remote_doris_http_port = showres[0][3] - log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}") + remote_doris_thrift_port = showres[0][5] + log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: ${remote_doris_thrift_port}") def showres2 = sql "show backends"; log.info("show backends log = ${showres2}") @@ -141,10 +143,12 @@ suite("test_remote_doris_all_types_show", "p0,external,doris,external_docker,ext sql """ CREATE CATALOG `test_remote_doris_all_types_catalog` PROPERTIES ( 'type' = 'doris', + 'fe_thrift_hosts' = '${remote_doris_host}:${remote_doris_thrift_port}', 'fe_http_hosts' = 'http://${remote_doris_host}:${remote_doris_http_port}', 'fe_arrow_hosts' = '${remote_doris_host}:${remote_doris_arrow_port}', 'user' = '${remote_doris_user}', - 'password' = '${remote_doris_psw}' + 'password' = '${remote_doris_psw}', + 'use_arrow_flight' = 'true' ); """ diff --git a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy index 82e2b2e3550c4f..710ee5ecb5477c 100644 --- a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy +++ b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_catalog.groovy @@ -21,11 +21,13 @@ suite("test_remote_doris_catalog", "p0,external,doris,external_docker,external_d String remote_doris_http_port = context.config.otherConfigs.get("extArrowFlightHttpPort") String remote_doris_user = context.config.otherConfigs.get("extArrowFlightSqlUser") String remote_doris_psw = context.config.otherConfigs.get("extArrowFlightSqlPassword") + String remote_doris_thrift_port = context.config.otherConfigs.get("extFeThriftPort") def showres = sql "show frontends"; remote_doris_arrow_port = showres[0][6] remote_doris_http_port = showres[0][3] - log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}") + remote_doris_thrift_port = showres[0][5] + log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: ${remote_doris_thrift_port}") def showres2 = sql "show backends"; log.info("show backends log = ${showres2}") @@ -39,10 +41,12 @@ suite("test_remote_doris_catalog", "p0,external,doris,external_docker,external_d sql """ CREATE CATALOG `test_remote_doris_catalog_catalog` PROPERTIES ( 'type' = 'doris', + 'fe_thrift_hosts' = '${remote_doris_host}:${remote_doris_thrift_port}', 'fe_http_hosts' = 'http://${remote_doris_host}:${remote_doris_http_port}', 'fe_arrow_hosts' = '${remote_doris_host}:${remote_doris_arrow_port}', 'user' = '${remote_doris_user}', - 'password' = '${remote_doris_psw}' + 'password' = '${remote_doris_psw}', + 'use_arrow_flight' = 'true' ); """ diff --git a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy index 3782f5c50a7c50..cf7dfd5ea31584 100644 --- a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy +++ b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_predict.groovy @@ -21,11 +21,13 @@ suite("test_remote_doris_predict", "p0,external,doris,external_docker,external_d String remote_doris_http_port = context.config.otherConfigs.get("extArrowFlightHttpPort") String remote_doris_user = context.config.otherConfigs.get("extArrowFlightSqlUser") String remote_doris_psw = context.config.otherConfigs.get("extArrowFlightSqlPassword") + String remote_doris_thrift_port = context.config.otherConfigs.get("extFeThriftPort") def showres = sql "show frontends"; remote_doris_arrow_port = showres[0][6] remote_doris_http_port = showres[0][3] - log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}") + remote_doris_thrift_port = showres[0][5] + log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: ${remote_doris_thrift_port}") def showres2 = sql "show backends"; log.info("show backends log = ${showres2}") @@ -63,10 +65,12 @@ suite("test_remote_doris_predict", "p0,external,doris,external_docker,external_d sql """ CREATE CATALOG `test_remote_doris_predict_catalog` PROPERTIES ( 'type' = 'doris', + 'fe_thrift_hosts' = '${remote_doris_host}:${remote_doris_thrift_port}', 'fe_http_hosts' = 'http://${remote_doris_host}:${remote_doris_http_port}', 'fe_arrow_hosts' = '${remote_doris_host}:${remote_doris_arrow_port}', 'user' = '${remote_doris_user}', - 'password' = '${remote_doris_psw}' + 'password' = '${remote_doris_psw}', + 'use_arrow_flight' = 'true' ); """ diff --git a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy index e7596eafffb7ec..02d1028c26b68c 100644 --- a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy +++ b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_refresh.groovy @@ -21,11 +21,13 @@ suite("test_remote_doris_refresh", "p0,external,doris,external_docker,external_d String remote_doris_http_port = context.config.otherConfigs.get("extArrowFlightHttpPort") String remote_doris_user = context.config.otherConfigs.get("extArrowFlightSqlUser") String remote_doris_psw = context.config.otherConfigs.get("extArrowFlightSqlPassword") + String remote_doris_thrift_port = context.config.otherConfigs.get("extFeThriftPort") def showres = sql "show frontends"; remote_doris_arrow_port = showres[0][6] remote_doris_http_port = showres[0][3] - log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}") + remote_doris_thrift_port = showres[0][5] + log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: ${remote_doris_thrift_port}") def showres2 = sql "show backends"; log.info("show backends log = ${showres2}") @@ -39,10 +41,12 @@ suite("test_remote_doris_refresh", "p0,external,doris,external_docker,external_d sql """ CREATE CATALOG `test_remote_doris_refresh_catalog` PROPERTIES ( 'type' = 'doris', + 'fe_thrift_hosts' = '${remote_doris_host}:${remote_doris_thrift_port}', 'fe_http_hosts' = 'http://${remote_doris_host}:${remote_doris_http_port}', 'fe_arrow_hosts' = '${remote_doris_host}:${remote_doris_arrow_port}', 'user' = '${remote_doris_user}', - 'password' = '${remote_doris_psw}' + 'password' = '${remote_doris_psw}', + 'use_arrow_flight' = 'true' ); """ diff --git a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy index f2633b8cd28559..223c294d81169e 100644 --- a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy +++ b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_statistics.groovy @@ -21,11 +21,13 @@ suite("test_remote_doris_statistics", "p0,external,doris,external_docker,externa String remote_doris_http_port = context.config.otherConfigs.get("extArrowFlightHttpPort") String remote_doris_user = context.config.otherConfigs.get("extArrowFlightSqlUser") String remote_doris_psw = context.config.otherConfigs.get("extArrowFlightSqlPassword") + String remote_doris_thrift_port = context.config.otherConfigs.get("extFeThriftPort") def showres = sql "show frontends"; remote_doris_arrow_port = showres[0][6] remote_doris_http_port = showres[0][3] - log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}") + remote_doris_thrift_port = showres[0][5] + log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: ${remote_doris_thrift_port}") def showres2 = sql "show backends"; log.info("show backends log = ${showres2}") @@ -71,10 +73,12 @@ suite("test_remote_doris_statistics", "p0,external,doris,external_docker,externa sql """ CREATE CATALOG `test_remote_doris_statistics_catalog` PROPERTIES ( 'type' = 'doris', + 'fe_thrift_hosts' = '${remote_doris_host}:${remote_doris_thrift_port}', 'fe_http_hosts' = 'http://${remote_doris_host}:${remote_doris_http_port}', 'fe_arrow_hosts' = '${remote_doris_host}:${remote_doris_arrow_port}', 'user' = '${remote_doris_user}', - 'password' = '${remote_doris_psw}' + 'password' = '${remote_doris_psw}', + 'use_arrow_flight' = 'true' ); """ diff --git a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy index a9dd083ffd9a59..e6de44e005a79d 100644 --- a/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy +++ b/regression-test/suites/external_table_p0/remote_doris/test_remote_doris_table_stats.groovy @@ -21,11 +21,13 @@ suite("test_remote_doris_table_stats", "p0,external,doris,external_docker,extern String remote_doris_http_port = context.config.otherConfigs.get("extArrowFlightHttpPort") String remote_doris_user = context.config.otherConfigs.get("extArrowFlightSqlUser") String remote_doris_psw = context.config.otherConfigs.get("extArrowFlightSqlPassword") + String remote_doris_thrift_port = context.config.otherConfigs.get("extFeThriftPort") def showres = sql "show frontends"; remote_doris_arrow_port = showres[0][6] remote_doris_http_port = showres[0][3] - log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}") + remote_doris_thrift_port = showres[0][5] + log.info("show frontends log = ${showres}, arrow: ${remote_doris_arrow_port}, http: ${remote_doris_http_port}, thrift: ${remote_doris_thrift_port}") def showres2 = sql "show backends"; log.info("show backends log = ${showres2}") @@ -71,10 +73,12 @@ suite("test_remote_doris_table_stats", "p0,external,doris,external_docker,extern sql """ CREATE CATALOG `test_remote_doris_table_stats_catalog` PROPERTIES ( 'type' = 'doris', + 'fe_thrift_hosts' = '${remote_doris_host}:${remote_doris_thrift_port}', 'fe_http_hosts' = 'http://${remote_doris_host}:${remote_doris_http_port}', 'fe_arrow_hosts' = '${remote_doris_host}:${remote_doris_arrow_port}', 'user' = '${remote_doris_user}', - 'password' = '${remote_doris_psw}' + 'password' = '${remote_doris_psw}', + 'use_arrow_flight' = 'true' ); """