From c2e2226bd20ccdcc4a2228e7138ea7d01ac59a2a Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 13 Dec 2022 02:16:41 +0800 Subject: [PATCH] [CONNECTOR] add sink metrics --- ...urceMetrics.java => ConnectorMetrics.java} | 20 ++- .../metrics/connector/SinkTaskInfo.java | 95 +++++++++++ .../metrics/connector/SourceTaskInfo.java | 4 + .../{SourceTaskError.java => TaskError.java} | 2 +- .../astraea/connector/perf/PerfSinkTest.java | 66 ++++++++ .../connector/perf/PerfSourceTest.java | 7 +- .../main/java/org/astraea/gui/Context.java | 29 ++-- .../org/astraea/gui/tab/ConnectorNode.java | 149 ++++++++++++------ 8 files changed, 303 insertions(+), 69 deletions(-) rename common/src/main/java/org/astraea/common/metrics/connector/{SourceMetrics.java => ConnectorMetrics.java} (75%) create mode 100644 common/src/main/java/org/astraea/common/metrics/connector/SinkTaskInfo.java rename common/src/main/java/org/astraea/common/metrics/connector/{SourceTaskError.java => TaskError.java} (97%) diff --git a/common/src/main/java/org/astraea/common/metrics/connector/SourceMetrics.java b/common/src/main/java/org/astraea/common/metrics/connector/ConnectorMetrics.java similarity index 75% rename from common/src/main/java/org/astraea/common/metrics/connector/SourceMetrics.java rename to common/src/main/java/org/astraea/common/metrics/connector/ConnectorMetrics.java index 1619d1891e..3814305599 100644 --- a/common/src/main/java/org/astraea/common/metrics/connector/SourceMetrics.java +++ b/common/src/main/java/org/astraea/common/metrics/connector/ConnectorMetrics.java @@ -21,7 +21,7 @@ import org.astraea.common.metrics.BeanQuery; import org.astraea.common.metrics.MBeanClient; -public class SourceMetrics { +public class ConnectorMetrics { public static List sourceTaskInfo(MBeanClient client) { return client @@ -37,7 +37,21 @@ public static List sourceTaskInfo(MBeanClient client) { .collect(Collectors.toUnmodifiableList()); } - public static List sourceTaskError(MBeanClient client) { + public static List sinkTaskInfo(MBeanClient client) { + return client + .queryBeans( + BeanQuery.builder() + .domainName("kafka.connect") + .property("type", "sink-task-metrics") + .property("connector", "*") + .property("task", "*") + .build()) + .stream() + .map(b -> (SinkTaskInfo) () -> b) + .collect(Collectors.toUnmodifiableList()); + } + + public static List taskError(MBeanClient client) { return client .queryBeans( BeanQuery.builder() @@ -47,7 +61,7 @@ public static List sourceTaskError(MBeanClient client) { .property("task", "*") .build()) .stream() - .map(b -> (SourceTaskError) () -> b) + .map(b -> (TaskError) () -> b) .collect(Collectors.toUnmodifiableList()); } } diff --git a/common/src/main/java/org/astraea/common/metrics/connector/SinkTaskInfo.java b/common/src/main/java/org/astraea/common/metrics/connector/SinkTaskInfo.java new file mode 100644 index 0000000000..ee1aae9cb8 --- /dev/null +++ b/common/src/main/java/org/astraea/common/metrics/connector/SinkTaskInfo.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics.connector; + +import org.astraea.common.metrics.HasBeanObject; + +@FunctionalInterface +public interface SinkTaskInfo extends HasBeanObject { + + default String connectorName() { + return beanObject().properties().get("connector"); + } + + default int taskId() { + return Integer.parseInt(beanObject().properties().get("task")); + } + + default String taskType() { + return beanObject().properties().get("type"); + } + + default double offsetCommitCompletionRate() { + return (double) beanObject().attributes().get("offset-commit-completion-rate"); + } + + default double offsetCommitCompletionTotal() { + return (double) beanObject().attributes().get("offset-commit-completion-total"); + } + + default double offsetCommitSeqNo() { + return (double) beanObject().attributes().get("offset-commit-seq-no"); + } + + default double offsetCommitSkipRate() { + return (double) beanObject().attributes().get("offset-commit-skip-rate"); + } + + default double offsetCommitSkipTotal() { + return (double) beanObject().attributes().get("offset-commit-skip-total"); + } + + default double partitionCount() { + return (double) beanObject().attributes().get("partition-count"); + } + + default double putBatchAvgTimeMs() { + return (double) beanObject().attributes().get("put-batch-avg-time-ms"); + } + + default double putBatchMaxTimeMs() { + return (double) beanObject().attributes().get("put-batch-max-time-ms"); + } + + default double sinkRecordActiveCount() { + return (double) beanObject().attributes().get("sink-record-active-count"); + } + + default double sinkRecordActiveCountAvg() { + return (double) beanObject().attributes().get("sink-record-active-count-avg"); + } + + default double sinkRecordActiveCountMax() { + return (double) beanObject().attributes().get("sink-record-active-count-max"); + } + + default double sinkRecordReadRate() { + return (double) beanObject().attributes().get("sink-record-read-rate"); + } + + default double sinkRecordReadTotal() { + return (double) beanObject().attributes().get("sink-record-read-total"); + } + + default double sinkRecordSendRate() { + return (double) beanObject().attributes().get("sink-record-send-rate"); + } + + default double sinkRecordSendTotal() { + return (double) beanObject().attributes().get("sink-record-send-total"); + } +} diff --git a/common/src/main/java/org/astraea/common/metrics/connector/SourceTaskInfo.java b/common/src/main/java/org/astraea/common/metrics/connector/SourceTaskInfo.java index c1d91b1cc7..729256481d 100644 --- a/common/src/main/java/org/astraea/common/metrics/connector/SourceTaskInfo.java +++ b/common/src/main/java/org/astraea/common/metrics/connector/SourceTaskInfo.java @@ -29,6 +29,10 @@ default int taskId() { return Integer.parseInt(beanObject().properties().get("task")); } + default String taskType() { + return beanObject().properties().get("type"); + } + default double pollBatchAvgTimeMs() { return (double) beanObject().attributes().get("poll-batch-avg-time-ms"); } diff --git a/common/src/main/java/org/astraea/common/metrics/connector/SourceTaskError.java b/common/src/main/java/org/astraea/common/metrics/connector/TaskError.java similarity index 97% rename from common/src/main/java/org/astraea/common/metrics/connector/SourceTaskError.java rename to common/src/main/java/org/astraea/common/metrics/connector/TaskError.java index cc4edeb35f..4ebdfec186 100644 --- a/common/src/main/java/org/astraea/common/metrics/connector/SourceTaskError.java +++ b/common/src/main/java/org/astraea/common/metrics/connector/TaskError.java @@ -19,7 +19,7 @@ import org.astraea.common.metrics.HasBeanObject; @FunctionalInterface -public interface SourceTaskError extends HasBeanObject { +public interface TaskError extends HasBeanObject { default String connectorName() { return beanObject().properties().get("connector"); diff --git a/connector/src/test/java/org/astraea/connector/perf/PerfSinkTest.java b/connector/src/test/java/org/astraea/connector/perf/PerfSinkTest.java index 980477cf3e..e2638f1456 100644 --- a/connector/src/test/java/org/astraea/connector/perf/PerfSinkTest.java +++ b/connector/src/test/java/org/astraea/connector/perf/PerfSinkTest.java @@ -16,6 +16,7 @@ */ package org.astraea.connector.perf; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -23,6 +24,8 @@ import org.astraea.common.Utils; import org.astraea.common.connector.ConnectorClient; import org.astraea.common.consumer.Record; +import org.astraea.common.metrics.MBeanClient; +import org.astraea.common.metrics.connector.ConnectorMetrics; import org.astraea.it.RequireSingleWorkerCluster; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -98,4 +101,67 @@ void testTask() { // the frequency is 1s so the elapsed time of executing put should be greater than 500ms Assertions.assertTrue(System.currentTimeMillis() - now > 500); } + + @Test + void testMetrics() { + var name = Utils.randomString(); + var topicName = Utils.randomString(); + var client = ConnectorClient.builder().url(workerUrl()).build(); + client + .createConnector( + name, + Map.of( + ConnectorClient.CONNECTOR_CLASS_KEY, + PerfSink.class.getName(), + ConnectorClient.TASK_MAX_KEY, + "1", + ConnectorClient.TOPICS_KEY, + topicName)) + .toCompletableFuture() + .join(); + Utils.sleep(Duration.ofSeconds(3)); + + var m0 = + ConnectorMetrics.sinkTaskInfo(MBeanClient.local()).stream() + .filter(m -> m.connectorName().equals(name)) + .collect(Collectors.toList()); + Assertions.assertNotEquals(0, m0.size()); + m0.forEach( + m -> { + Assertions.assertNotNull(m.taskType()); + // it is hard to check the commit metrics, so we check non-null + Assertions.assertDoesNotThrow(m::offsetCommitSeqNo); + Assertions.assertDoesNotThrow(m::offsetCommitCompletionRate); + Assertions.assertDoesNotThrow(m::offsetCommitCompletionTotal); + Assertions.assertDoesNotThrow(m::offsetCommitSkipTotal); + Assertions.assertDoesNotThrow(m::offsetCommitSkipRate); + Assertions.assertDoesNotThrow(m::partitionCount); + Assertions.assertDoesNotThrow(m::putBatchMaxTimeMs); + Assertions.assertDoesNotThrow(m::putBatchAvgTimeMs); + Assertions.assertDoesNotThrow(m::sinkRecordReadRate); + Assertions.assertDoesNotThrow(m::sinkRecordActiveCount); + Assertions.assertDoesNotThrow(m::sinkRecordActiveCountAvg); + Assertions.assertDoesNotThrow(m::sinkRecordSendRate); + Assertions.assertDoesNotThrow(m::sinkRecordReadTotal); + Assertions.assertDoesNotThrow(m::sinkRecordActiveCountMax); + Assertions.assertDoesNotThrow(m::sinkRecordSendTotal); + }); + + var m1 = + ConnectorMetrics.taskError(MBeanClient.local()).stream() + .filter(m -> m.connectorName().equals(name)) + .collect(Collectors.toList()); + Assertions.assertNotEquals(0, m1.size()); + m1.forEach( + m -> { + Assertions.assertEquals(0, m.lastErrorTimestamp()); + Assertions.assertEquals(0D, m.deadletterqueueProduceFailures()); + Assertions.assertEquals(0D, m.deadletterqueueProduceRequests()); + Assertions.assertEquals(0D, m.totalErrorsLogged()); + Assertions.assertEquals(0D, m.totalRecordErrors()); + Assertions.assertEquals(0D, m.totalRetries()); + Assertions.assertEquals(0D, m.totalRecordFailures()); + Assertions.assertEquals(0D, m.totalRecordsSkipped()); + }); + } } diff --git a/connector/src/test/java/org/astraea/connector/perf/PerfSourceTest.java b/connector/src/test/java/org/astraea/connector/perf/PerfSourceTest.java index efb3900ea0..11ffe2b16f 100644 --- a/connector/src/test/java/org/astraea/connector/perf/PerfSourceTest.java +++ b/connector/src/test/java/org/astraea/connector/perf/PerfSourceTest.java @@ -25,7 +25,7 @@ import org.astraea.common.admin.Replica; import org.astraea.common.connector.ConnectorClient; import org.astraea.common.metrics.MBeanClient; -import org.astraea.common.metrics.connector.SourceMetrics; +import org.astraea.common.metrics.connector.ConnectorMetrics; import org.astraea.it.RequireSingleWorkerCluster; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -211,12 +211,13 @@ void testMetrics() { Utils.sleep(Duration.ofSeconds(3)); var m0 = - SourceMetrics.sourceTaskInfo(MBeanClient.local()).stream() + ConnectorMetrics.sourceTaskInfo(MBeanClient.local()).stream() .filter(m -> m.connectorName().equals(name)) .collect(Collectors.toList()); Assertions.assertNotEquals(0, m0.size()); m0.forEach( m -> { + Assertions.assertNotNull(m.taskType()); Assertions.assertNotEquals(0D, m.pollBatchAvgTimeMs()); Assertions.assertNotEquals(0D, m.pollBatchMaxTimeMs()); Assertions.assertNotEquals(0D, m.sourceRecordActiveCountMax()); @@ -229,7 +230,7 @@ void testMetrics() { }); var m1 = - SourceMetrics.sourceTaskError(MBeanClient.local()).stream() + ConnectorMetrics.taskError(MBeanClient.local()).stream() .filter(m -> m.connectorName().equals(name)) .collect(Collectors.toList()); Assertions.assertNotEquals(0, m1.size()); diff --git a/gui/src/main/java/org/astraea/gui/Context.java b/gui/src/main/java/org/astraea/gui/Context.java index b358a441a8..ead99959c9 100644 --- a/gui/src/main/java/org/astraea/gui/Context.java +++ b/gui/src/main/java/org/astraea/gui/Context.java @@ -72,16 +72,24 @@ public void workerJmxPort(int workerJmxPort) { workerClients = new Clients<>(workerJmxPort); } + @SuppressWarnings("resource") public Map addBrokerClients(List nodeInfos) { if (brokerClients == null) return Map.of(); - nodeInfos.forEach(n -> brokerClients.add(n.id(), n.host())); - return brokerClients.clients(); + nodeInfos.forEach( + n -> + brokerClients.clients.computeIfAbsent( + n.id(), ignored -> MBeanClient.jndi(n.host(), brokerClients.jmxPort))); + return Map.copyOf(brokerClients.clients); } + @SuppressWarnings("resource") public Map addWorkerClients(Set hostnames) { if (workerClients == null) return Map.of(); - hostnames.forEach(n -> workerClients.add(n, n)); - return workerClients.clients(); + hostnames.forEach( + n -> + workerClients.clients.computeIfAbsent( + n, ignored -> MBeanClient.jndi(n, workerClients.jmxPort))); + return Map.copyOf(workerClients.clients); } public Admin admin() { @@ -98,12 +106,12 @@ public ConnectorClient connectorClient() { public Map brokerClients() { if (brokerClients == null) return Map.of(); - return brokerClients.clients(); + return Map.copyOf(brokerClients.clients); } public Map workerClients() { if (workerClients == null) return Map.of(); - return workerClients.clients(); + return Map.copyOf(workerClients.clients); } private static class Clients> { @@ -113,14 +121,5 @@ private static class Clients> { Clients(int jmxPort) { this.jmxPort = jmxPort; } - - void add(T identity, String hostname) { - var previous = clients.put(identity, MBeanClient.jndi(hostname, jmxPort)); - if (previous != null) previous.close(); - } - - Map clients() { - return Map.copyOf(clients); - } } } diff --git a/gui/src/main/java/org/astraea/gui/tab/ConnectorNode.java b/gui/src/main/java/org/astraea/gui/tab/ConnectorNode.java index 8bd36f37e2..98ebb49fe8 100644 --- a/gui/src/main/java/org/astraea/gui/tab/ConnectorNode.java +++ b/gui/src/main/java/org/astraea/gui/tab/ConnectorNode.java @@ -19,8 +19,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; import javafx.geometry.Side; import javafx.scene.Node; @@ -28,9 +26,7 @@ import org.astraea.common.MapUtils; import org.astraea.common.connector.ConnectorClient; import org.astraea.common.connector.WorkerStatus; -import org.astraea.common.metrics.connector.SourceMetrics; -import org.astraea.common.metrics.connector.SourceTaskError; -import org.astraea.common.metrics.connector.SourceTaskInfo; +import org.astraea.common.metrics.connector.ConnectorMetrics; import org.astraea.gui.Context; import org.astraea.gui.pane.PaneBuilder; import org.astraea.gui.pane.Slide; @@ -121,7 +117,6 @@ private static Node taskNode(Context context) { .connectorStatus(name) .toCompletableFuture()) .collect(Collectors.toList()))), - // try to update metrics client of workers context .connectorClient() .activeWorkers() @@ -133,18 +128,38 @@ private static Node taskNode(Context context) { .collect(Collectors.toSet()))) .thenApply( ignored -> - Map.entry( - context.workerClients().values().stream() - .flatMap(c -> SourceMetrics.sourceTaskInfo(c).stream()) - .collect( - Collectors.toMap( - SourceTaskInfo::taskId, Function.identity())), - context.workerClients().values().stream() - .flatMap(c -> SourceMetrics.sourceTaskError(c).stream()) - .collect( - Collectors.toMap( - SourceTaskError::taskId, Function.identity())))), - (connectorStatuses, taskInfoAndErrors) -> + context.workerClients().values().stream() + .flatMap(c -> ConnectorMetrics.sourceTaskInfo(c).stream()) + .collect(Collectors.toList())), + context + .connectorClient() + .activeWorkers() + .thenApply( + workers -> + context.addWorkerClients( + workers.stream() + .map(WorkerStatus::hostname) + .collect(Collectors.toSet()))) + .thenApply( + ignored -> + context.workerClients().values().stream() + .flatMap(c -> ConnectorMetrics.sinkTaskInfo(c).stream()) + .collect(Collectors.toList())), + context + .connectorClient() + .activeWorkers() + .thenApply( + workers -> + context.addWorkerClients( + workers.stream() + .map(WorkerStatus::hostname) + .collect(Collectors.toSet()))) + .thenApply( + ignored -> + context.workerClients().values().stream() + .flatMap(c -> ConnectorMetrics.taskError(c).stream()) + .collect(Collectors.toList())), + (connectorStatuses, sourceTaskInfos, sinkTaskInfos, taskErrors) -> connectorStatuses.stream() .flatMap( connectorStatus -> @@ -157,35 +172,75 @@ private static Node taskNode(Context context) { map.put("worker id", task.workerId()); map.put("state", task.state()); task.error().ifPresent(e -> map.put("error", e)); - var info = taskInfoAndErrors.getKey().get(task.id()); - if (info != null) { - map.put( - "poll rate (records/s)", - info.sourceRecordPollRate()); - map.put("records", info.sourceRecordPollTotal()); - map.put( - "write rate (records/s)", - info.sourceRecordWriteRate()); - map.put( - "written records", - info.sourceRecordWriteTotal()); - map.put( - "poll batch time (avg)", - info.pollBatchAvgTimeMs()); - map.put( - "poll batch time (max)", - info.pollBatchMaxTimeMs()); - } - var error = - taskInfoAndErrors.getValue().get(task.id()); - if (error != null) { - map.put("retries", error.totalRetries()); - map.put( - "failure records", error.totalRecordFailures()); - map.put( - "skipped records", error.totalRecordsSkipped()); - map.put("error records", error.totalRecordErrors()); - } + sourceTaskInfos.stream() + .filter(t -> t.taskId() == task.id()) + .filter( + t -> + t.connectorName() + .equals(connectorStatus.name())) + .forEach( + info -> { + map.put( + "poll rate (records/s)", + info.sourceRecordPollRate()); + map.put( + "records", + info.sourceRecordPollTotal()); + map.put( + "write rate (records/s)", + info.sourceRecordWriteRate()); + map.put( + "written records", + info.sourceRecordWriteTotal()); + map.put( + "poll batch time (avg)", + info.pollBatchAvgTimeMs()); + map.put( + "poll batch time (max)", + info.pollBatchMaxTimeMs()); + }); + sinkTaskInfos.stream() + .filter(t -> t.taskId() == task.id()) + .filter( + t -> + t.connectorName() + .equals(connectorStatus.name())) + .forEach( + info -> { + map.put( + "partitions", info.partitionCount()); + map.put( + "put batch time (avg)", + info.putBatchAvgTimeMs()); + map.put( + "put batch time (max)", + info.putBatchMaxTimeMs()); + map.put( + "read rate (records/s)", + info.sinkRecordReadRate()); + map.put( + "read records", + info.sinkRecordReadTotal()); + }); + taskErrors.stream() + .filter(t -> t.taskId() == task.id()) + .filter( + t -> + t.connectorName() + .equals(connectorStatus.name())) + .forEach( + error -> { + map.put("retries", error.totalRetries()); + map.put( + "failure records", + error.totalRecordFailures()); + map.put( + "skipped records", + error.totalRecordsSkipped()); + map.put( + "error records", + error.totalRecordErrors()); + }); return map; })) .collect(Collectors.toList())))