diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 29887658f7acee..af62171007b571 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -104,6 +104,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; @@ -111,10 +112,11 @@ public class ReportHandler extends Daemon { private static final Logger LOG = LogManager.getLogger(ReportHandler.class); - private BlockingQueue reportQueue = Queues.newLinkedBlockingQueue(); + private BlockingQueue reportQueue = Queues.newLinkedBlockingQueue(); + + private Map reportTasks = Maps.newHashMap(); private enum ReportType { - UNKNOWN, TASK, DISK, TABLET @@ -158,7 +160,7 @@ public TMasterResult handleReport(TReportRequest request) throws TException { Map partitionsVersion = null; long reportVersion = -1; - ReportType reportType = ReportType.UNKNOWN; + ReportType reportType = null; if (request.isSetTasks()) { tasks = request.getTasks(); @@ -189,8 +191,16 @@ public TMasterResult handleReport(TReportRequest request) throws TException { backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore()); } - ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, partitionsVersion, reportVersion, - request.getStoragePolicy(), request.getResource(), request.getNumCores(), + if (reportType == null) { + tStatus.setStatusCode(TStatusCode.INTERNAL_ERROR); + tStatus.setErrorMsgs(Lists.newArrayList("unknown report type")); + LOG.error("receive unknown report type from be {}. current queue size: {}", + backend.getId(), reportQueue.size()); + return result; + } + + ReportTask reportTask = new ReportTask(beId, reportType, tasks, disks, tablets, partitionsVersion, + reportVersion, request.getStoragePolicy(), request.getResource(), request.getNumCores(), request.getPipelineExecutorSize()); try { putToQueue(reportTask); @@ -202,8 +212,8 @@ public TMasterResult handleReport(TReportRequest request) throws TException { tStatus.setErrorMsgs(errorMsgs); return result; } - LOG.info("receive report from be {}. type: {}, current queue size: {}", - backend.getId(), reportType, reportQueue.size()); + LOG.info("receive report from be {}. type: {}, report version {}, current queue size: {}", + backend.getId(), reportType, reportVersion, reportQueue.size()); return result; } @@ -215,7 +225,14 @@ private void putToQueue(ReportTask reportTask) throws Exception { "the report queue size exceeds the limit: " + Config.report_queue_size + ". current: " + currentSize); } - reportQueue.put(reportTask); + + BackendReportType backendReportType = new BackendReportType(reportTask.beId, reportTask.reportType); + + synchronized (reportTasks) { + reportTasks.put(backendReportType, reportTask); + } + + reportQueue.put(backendReportType); } private Map buildTabletMap(List tabletList) { @@ -230,9 +247,38 @@ private Map buildTabletMap(List tabletList) { return tabletMap; } + private class BackendReportType { + private long beId; + private ReportType reportType; + + public BackendReportType(long beId, ReportType reportType) { + this.beId = beId; + this.reportType = reportType; + } + + @Override + public int hashCode() { + return Objects.hash(beId, reportType); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof BackendReportType)) { + return false; + } + BackendReportType otherBeReport = (BackendReportType) other; + return this.beId == otherBeReport.beId + && this.reportType == otherBeReport.reportType; + } + } + private class ReportTask extends MasterTask { private long beId; + private ReportType reportType; private Map> tasks; private Map disks; private Map tablets; @@ -244,12 +290,13 @@ private class ReportTask extends MasterTask { private int cpuCores; private int pipelineExecutorSize; - public ReportTask(long beId, Map> tasks, + public ReportTask(long beId, ReportType reportType, Map> tasks, Map disks, Map tablets, Map partitionsVersion, long reportVersion, List storagePolicies, List storageResources, int cpuCores, int pipelineExecutorSize) { this.beId = beId; + this.reportType = reportType; this.tasks = tasks; this.disks = disks; this.tablets = tablets; @@ -1383,13 +1430,28 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI @Override protected void runOneCycle() { while (true) { - ReportTask task = null; - try { - task = reportQueue.take(); + ReportTask task = takeReportTask(); + if (task != null) { task.exec(); - } catch (InterruptedException e) { - LOG.warn("got interupted exception when executing report", e); } } } + + private ReportTask takeReportTask() { + BackendReportType backendReportType; + try { + backendReportType = reportQueue.take(); + } catch (InterruptedException e) { + LOG.warn("got interupted exception when executing report", e); + return null; + } + + ReportTask task = null; + synchronized (reportTasks) { + task = reportTasks.get(backendReportType); + reportTasks.remove(backendReportType); + } + + return task; + } }