Skip to content

Commit

Permalink
[improve] [broker] Part 2 of PIP-370: add metrics "pulsar_replication…
Browse files Browse the repository at this point in the history
…_disconnected_count" (apache#23213)

(cherry picked from commit 09a16c2)
(cherry picked from commit fbf5268)
  • Loading branch information
poorbarcode authored and nikhil-ctds committed Aug 28, 2024
1 parent 9805e4b commit 0fb19ec
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ void updateStats(TopicStats stats) {
replStats.replicationBacklog += as.replicationBacklog;
replStats.msgRateExpired += as.msgRateExpired;
replStats.connectedCount += as.connectedCount;
replStats.disconnectedCount += as.disconnectedCount;
replStats.replicationDelayInSeconds += as.replicationDelayInSeconds;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class AggregatedReplicationStats {
/** The count of replication-subscriber up and running to replicate to remote cluster. */
public long connectedCount;

/** The count of replication-subscriber that failed to start to replicate to remote cluster. */
public long disconnectedCount;

/** Time in seconds from the time a message was produced to the time when it is about to be replicated. */
public long replicationDelayInSeconds;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,11 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
aggReplStats.replicationBacklog += replStats.replicationBacklog;
aggReplStats.msgRateExpired += replStats.msgRateExpired;
aggReplStats.connectedCount += replStats.connected ? 1 : 0;
if (replStats.connected) {
aggReplStats.connectedCount += 1;
} else {
aggReplStats.disconnectedCount += 1;
}
aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds;
});

Expand Down Expand Up @@ -497,6 +501,8 @@ private static void printNamespaceStats(PrometheusMetricStreams stream, Aggregat
replStats -> replStats.replicationBacklog, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_connected_count", stats,
replStats -> replStats.connectedCount, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_disconnected_count", stats,
replStats -> replStats.disconnectedCount, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_rate_expired", stats,
replStats -> replStats.msgRateExpired, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_delay_in_seconds", stats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_connected_count", replStats.connectedCount,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_disconnected_count", replStats.disconnectedCount,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_rate_expired", replStats.msgRateExpired,
cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_delay_in_seconds", replStats.replicationDelayInSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -78,8 +80,8 @@
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand All @@ -88,6 +90,8 @@
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.glassfish.jersey.client.JerseyClient;
import org.glassfish.jersey.client.JerseyClientBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -1128,4 +1132,121 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro
admin1.namespaces().deleteNamespace(ns);
admin2.namespaces().deleteNamespace(ns);
}

@Test
public void testReplicationCountMetrics() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
// 1.Create topic, does not enable replication now.
admin1.topics().createNonPartitionedTopic(topicName);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();

// We inject an error to make the internal producer fail to connect.
final AtomicInteger createProducerCounter = new AtomicInteger();
final AtomicBoolean failedCreateProducer = new AtomicBoolean(true);
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (failedCreateProducer.get()) {
log.info("Retry create replicator.producer count: {}", createProducerCounter);
// Release producer and fail callback.
originalProducer.closeAsync();
throw new RuntimeException("mock error");
}
return originalProducer;
}
return originalProducer;
});

// 2.Enable replication.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));

// Verify: metrics.
// Cluster level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
// Namespace level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
// Topic level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
JerseyClient httpClient = JerseyClientBuilder.createClient();
Awaitility.await().untilAsserted(() -> {
int topicConnected = 0;
int topicDisconnected = 0;

String response = httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
Multimap<String, PrometheusMetricsClient.Metric> metricMap = PrometheusMetricsClient.parseMetrics(response);
if (!metricMap.containsKey("pulsar_replication_disconnected_count")) {
fail("Expected 1 disconnected replicator.");
}
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_connected_count")) {
if (cluster1.equals(metric.tags.get("cluster"))
&& nonReplicatedNamespace.equals(metric.tags.get("namespace"))
&& topicName.equals(metric.tags.get("topic"))) {
topicConnected += Double.valueOf(metric.value).intValue();
}
}
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_disconnected_count")) {
if (cluster1.equals(metric.tags.get("cluster"))
&& nonReplicatedNamespace.equals(metric.tags.get("namespace"))
&& topicName.equals(metric.tags.get("topic"))) {
topicDisconnected += Double.valueOf(metric.value).intValue();
}
}
log.info("{}, {},", topicConnected, topicDisconnected);
assertEquals(topicConnected, 0);
assertEquals(topicDisconnected, 1);
});

// Let replicator connect successfully.
failedCreateProducer.set(false);
// Verify: metrics.
// Cluster level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
// Namespace level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
// Topic level:
// - pulsar_replication_connected_count
// - pulsar_replication_disconnected_count
Awaitility.await().atMost(Duration.ofSeconds(130)).untilAsserted(() -> {
int topicConnected = 0;
int topicDisconnected = 0;

String response = httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
Multimap<String, PrometheusMetricsClient.Metric> metricMap = PrometheusMetricsClient.parseMetrics(response);
if (!metricMap.containsKey("pulsar_replication_disconnected_count")) {
fail("Expected 1 disconnected replicator.");
}
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_connected_count")) {
if (cluster1.equals(metric.tags.get("cluster"))
&& nonReplicatedNamespace.equals(metric.tags.get("namespace"))
&& topicName.equals(metric.tags.get("topic"))) {
topicConnected += Double.valueOf(metric.value).intValue();
}
}
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_replication_disconnected_count")) {
if (cluster1.equals(metric.tags.get("cluster"))
&& nonReplicatedNamespace.equals(metric.tags.get("namespace"))
&& topicName.equals(metric.tags.get("topic"))) {
topicDisconnected += Double.valueOf(metric.value).intValue();
}
}
log.info("{}, {}", topicConnected, topicDisconnected);
assertEquals(topicConnected, 1);
assertEquals(topicDisconnected, 0);
});

// cleanup.
taskToClearInjection.run();
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
waitReplicatorStopped(topicName);
admin1.topics().delete(topicName, false);
admin2.topics().delete(topicName, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,10 @@ public void testConfigReplicationStartAt() throws Exception {
public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception {
super.testDifferentTopicCreationRule(replicationMode);
}

@Test(enabled = false)
@Override
public void testReplicationCountMetrics() throws Exception {
super.testReplicationCountMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void testSimpleAggregation() {
replStats2.msgThroughputOut = 1536.0;
replStats2.replicationBacklog = 99;
replStats2.connectedCount = 1;
replStats2.disconnectedCount = 2;
replStats2.msgRateExpired = 3.0;
replStats2.replicationDelayInSeconds = 20;
topicStats2.replicationStats.put(namespace, replStats2);
Expand Down Expand Up @@ -148,6 +149,7 @@ public void testSimpleAggregation() {
assertEquals(nsReplStats.msgThroughputOut, 1792.0);
assertEquals(nsReplStats.replicationBacklog, 100);
assertEquals(nsReplStats.connectedCount, 1);
assertEquals(nsReplStats.disconnectedCount, 2);
assertEquals(nsReplStats.msgRateExpired, 6.0);
assertEquals(nsReplStats.replicationDelayInSeconds, 40);

Expand Down

0 comments on commit 0fb19ec

Please sign in to comment.