Skip to content

Commit 37f0a2d

Browse files
authored
[monitoring][broker][metadata] add metadata store metrics (apache#17041)
1 parent cda2ea7 commit 37f0a2d

File tree

35 files changed

+417
-45
lines changed

35 files changed

+417
-45
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java

+1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad
9898
int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout"));
9999
store = MetadataStoreExtended.create(url,
100100
MetadataStoreConfig.builder()
101+
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
101102
.sessionTimeoutMillis(zkTimeout)
102103
.build());
103104
} catch (MetadataStoreException e) {

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,17 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
8888
this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
8989
}
9090

91-
public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
91+
public static MetadataStoreExtended createLocalMetadataStore(String serverUrls, int sessionTimeoutMs)
9292
throws MetadataStoreException {
9393
return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
94-
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build());
94+
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false)
95+
.metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
96+
}
97+
98+
public static MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs)
99+
throws MetadataStoreException {
100+
return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
101+
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false)
102+
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
95103
}
96104
}

pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ private static void initializeCluster(Arguments arguments) throws Exception {
248248
arguments.metadataStoreUrl, arguments.configurationMetadataStore);
249249

250250
MetadataStoreExtended localStore =
251-
initMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
252-
MetadataStoreExtended configStore = initMetadataStore(arguments.configurationMetadataStore,
251+
initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
252+
MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore,
253253
arguments.zkSessionTimeoutMillis);
254254

255255
final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl
@@ -389,9 +389,22 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam
389389
}
390390
}
391391

392-
public static MetadataStoreExtended initMetadataStore(String connection, int sessionTimeout) throws Exception {
392+
public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception {
393393
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
394394
.sessionTimeoutMillis(sessionTimeout)
395+
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
396+
.build());
397+
if (store instanceof MetadataStoreLifecycle) {
398+
((MetadataStoreLifecycle) store).initializeCluster().get();
399+
}
400+
return store;
401+
}
402+
403+
public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout)
404+
throws Exception {
405+
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
406+
.sessionTimeoutMillis(sessionTimeout)
407+
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
395408
.build());
396409
if (store instanceof MetadataStoreLifecycle) {
397410
((MetadataStoreLifecycle) store).initializeCluster().get();

pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public static void main(String[] args) throws Exception {
9999

100100
@Cleanup
101101
MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeper,
102-
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
102+
MetadataStoreConfig.builder()
103+
.sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
104+
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
105+
.build());
103106

104107
if (arguments.bkMetadataServiceUri != null) {
105108
@Cleanup
@@ -121,7 +124,8 @@ public static void main(String[] args) throws Exception {
121124
// Should it be done by REST API before broker is down?
122125
@Cleanup
123126
MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore,
124-
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
127+
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
128+
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
125129
deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join();
126130
}
127131

pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static int doMain(String[] args) throws Exception {
8383
}
8484

8585
try (MetadataStore configStore = PulsarClusterMetadataSetup
86-
.initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
86+
.initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
8787
PulsarResources pulsarResources = new PulsarResources(null, configStore);
8888
for (String namespace : arguments.namespaces) {
8989
NamespaceName namespaceName = null;

pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public static void main(String[] args) throws Exception {
9191
}
9292

9393
try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup
94-
.initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
94+
.initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
9595
PulsarResources pulsarResources = new PulsarResources(null, configStore);
9696
// Create system tenant
9797
PulsarClusterMetadataSetup

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+2
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro
362362
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
363363
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
364364
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
365+
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
365366
.synchronizer(synchronizer)
366367
.build());
367368
}
@@ -1045,6 +1046,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
10451046
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
10461047
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
10471048
.synchronizer(synchronizer)
1049+
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
10481050
.build());
10491051
}
10501052

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java

