Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription: register metrics to count the number of pipe events in sink queue and prefetching queue in subscription task #13575

Merged
merged 7 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,14 @@ public void executePrefetch(final String consumerGroupId, final String topicName
}
broker.executePrefetch(topicName);
}

public int getPipeEventCount(final String consumerGroupId, final String topicName) {
final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn(
"Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId);
return 0;
}
return broker.getPipeEventCount(topicName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,24 @@ public void executePrefetch(final String topicName) {
}
prefetchingQueue.executePrefetch();
}

public int getPipeEventCount(final String topicName) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] does not exist",
topicName,
brokerId);
return 0;
}
if (prefetchingQueue.isClosed()) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed",
topicName,
brokerId);
return 0;
}
return prefetchingQueue.getPipeEventCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -531,14 +531,26 @@ public static String generatePrefetchingQueueId(
return consumerGroupId + "_" + topicName;
}

public long getUncommittedEventCount() {
public long getSubscriptionUncommittedEventCount() {
return inFlightEvents.size();
}

public long getCurrentCommitId() {
return commitIdGenerator.get();
}

public int getPipeEventCount() {
return inputPendingQueue.size()
+ prefetchingQueue.stream()
.map(SubscriptionEvent::getPipeEventCount)
.reduce(Integer::sum)
.orElse(0)
+ +inFlightEvents.values().stream()
.map(SubscriptionEvent::getPipeEventCount)
.reduce(Integer::sum)
.orElse(0);
}

/////////////////////////////// close & termination ///////////////////////////////

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ public String getFileName() {
return pipeEvents.getTsFile().getName();
}

/////////////////////////////// APIs provided for metric framework ///////////////////////////////

public int getPipeEventCount() {
return pipeEvents.getPipeEventCount();
}

/////////////////////////////// object ///////////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public SubscriptionPipeTabletEventBatch(

@Override
public synchronized boolean onEvent(final Consumer<SubscriptionEvent> consumer) {
if (shouldEmit()) {
if (shouldEmit() && !enrichedEvents.isEmpty()) {
if (Objects.isNull(events)) {
events = generateSubscriptionEvents();
}
Expand Down Expand Up @@ -98,6 +98,8 @@ public synchronized void cleanUp() {
for (final EnrichedEvent enrichedEvent : enrichedEvents) {
enrichedEvent.clearReferenceCount(this.getClass().getName());
}
enrichedEvents.clear();
tablets.clear();
}

public synchronized void ack() {
Expand Down Expand Up @@ -221,4 +223,10 @@ private static String formatEnrichedEvents(
}
return eventMessageList.toString();
}

//////////////////////////// APIs provided for metric framework ////////////////////////////

public int getPipeEventCount() {
return enrichedEvents.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
public class SubscriptionPipeTsFileEventBatch extends SubscriptionPipeEventBatch {

private final PipeTabletEventTsFileBatch batch;
private final List<EnrichedEvent> enrichedEvents;

public SubscriptionPipeTsFileEventBatch(
final int regionId,
Expand All @@ -51,11 +52,12 @@ public SubscriptionPipeTsFileEventBatch(
final long maxBatchSizeInBytes) {
super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes);
this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs, maxBatchSizeInBytes);
this.enrichedEvents = new ArrayList<>();
}

@Override
public synchronized boolean onEvent(final Consumer<SubscriptionEvent> consumer) throws Exception {
if (batch.shouldEmit()) {
if (batch.shouldEmit() && !enrichedEvents.isEmpty()) {
if (Objects.isNull(events)) {
events = generateSubscriptionEvents();
}
Expand All @@ -74,6 +76,7 @@ public synchronized boolean onEvent(
throws Exception {
if (event instanceof TabletInsertionEvent) {
batch.onEvent((TabletInsertionEvent) event); // no exceptions will be thrown
enrichedEvents.add(event);
event.decreaseReferenceCount(
SubscriptionPipeTsFileEventBatch.class.getName(),
false); // missing releaseLastEvent decreases reference count
Expand All @@ -85,6 +88,7 @@ public synchronized boolean onEvent(
public synchronized void cleanUp() {
// close batch, it includes clearing the reference count of events
batch.close();
enrichedEvents.clear();
}

public synchronized void ack() {
Expand Down Expand Up @@ -128,4 +132,10 @@ protected Map<String, String> coreReportMessage() {
coreReportMessage.put("batch", batch.toString());
return coreReportMessage;
}

//////////////////////////// APIs provided for metric framework ////////////////////////////

public int getPipeEventCount() {
return enrichedEvents.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,17 @@ public void ack() {}
@Override
public void cleanUp() {}

/////////////////////////////// stringify ///////////////////////////////

@Override
public String toString() {
return "SubscriptionEmptyPipeEvent";
}

//////////////////////////// APIs provided for metric framework ////////////////////////////

@Override
public int getPipeEventCount() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public interface SubscriptionPipeEvents {
void ack();

void cleanUp();

//////////////////////////// APIs provided for metric framework ////////////////////////////

int getPipeEventCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,17 @@ public void cleanUp() {
batch.cleanUp();
}

/////////////////////////////// stringify ///////////////////////////////

@Override
public String toString() {
return "SubscriptionPipeTabletBatchEvents{batch=" + batch + "}";
}

//////////////////////////// APIs provided for metric framework ////////////////////////////

@Override
public int getPipeEventCount() {
return batch.getPipeEventCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class SubscriptionPipeTsFileBatchEvents implements SubscriptionPipeEvents
private final SubscriptionPipeTsFileEventBatch batch;
private final File tsFile;
private final AtomicInteger referenceCount; // shared between the same batch
private final int count; // snapshot the initial reference count, used for event count calculation

public SubscriptionPipeTsFileBatchEvents(
final SubscriptionPipeTsFileEventBatch batch,
Expand All @@ -37,6 +38,7 @@ public SubscriptionPipeTsFileBatchEvents(
this.batch = batch;
this.tsFile = tsFile;
this.referenceCount = referenceCount;
this.count = Math.max(1, referenceCount.get());
}

@Override
Expand All @@ -58,6 +60,8 @@ public void cleanUp() {
}
}

/////////////////////////////// stringify ///////////////////////////////

@Override
public String toString() {
return "SubscriptionPipeTsFileBatchEvents{batch="
Expand All @@ -68,4 +72,13 @@ public String toString() {
+ referenceCount
+ "}";
}

//////////////////////////// APIs provided for metric framework ////////////////////////////

@Override
public int getPipeEventCount() {
// Since multiple events will share the same batch, equal division is performed here.
// If it is not exact, round up to remain pessimistic.
return (batch.getPipeEventCount() + count - 1) / count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,19 @@ public void cleanUp() {
tsFileInsertionEvent.clearReferenceCount(this.getClass().getName());
}

/////////////////////////////// stringify ///////////////////////////////

@Override
public String toString() {
return "SubscriptionPipeTsFilePlainEvent{tsFileInsertionEvent="
+ tsFileInsertionEvent.coreReportMessage()
+ "}";
}

//////////////////////////// APIs provided for metric framework ////////////////////////////

@Override
public int getPipeEventCount() {
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void createAutoGauge(final String id) {
Metric.SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT.toString(),
MetricLevel.IMPORTANT,
queue,
SubscriptionPrefetchingQueue::getUncommittedEventCount,
SubscriptionPrefetchingQueue::getSubscriptionUncommittedEventCount,
Tag.NAME.toString(),
queue.getPrefetchingQueueId());
// current commit id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionConnectorSubtask extends PipeConnectorSubtask {

private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConnectorSubtask.class);

private final String topicName;
private final String consumerGroupId;

Expand Down Expand Up @@ -72,4 +77,13 @@ public String getConsumerGroupId() {
public UnboundedBlockingPendingQueue<Event> getInputPendingQueue() {
return inputPendingQueue;
}

//////////////////////////// APIs provided for metric framework ////////////////////////////

@Override
public int getEventCount(final String pipeName) {
// count the number of pipe events in sink queue and prefetching queue, note that can safely
// ignore lastEvent
return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId, topicName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeRealtimePriorityBlockingQueue;
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
Expand Down Expand Up @@ -157,6 +158,8 @@ public synchronized String register(
final PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
lifeCycle.register();
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.register(lifeCycle.getSubtask(), environment.getPipeName(), environment.getCreationTime());

return attributeSortedString;
}
Expand Down
Loading