Skip to content

Commit

Permalink
[fix](cloud tvf) Fix tvf query run in cloud multi cluster (apache#37157)
Browse files Browse the repository at this point in the history
Fix
```
mysql> select * from numbers("number" = "10");
ERROR 1105 (HY000): errCode = 2, detailMessage = There is no scanNode Backend available.[10002: not exist]
```
  • Loading branch information
deardeng committed Aug 12, 2024
1 parent c0e6d1e commit 54115f7
Show file tree
Hide file tree
Showing 64 changed files with 403 additions and 326 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import org.apache.doris.proto.InternalService.PFetchColIdsRequest;
import org.apache.doris.proto.InternalService.PFetchColIdsRequest.Builder;
Expand Down Expand Up @@ -156,14 +157,14 @@ public AlterLightSchemaChangeInfo callForColumnsInfo()
Map<Long, Future<PFetchColIdsResponse>> beIdToRespFuture = new HashMap<>();
try {
for (Long beId : beIdToRequest.keySet()) {
final Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beId);
final Backend backend = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().get(beId);
final TNetworkAddress address =
new TNetworkAddress(Objects.requireNonNull(backend).getHost(), backend.getBrpcPort());
final Future<PFetchColIdsResponse> responseFuture = BackendServiceProxy.getInstance()
.getColumnIdsByTabletIds(address, beIdToRequest.get(beId));
beIdToRespFuture.put(beId, responseFuture);
}
} catch (RpcException e) {
} catch (RpcException | UserException e) {
throw new IllegalStateException("fetch columnIds RPC failed", e);
}
// wait for and get results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,15 @@ private static void checkDecommissionWithReplicaAllocation(List<Backend> decommi
Set<Tag> decommissionTags = decommissionBackends.stream().map(be -> be.getLocationTag())
.collect(Collectors.toSet());
Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
List<Backend> bes;
try {
bes = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().asList();
} catch (UserException e) {
LOG.warn("Failed to get current cluster backend by current cluster.", e);
return;
}

