Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](query) prefer to chose tablet on alive disk #39467 #39654

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -919,13 +919,6 @@ void report_task_callback(const TMasterInfo& master_info) {
}

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) {
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests at the same time
// and can not be processed.
if (config::report_random_wait) {
random_sleep(5);
}

TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.disks = true;
Expand Down Expand Up @@ -966,8 +959,16 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.tablets = true;

uint64_t report_version = s_report_version;
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
uint64_t report_version;
for (int i = 0; i < 5; i++) {
request.tablets.clear();
report_version = s_report_version;
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
if (report_version == s_report_version) {
break;
}
}

if (report_version < s_report_version) {
// TODO llj This can only reduce the possibility for report error, but can't avoid it.
// If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
Expand Down
2 changes: 2 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ int main(int argc, char** argv) {
exit(1);
}

exec_env->get_storage_engine()->notify_listeners();

while (!doris::k_doris_exit) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ public boolean hasPathHash() {
return pathHash != 0;
}

public boolean isAlive() {
return state == DiskState.ONLINE;
}

public boolean isStorageMediumMatch(TStorageMedium storageMedium) {
return this.storageMedium == storageMedium;
}
Expand Down
28 changes: 20 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,11 @@ public Multimap<Long, Long> getNormalReplicaBackendPathMap() {
}

// for query
public List<Replica> getQueryableReplicas(long visibleVersion, boolean allowFailedVersion) {
public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Long>> backendAlivePathHashs,
boolean allowFailedVersion) {
List<Replica> allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size());
List<Replica> auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size());
List<Replica> deadPathReplica = Lists.newArrayList();
for (Replica replica : replicas) {
if (replica.isBad()) {
continue;
Expand All @@ -294,21 +296,31 @@ public List<Replica> getQueryableReplicas(long visibleVersion, boolean allowFail
continue;
}

if (!replica.checkVersionCatchUp(visibleVersion, false)) {
continue;
}

Set<Long> thisBeAlivePaths = backendAlivePathHashs.get(replica.getBackendId());
ReplicaState state = replica.getState();
if (state.canQuery()) {
if (replica.checkVersionCatchUp(visibleVersion, false)) {
allQueryableReplica.add(replica);
}
// if thisBeAlivePaths contains pathHash = 0, it mean this be hadn't report disks state.
// should ignore this case.
if (replica.getPathHash() != -1 && thisBeAlivePaths != null
&& !thisBeAlivePaths.contains(replica.getPathHash())
&& !thisBeAlivePaths.contains(0L)) {
deadPathReplica.add(replica);
} else if (state.canQuery()) {
allQueryableReplica.add(replica);
} else if (state == ReplicaState.DECOMMISSION) {
if (replica.checkVersionCatchUp(visibleVersion, false)) {
auxiliaryReplica.add(replica);
}
auxiliaryReplica.add(replica);
}
}

if (allQueryableReplica.isEmpty()) {
allQueryableReplica = auxiliaryReplica;
}
if (allQueryableReplica.isEmpty()) {
allQueryableReplica = deadPathReplica;
}

if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) {
long minVersionCount = allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
Expand Down Expand Up @@ -808,6 +809,15 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Map<Object, Object> objectPool = new HashMap<Object, Object>();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
Set<Long> backendHealthPathHashs;
if (backend == null) {
backendHealthPathHashs = Sets.newHashSet();
} else {
backendHealthPathHashs = backend.getDisks().values().stream()
.filter(DiskInfo::isAlive)
.map(DiskInfo::getPathHash).collect(Collectors.toSet());
}
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
Expand Down Expand Up @@ -863,7 +873,24 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
long currentBackendReportVersion = Env.getCurrentSystemInfo()
.getBackendReportVersion(backendId);
if (backendReportVersion < currentBackendReportVersion) {
continue;

// if backendHealthPathHashs contains health path hash 0,
// it means this backend hadn't reported disks state,
// should ignore this case.
boolean thisReplicaOnBadDisk = replica.getPathHash() != -1L
&& !backendHealthPathHashs.contains(replica.getPathHash())
&& !backendHealthPathHashs.contains(0L);

boolean existsOtherHealthReplica = tablet.getReplicas().stream()
.anyMatch(r -> r.getBackendId() != replica.getBackendId()
&& r.getVersion() >= replica.getVersion()
&& r.getLastFailedVersion() == -1L
&& !r.isBad());

// if replica is on bad disks and there are other health replicas, still delete it.
if (!(thisReplicaOnBadDisk && existsOtherHealthReplica)) {
continue;
}
}

BinlogConfig binlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
Expand Down Expand Up @@ -732,7 +733,7 @@ private Collection<Long> distributionPrune(
}

private void addScanRangeLocations(Partition partition,
List<Tablet> tablets) throws UserException {
List<Tablet> tablets, Map<Long, Set<Long>> backendAlivePathHashs) throws UserException {
long visibleVersion = partition.getVisibleVersion();
String visibleVersionStr = String.valueOf(visibleVersion);

Expand Down Expand Up @@ -776,7 +777,8 @@ private void addScanRangeLocations(Partition partition,
paloRange.setTabletId(tabletId);

// random shuffle List && only collect one copy
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion,
backendAlivePathHashs, skipMissingVersion);
if (replicas.isEmpty()) {
if (ConnectContext.get().getSessionVariable().skipBadTablet) {
continue;
Expand Down Expand Up @@ -1125,6 +1127,12 @@ private void computeTabletInfo() throws UserException {
*/
Preconditions.checkState(scanBackendIds.size() == 0);
Preconditions.checkState(scanTabletIds.size() == 0);
Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream()
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
}

for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
Expand Down Expand Up @@ -1166,7 +1174,7 @@ private void computeTabletInfo() throws UserException {

totalTabletsNum += selectedTable.getTablets().size();
selectedSplitNum += tablets.size();
addScanRangeLocations(partition, tablets);
addScanRangeLocations(partition, tablets, backendAlivePathHashs);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.system.Backend;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class QueryTabletTest extends TestWithFeService {

@Override
protected int backendNum() {
return 3;
}

@Test
public void testTabletOnBadDisks() throws Exception {
createDatabase("db1");
createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 1"
+ " properties('replication_num' = '3')");

Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("db1");
OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
Assertions.assertNotNull(tbl);
Tablet tablet = tbl.getPartitions().iterator().next()
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
.getTablets().iterator().next();

List<Replica> replicas = tablet.getReplicas();
Assertions.assertEquals(3, replicas.size());
for (Replica replica : replicas) {
Assertions.assertTrue(replica.getPathHash() != -1L);
}

Assertions.assertEquals(replicas,
tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));

// disk mark as bad
Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId())
.getDisks().values().forEach(disk -> disk.setState(DiskInfo.DiskState.OFFLINE));

// lost disk
replicas.get(1).setPathHash(-123321L);

Assertions.assertEquals(Lists.newArrayList(replicas.get(2)),
tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
}

private Map<Long, Set<Long>> getAlivePathHashs() {
Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream()
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
}

return backendAlivePathHashs;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TCloneReq;
import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TDropTabletReq;
import org.apache.doris.thrift.TExecPlanFragmentParams;
Expand Down Expand Up @@ -83,7 +84,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

/*
* This class is used to create mock backends.
Expand Down Expand Up @@ -191,13 +194,17 @@ public void run() {
TTaskType taskType = request.getTaskType();
switch (taskType) {
case CREATE:
++reportVersion;
handleCreateTablet(request, finishTaskRequest);
break;
case ALTER:
++reportVersion;
break;
case DROP:
handleDropTablet(request, finishTaskRequest);
break;
case CLONE:
++reportVersion;
handleCloneTablet(request, finishTaskRequest);
break;
case STORAGE_MEDIUM_MIGRATE:
Expand All @@ -223,6 +230,30 @@ public void run() {
}
}

private void handleCreateTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) {
TCreateTabletReq req = request.getCreateTabletReq();
List<DiskInfo> candDisks = backendInFe.getDisks().values().stream()
.filter(disk -> req.storage_medium == disk.getStorageMedium() && disk.isAlive())
.collect(Collectors.toList());
if (candDisks.isEmpty()) {
candDisks = backendInFe.getDisks().values().stream()
.filter(DiskInfo::isAlive)
.collect(Collectors.toList());
}
DiskInfo choseDisk = candDisks.isEmpty() ? null
: candDisks.get(new Random().nextInt(candDisks.size()));

List<TTabletInfo> tabletInfos = Lists.newArrayList();
TTabletInfo tabletInfo = new TTabletInfo();
tabletInfo.setTabletId(req.tablet_id);
tabletInfo.setVersion(req.version);
tabletInfo.setPathHash(choseDisk == null ? -1L : choseDisk.getPathHash());
tabletInfo.setReplicaId(req.replica_id);
tabletInfo.setUsed(true);
tabletInfos.add(tabletInfo);
finishTaskRequest.setFinishTabletInfos(tabletInfos);
}

private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) {
TDropTabletReq req = request.getDropTabletReq();
long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id));
Expand Down
Loading