+1
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public static void main(String[] args) throws Exception {
143143
MetadataStoreExtended store = MetadataStoreExtended.create(brokerConfig.getMetadataStoreUrl(),
144144
MetadataStoreConfig.builder()
145145
.sessionTimeoutMillis((int) brokerConfig.getMetadataStoreSessionTimeoutMillis())
146+
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
146147
.build());
147148

148149
@Cleanup

pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -381,21 +381,27 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
381381
}
382382

383383
protected MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
384-
return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder().synchronizer(synchronizer).build());
384+
return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder()
385+
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
386+
.synchronizer(synchronizer).build());
385387
}
386388

387389
protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
388-
return new ZKMetadataStore(mockZooKeeper);
390+
return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder()
391+
.metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
389392
}
390393

391394
protected MetadataStoreExtended createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
392395
return new ZKMetadataStore(mockZooKeeperGlobal,
393-
MetadataStoreConfig.builder().synchronizer(synchronizer).build());
396+
MetadataStoreConfig.builder()
397+
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
398+
.synchronizer(synchronizer).build());
394399

395400
}
396401

397402
protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
398-
return new ZKMetadataStore(mockZooKeeperGlobal);
403+
return new ZKMetadataStore(mockZooKeeperGlobal, MetadataStoreConfig.builder()
404+
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
399405
}
400406