for (Backend backend : bes) {
long beId = backend.getId();
if (!backend.isScheduleAvailable()
|| decommissionBackends.stream().anyMatch(be -> be.getId() == beId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -36,8 +37,8 @@
public class AdminCancelRebalanceDiskStmt extends DdlStmt {
private List<Backend> backends = Lists.newArrayList();

public AdminCancelRebalanceDiskStmt(List<String> backends) {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend();
public AdminCancelRebalanceDiskStmt(List<String> backends) throws UserException {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
Map<String, Long> backendsID = new HashMap<String, Long>();
for (Backend backend : backendsInfo.values()) {
backendsID.put(NetUtils.getHostPortInAccessibleFormat(backend.getHost(), backend.getHeartbeatPort()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -36,8 +37,8 @@
public class AdminCleanTrashStmt extends DdlStmt {
private List<Backend> backends = Lists.newArrayList();

public AdminCleanTrashStmt(List<String> backends) {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend();
public AdminCleanTrashStmt(List<String> backends) throws UserException {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
Map<String, Long> backendsID = new HashMap<String, Long>();
for (Backend backend : backendsInfo.values()) {
backendsID.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,26 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class AdminRebalanceDiskStmt extends DdlStmt {
private static final Logger LOG = LogManager.getLogger(AdminRebalanceDiskStmt.class);
private List<Backend> backends = Lists.newArrayList();
private long timeoutS = 0;

public AdminRebalanceDiskStmt(List<String> backends) {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend();
ImmutableMap<Long, Backend> backendsInfo;
try {
backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
} catch (AnalysisException e) {
LOG.warn("failed to get backends,", e);
return;
}
Map<String, Long> backendsID = new HashMap<String, Long>();
for (Backend backend : backendsInfo.values()) {
backendsID.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class ShowTrashDiskStmt extends ShowStmt {

private Backend backend;

public ShowTrashDiskStmt(String backendQuery) {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend();
public ShowTrashDiskStmt(String backendQuery) throws AnalysisException {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
for (Backend backend : backendsInfo.values()) {
String backendStr = NetUtils.getHostPortInAccessibleFormat(backend.getHost(), backend.getHeartbeatPort());
if (backendQuery.equals(backendStr)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
public class ShowTrashStmt extends ShowStmt {
private List<Backend> backends = Lists.newArrayList();

public ShowTrashStmt() {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend();
public ShowTrashStmt() throws AnalysisException {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
for (Backend backend : backendsInfo.values()) {
this.backends.add(backend);
}
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -6168,8 +6168,8 @@ public void cleanTrash(AdminCleanTrashStmt stmt) {
AgentTaskExecutor.submit(batchTask);
}

public void cleanUDFCacheTask(DropFunctionStmt stmt) {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend();
public void cleanUDFCacheTask(DropFunctionStmt stmt) throws UserException {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
String functionSignature = stmt.signatureString();
AgentBatchTask batchTask = new AgentBatchTask();
for (Backend backend : backendsInfo.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -176,7 +179,15 @@ public void createHdfsVault(StorageVault vault) throws DdlException {
}

private void alterSyncVaultTask() {
systemInfoService.getAllBackends().forEach(backend -> {
List<Backend> bes;
try {
// get system all backends
bes = systemInfoService.getAllBackendsByAllCluster().values().asList();
} catch (UserException e) {
LOG.warn("failed to get current cluster backends: {}", e);
return;
}
bes.forEach(backend -> {
TNetworkAddress address = backend.getBrpcAddress();
try {
BackendServiceProxy.getInstance().alterVaultSync(address, PAlterVaultSyncRequest.newBuilder().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.catalog;

import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
Expand Down Expand Up @@ -51,7 +52,13 @@ public TabletStatMgr() {

@Override
protected void runAfterCatalogReady() {
ImmutableMap<Long, Backend> backends = Env.getCurrentSystemInfo().getIdToBackend();
ImmutableMap<Long, Backend> backends;
try {
backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
} catch (AnalysisException e) {
LOG.warn("can't get backends info", e);
return;
}
long start = System.currentTimeMillis();
taskPool.submit(() -> {
// no need to get tablet stat if backend is not alive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,13 @@ public Rebalancer getRebalancer() {
* update working slots at the beginning of each round
*/
private boolean updateWorkingSlots() {
ImmutableMap<Long, Backend> backends = infoService.getAllBackendsMap();
ImmutableMap<Long, Backend> backends;
try {
backends = infoService.getAllBackendsByAllCluster();
} catch (AnalysisException e) {
LOG.warn("failed to get backends with current cluster", e);
return false;
}
for (Backend backend : backends.values()) {
if (!backend.hasPathHash() && backend.isAlive()) {
// when upgrading, backend may not get path info yet. so return false and wait for next round.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.cloud.proto.Cloud.ClusterPB;
import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
Expand Down Expand Up @@ -368,25 +369,18 @@ public int getMinPipelineExecutorSize() {
}

@Override
public List<Backend> getBackendsByCurrentCluster() throws UserException {
public ImmutableMap<Long, Backend> getBackendsByCurrentCluster() throws AnalysisException {
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
throw new UserException("connect context is null");
throw new AnalysisException("connect context is null");
}

String cluster = ctx.getCurrentCloudCluster();
if (Strings.isNullOrEmpty(cluster)) {
throw new UserException("cluster name is empty");
throw new AnalysisException("cluster name is empty");
}

//((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster);

return getBackendsByClusterName(cluster);
}

@Override
public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster() throws UserException {
List<Backend> backends = getBackendsByCurrentCluster();
List<Backend> backends = getBackendsByClusterName(cluster);
Map<Long, Backend> idToBackend = Maps.newHashMap();
for (Backend be : backends) {
idToBackend.put(be.getId(), be);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -57,8 +58,8 @@ public ReplicasProcNode(long tabletId, List<Replica> replicas) {
}

@Override
public ProcResult fetchResult() {
ImmutableMap<Long, Backend> backendMap = Env.getCurrentSystemInfo().getIdToBackend();
public ProcResult fetchResult() throws AnalysisException {
ImmutableMap<Long, Backend> backendMap = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();

BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ public TabletsProcDir(Table table, MaterializedIndex index) {
this.index = index;
}

public List<List<Comparable>> fetchComparableResult(long version, long backendId, Replica.ReplicaState state) {
public List<List<Comparable>> fetchComparableResult(long version, long backendId, Replica.ReplicaState state)
throws AnalysisException {
Preconditions.checkNotNull(table);
Preconditions.checkNotNull(index);
ImmutableMap<Long, Backend> backendMap = Env.getCurrentSystemInfo().getIdToBackend();
ImmutableMap<Long, Backend> backendMap = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();

List<List<Comparable>> tabletInfos = new ArrayList<List<Comparable>>();
Map<Long, String> pathHashToRoot = new HashMap<>();
Expand Down Expand Up @@ -179,12 +180,12 @@ public List<List<Comparable>> fetchComparableResult(long version, long backendId
return tabletInfos;
}

private List<List<Comparable>> fetchComparableResult() {
private List<List<Comparable>> fetchComparableResult() throws AnalysisException {
return fetchComparableResult(-1, -1, null);
}

@Override
public ProcResult fetchResult() {
public ProcResult fetchResult() throws AnalysisException {
List<List<Comparable>> tabletInfos = fetchComparableResult();
// sort by tabletId, replicaId
ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(0, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ public class TrashProcDir implements ProcDirInterface {
private List<Backend> backends = Lists.newArrayList();

public TrashProcDir() {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend();
ImmutableMap<Long, Backend> backendsInfo;
try {
backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
} catch (AnalysisException e) {
LOG.warn("Can't get backends info", e);
return;
}
for (Backend backend : backendsInfo.values()) {
this.backends.add(backend);
}
Expand Down
Loading

0 comments on commit 54115f7

Please sign in to comment.