Skip to content

Commit 8351c07

Browse files
authored
[feat][broker] PIP-264: Add OpenTelemetry managed cursor metrics (apache#23000)
1 parent dd1b579 commit 8351c07

File tree

7 files changed

+321
-12
lines changed

7 files changed

+321
-12
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java

+8
Original file line numberDiff line numberDiff line change
@@ -877,4 +877,12 @@ default boolean periodicRollover() {
877877
return false;
878878
}
879879

880+
/**
881+
* Get the attributes associated with the cursor.
882+
*
883+
* @return the attributes associated with the cursor
884+
*/
885+
default ManagedCursorAttributes getManagedCursorAttributes() {
886+
return new ManagedCursorAttributes(this);
887+
}
880888
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger;
20+
21+
import io.opentelemetry.api.common.Attributes;
22+
import lombok.Getter;
23+
import org.apache.pulsar.common.naming.TopicName;
24+
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
25+
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedCursorOperationStatus;
26+
27+
@Getter
28+
public class ManagedCursorAttributes {
29+
30+
private final Attributes attributes;
31+
private final Attributes attributesOperationSucceed;
32+
private final Attributes attributesOperationFailure;
33+
34+
public ManagedCursorAttributes(ManagedCursor cursor) {
35+
var mlName = cursor.getManagedLedger().getName();
36+
var topicName = TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName));
37+
attributes = Attributes.of(
38+
OpenTelemetryAttributes.ML_CURSOR_NAME, cursor.getName(),
39+
OpenTelemetryAttributes.ML_LEDGER_NAME, mlName,
40+
OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace()
41+
);
42+
attributesOperationSucceed = Attributes.builder()
43+
.putAll(attributes)
44+
.putAll(ManagedCursorOperationStatus.SUCCESS.attributes)
45+
.build();
46+
attributesOperationFailure = Attributes.builder()
47+
.putAll(attributes)
48+
.putAll(ManagedCursorOperationStatus.FAILURE.attributes)
49+
.build();
50+
}
51+
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

+14
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
7878
import org.apache.bookkeeper.mledger.Entry;
7979
import org.apache.bookkeeper.mledger.ManagedCursor;
80+
import org.apache.bookkeeper.mledger.ManagedCursorAttributes;
8081
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
8182
import org.apache.bookkeeper.mledger.ManagedLedger;
8283
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -286,6 +287,11 @@ public enum State {
286287

287288
protected final ManagedCursorMXBean mbean;
288289

290+
private volatile ManagedCursorAttributes managedCursorAttributes;
291+
private static final AtomicReferenceFieldUpdater<ManagedCursorImpl, ManagedCursorAttributes> ATTRIBUTES_UPDATER =
292+
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, ManagedCursorAttributes.class,
293+
"managedCursorAttributes");
294+
289295
@SuppressWarnings("checkstyle:javadoctype")
290296
public interface VoidCallback {
291297
void operationComplete();
@@ -3719,4 +3725,12 @@ public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) thro
37193725
}
37203726
return newNonDurableCursor;
37213727
}
3728+
3729+
@Override
3730+
public ManagedCursorAttributes getManagedCursorAttributes() {
3731+
if (managedCursorAttributes != null) {
3732+
return managedCursorAttributes;
3733+
}
3734+
return ATTRIBUTES_UPDATER.updateAndGet(this, old -> old != null ? old : new ManagedCursorAttributes(this));
3735+
}
37223736
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

+3
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
122122

123123
private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
124124
private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats;
125+
private final OpenTelemetryManagedCursorStats openTelemetryManagedCursorStats;
125126

126127
//indicate whether shutdown() is called.
127128
private volatile boolean closed;
@@ -231,6 +232,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
231232

232233
openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
233234
openTelemetryManagedLedgerStats = new OpenTelemetryManagedLedgerStats(openTelemetry, this);
235+
openTelemetryManagedCursorStats = new OpenTelemetryManagedCursorStats(openTelemetry, this);
234236
}
235237