401407
private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.stats;
20+
21+
import com.google.common.collect.Multimap;
22+
import java.io.ByteArrayOutputStream;
23+
import java.util.Collection;
24+
import java.util.UUID;
25+
import java.util.concurrent.TimeUnit;
26+
import lombok.Cleanup;
27+
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
28+
import org.apache.pulsar.broker.service.BrokerTestBase;
29+
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
30+
import org.apache.pulsar.client.api.Consumer;
31+
import org.apache.pulsar.client.api.Message;
32+
import org.apache.pulsar.client.api.Producer;
33+
import org.apache.pulsar.client.api.Schema;
34+
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
35+
import org.testng.Assert;
36+
import org.testng.annotations.AfterMethod;
37+
import org.testng.annotations.BeforeMethod;
38+
import org.testng.annotations.Test;
39+
40+
41+
@Test(groups = "broker")
42+
public class MetadataStoreStatsTest extends BrokerTestBase {
43+
44+
@BeforeMethod(alwaysRun = true)
45+
@Override
46+
protected void setup() throws Exception {
47+
conf.setTopicLevelPoliciesEnabled(false);
48+
conf.setSystemTopicEnabled(false);
49+
super.baseSetup();
50+
AuthenticationProviderToken.resetMetrics();
51+
}
52+
53+
@AfterMethod(alwaysRun = true)
54+
@Override
55+
protected void cleanup() throws Exception {
56+
super.internalCleanup();
57+
resetConfig();
58+
}
59+
60+
@Test
61+
public void testMetadataStoreStats() throws Exception {
62+
String ns = "prop/ns-abc1";
63+
admin.namespaces().createNamespace(ns);
64+
65+
String topic = "persistent://prop/ns-abc1/metadata-store-" + UUID.randomUUID();
66+
String subName = "my-sub1";
67+
68+
@Cleanup
69+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
70+
.topic(topic).create();
71+
@Cleanup
72+
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
73+
.topic(topic).subscriptionName(subName).subscribe();
74+
75+
for (int i = 0; i < 100; i++) {
76+
producer.newMessage().value(UUID.randomUUID().toString()).send();
77+
}
78+
79+
for (;;) {
80+
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
81+
if (message == null) {
82+
break;
83+
}
84+
consumer.acknowledge(message);
85+
}
86+
87+
ByteArrayOutputStream output = new ByteArrayOutputStream();
88+
PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output);
89+
String metricsStr = output.toString();
90+
Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
91+
92+
Collection<PrometheusMetricsTest.Metric> opsLatency = metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum");
93+
Collection<PrometheusMetricsTest.Metric> putBytes = metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");
94+
95+
Assert.assertTrue(opsLatency.size() > 1);
96+
Assert.assertTrue(putBytes.size() > 1);
97+
98+
for (PrometheusMetricsTest.Metric m : opsLatency) {
99+
Assert.assertEquals(m.tags.get("cluster"), "test");
100+
String metadataStoreName = m.tags.get("name");
101+
Assert.assertNotNull(metadataStoreName);
102+
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
103+
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
104+
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
105+
Assert.assertNotNull(m.tags.get("status"));
106+
107+
if (m.tags.get("status").equals("success")) {
108+
if (m.tags.get("type").equals("get")) {
109+
Assert.assertTrue(m.value >= 0);
110+
} else if (m.tags.get("type").equals("del")) {
111+
Assert.assertTrue(m.value >= 0);
112+
} else if (m.tags.get("type").equals("put")) {
113+
Assert.assertTrue(m.value >= 0);
114+
} else {
115+
Assert.fail();
116+
}
117+
} else {
118+
if (m.tags.get("type").equals("get")) {
119+
Assert.assertTrue(m.value >= 0);
120+
} else if (m.tags.get("type").equals("del")) {
121+
Assert.assertTrue(m.value >= 0);
122+
} else if (m.tags.get("type").equals("put")) {
123+
Assert.assertTrue(m.value >= 0);
124+
} else {
125+
Assert.fail();
126+
}
127+
}
128+
}
129+
for (PrometheusMetricsTest.Metric m : putBytes) {
130+
Assert.assertEquals(m.tags.get("cluster"), "test");
131+
String metadataStoreName = m.tags.get("name");
132+
Assert.assertNotNull(metadataStoreName);
133+
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
134+
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
135+
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
136+
Assert.assertTrue(m.value > 0);
137+
}
138+
}
139+
140+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception {
251251
PulsarClusterMetadataSetup.main(args);
252252

253253
try (MetadataStoreExtended localStore = PulsarClusterMetadataSetup
254-
.initMetadataStore(zkConnection, 30000)) {
254+
.initLocalMetadataStore(zkConnection, 30000)) {
255255
// expected not exist
256256
assertFalse(localStore.exists("/ledgers").get());
257257

@@ -268,7 +268,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception {
268268

269269
PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs);
270270
try (MetadataStoreExtended bookkeeperMetadataServiceUriStore = PulsarClusterMetadataSetup
271-
.initMetadataStore(zkConnection, 30000)) {
271+
.initLocalMetadataStore(zkConnection, 30000)) {
272272
// expected not exist
273273
assertFalse(bookkeeperMetadataServiceUriStore.exists("/ledgers").get());
274274
}

pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void setup() throws Exception {
8383
}
8484

8585
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
86-
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
86+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
8787
proxyServer = new ProxyServer(config);
8888
WebSocketServiceStarter.start(proxyServer, service);
8989
log.info("Proxy Server Started");

pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ protected void setup() throws Exception {
6464
config.setWebServicePort(Optional.of(0));
6565
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
6666
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
67-
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
67+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
6868
service.start();
6969
}
7070

pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void configTest(int numIoThreads, int connectionsPerBroker) throws Except
6767
config.setServiceUrl("http://localhost:8080");
6868
config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
6969
WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
70-
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
70+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
7171
service.start();
7272

7373
PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();

pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void setup() throws Exception {
7979
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
8080
config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName());
8181
WebSocketService service = spy(new WebSocketService(config));
82-
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
82+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
8383
proxyServer = new ProxyServer(config);
8484
WebSocketServiceStarter.start(proxyServer, service);
8585
log.info("Proxy Server Started");

pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void setup() throws Exception {
101101
config.setClusterName("test");
102102
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
103103
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
104-
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
104+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
105105
proxyServer = new ProxyServer(config);
106106
WebSocketServiceStarter.start(proxyServer, service);
107107
log.info("Proxy Server Started");

pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void setup() throws Exception {
7474
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
7575
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
7676
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
77-
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
77+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
7878
proxyServer = new ProxyServer(config);
7979
WebSocketServiceStarter.start(proxyServer, service);
8080
log.info("Proxy Server Started");

0 commit comments

Comments
 (0)