diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java index 8768eb34ce1..d9746dcd6e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java @@ -22,11 +22,13 @@ public class OpProfileDef { public int operatorId; public int operatorType; public int incomingCount; + public long optimalMemoryAllocation; - public OpProfileDef(int operatorId, int operatorType, int incomingCount) { + public OpProfileDef(int operatorId, int operatorType, int incomingCount, long optimalMemoryAllocation) { this.operatorId = operatorId; this.operatorType = operatorType; this.incomingCount = incomingCount; + this.optimalMemoryAllocation = optimalMemoryAllocation; } public int getOperatorId(){ return operatorId; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java index d47e8d9c00f..c512959dc79 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java @@ -57,7 +57,7 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl conte } else { OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), - OperatorUtilities.getChildCount(popConfig)); + OperatorUtilities.getChildCount(popConfig), popConfig.getMaxAllocation()); this.stats = context.getStats().newOperatorStats(def, allocator); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index 67a8b80f280..f682104aa1a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -45,7 +45,7 @@ public class OperatorStats { public long[] recordsReceivedByInput; public long[] batchesReceivedByInput; private long[] schemaCountByInput; - + private long optimalMemoryAllocation; private boolean inProcessing = false; private boolean inSetup = false; @@ -62,7 +62,7 @@ public class OperatorStats { private int inputCount; public OperatorStats(OpProfileDef def, BufferAllocator allocator){ - this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator); + this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator, def.optimalMemoryAllocation); } /** @@ -74,7 +74,7 @@ public OperatorStats(OpProfileDef def, BufferAllocator allocator){ */ public OperatorStats(OperatorStats original, boolean isClean) { - this(original.operatorId, original.operatorType, original.inputCount, original.allocator); + this(original.operatorId, original.operatorType, original.inputCount, original.allocator, original.optimalMemoryAllocation); if ( !isClean ) { inProcessing = original.inProcessing; @@ -88,7 +88,7 @@ public OperatorStats(OperatorStats original, boolean isClean) { } @VisibleForTesting - public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) { + public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long initialAllocation) { super(); this.allocator = allocator; this.operatorId = operatorId; @@ -97,6 +97,7 @@ public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAll this.recordsReceivedByInput = new long[inputCount]; this.batchesReceivedByInput = new long[inputCount]; this.schemaCountByInput = new long[inputCount]; + this.optimalMemoryAllocation = initialAllocation; } private String assertionError(String msg){ @@ -207,6 +208,7 @@ public OperatorProfile getProfile() { .setOperatorId(operatorId) // .setSetupNanos(setupNanos) // .setProcessNanos(processingNanos) + .setOptimalMemAllocation(optimalMemoryAllocation) .setWaitNanos(waitNanos); if (allocator != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index 126ad0751d4..07693c4c9c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -101,18 +101,6 @@ public boolean enforceWidth() { return getMinParallelizationWidth() > 1; } - @Override - @JsonIgnore - public long getInitialAllocation() { - return 0; - } - - @Override - @JsonIgnore - public long getMaxAllocation() { - return 0; - } - @Override @JsonIgnore public boolean canPushdownProjects(List columns) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 95a1235017a..9c1ba591435 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -54,7 +54,7 @@ public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorCon } //Creating new stat for appending to list stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), - config.getOperatorType(), OperatorUtilities.getChildCount(config)), + config.getOperatorType(), OperatorUtilities.getChildCount(config), config.getMaxAllocation()), this.oContext.getAllocator()); fragmentContext.getStats().addOperatorStats(this.stats); this.fragmentContext = fragmentContext; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java index fecea5e7f6e..541130b1670 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.planner.fragment; +import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.function.CheckedConsumer; @@ -29,7 +30,7 @@ import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig; import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; - +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -46,6 +47,7 @@ * fragment is based on the cluster state and provided queue configuration. */ public class DistributedQueueParallelizer extends SimpleParallelizer { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class); private final boolean planHasMemory; private final QueryContext queryContext; private final QueryResourceManager rm; @@ -65,9 +67,13 @@ public BiFunction getMemory() { if (!planHasMemory) { final DrillNode drillEndpointNode = DrillNode.create(endpoint); if (operator.isBufferedOperator(queryContext)) { - return operators.get(drillEndpointNode).get(operator); + Long operatorsMemory = operators.get(drillEndpointNode).get(operator); + logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory); + return operatorsMemory; } else { - return operator.getMaxAllocation(); + Long nonBufferedMemory = (long)operator.getCost().getMemoryCost(); + logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, nonBufferedMemory); + return nonBufferedMemory; } } else { @@ -92,10 +98,11 @@ public BiFunction getMemory() { */ public void adjustMemory(PlanningSet planningSet, Set roots, Map onlineEndpointUUIDs) throws ExecutionSetupException { - if (planHasMemory) { + logger.debug(" Plan already has memory settings. Adjustment of the memory is skipped"); return; } + logger.info(" Memory adjustment phase triggered"); final Map onlineDrillNodeUUIDs = onlineEndpointUUIDs.entrySet().stream() .collect(Collectors.toMap(x -> DrillNode.create(x.getKey()), x -> x.getValue())); @@ -112,7 +119,7 @@ public void adjustMemory(PlanningSet planningSet, Set roots, for (Wrapper wrapper : roots) { traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragment) -> { - MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext); + MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext, rm.minimumOperatorMemory()); fragment.getNode().getRoot().accept(calculator, fragment); NodeResources.merge(totalNodeResources, fragment.getResourceMap()); operators.entrySet() @@ -122,6 +129,10 @@ public void adjustMemory(PlanningSet planningSet, Set roots, })); } + if (logger.isDebugEnabled()) { + logger.debug(" Total node resource requirements for the plan is {}", getJSONFromResourcesMap(totalNodeResources)); + } + final QueryQueueConfig queueConfig; try { queueConfig = this.rm.selectQueue(max(totalNodeResources.values())); @@ -130,8 +141,10 @@ public void adjustMemory(PlanningSet planningSet, Set roots, } Map>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits(operators, totalNodeResources, - queueConfig.getMaxQueryMemoryInMBPerNode()); + List>> memoryAdjustedOperators = + ensureOperatorMemoryWithinLimits(operators, totalNodeResources, + convertMBToBytes(Math.min(queueConfig.getMaxQueryMemoryInMBPerNode(), + queueConfig.getQueueTotalMemoryInMB(onlineEndpointUUIDs.size())))); memoryAdjustedOperators.entrySet().stream().forEach((x) -> { Map memoryPerOperator = x.getValue().stream() .collect(Collectors.toMap(operatorLongPair -> operatorLongPair.getLeft(), @@ -140,9 +153,17 @@ public void adjustMemory(PlanningSet planningSet, Set roots, this.operators.put(x.getKey(), memoryPerOperator); }); + if (logger.isDebugEnabled()) { + logger.debug(" Total node resource requirements after adjustment {}", getJSONFromResourcesMap(totalNodeResources)); + } + this.rm.setCost(convertToUUID(totalNodeResources, onlineDrillNodeUUIDs)); } + private long convertMBToBytes(long value) { + return value * 1024 * 1024; + } + private Map convertToUUID(Map nodeResourcesMap, Map onlineDrillNodeUUIDs) { Map nodeResourcesPerUUID = new HashMap<>(); @@ -172,50 +193,81 @@ private NodeResources max(Collection resources) { */ private Map>> ensureOperatorMemoryWithinLimits(Map>> memoryPerOperator, - Map nodeResourceMap, long nodeLimit) { + Map nodeResourceMap, long nodeLimit) throws ExecutionSetupException { // Get the physical operators which are above the node memory limit. - Map>> onlyMemoryAboveLimitOperators = new HashMap<>(); - memoryPerOperator.entrySet().stream().forEach((entry) -> { - onlyMemoryAboveLimitOperators.putIfAbsent(entry.getKey(), new ArrayList<>()); - if (nodeResourceMap.get(entry.getKey()).getMemoryInBytes() > nodeLimit) { - onlyMemoryAboveLimitOperators.get(entry.getKey()).addAll(entry.getValue()); - } - }); - + Map>> onlyMemoryAboveLimitOperators = memoryPerOperator.entrySet() + .stream() + .filter(entry -> nodeResourceMap.get(entry.getKey()).getMemoryInBytes() > nodeLimit) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); // Compute the total memory required by the physical operators on the drillbits which are above node limit. // Then use the total memory to adjust the memory requirement based on the permissible node limit. Map>> memoryAdjustedDrillbits = new HashMap<>(); onlyMemoryAboveLimitOperators.entrySet().stream().forEach( - entry -> { - Long totalMemory = entry.getValue().stream().mapToLong(Pair::getValue).sum(); - List> adjustedMemory = entry.getValue().stream().map(operatorMemory -> { + CheckedConsumer.throwingConsumerWrapper(entry -> { + Long totalBufferedOperatorsMemoryReq = entry.getValue().stream().mapToLong(Pair::getValue).sum(); + Long nonBufferedOperatorsMemoryReq = nodeResourceMap.get(entry.getKey()).getMemoryInBytes() - totalBufferedOperatorsMemoryReq; + Long bufferedOperatorsMemoryLimit = nodeLimit - nonBufferedOperatorsMemoryReq; + if (bufferedOperatorsMemoryLimit < 0 || nonBufferedOperatorsMemoryReq < 0) { + logger.error(" Operator memory requirements for buffered operators {} or non buffered operators {} is negative", bufferedOperatorsMemoryLimit, + nonBufferedOperatorsMemoryReq); + throw new ExecutionSetupException("Operator memory requirements for buffered operators " + bufferedOperatorsMemoryLimit + " or non buffered operators " + + nonBufferedOperatorsMemoryReq + " is less than zero"); + } + List> adjustedMemory = entry.getValue().stream().map(operatorAndMemory -> { // formula to adjust the memory is (optimalMemory / totalMemory(this is for all the operators)) * permissible_node_limit. - return Pair.of(operatorMemory.getKey(), (long) Math.ceil(operatorMemory.getValue()/totalMemory * nodeLimit)); + return Pair.of(operatorAndMemory.getKey(), + Math.max(this.rm.minimumOperatorMemory(), + (long) Math.ceil(operatorAndMemory.getValue()/totalBufferedOperatorsMemoryReq * bufferedOperatorsMemoryLimit))); }).collect(Collectors.toList()); memoryAdjustedDrillbits.put(entry.getKey(), adjustedMemory); NodeResources nodeResources = nodeResourceMap.get(entry.getKey()); - nodeResources.setMemoryInBytes(adjustedMemory.stream().mapToLong(Pair::getValue).sum()); - } + nodeResources.setMemoryInBytes(nonBufferedOperatorsMemoryReq + adjustedMemory.stream().mapToLong(Pair::getValue).sum()); + }) ); + checkIfWithinLimit(nodeResourceMap, nodeLimit); + // Get all the operations on drillbits which were adjusted for memory and merge them with operators which are not // adjusted for memory. - Map>> allDrillbits = new HashMap<>(); - memoryPerOperator.entrySet().stream().filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())).forEach( - operatorMemory -> { - allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue()); - } - ); + Map>> allDrillbits = memoryPerOperator.entrySet() + .stream() + .filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); memoryAdjustedDrillbits.entrySet().stream().forEach( - operatorMemory -> { - allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue()); - } - ); + operatorMemory -> allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue())); // At this point allDrillbits contains the operators on all drillbits. The memory also is adjusted based on the nodeLimit and // the ratio of their requirements. return allDrillbits; } + + private void checkIfWithinLimit(Map nodeResourcesMap, long nodeLimit) throws ExecutionSetupException { + for (Map.Entry entry : nodeResourcesMap.entrySet()) { + if (entry.getValue().getMemoryInBytes() > nodeLimit) { + logger.error(" Memory requirement for the query cannot be adjusted." + + " Memory requirement {} (in bytes) for a node {} is greater than limit {}", entry.getValue() + .getMemoryInBytes(), entry.getKey(), nodeLimit); + throw new ExecutionSetupException("Minimum memory requirement " + + entry.getValue().getMemoryInBytes() + " for a node " + entry.getKey() + " is greater than limit: " + nodeLimit); + } + } + } + + private String getJSONFromResourcesMap(Map resourcesMap) { + String json = ""; + try { + json = new ObjectMapper().writeValueAsString(resourcesMap.entrySet() + .stream() + .collect(Collectors.toMap(entry -> entry.getKey() + .toString(), Map.Entry::getValue))); + } catch (JsonProcessingException exception) { + logger.error(" Cannot convert the Node resources map to json "); + } + + return json; + } } \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java index 0212e088211..3d28067d563 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java @@ -17,19 +17,17 @@ */ package org.apache.drill.exec.planner.fragment; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.util.memory.ZKQueueMemoryAllocationUtilities; import org.apache.drill.exec.work.foreman.ForemanSetupException; - import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + public class Fragment implements Iterable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fragment.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index fdfa95c393a..d806b52e20a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -55,6 +55,7 @@ public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNo PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child); materializedSender.setOperatorId(0); materializedSender.setCost(exchange.getCost()); + materializedSender.setMaxAllocation(exchange.getMaxAllocation()); // logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child); return materializedSender; @@ -62,6 +63,7 @@ public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNo // receiving exchange. PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId()); materializedReceiver.setOperatorId(Short.MAX_VALUE & exchange.getOperatorId()); + materializedReceiver.setMaxAllocation(exchange.getMaxAllocation()); // logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver); materializedReceiver.setCost(exchange.getCost()); return materializedReceiver; @@ -70,8 +72,10 @@ public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNo @Override public PhysicalOperator visitGroupScan(GroupScan groupScan, IndexedFragmentNode iNode) throws ExecutionSetupException { + iNode.addAllocation(groupScan); SubScan child = groupScan.getSpecificScan(iNode.getMinorFragmentId()); child.setOperatorId(Short.MAX_VALUE & groupScan.getOperatorId()); + child.setMaxAllocation(groupScan.getMaxAllocation()); // remember the subscan for future use iNode.addSubScan(child); return child; @@ -89,11 +93,11 @@ public PhysicalOperator visitSubScan(SubScan subScan, IndexedFragmentNode value) @Override public PhysicalOperator visitStore(Store store, IndexedFragmentNode iNode) throws ExecutionSetupException { PhysicalOperator child = store.getChild().accept(this, iNode); - iNode.addAllocation(store); try { PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId()); + o.setMaxAllocation(store.getMaxAllocation()); o.setOperatorId(Short.MAX_VALUE & store.getOperatorId()); // logger.debug("New materialized store node {} with child {}", o, child); return o; @@ -112,6 +116,7 @@ public PhysicalOperator visitOp(PhysicalOperator op, IndexedFragmentNode iNode) } PhysicalOperator newOp = op.getNewWithChildren(children); newOp.setCost(op.getCost()); + newOp.setMaxAllocation(op.getMaxAllocation()); newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); return newOp; } @@ -128,6 +133,7 @@ public PhysicalOperator visitLateralJoin(LateralJoinPOP op, IndexedFragmentNode PhysicalOperator newOp = op.getNewWithChildren(children); newOp.setCost(op.getCost()); + newOp.setMaxAllocation(op.getMaxAllocation()); newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); ((LateralJoinPOP) newOp).setUnnestForLateralJoin(unnestForThisLateral); @@ -138,6 +144,7 @@ public PhysicalOperator visitLateralJoin(LateralJoinPOP op, IndexedFragmentNode public PhysicalOperator visitUnnest(UnnestPOP unnest, IndexedFragmentNode value) throws ExecutionSetupException { PhysicalOperator newOp = visitOp(unnest, value); value.addUnnest((UnnestPOP) newOp); + newOp.setMaxAllocation(unnest.getMaxAllocation()); return newOp; } @@ -157,6 +164,7 @@ public PhysicalOperator visitRowKeyJoin(RowKeyJoinPOP op, IndexedFragmentNode iN PhysicalOperator newOp = op.getNewWithChildren(children); newOp.setCost(op.getCost()); newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); + newOp.setMaxAllocation(op.getMaxAllocation()); ((RowKeyJoinPOP)newOp).setSubScanForRowKeyJoin(subScanInLeftInput); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java index d3d759ca437..4593c55f6ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java @@ -49,11 +49,13 @@ public class MemoryCalculator extends AbstractOpWrapperVisitor>> bufferedOperators; private final QueryContext queryContext; + private final long MINIMUM_MEMORY_FOR_BUFFER_OPERS; - public MemoryCalculator(PlanningSet planningSet, QueryContext context) { + public MemoryCalculator(PlanningSet planningSet, QueryContext context, long minMemory) { this.planningSet = planningSet; this.bufferedOperators = new HashMap<>(); this.queryContext = context; + this.MINIMUM_MEMORY_FOR_BUFFER_OPERS = minMemory; } // Helper method to compute the minor fragment count per drillbit. This method returns @@ -86,7 +88,7 @@ public Void visitSendingExchange(Exchange exchange, Wrapper fragment) throws Run getMinorFragCountPerDrillbit(fragment), // get the memory requirements for the sender operator. (x) -> exchange.getSenderMemory(receivingFragment.getWidth(), x.getValue())); - return visitOp(exchange, fragment); + return visit(exchange, fragment); } @Override @@ -117,19 +119,26 @@ public Void visitReceivingExchange(Exchange exchange, Wrapper fragment) throws R return null; } + private Void visit(PhysicalOperator op, Wrapper fragment) { + for (PhysicalOperator child : op) { + child.accept(this, fragment); + } + return null; + } + public List> getBufferedOperators(DrillNode endpoint) { return this.bufferedOperators.getOrDefault(endpoint, new ArrayList<>()); } @Override public Void visitOp(PhysicalOperator op, Wrapper fragment) { - long memoryCost = (int)Math.ceil(op.getCost().getMemoryCost()); + long memoryCost = (long)Math.ceil(op.getCost().getMemoryCost()); if (op.isBufferedOperator(queryContext)) { // If the operator is a buffered operator then get the memory estimates of the optimizer. // The memory estimates of the optimizer are for the whole operator spread across all the // minor fragments. Divide this memory estimation by fragment width to get the memory // requirement per minor fragment. - long memoryCostPerMinorFrag = (int)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()); + long memoryCostPerMinorFrag = Math.max((long)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()), MINIMUM_MEMORY_FOR_BUFFER_OPERS); Map drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment); Map x.getKey(), (x) -> Pair.of(op, - memoryCostPerMinorFrag * x.getValue()))); + memoryCostPerMinorFrag))); bufferedOperatorsPerDrillbit.entrySet().forEach((x) -> { bufferedOperators.putIfAbsent(x.getKey(), new ArrayList<>()); bufferedOperators.get(x.getKey()).add(x.getValue()); @@ -153,10 +162,7 @@ public Void visitOp(PhysicalOperator op, Wrapper fragment) { getMinorFragCountPerDrillbit(fragment), (x) -> memoryCost * x.getValue()); } - for (PhysicalOperator child : op) { - child.accept(this, fragment); - } - return null; + return visit(op, fragment); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ZKQueueParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ZKQueueParallelizer.java index 6e529224b9d..98fb2b32343 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ZKQueueParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ZKQueueParallelizer.java @@ -64,7 +64,6 @@ public void adjustMemory(PlanningSet planningSet, Set roots, } endpointMap = collector.getNodeMap(); - ZKQueueMemoryAllocationUtilities.planMemory(queryContext, this.resourceManager, endpointMap); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java index 3e1d6c79832..71fb70ebb18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java @@ -21,15 +21,18 @@ import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder; import org.apache.drill.common.logical.PlanProperties.PlanType; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.Screen; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler; import org.apache.drill.exec.planner.sql.handlers.SimpleCommandResult; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.direct.DirectGroupScan; import org.apache.drill.exec.store.pojo.PojoRecordReader; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -43,20 +46,24 @@ public static PhysicalPlan createDirectPlan(QueryContext context, boolean result @SuppressWarnings("unchecked") public static PhysicalPlan createDirectPlan(QueryContext context, T obj){ - return createDirectPlan(context.getCurrentEndpoint(), Collections.singletonList(obj), (Class) obj.getClass()); + return createDirectPlan(context, Collections.singletonList(obj), (Class) obj.getClass()); } - public static PhysicalPlan createDirectPlan(DrillbitEndpoint endpoint, List records, Class clazz){ + public static PhysicalPlan createDirectPlan(QueryContext context, List records, Class clazz){ PojoRecordReader reader = new PojoRecordReader<>(clazz, records); DirectGroupScan scan = new DirectGroupScan(reader); - Screen screen = new Screen(scan, endpoint); + Screen screen = new Screen(scan, context.getCurrentEndpoint()); PlanPropertiesBuilder propsBuilder = PlanProperties.builder(); propsBuilder.type(PlanType.APACHE_DRILL_PHYSICAL); propsBuilder.version(1); propsBuilder.resultMode(ResultMode.EXEC); propsBuilder.generator(DirectPlan.class.getSimpleName(), ""); + Collection pops = DefaultSqlHandler.getPops(screen); + for (PhysicalOperator pop : pops) { + pop.setCost(new PrelCostEstimates(context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE), 0)); + } return new PhysicalPlan(propsBuilder.build(), DefaultSqlHandler.getPops(screen)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java index 1318e6f47de..337fbfd76b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFilesHandler.java @@ -92,7 +92,7 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException { .map(fileStatus -> new ShowFilesCommandResult(new Records.File(wsSchema.getFullSchemaName(), wsSchema, fileStatus))) .collect(Collectors.toList()); - return DirectPlan.createDirectPlan(context.getCurrentEndpoint(), records, ShowFilesCommandResult.class); + return DirectPlan.createDirectPlan(context, records, ShowFilesCommandResult.class); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/rmblobmgr/RMConsistentBlobStoreManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/rmblobmgr/RMConsistentBlobStoreManager.java index befa4bce838..b140abd2a26 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/rmblobmgr/RMConsistentBlobStoreManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/rmblobmgr/RMConsistentBlobStoreManager.java @@ -404,7 +404,7 @@ private String updateBlobs(Map resourcesMap, QueryQueueCo throw new RMBlobUpdateException(String.format("Failed to update the cluster state blob and queue blob in a " + "transaction. [Details: %s]", exceptionStringBuilder.toString())); } - logger.debug("Successfully updated the blobs in a transaction. [Details: %s]", exceptionStringBuilder.toString()); + logger.debug("Successfully updated the blobs in a transaction. [Details: {}]", exceptionStringBuilder.toString()); // Reset the exceptionStringBuilder for next event exceptionStringBuilder.delete(0, exceptionStringBuilder.length()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index aad01c0eb16..4899715e674 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -425,6 +425,7 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep private void runPhysicalPlan(final PhysicalPlan plan, Pointer textPlan) throws ExecutionSetupException { validatePlan(plan); + queryRM.setCost(plan.totalCost()); final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM); if (enableRuntimeFilter) { runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext); @@ -433,7 +434,6 @@ private void runPhysicalPlan(final PhysicalPlan plan, Pointer textPlan) if (textPlan != null) { queryManager.setPlanText(textPlan.value); } - queryManager.setTotalCost(plan.totalCost()); work.applyPlan(drillbitContext.getPlanReader()); logWorkUnit(work); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java index c0cdefe21aa..96b06912dbb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java @@ -273,7 +273,6 @@ public boolean hasQueue() { @Override public void setCost(double cost) { - throw new UnsupportedOperationException("DistributedQueryRM doesn't support cost in double format"); } public void setCost(Map costOnAssignedEndpoints) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java index 4fb5a8f4417..d49cfd4f0ff 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java @@ -220,7 +220,7 @@ public void testAllocators() throws Exception { // Use some bogus operator type to create a new operator context. def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, - OperatorUtilities.getChildCount(physicalOperator1)); + OperatorUtilities.getChildCount(physicalOperator1), physicalOperator1.getMaxAllocation()); stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator()); // Add a couple of Operator Contexts @@ -234,7 +234,7 @@ public void testAllocators() throws Exception { OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3); def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, - OperatorUtilities.getChildCount(physicalOperator4)); + OperatorUtilities.getChildCount(physicalOperator4), physicalOperator4.getMaxAllocation()); stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator()); OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats); DrillBuf b22 = oContext22.getAllocator().buffer(2000000); @@ -248,7 +248,7 @@ public void testAllocators() throws Exception { // New fragment starts an operator that allocates an amount within the limit def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE, - OperatorUtilities.getChildCount(physicalOperator5)); + OperatorUtilities.getChildCount(physicalOperator5), physicalOperator5.getMaxAllocation()); stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator()); OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java index 9c2d5d8feca..b212e76b8dd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java @@ -655,7 +655,7 @@ public void testMergeLimit() { @Test public void testMetrics() { - OperatorStats stats = new OperatorStats(100, 101, 0, fixture.allocator()); + OperatorStats stats = new OperatorStats(100, 101, 0, fixture.allocator(), 0); SortMetrics metrics = new SortMetrics(stats); // Input stats diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java index cd6b0a9a280..577014eaafe 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java @@ -27,7 +27,7 @@ import org.apache.drill.exec.planner.fragment.PlanningSet; import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.fragment.Wrapper; -import org.apache.drill.exec.planner.fragment.common.DrillNode; +import org.apache.drill.common.DrillNode; import org.apache.drill.exec.pop.PopUnitTestBase; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared; @@ -44,9 +44,10 @@ import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterFixtureBuilder; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.HashMap; @@ -59,6 +60,8 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -90,7 +93,7 @@ private static final DrillbitEndpoint newDrillbitEndpoint(String address, int po UserBitShared.QueryId.getDefaultInstance()); private static Map onlineEndpoints; - private Map resources; + private Map totalResources; @AfterClass public static void close() throws Exception { @@ -102,7 +105,16 @@ private QueryResourceManager mockResourceManager() throws QueueSelectionExceptio final QueryQueueConfig queueConfig = mock(QueryQueueConfig.class); when(queueConfig.getMaxQueryMemoryInMBPerNode()).thenReturn(10L); + when(queueConfig.getQueueTotalMemoryInMB(anyInt())).thenReturn(100L); when(mockRM.selectQueue(any(NodeResources.class))).thenReturn(queueConfig); + when(mockRM.minimumOperatorMemory()).thenReturn(40L); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + totalResources = (Map) invocation.getArguments()[0]; + return null; + } + }).when(mockRM).setCost(any(Map.class)); return mockRM; } @@ -116,7 +128,7 @@ private final Wrapper mockWrapper(Wrapper rootFragment, List mockdependencies = new ArrayList<>(); for (Wrapper dependency : rootFragment.getFragmentDependencies()) { - mockdependencies.add(mockWrapper(dependency, resourceMap, endpoints, originalToMockWrapper)); + mockdependencies.add(mockWrapper(dependency, getNodeResources(), endpoints, originalToMockWrapper)); } when(mockWrapper.getNode()).thenReturn(rootFragment.getNode()); @@ -129,11 +141,9 @@ private final Wrapper mockWrapper(Wrapper rootFragment, } private final PlanningSet mockPlanningSet(PlanningSet planningSet, - Map resourceMap, List endpoints) { Map wrapperToMockWrapper = new HashMap<>(); - Wrapper rootFragment = mockWrapper( planningSet.getRootWrapper(), resourceMap, - endpoints, wrapperToMockWrapper); + Wrapper rootFragment = mockWrapper(planningSet.getRootWrapper(), getNodeResources(), endpoints, wrapperToMockWrapper); PlanningSet mockPlanningSet = mock(PlanningSet.class); when(mockPlanningSet.getRootWrapper()).thenReturn(rootFragment); when(mockPlanningSet.get(any(Fragment.class))).thenAnswer(invocation -> { @@ -196,10 +206,13 @@ private Fragment getRootFragmentFromPlan(DrillbitContext context, } private PlanningSet preparePlanningSet(List activeEndpoints, long slice_target, - Map resources, String sql, - SimpleParallelizer parallelizer) throws Exception { + String sql, SimpleParallelizer parallelizer) throws Exception { Fragment rootFragment = getRootFragmentFromPlan(drillbitContext, getPlanForQuery(sql, 10, slice_target)); - return mockPlanningSet(parallelizer.prepareFragmentTree(rootFragment), resources, activeEndpoints); + return mockPlanningSet(parallelizer.prepareFragmentTree(rootFragment), activeEndpoints); + } + + private Map getNodeResources() { + return onlineEndpoints.keySet().stream().collect(Collectors.toMap(x -> DrillNode.create(x), x -> NodeResources.create())); } @BeforeClass @@ -207,21 +220,14 @@ public static void setupForAllTests() { onlineEndpoints = getEndpoints(2, new HashSet<>()); } - @Before - public void setupForEachTest() { - // Have to create separately for each test since it is updated my MemoryCalculator during merge - resources = onlineEndpoints.keySet().stream().collect(Collectors.toMap(x -> DrillNode.create(x), - x -> NodeResources.create())); - } - @Test public void TestSingleMajorFragmentWithProjectAndScan() throws Exception { String sql = "SELECT * from cp.`tpch/nation.parquet`"; SimpleParallelizer parallelizer = new DistributedQueueParallelizer(false, queryContext, mockResourceManager()); - PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), DEFAULT_SLICE_TARGET, resources, sql, parallelizer); + PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), DEFAULT_SLICE_TARGET, sql, parallelizer); parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), onlineEndpoints); - assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 30)); + assertTrue("memory requirement is different", Iterables.all(totalResources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 30)); } @@ -230,21 +236,20 @@ public void TestSingleMajorFragmentWithGroupByProjectAndScan() throws Exception String sql = "SELECT dept_id, count(*) from cp.`tpch/lineitem.parquet` group by dept_id"; SimpleParallelizer parallelizer = new DistributedQueueParallelizer(false, queryContext, mockResourceManager()); - PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), DEFAULT_SLICE_TARGET, resources, sql, parallelizer); + PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), DEFAULT_SLICE_TARGET, sql, parallelizer); parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), onlineEndpoints); - assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 529570)); + assertTrue("memory requirement is different", Iterables.all(totalResources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 529570)); } @Test - public void TestTwoMajorFragmentWithSortyProjectAndScan() throws Exception { + public void TestTwoMajorFragmentWithSortProjectAndScan() throws Exception { String sql = "SELECT * from cp.`tpch/lineitem.parquet` order by dept_id"; SimpleParallelizer parallelizer = new DistributedQueueParallelizer(false, queryContext, mockResourceManager()); - PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), 2, resources, sql, - parallelizer); + PlanningSet planningSet = preparePlanningSet(new ArrayList<>(onlineEndpoints.keySet()), 2, sql, parallelizer); parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), onlineEndpoints); - assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 481490)); + assertTrue("memory requirement is different", Iterables.all(totalResources.entrySet(), (e) -> e.getValue().getMemoryInBytes() == 481460)); } @Test diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java index 01c06a4a664..37eb3556d14 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java @@ -77,7 +77,7 @@ public void testSimpleIterator() throws Throwable { RecordBatch singleBatch = exec.getIncoming(); PhysicalOperator dummyPop = operatorList.iterator().next(); OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, - OperatorUtilities.getChildCount(dummyPop)); + OperatorUtilities.getChildCount(dummyPop), dummyPop.getMaxAllocation()); OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator()); RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false, null); int totalRecords = 0; @@ -133,7 +133,7 @@ public void testMarkResetIterator() throws Throwable { RecordBatch singleBatch = exec.getIncoming(); PhysicalOperator dummyPop = operatorList.iterator().next(); OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, - OperatorUtilities.getChildCount(dummyPop)); + OperatorUtilities.getChildCount(dummyPop), dummyPop.getMaxAllocation()); OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator()); RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, null); List vectors; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java index 5504382dc09..b5ca7351d4e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java @@ -67,7 +67,7 @@ public void testIOStats() throws Exception { InputStream is = null; Configuration conf = new Configuration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); - OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/); + OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/, 0 /*optimalMemoryAllocation*/); OperatorStats stats = new OperatorStats(profileDef, null /*allocator*/); // start wait time method in OperatorStats expects the OperatorStats state to be in "processing" diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java index 3d9190894ac..bc72bd8b781 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java @@ -438,7 +438,7 @@ public MockOperatorContext(FragmentContext fragContext, BufferAllocator allocator, PhysicalOperator config) { super(fragContext, allocator, config); - this.operatorStats = new OperatorStats(new OpProfileDef(0, 0, 100), allocator); + this.operatorStats = new OperatorStats(new OpProfileDef(0, 0, 100, 0), allocator); } @Override diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java index 932872c5163..4babfb18148 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java @@ -2377,6 +2377,8 @@ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.ex if(message.hasWaitNanos()) output.writeInt64(9, message.getWaitNanos(), false); + if(message.hasOptimalMemAllocation()) + output.writeInt64(10, message.getOptimalMemAllocation(), false); } public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.OperatorProfile message) { @@ -2442,6 +2444,9 @@ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.ex case 9: builder.setWaitNanos(input.readInt64()); break; + case 10: + builder.setOptimalMemAllocation(input.readInt64()); + break; default: input.handleUnknownField(number, this); } @@ -2490,6 +2495,7 @@ public static java.lang.String getFieldName(int number) case 7: return "peakLocalMemoryAllocated"; case 8: return "metric"; case 9: return "waitNanos"; + case 10: return "optimalMemAllocation"; default: return null; } } @@ -2509,6 +2515,7 @@ public static int getFieldNumber(java.lang.String name) fieldMap.put("peakLocalMemoryAllocated", 7); fieldMap.put("metric", 8); fieldMap.put("waitNanos", 9); + fieldMap.put("optimalMemAllocation", 10); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 5f30015ef4f..452d90e3bda 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -22363,6 +22363,15 @@ org.apache.drill.exec.proto.UserBitShared.MetricValueOrBuilder getMetricOrBuilde * optional int64 wait_nanos = 9; */ long getWaitNanos(); + + /** + * optional int64 optimal_mem_allocation = 10; + */ + boolean hasOptimalMemAllocation(); + /** + * optional int64 optimal_mem_allocation = 10; + */ + long getOptimalMemAllocation(); } /** * Protobuf type {@code exec.shared.OperatorProfile} @@ -22385,6 +22394,7 @@ private OperatorProfile() { peakLocalMemoryAllocated_ = 0L; metric_ = java.util.Collections.emptyList(); waitNanos_ = 0L; + optimalMemAllocation_ = 0L; } @java.lang.Override @@ -22459,6 +22469,11 @@ private OperatorProfile( waitNanos_ = input.readInt64(); break; } + case 80: { + bitField0_ |= 0x00000040; + optimalMemAllocation_ = input.readInt64(); + break; + } default: { if (!parseUnknownField( input, unknownFields, extensionRegistry, tag)) { @@ -22658,6 +22673,21 @@ public long getWaitNanos() { return waitNanos_; } + public static final int OPTIMAL_MEM_ALLOCATION_FIELD_NUMBER = 10; + private long optimalMemAllocation_; + /** + * optional int64 optimal_mem_allocation = 10; + */ + public boolean hasOptimalMemAllocation() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional int64 optimal_mem_allocation = 10; + */ + public long getOptimalMemAllocation() { + return optimalMemAllocation_; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -22696,6 +22726,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeInt64(9, waitNanos_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeInt64(10, optimalMemAllocation_); + } unknownFields.writeTo(output); } @@ -22737,6 +22770,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeInt64Size(9, waitNanos_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(10, optimalMemAllocation_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -22787,6 +22824,11 @@ public boolean equals(final java.lang.Object obj) { result = result && (getWaitNanos() == other.getWaitNanos()); } + result = result && (hasOptimalMemAllocation() == other.hasOptimalMemAllocation()); + if (hasOptimalMemAllocation()) { + result = result && (getOptimalMemAllocation() + == other.getOptimalMemAllocation()); + } result = result && unknownFields.equals(other.unknownFields); return result; } @@ -22834,6 +22876,11 @@ public int hashCode() { hash = (53 * hash) + com.google.protobuf.Internal.hashLong( getWaitNanos()); } + if (hasOptimalMemAllocation()) { + hash = (37 * hash) + OPTIMAL_MEM_ALLOCATION_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + getOptimalMemAllocation()); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -22993,6 +23040,8 @@ public Builder clear() { } waitNanos_ = 0L; bitField0_ = (bitField0_ & ~0x00000080); + optimalMemAllocation_ = 0L; + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -23063,6 +23112,10 @@ public org.apache.drill.exec.proto.UserBitShared.OperatorProfile buildPartial() to_bitField0_ |= 0x00000020; } result.waitNanos_ = waitNanos_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000040; + } + result.optimalMemAllocation_ = optimalMemAllocation_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -23182,6 +23235,9 @@ public Builder mergeFrom(org.apache.drill.exec.proto.UserBitShared.OperatorProfi if (other.hasWaitNanos()) { setWaitNanos(other.getWaitNanos()); } + if (other.hasOptimalMemAllocation()) { + setOptimalMemAllocation(other.getOptimalMemAllocation()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -23883,6 +23939,38 @@ public Builder clearWaitNanos() { onChanged(); return this; } + + private long optimalMemAllocation_ ; + /** + * optional int64 optimal_mem_allocation = 10; + */ + public boolean hasOptimalMemAllocation() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional int64 optimal_mem_allocation = 10; + */ + public long getOptimalMemAllocation() { + return optimalMemAllocation_; + } + /** + * optional int64 optimal_mem_allocation = 10; + */ + public Builder setOptimalMemAllocation(long value) { + bitField0_ |= 0x00000100; + optimalMemAllocation_ = value; + onChanged(); + return this; + } + /** + * optional int64 optimal_mem_allocation = 10; + */ + public Builder clearOptimalMemAllocation() { + bitField0_ = (bitField0_ & ~0x00000100); + optimalMemAllocation_ = 0L; + onChanged(); + return this; + } @java.lang.Override public final Builder setUnknownFields( final com.google.protobuf.UnknownFieldSet unknownFields) { @@ -27858,64 +27946,65 @@ public org.apache.drill.exec.proto.UserBitShared.SaslMessage getDefaultInstanceF "y_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n" + "\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022" + "\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 " + - "\001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profile" + + "\001(\003\"\237\002\n\017OperatorProfile\0221\n\rinput_profile" + "\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013op" + "erator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023" + "\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001" + "(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022" + "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" + - "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" + - "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" + - "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " + - "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030" + - "\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.sh" + - "ared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022functio" + - "n_signature\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmech" + - "anism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(" + - "\0162\027.exec.shared.SaslStatus*5\n\nRpcChannel" + - "\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020" + - "\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010" + - "PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_ST" + - "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" + - "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" + - "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" + - "\032\n\026CANCELLATION_REQUESTED\020\006*\374\t\n\020CoreOper" + - "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" + - "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" + - "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" + - "_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGIN" + - "G_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER" + - "\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022" + - "\032\n\026RANGE_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r\022" + - "\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMI" + - "NG_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERN" + - "AL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_S" + - "ORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIV" + - "E_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rM" + - "OCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DI" + - "RECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT" + - "_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_S" + - "CHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n" + - "\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!" + - "\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAV" + - "RO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFK" + - "A_SUB_SCAN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATT" + - "EN\020(\022\020\n\014LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HI" + - "VE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN\020+" + - "\022\r\n\tJDBC_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017M" + - "APRDB_SUB_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013" + - "KUDU_WRITER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n" + - "\013JSON_WRITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022" + - "\n\016IMAGE_SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\020" + - "5\022\023\n\017PARTITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCA" + - "N\0207\022\022\n\016RUNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\0209" + - "\022\023\n\017SYSLOG_SUB_SCAN\020:\022\030\n\024STATISTICS_AGGR" + - "EGATE\020;\022\020\n\014UNPIVOT_MAPS\020<\022\024\n\020STATISTICS_" + - "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>*g\n\nSaslStatus" + - "\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SA" + - "SL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SA" + - "SL_FAILED\020\004B.\n\033org.apache.drill.exec.pro" + - "toB\rUserBitSharedH\001" + "e\022\022\n\nwait_nanos\030\t \001(\003\022\036\n\026optimal_mem_all" + + "ocation\030\n \001(\003\"B\n\rStreamProfile\022\017\n\007record" + + "s\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(" + + "\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nl" + + "ong_value\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n" + + "\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar" + + "\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022function_signat" + + "ure\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 " + + "\001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec" + + ".shared.SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_" + + "CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQue" + + "ryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL" + + "\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020" + + "\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAI" + + "TING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISH" + + "ED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCE" + + "LLATION_REQUESTED\020\006*\374\t\n\020CoreOperatorType" + + "\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020" + + "\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHAS" + + "H_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITI" + + "ON_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIV" + + "ER\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013\n\007PR" + + "OJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\032\n\026RANGE" + + "_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELEC" + + "TION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGRE" + + "GATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020" + + "\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n" + + "\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SC" + + "AN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_" + + "SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB" + + "_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCA" + + "N\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SU" + + "B_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCE" + + "R_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WIND" + + "OW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_S" + + "CAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFKA_SUB_SC" + + "AN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014" + + "LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HIVE_DRILL" + + "_NATIVE_PARQUET_ROW_GROUP_SCAN\020+\022\r\n\tJDBC" + + "_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017MAPRDB_SU" + + "B_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRI" + + "TER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WR" + + "ITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_" + + "SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PAR" + + "TITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016R" + + "UNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\0209\022\023\n\017SYSL" + + "OG_SUB_SCAN\020:\022\030\n\024STATISTICS_AGGREGATE\020;\022" + + "\020\n\014UNPIVOT_MAPS\020<\022\024\n\020STATISTICS_MERGE\020=\022" + + "\021\n\rLTSV_SUB_SCAN\020>*g\n\nSaslStatus\022\020\n\014SASL" + + "_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PR" + + "OGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILE" + + "D\020\004B.\n\033org.apache.drill.exec.protoB\rUser" + + "BitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -28033,7 +28122,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_exec_shared_OperatorProfile_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_exec_shared_OperatorProfile_descriptor, - new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "PeakLocalMemoryAllocated", "Metric", "WaitNanos", }); + new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "PeakLocalMemoryAllocated", "Metric", "WaitNanos", "OptimalMemAllocation", }); internal_static_exec_shared_StreamProfile_descriptor = getDescriptor().getMessageTypes().get(17); internal_static_exec_shared_StreamProfile_fieldAccessorTable = new diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java index d6275fa26c3..224214a69dd 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java @@ -57,6 +57,7 @@ public static OperatorProfile getDefaultInstance() private long peakLocalMemoryAllocated; private List metric; private long waitNanos; + private long optimalMemAllocation; public OperatorProfile() { @@ -169,6 +170,19 @@ public OperatorProfile setWaitNanos(long waitNanos) return this; } + // optimalMemAllocation + + public long getOptimalMemAllocation() + { + return optimalMemAllocation; + } + + public OperatorProfile setOptimalMemAllocation(long optimalMemAllocation) + { + this.optimalMemAllocation = optimalMemAllocation; + return this; + } + // java serialization public void readExternal(ObjectInput in) throws IOException @@ -253,6 +267,9 @@ public void mergeFrom(Input input, OperatorProfile message) throws IOException case 9: message.waitNanos = input.readInt64(); break; + case 10: + message.optimalMemAllocation = input.readInt64(); + break; default: input.handleUnknownField(number, this); } @@ -299,6 +316,9 @@ public void writeTo(Output output, OperatorProfile message) throws IOException if(message.waitNanos != 0) output.writeInt64(9, message.waitNanos, false); + + if(message.optimalMemAllocation != 0) + output.writeInt64(10, message.optimalMemAllocation, false); } public String getFieldName(int number) @@ -313,6 +333,7 @@ public String getFieldName(int number) case 7: return "peakLocalMemoryAllocated"; case 8: return "metric"; case 9: return "waitNanos"; + case 10: return "optimalMemAllocation"; default: return null; } } @@ -334,6 +355,7 @@ public int getFieldNumber(String name) __fieldMap.put("peakLocalMemoryAllocated", 7); __fieldMap.put("metric", 8); __fieldMap.put("waitNanos", 9); + __fieldMap.put("optimalMemAllocation", 10); } } diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 4d057d59e3a..46d28eb4146 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -268,6 +268,7 @@ message OperatorProfile { optional int64 peak_local_memory_allocated = 7; repeated MetricValue metric = 8; optional int64 wait_nanos = 9; + optional int64 optimal_mem_allocation = 10; } message StreamProfile {