236238
static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -622,6 +624,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
622624
}));
623625
}).thenAcceptAsync(__ -> {
624626
//wait for tasks in scheduledExecutor executed.
627+
openTelemetryManagedCursorStats.close();
625628
openTelemetryManagedLedgerStats.close();
626629
openTelemetryCacheStats.close();
627630
scheduledExecutor.shutdownNow();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.impl;
20+
21+
import com.google.common.collect.Streams;
22+
import io.opentelemetry.api.OpenTelemetry;
23+
import io.opentelemetry.api.metrics.BatchCallback;
24+
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
25+
import org.apache.bookkeeper.mledger.ManagedCursor;
26+
import org.apache.pulsar.opentelemetry.Constants;
27+
28+
public class OpenTelemetryManagedCursorStats implements AutoCloseable {
29+
30+
// Replaces ['pulsar_ml_cursor_persistLedgerSucceed', 'pulsar_ml_cursor_persistLedgerErrors']
31+
public static final String PERSIST_OPERATION_COUNTER = "pulsar.broker.managed_ledger.persist.operation.count";
32+
private final ObservableLongMeasurement persistOperationCounter;
33+
34+
// Replaces ['pulsar_ml_cursor_persistZookeeperSucceed', 'pulsar_ml_cursor_persistZookeeperErrors']
35+
public static final String PERSIST_OPERATION_METADATA_STORE_COUNTER =
36+
"pulsar.broker.managed_ledger.persist.mds.operation.count";
37+
private final ObservableLongMeasurement persistOperationMetadataStoreCounter;
38+
39+
// Replaces pulsar_ml_cursor_nonContiguousDeletedMessagesRange
40+
public static final String NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER =
41+
"pulsar.broker.managed_ledger.message_range.count";
42+
private final ObservableLongMeasurement nonContiguousMessageRangeCounter;
43+
44+
// Replaces pulsar_ml_cursor_writeLedgerSize
45+
public static final String OUTGOING_BYTE_COUNTER = "pulsar.broker.managed_ledger.cursor.outgoing.size";
46+
private final ObservableLongMeasurement outgoingByteCounter;
47+
48+
// Replaces pulsar_ml_cursor_writeLedgerLogicalSize
49+
public static final String OUTGOING_BYTE_LOGICAL_COUNTER =
50+
"pulsar.broker.managed_ledger.cursor.outgoing.logical.size";
51+
private final ObservableLongMeasurement outgoingByteLogicalCounter;
52+
53+
// Replaces pulsar_ml_cursor_readLedgerSize
54+
public static final String INCOMING_BYTE_COUNTER = "pulsar.broker.managed_ledger.cursor.incoming.size";
55+
private final ObservableLongMeasurement incomingByteCounter;
56+
57+
private final BatchCallback batchCallback;
58+
59+
public OpenTelemetryManagedCursorStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
60+
var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
61+
62+
persistOperationCounter = meter
63+
.counterBuilder(PERSIST_OPERATION_COUNTER)
64+
.setUnit("{operation}")
65+
.setDescription("The number of acknowledgment operations on the ledger.")
66+
.buildObserver();
67+
68+
persistOperationMetadataStoreCounter = meter
69+
.counterBuilder(PERSIST_OPERATION_METADATA_STORE_COUNTER)
70+
.setUnit("{operation}")
71+
.setDescription("The number of acknowledgment operations in the metadata store.")
72+
.buildObserver();
73+
74+
nonContiguousMessageRangeCounter = meter
75+
.upDownCounterBuilder(NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER)
76+
.setUnit("{range}")
77+
.setDescription("The number of non-contiguous deleted messages ranges.")
78+
.buildObserver();
79+
80+
outgoingByteCounter = meter
81+
.counterBuilder(OUTGOING_BYTE_COUNTER)
82+
.setUnit("{By}")
83+
.setDescription("The total amount of data written to the ledger.")
84+
.buildObserver();
85+
86+
outgoingByteLogicalCounter = meter
87+
.counterBuilder(OUTGOING_BYTE_LOGICAL_COUNTER)
88+
.setUnit("{By}")
89+
.setDescription("The total amount of data written to the ledger, not including replicas.")
90+
.buildObserver();
91+
92+
incomingByteCounter = meter
93+
.counterBuilder(INCOMING_BYTE_COUNTER)
94+
.setUnit("{By}")
95+
.setDescription("The total amount of data read from the ledger.")
96+
.buildObserver();
97+
98+
batchCallback = meter.batchCallback(() -> factory.getManagedLedgers()
99+
.values()
100+
.stream()
101+
.map(ManagedLedgerImpl::getCursors)
102+
.flatMap(Streams::stream)
103+
.forEach(this::recordMetrics),
104+
persistOperationCounter,
105+
persistOperationMetadataStoreCounter,
106+
nonContiguousMessageRangeCounter,
107+
outgoingByteCounter,
108+
outgoingByteLogicalCounter,
109+
incomingByteCounter);
110+
}
111+
112+
@Override
113+
public void close() {
114+
batchCallback.close();
115+
}
116+
117+
private void recordMetrics(ManagedCursor cursor) {
118+
var stats = cursor.getStats();
119+
var cursorAttributesSet = cursor.getManagedCursorAttributes();
120+
var attributes = cursorAttributesSet.getAttributes();
121+
var attributesSucceed = cursorAttributesSet.getAttributesOperationSucceed();
122+
var attributesFailed = cursorAttributesSet.getAttributesOperationFailure();
123+
124+
persistOperationCounter.record(stats.getPersistLedgerSucceed(), attributesSucceed);
125+
persistOperationCounter.record(stats.getPersistLedgerErrors(), attributesFailed);
126+
127+
persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperSucceed(), attributesSucceed);
128+
persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperErrors(), attributesFailed);
129+
130+
nonContiguousMessageRangeCounter.record(cursor.getTotalNonContiguousDeletedMessagesRange(), attributes);
131+
132+
outgoingByteCounter.record(stats.getWriteCursorLedgerSize(), attributes);
133+
outgoingByteLogicalCounter.record(stats.getWriteCursorLedgerLogicalSize(), attributes);
134+
incomingByteCounter.record(stats.getReadCursorLedgerSize(), attributes);
135+
}
136+
}

0 commit comments

Comments
 (0)