diff --git a/common/pom.xml b/common/pom.xml index aec42750a1..e12db315b5 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -60,6 +60,10 @@ io.prometheus simpleclient_servlet + + io.prometheus + simpleclient_pushgateway + com.fasterxml.jackson.core jackson-databind diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java index ec28417fb0..1d794fb3bc 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java @@ -199,6 +199,12 @@ public class RssBaseConf extends RssConf { .defaultValue(false) .withDescription("Whether enable test mode for the shuffle server."); + public static final ConfigOption RSS_METRICS_REPORTER_CLASS = ConfigOptions + .key("rss.metrics.reporter.class") + .stringType() + .noDefaultValue() + .withDescription("The class of metrics reporter."); + public boolean loadCommonConf(Map properties) { if (properties == null) { return false; diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/AbstractMetricReporter.java b/common/src/main/java/org/apache/uniffle/common/metrics/AbstractMetricReporter.java new file mode 100644 index 0000000000..58f035c55a --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/metrics/AbstractMetricReporter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.metrics; + +import java.util.ArrayList; +import java.util.List; + +import io.prometheus.client.CollectorRegistry; + +import org.apache.uniffle.common.config.RssConf; + +public abstract class AbstractMetricReporter implements MetricReporter { + protected final RssConf conf; + protected final String instanceId; + protected List registryList = new ArrayList<>(); + + public AbstractMetricReporter(RssConf conf, String instanceId) { + this.conf = conf; + this.instanceId = instanceId; + } + + @Override + public void addCollectorRegistry(CollectorRegistry registry) { + registryList.add(registry); + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporter.java b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporter.java new file mode 100644 index 0000000000..81ca52bf99 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.metrics; + +import io.prometheus.client.CollectorRegistry; + +public interface MetricReporter { + + void start(); + + void stop(); + + void addCollectorRegistry(CollectorRegistry registry); +} diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java new file mode 100644 index 0000000000..d66576dfca --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.metrics; + +import java.lang.reflect.Constructor; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.config.RssConf; + +public class MetricReporterFactory { + + public static MetricReporter getMetricReporter(RssConf conf, String instanceId) throws Exception { + String name = conf.get(RssBaseConf.RSS_METRICS_REPORTER_CLASS); + if (StringUtils.isEmpty(name)) { + return null; + } + Class klass = Class.forName(name); + Constructor constructor; + constructor = klass.getConstructor(conf.getClass(), instanceId.getClass()); + return (AbstractMetricReporter) constructor.newInstance(conf, instanceId); + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java new file mode 100644 index 0000000000..8795d5b654 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.metrics.prometheus; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.metrics.AbstractMetricReporter; +import org.apache.uniffle.common.util.ThreadUtils; + +public class PrometheusPushGatewayMetricReporter extends AbstractMetricReporter { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayMetricReporter.class); + static final String PUSHGATEWAY_ADDR = "rss.metrics.prometheus.pushgateway.addr"; + static final String GROUPING_KEY = "rss.metrics.prometheus.pushgateway.groupingkey"; + static final String JOB_NAME = "rss.metrics.prometheus.pushgateway.jobname"; + static final String REPORT_INTEVAL = "rss.metrics.prometheus.pushgateway.report.interval.seconds"; + private ScheduledExecutorService scheduledExecutorService; + private PushGateway pushGateway; + + public PrometheusPushGatewayMetricReporter(RssConf conf, String instanceId) { + super(conf, instanceId); + } + + @Override + public void start() { + if (pushGateway == null) { + String address = conf.getString(PUSHGATEWAY_ADDR, null); + if (StringUtils.isEmpty(address)) { + throw new RuntimeException(PUSHGATEWAY_ADDR + " should not be empty!"); + } + pushGateway = new PushGateway(address); + } + String jobName = conf.getString(JOB_NAME, null); + if (StringUtils.isEmpty(jobName)) { + throw new RuntimeException(JOB_NAME + " should not be empty!"); + } + Map groupingKey = parseGroupingKey(conf.getString(GROUPING_KEY, "")); + groupingKey.put("instance", instanceId); + int reportInterval = conf.getInteger(REPORT_INTEVAL, 10); + scheduledExecutorService = Executors.newScheduledThreadPool(1, + ThreadUtils.getThreadFactory("PrometheusPushGatewayMetricReporter-%d")); + scheduledExecutorService.scheduleWithFixedDelay(() -> { + for (CollectorRegistry registry : registryList) { + try { + pushGateway.push(registry, jobName, groupingKey); + } catch (Throwable e) { + LOG.error("Failed to send metrics to push gateway.", e); + } + } + }, 0, reportInterval, TimeUnit.SECONDS); + } + + @Override + public void stop() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + } + + @VisibleForTesting + void setPushGateway(PushGateway pushGateway) { + this.pushGateway = pushGateway; + } + + static Map parseGroupingKey(final String groupingKeyConfig) { + Map groupingKey = new HashMap<>(); + if (!groupingKeyConfig.isEmpty()) { + String[] kvs = groupingKeyConfig.split(";"); + for (String kv : kvs) { + int idx = kv.indexOf("="); + if (idx < 0) { + LOG.warn("Invalid prometheusPushGateway groupingKey:{}, will be ignored", kv); + continue; + } + + String labelKey = kv.substring(0, idx); + String labelValue = kv.substring(idx + 1); + if (StringUtils.isEmpty(labelKey) + || StringUtils.isEmpty(labelValue)) { + LOG.warn( + "Invalid groupingKey {labelKey:{}, labelValue:{}} must not be empty", + labelKey, + labelValue); + continue; + } + groupingKey.put(labelKey, labelValue); + } + + return groupingKey; + } + + return groupingKey; + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporterTest.java b/common/src/test/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporterTest.java new file mode 100644 index 0000000000..56788b1e23 --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporterTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.metrics.prometheus; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; +import io.prometheus.client.exporter.PushGateway; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.metrics.MetricReporter; +import org.apache.uniffle.common.metrics.MetricReporterFactory; +import org.apache.uniffle.common.metrics.MetricsManager; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PrometheusPushGatewayMetricReporterTest { + + @Test + public void testParseGroupingKey() { + Map groupingKey = + PrometheusPushGatewayMetricReporter.parseGroupingKey("k1=v1;k2=v2"); + assertNotNull(groupingKey); + assertEquals("v1", groupingKey.get("k1")); + assertEquals("v2", groupingKey.get("k2")); + } + + @Test + public void testParseIncompleteGroupingKey() { + Map groupingKey = + PrometheusPushGatewayMetricReporter.parseGroupingKey("k1="); + assertTrue(groupingKey.isEmpty()); + + groupingKey = PrometheusPushGatewayMetricReporter.parseGroupingKey("=v1"); + assertTrue(groupingKey.isEmpty()); + + groupingKey = PrometheusPushGatewayMetricReporter.parseGroupingKey("k1"); + assertTrue(groupingKey.isEmpty()); + } + + @Test + public void test() throws Exception { + RssConf conf = new RssConf(); + conf.setString(RssBaseConf.RSS_METRICS_REPORTER_CLASS, + PrometheusPushGatewayMetricReporter.class.getCanonicalName()); + conf.setString(PrometheusPushGatewayMetricReporter.PUSHGATEWAY_ADDR, ""); + conf.setString(PrometheusPushGatewayMetricReporter.GROUPING_KEY, "a=1;b=2"); + String jobName = "jobname"; + conf.setString(PrometheusPushGatewayMetricReporter.JOB_NAME, jobName); + String instanceId = "127.0.0.1-19999"; + MetricReporter metricReporter = MetricReporterFactory.getMetricReporter(conf, instanceId); + assertTrue(metricReporter instanceof PrometheusPushGatewayMetricReporter); + MetricsManager metricsManager = new MetricsManager(); + CollectorRegistry collectorRegistry = metricsManager.getCollectorRegistry(); + metricReporter.addCollectorRegistry(collectorRegistry); + CountDownLatch countDownLatch = new CountDownLatch(1); + Counter counter1 = metricsManager.addCounter("counter1"); + counter1.inc(); + PushGateway pushGateway = new CustomPushGateway((registry, job, groupingKey) -> { + countDownLatch.countDown(); + assertEquals(jobName, job); + assertEquals(3, groupingKey.size()); + assertEquals(instanceId, groupingKey.get("instance")); + assertEquals(1, counter1.get()); + }); + ((PrometheusPushGatewayMetricReporter) metricReporter).setPushGateway(pushGateway); + metricReporter.start(); + countDownLatch.await(20, TimeUnit.SECONDS); + metricReporter.stop(); + } + + class CustomPushGateway extends PushGateway { + + private final CustomCallback> callback; + + CustomPushGateway(CustomCallback> callback) { + super("localhost"); + this.callback = callback; + } + + @Override + public void push(CollectorRegistry registry, String job, Map groupingKey) throws IOException { + callback.apply(registry, job, groupingKey); + } + } + + @FunctionalInterface + interface CustomCallback { + void apply(P1 p1, P2 p2, P3 p3); + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index f93c4335cb..e1a126befc 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -26,9 +26,12 @@ import org.apache.uniffle.common.Arguments; import org.apache.uniffle.common.metrics.GRPCMetrics; import org.apache.uniffle.common.metrics.JvmMetrics; +import org.apache.uniffle.common.metrics.MetricReporter; +import org.apache.uniffle.common.metrics.MetricReporterFactory; import org.apache.uniffle.common.rpc.ServerInterface; import org.apache.uniffle.common.security.SecurityConfig; import org.apache.uniffle.common.security.SecurityContextFactory; +import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.common.web.CommonMetricsServlet; import org.apache.uniffle.common.web.JettyServer; import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics; @@ -58,6 +61,8 @@ public class CoordinatorServer { private AccessManager accessManager; private ApplicationManager applicationManager; private GRPCMetrics grpcMetrics; + private MetricReporter metricReporter; + private String id; public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception { this.coordinatorConf = coordinatorConf; @@ -117,11 +122,22 @@ public void stopServer() throws Exception { if (clientConfManager != null) { clientConfManager.close(); } + if (metricReporter != null) { + metricReporter.stop(); + LOG.info("Metric Reporter Stopped!"); + } SecurityContextFactory.get().getSecurityContext().close(); server.stop(); } private void initialization() throws Exception { + String ip = RssUtils.getHostIp(); + if (ip == null) { + throw new RuntimeException("Couldn't acquire host Ip"); + } + int port = coordinatorConf.getInteger(CoordinatorConf.RPC_SERVER_PORT); + id = ip + "-" + port; + LOG.info("Start to initialize coordinator {}", id); jettyServer = new JettyServer(coordinatorConf); // register metrics first to avoid NPE problem when add dynamic metrics registerMetrics(); @@ -153,7 +169,7 @@ private void initialization() throws Exception { server = coordinatorFactory.getServer(); } - private void registerMetrics() { + private void registerMetrics() throws Exception { LOG.info("Register metrics"); CollectorRegistry coordinatorCollectorRegistry = new CollectorRegistry(true); CoordinatorMetrics.register(coordinatorCollectorRegistry); @@ -182,6 +198,13 @@ private void registerMetrics() { jettyServer.addServlet( new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true), "/prometheus/metrics/jvm"); + + metricReporter = MetricReporterFactory.getMetricReporter(coordinatorConf, id); + if (metricReporter != null) { + metricReporter.addCollectorRegistry(CoordinatorMetrics.getCollectorRegistry()); + metricReporter.addCollectorRegistry(grpcMetrics.getCollectorRegistry()); + metricReporter.addCollectorRegistry(JvmMetrics.getCollectorRegistry()); + } } public ClusterManager getClusterManager() { diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md index da02a57950..ddc7f4ddbf 100644 --- a/docs/coordinator_guide.md +++ b/docs/coordinator_guide.md @@ -105,6 +105,7 @@ This document will introduce how to deploy Uniffle coordinators. |rss.coordinator.quota.update.interval|60000|Update interval for the default number of submitted apps per user.| |rss.coordinator.quota.default.path|-|A configuration file for the number of apps for a user-defined user.| |rss.coordinator.quota.default.app.num|5|Default number of apps at user level.| +|rss.metrics.reporter.class|-|The class of metrics reporter.| ### AccessClusterLoadChecker settings |Property Name|Default| Description| @@ -118,3 +119,13 @@ AccessCandidatesChecker is one of the built-in access checker, which will allow |---|---|---| |rss.coordinator.access.candidates.updateIntervalSec|120|Accessed candidates update interval in seconds, which is only valid when AccessCandidatesChecker is enabled.| |rss.coordinator.access.candidates.path|-|Accessed candidates file path, the file can be stored on HDFS| + +### PrometheusPushGatewayMetricReporter settings +PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, which will allow user pushes metrics to a [Prometheus Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped by Prometheus. + +|Property Name|Default| Description | +|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL including scheme, host name, and port. | +|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). | +|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. | +|rss.metrics.prometheus.pushgateway.report.interval.seconds|10| The interval in seconds for the reporter to report metrics. | diff --git a/docs/server_guide.md b/docs/server_guide.md index 17cb7875fe..357aa94dce 100644 --- a/docs/server_guide.md +++ b/docs/server_guide.md @@ -84,4 +84,16 @@ This document will introduce how to deploy Uniffle shuffle servers. |rss.server.disk.capacity|-1|Disk capacity that shuffle server can use. If it's negative, it will use the default disk whole space| |rss.server.multistorage.fallback.strategy.class|-|The fallback strategy for `MEMORY_LOCALFILE_HDFS`. Support `org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy` and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy` will be used.| |rss.server.leak.shuffledata.check.interval|3600000|The interval of leak shuffle data check (ms)| -|rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency of single partition writer, the data partition file number is equal to this value. Default value is 1. This config could improve the writing speed, especially for huge partition.| \ No newline at end of file +|rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency of single partition writer, the data partition file number is equal to this value. Default value is 1. This config could improve the writing speed, especially for huge partition.| +|rss.metrics.reporter.class|-|The class of metrics reporter.| + + +### PrometheusPushGatewayMetricReporter settings +PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, which will allow user pushes metrics to a [Prometheus Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped by Prometheus. + +|Property Name|Default| Description | +|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL including scheme, host name, and port. | +|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). | +|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. | +|rss.metrics.prometheus.pushgateway.report.interval.seconds|10| The interval in seconds for the reporter to report metrics. | diff --git a/pom.xml b/pom.xml index dc4f34a6ed..18189d7945 100644 --- a/pom.xml +++ b/pom.xml @@ -501,7 +501,11 @@ simpleclient_servlet ${prometheus.simpleclient.version} - + + io.prometheus + simpleclient_pushgateway + ${prometheus.simpleclient.version} + org.awaitility awaitility diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index 56e1a68a55..e4d09b51aa 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -34,6 +34,8 @@ import org.apache.uniffle.common.Arguments; import org.apache.uniffle.common.metrics.GRPCMetrics; import org.apache.uniffle.common.metrics.JvmMetrics; +import org.apache.uniffle.common.metrics.MetricReporter; +import org.apache.uniffle.common.metrics.MetricReporterFactory; import org.apache.uniffle.common.rpc.ServerInterface; import org.apache.uniffle.common.security.SecurityConfig; import org.apache.uniffle.common.security.SecurityContextFactory; @@ -75,6 +77,7 @@ public class ShuffleServer { private Set tags = Sets.newHashSet(); private AtomicBoolean isHealthy = new AtomicBoolean(true); private GRPCMetrics grpcMetrics; + private MetricReporter metricReporter; public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception { this.shuffleServerConf = shuffleServerConf; @@ -140,6 +143,10 @@ public void stopServer() throws Exception { healthCheck.stop(); LOG.info("HealthCheck stopped!"); } + if (metricReporter != null) { + metricReporter.stop(); + LOG.info("Metric Reporter Stopped!"); + } SecurityContextFactory.get().getSecurityContext().close(); server.stop(); LOG.info("RPC Server Stopped!"); @@ -207,7 +214,7 @@ private void initServerTags() { LOG.info("Server tags: {}", tags); } - private void registerMetrics() { + private void registerMetrics() throws Exception { LOG.info("Register metrics"); CollectorRegistry shuffleServerCollectorRegistry = new CollectorRegistry(true); ShuffleServerMetrics.register(shuffleServerCollectorRegistry); @@ -236,6 +243,13 @@ private void registerMetrics() { jettyServer.addServlet( new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true), "/prometheus/metrics/jvm"); + + metricReporter = MetricReporterFactory.getMetricReporter(shuffleServerConf, id); + if (metricReporter != null) { + metricReporter.addCollectorRegistry(ShuffleServerMetrics.getCollectorRegistry()); + metricReporter.addCollectorRegistry(grpcMetrics.getCollectorRegistry()); + metricReporter.addCollectorRegistry(JvmMetrics.getCollectorRegistry()); + } } /**