diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index 7f4f187ebd78..13770fdbf7c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -203,4 +203,14 @@ public void executePrefetch(final String consumerGroupId, final String topicName } broker.executePrefetch(topicName); } + + public int getPipeEventCount(final String consumerGroupId, final String topicName) { + final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); + if (Objects.isNull(broker)) { + LOGGER.warn( + "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); + return 0; + } + return broker.getPipeEventCount(topicName); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index f6fc757f5ceb..420b571103a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -368,4 +368,24 @@ public void executePrefetch(final String topicName) { } prefetchingQueue.executePrefetch(); } + + public int getPipeEventCount(final String topicName) { + final SubscriptionPrefetchingQueue prefetchingQueue = + topicNameToPrefetchingQueue.get(topicName); + if (Objects.isNull(prefetchingQueue)) { + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist", + topicName, + brokerId); + return 0; + } + if (prefetchingQueue.isClosed()) { + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", + topicName, + brokerId); + return 0; + } + return prefetchingQueue.getPipeEventCount(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index e23f5fb05275..ac5b3fad279c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -531,7 +531,7 @@ public static String generatePrefetchingQueueId( return consumerGroupId + "_" + topicName; } - public long getUncommittedEventCount() { + public long getSubscriptionUncommittedEventCount() { return inFlightEvents.size(); } @@ -539,6 +539,18 @@ public long getCurrentCommitId() { return commitIdGenerator.get(); } + public int getPipeEventCount() { + return inputPendingQueue.size() + + prefetchingQueue.stream() + .map(SubscriptionEvent::getPipeEventCount) + .reduce(Integer::sum) + .orElse(0) + + +inFlightEvents.values().stream() + .map(SubscriptionEvent::getPipeEventCount) + .reduce(Integer::sum) + .orElse(0); + } + /////////////////////////////// close & termination /////////////////////////////// public boolean isClosed() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index 2f7902f1c6b1..ad274023f6a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -415,6 +415,12 @@ public String getFileName() { return pipeEvents.getTsFile().getName(); } + /////////////////////////////// APIs provided for metric framework /////////////////////////////// + + public int getPipeEventCount() { + return pipeEvents.getPipeEventCount(); + } + /////////////////////////////// object /////////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index 54363ac8c7cb..871b1c12587f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -70,7 +70,7 @@ public SubscriptionPipeTabletEventBatch( @Override public synchronized boolean onEvent(final Consumer consumer) { - if (shouldEmit()) { + if (shouldEmit() && !enrichedEvents.isEmpty()) { if (Objects.isNull(events)) { events = generateSubscriptionEvents(); } @@ -98,6 +98,8 @@ public synchronized void cleanUp() { for (final EnrichedEvent enrichedEvent : enrichedEvents) { enrichedEvent.clearReferenceCount(this.getClass().getName()); } + enrichedEvents.clear(); + tablets.clear(); } public synchronized void ack() { @@ -221,4 +223,10 @@ private static String formatEnrichedEvents( } return eventMessageList.toString(); } + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + public int getPipeEventCount() { + return enrichedEvents.size(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index 2b5e29f7f6c8..e38887e19f2f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -43,6 +43,7 @@ public class SubscriptionPipeTsFileEventBatch extends SubscriptionPipeEventBatch { private final PipeTabletEventTsFileBatch batch; + private final List enrichedEvents; public SubscriptionPipeTsFileEventBatch( final int regionId, @@ -51,11 +52,12 @@ public SubscriptionPipeTsFileEventBatch( final long maxBatchSizeInBytes) { super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes); this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs, maxBatchSizeInBytes); + this.enrichedEvents = new ArrayList<>(); } @Override public synchronized boolean onEvent(final Consumer consumer) throws Exception { - if (batch.shouldEmit()) { + if (batch.shouldEmit() && !enrichedEvents.isEmpty()) { if (Objects.isNull(events)) { events = generateSubscriptionEvents(); } @@ -74,6 +76,7 @@ public synchronized boolean onEvent( throws Exception { if (event instanceof TabletInsertionEvent) { batch.onEvent((TabletInsertionEvent) event); // no exceptions will be thrown + enrichedEvents.add(event); event.decreaseReferenceCount( SubscriptionPipeTsFileEventBatch.class.getName(), false); // missing releaseLastEvent decreases reference count @@ -85,6 +88,7 @@ public synchronized boolean onEvent( public synchronized void cleanUp() { // close batch, it includes clearing the reference count of events batch.close(); + enrichedEvents.clear(); } public synchronized void ack() { @@ -128,4 +132,10 @@ protected Map coreReportMessage() { coreReportMessage.put("batch", batch.toString()); return coreReportMessage; } + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + public int getPipeEventCount() { + return enrichedEvents.size(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java index d0556cd15b7f..3e74e03d8f4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java @@ -34,8 +34,17 @@ public void ack() {} @Override public void cleanUp() {} + /////////////////////////////// stringify /////////////////////////////// + @Override public String toString() { return "SubscriptionEmptyPipeEvent"; } + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + @Override + public int getPipeEventCount() { + return 0; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java index 813690dd00b0..489c4cf8120c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java @@ -31,4 +31,8 @@ public interface SubscriptionPipeEvents { void ack(); void cleanUp(); + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + int getPipeEventCount(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java index fb9e31f77c2b..226367405eba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java @@ -46,8 +46,17 @@ public void cleanUp() { batch.cleanUp(); } + /////////////////////////////// stringify /////////////////////////////// + @Override public String toString() { return "SubscriptionPipeTabletBatchEvents{batch=" + batch + "}"; } + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + @Override + public int getPipeEventCount() { + return batch.getPipeEventCount(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java index aa9a3593a93b..5cae21f5ac8a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java @@ -29,6 +29,7 @@ public class SubscriptionPipeTsFileBatchEvents implements SubscriptionPipeEvents private final SubscriptionPipeTsFileEventBatch batch; private final File tsFile; private final AtomicInteger referenceCount; // shared between the same batch + private final int count; // snapshot the initial reference count, used for event count calculation public SubscriptionPipeTsFileBatchEvents( final SubscriptionPipeTsFileEventBatch batch, @@ -37,6 +38,7 @@ public SubscriptionPipeTsFileBatchEvents( this.batch = batch; this.tsFile = tsFile; this.referenceCount = referenceCount; + this.count = Math.max(1, referenceCount.get()); } @Override @@ -58,6 +60,8 @@ public void cleanUp() { } } + /////////////////////////////// stringify /////////////////////////////// + @Override public String toString() { return "SubscriptionPipeTsFileBatchEvents{batch=" @@ -68,4 +72,13 @@ public String toString() { + referenceCount + "}"; } + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + @Override + public int getPipeEventCount() { + // Since multiple events will share the same batch, equal division is performed here. + // If it is not exact, round up to remain pessimistic. + return (batch.getPipeEventCount() + count - 1) / count; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java index c210635a0615..111006fa6d32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java @@ -47,10 +47,19 @@ public void cleanUp() { tsFileInsertionEvent.clearReferenceCount(this.getClass().getName()); } + /////////////////////////////// stringify /////////////////////////////// + @Override public String toString() { return "SubscriptionPipeTsFilePlainEvent{tsFileInsertionEvent=" + tsFileInsertionEvent.coreReportMessage() + "}"; } + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + @Override + public int getPipeEventCount() { + return 1; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java index d38de0d709e1..4e712fb29722 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java @@ -92,7 +92,7 @@ private void createAutoGauge(final String id) { Metric.SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT.toString(), MetricLevel.IMPORTANT, queue, - SubscriptionPrefetchingQueue::getUncommittedEventCount, + SubscriptionPrefetchingQueue::getSubscriptionUncommittedEventCount, Tag.NAME.toString(), queue.getPrefetchingQueueId()); // current commit id diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java index 8524e77d040e..3e540254d8e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java @@ -25,8 +25,13 @@ import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.event.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class SubscriptionConnectorSubtask extends PipeConnectorSubtask { + private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConnectorSubtask.class); + private final String topicName; private final String consumerGroupId; @@ -72,4 +77,13 @@ public String getConsumerGroupId() { public UnboundedBlockingPendingQueue getInputPendingQueue() { return inputPendingQueue; } + + //////////////////////////// APIs provided for metric framework //////////////////////////// + + @Override + public int getEventCount(final String pipeName) { + // count the number of pipe events in sink queue and prefetching queue, note that can safely + // ignore lastEvent + return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId, topicName); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java index dfce62faac8b..e0707ebf5473 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask; import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtaskLifeCycle; import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeRealtimePriorityBlockingQueue; +import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -157,6 +158,8 @@ public synchronized String register( final PipeConnectorSubtaskLifeCycle lifeCycle = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); lifeCycle.register(); + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .register(lifeCycle.getSubtask(), environment.getPipeName(), environment.getCreationTime()); return attributeSortedString; }