Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node metrics to monitor metrics #9531

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions buildSrc/src/main/kotlin/common-conventions.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
* limitations under the License.
*/

import org.owasp.dependencycheck.gradle.extension.AnalyzerExtension

plugins { id("org.owasp.dependencycheck") }

repositories { mavenCentral() }
Expand All @@ -29,11 +27,8 @@ dependencyCheck {

failBuildOnCVSS = 8f
suppressionFile = resources.resolve("suppressions.xml").toString()
analyzers(
closureOf<AnalyzerExtension> {
experimentalEnabled = true
golangModEnabled =
false // Too many vulnerabilities in transitive dependencies currently
}
)
analyzers {
experimentalEnabled = true
golangModEnabled = false // Too many vulnerabilities in transitive dependencies currently
}
}
11 changes: 4 additions & 7 deletions buildSrc/src/main/kotlin/go-conventions.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

import org.gradle.internal.io.NullOutputStream
import org.owasp.dependencycheck.gradle.extension.AnalyzerExtension
import plugin.go.Go
import plugin.go.GoExtension
import plugin.go.GoPlugin
Expand Down Expand Up @@ -90,12 +89,10 @@ listOf(tasks.dependencyCheckAggregate, tasks.dependencyCheckAnalyze).forEach {
dependsOn("setup")
doFirst {
dependencyCheck {
analyzers(
closureOf<AnalyzerExtension> {
val go = project.extensions.getByName<GoExtension>("go")
pathToGo = go.goBin.toString()
}
)
analyzers {
val go = project.extensions.getByName<GoExtension>("go")
pathToGo = go.goBin.toString()
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions buildSrc/src/main/kotlin/java-conventions.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ tasks.javadoc { options.encoding = "UTF-8" }

tasks.withType<Test> {
finalizedBy(tasks.jacocoTestReport)
jvmArgs = listOf("-XX:+EnableDynamicAgentLoading") // Allow byte buddy for Mockito
maxHeapSize = "4096m"
minHeapSize = "1024m"
systemProperty("user.timezone", "UTC")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.mirror.monitor;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.hedera.hashgraph.sdk.AccountId;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
Expand All @@ -36,12 +37,15 @@ public class NodeProperties {
private String accountId;

@Getter(lazy = true)
@JsonIgnore
@ToString.Exclude
private final List<AccountId> accountIds = List.of(AccountId.fromString(getAccountId()));

@NotBlank
private String host;

private Long nodeId;

@Min(0)
@Max(65535)
private int port = 50211;
Expand All @@ -58,4 +62,12 @@ public String getEndpoint() {
}
return host + ":" + port;
}

public long getNodeId() {
if (nodeId == null) {
var nodeAccountId = AccountId.fromString(accountId);
return nodeAccountId.num - 3;
edwin-greene marked this conversation as resolved.
Show resolved Hide resolved
}
return nodeId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.security.SecureRandom;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -55,6 +56,7 @@ public class NodeSupplier {
private final RestApiClient restApiClient;

private final CopyOnWriteArrayList<NodeProperties> nodes = new CopyOnWriteArrayList<>();
private final Map<String, NodeProperties> nodeMap = new ConcurrentHashMap<>();
private final SecureRandom secureRandom = new SecureRandom();

@PostConstruct
Expand Down Expand Up @@ -93,6 +95,10 @@ public NodeProperties get() {
return nodes.get(nodeIndex);
}

public NodeProperties get(String accountId) {
return nodeMap.get(accountId);
}

public synchronized Flux<NodeProperties> refresh() {
boolean empty = nodes.isEmpty();
Retry retrySpec = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1L))
Expand All @@ -114,6 +120,7 @@ public synchronized Flux<NodeProperties> refresh() {
.doOnNext(n -> {
if (empty) {
nodes.addIfAbsent(n);
nodeMap.put(n.getAccountId(), n);
}
}); // Populate on startup before validation
}
Expand All @@ -140,6 +147,7 @@ private Flux<NodeProperties> toNodeProperties(NetworkNode networkNode) {
var nodeProperties = new NodeProperties();
nodeProperties.setAccountId(networkNode.getNodeAccountId());
nodeProperties.setHost(host);
nodeProperties.setNodeId(networkNode.getNodeId());
nodeProperties.setPort(serviceEndpoint.getPort());
return nodeProperties;
}));
Expand All @@ -166,6 +174,7 @@ private Client toClient(Map<String, AccountId> nodes) {
boolean validateNode(NodeProperties node) {
if (!monitorProperties.getNodeValidation().isEnabled()) {
nodes.addIfAbsent(node);
nodeMap.put(node.getAccountId(), node);
log.info("Adding node {} without validation", node.getAccountId());
return true;
}
Expand All @@ -186,6 +195,7 @@ boolean validateNode(NodeProperties node) {
if (receiptStatus == SUCCESS) {
log.info("Validated node {} successfully", nodeAccountId);
nodes.addIfAbsent(node);
nodeMap.put(node.getAccountId(), node);
return true;
}

Expand All @@ -196,6 +206,7 @@ boolean validateNode(NodeProperties node) {
log.warn("Unable to validate node {}: ", node, e);
}

nodeMap.remove(node.getAccountId());
nodes.remove(node);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hedera.mirror.monitor.publish;

import com.hedera.hashgraph.sdk.AccountId;
import com.hedera.mirror.monitor.NodeProperties;
import com.hedera.mirror.monitor.converter.DurationToStringSerializer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.TimeGauge;
Expand Down Expand Up @@ -47,6 +48,7 @@
private final Map<Tags, Timer> handleTimers = new ConcurrentHashMap<>();
private final Map<Tags, Timer> submitTimers = new ConcurrentHashMap<>();
private final MeterRegistry meterRegistry;
private final NodeSupplier nodeSupplier;
private final PublishProperties publishProperties;

public void onSuccess(PublishResponse response) {
Expand All @@ -61,11 +63,18 @@

private void recordMetric(PublishRequest request, PublishResponse response, String status) {
try {
String node = Optional.ofNullable(request.getTransaction().getNodeAccountIds())
String nodeAccount = Optional.ofNullable(request.getTransaction().getNodeAccountIds())
.filter(l -> !l.isEmpty())
.map(l -> l.get(0))
.map(AccountId::toString)
.orElse(UNKNOWN);

var node = nodeSupplier.get(nodeAccount);
if (node == null) {
log.warn("Unable to find node {}", nodeAccount);
return;

Check warning on line 75 in hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java

View check run for this annotation

Codecov / codecov/patch

hedera-mirror-monitor/src/main/java/com/hedera/mirror/monitor/publish/PublishMetrics.java#L74-L75

Added lines #L74 - L75 were not covered by tests
}

long startTime = request.getTimestamp().toEpochMilli();
long endTime = response != null ? response.getTimestamp().toEpochMilli() : System.currentTimeMillis();
Tags tags = new Tags(node, request.getScenario(), status);
Expand All @@ -90,7 +99,9 @@
return TimeGauge.builder(METRIC_DURATION, tags.getScenario(), unit, s -> s.getElapsed()
.toNanos())
.description("The amount of time this scenario has been publishing transactions")
.tag(Tags.TAG_NODE, tags.getNode())
.tag(Tags.TAG_HOST, String.valueOf(tags.getNode().getHost()))
.tag(Tags.TAG_NODE, String.valueOf(tags.getNode().getNodeId()))
.tag(Tags.TAG_PORT, String.valueOf(tags.getNode().getPort()))
.tag(Tags.TAG_SCENARIO, tags.getScenario().getName())
.tag(Tags.TAG_TYPE, tags.getType())
.register(meterRegistry);
Expand All @@ -99,7 +110,9 @@
private Timer newHandleMetric(Tags tags) {
return Timer.builder(METRIC_HANDLE)
.description("The time it takes from submit to being handled by the main nodes")
.tag(Tags.TAG_NODE, tags.getNode())
.tag(Tags.TAG_HOST, String.valueOf(tags.getNode().getHost()))
.tag(Tags.TAG_NODE, String.valueOf(tags.getNode().getNodeId()))
.tag(Tags.TAG_PORT, String.valueOf(tags.getNode().getPort()))
.tag(Tags.TAG_SCENARIO, tags.getScenario().getName())
.tag(Tags.TAG_STATUS, tags.getStatus())
.tag(Tags.TAG_TYPE, tags.getType())
Expand All @@ -109,7 +122,9 @@
private Timer newSubmitMetric(Tags tags) {
return Timer.builder(METRIC_SUBMIT)
.description("The time it takes to submit a transaction")
.tag(Tags.TAG_NODE, tags.getNode())
.tag(Tags.TAG_HOST, String.valueOf(tags.getNode().getHost()))
.tag(Tags.TAG_NODE, String.valueOf(tags.getNode().getNodeId()))
.tag(Tags.TAG_PORT, String.valueOf(tags.getNode().getPort()))
.tag(Tags.TAG_SCENARIO, tags.getScenario().getName())
.tag(Tags.TAG_STATUS, tags.getStatus())
.tag(Tags.TAG_TYPE, tags.getType())
Expand Down Expand Up @@ -146,12 +161,14 @@

@Value
class Tags {
static final String TAG_HOST = "host";
static final String TAG_NODE = "node";
static final String TAG_PORT = "port";
static final String TAG_SCENARIO = "scenario";
static final String TAG_STATUS = "status";
static final String TAG_TYPE = "type";

private final String node;
private final NodeProperties node;
private final PublishScenario scenario;
private final String status;

Expand Down
1 change: 1 addition & 0 deletions hedera-mirror-monitor/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ management:
metrics:
tags:
application: ${spring.application.name}
environment: ${hedera.mirror.monitor.network}
prometheus:
metrics:
export:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ void getNoValidNodes() {
.hasMessageContaining("No valid nodes available");
}

@Test
void getByAccountId() {
monitorProperties.getNodeValidation().setEnabled(false);
nodeSupplier.validateNode(node);
assertThat(nodeSupplier.get(node.getAccountId())).isEqualTo(node);
}

@Test
void getByAccountIdMissing() {
assertThat(nodeSupplier.get("0.0.100000")).isNull();
}

@Test
void init() {
cryptoServiceStub.addQuery(Mono.just(receipt(SUCCESS)));
Expand Down Expand Up @@ -266,6 +278,7 @@ void validationRecovers() {
assertThatThrownBy(() -> nodeSupplier.get())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("No valid nodes available");
assertThat(nodeSupplier.get(node.getAccountId())).isNull();

// When it recovers
cryptoServiceStub.addQuery(Mono.just(receipt(SUCCESS)));
Expand All @@ -274,6 +287,7 @@ void validationRecovers() {

// Then it is marked as healthy
assertThat(nodeSupplier.get()).isEqualTo(node);
assertThat(nodeSupplier.get(node.getAccountId())).isEqualTo(node);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.platform.commons.util.ReflectionUtils.getDeclaredConstructor;
import static org.mockito.Mockito.doReturn;

import com.hedera.hashgraph.sdk.AccountId;
import com.hedera.hashgraph.sdk.PrecheckStatusException;
Expand All @@ -26,6 +27,8 @@
import com.hedera.hashgraph.sdk.TransactionId;
import com.hedera.hashgraph.sdk.TransactionReceipt;
import com.hedera.hashgraph.sdk.proto.ResponseCodeEnum;
import com.hedera.mirror.monitor.NodeProperties;
import com.hedera.mirror.monitor.publish.PublishMetrics.Tags;
import com.hedera.mirror.monitor.publish.transaction.TransactionType;
import io.grpc.Status;
import io.micrometer.core.instrument.Gauge;
Expand All @@ -35,7 +38,6 @@
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.lang.reflect.Constructor;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.SneakyThrows;
Expand All @@ -44,30 +46,41 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;

@ExtendWith(OutputCaptureExtension.class)
@ExtendWith({MockitoExtension.class, OutputCaptureExtension.class})
class PublishMetricsTest {

private static final String NODE_ACCOUNT_ID = "0.0.3";
private static final String SCENARIO_NAME = "test";

@Mock
private NodeSupplier nodeSupplier;

private MeterRegistry meterRegistry;
private PublishMetrics publishMetrics;
private PublishProperties publishProperties;
private PublishScenario publishScenario;
private NodeProperties node;

@BeforeEach
void setup() {
meterRegistry = new SimpleMeterRegistry();
publishProperties = new PublishProperties();
publishMetrics = new PublishMetrics(meterRegistry, publishProperties);
publishMetrics = new PublishMetrics(meterRegistry, nodeSupplier, publishProperties);

PublishScenarioProperties publishScenarioProperties = new PublishScenarioProperties();
publishScenarioProperties.setName(SCENARIO_NAME);
publishScenarioProperties.setType(TransactionType.CONSENSUS_SUBMIT_MESSAGE);
publishScenario = new PublishScenario(publishScenarioProperties);

node = new NodeProperties();
node.setAccountId("0.0.3");
node.setHost("127.0.0.1");
node.setNodeId(0L);
doReturn(node).when(nodeSupplier).get(node.getAccountId());
}

@Test
Expand Down Expand Up @@ -211,18 +224,19 @@ private <T extends Meter> ObjectAssert<T> assertMetric(Iterable<T> meters) {
return assertThat(meters)
.hasSize(1)
.first()
.returns(NODE_ACCOUNT_ID, t -> t.getId().getTag(PublishMetrics.Tags.TAG_NODE))
.returns(SCENARIO_NAME, t -> t.getId().getTag(PublishMetrics.Tags.TAG_SCENARIO))
.returns(String.valueOf(node.getNodeId()), t -> t.getId().getTag(Tags.TAG_NODE))
.returns(node.getHost(), t -> t.getId().getTag(Tags.TAG_HOST))
.returns(String.valueOf(node.getPort()), t -> t.getId().getTag(Tags.TAG_PORT))
.returns(SCENARIO_NAME, t -> t.getId().getTag(Tags.TAG_SCENARIO))
.returns(TransactionType.CONSENSUS_SUBMIT_MESSAGE.toString(), t -> t.getId()
.getTag(PublishMetrics.Tags.TAG_TYPE));
.getTag(Tags.TAG_TYPE));
}

private PublishRequest request() {
List<AccountId> nodeAccountIds = List.of(AccountId.fromString(NODE_ACCOUNT_ID));
return PublishRequest.builder()
.scenario(publishScenario)
.timestamp(Instant.now().minusSeconds(5L))
.transaction(new TopicMessageSubmitTransaction().setNodeAccountIds(nodeAccountIds))
.transaction(new TopicMessageSubmitTransaction().setNodeAccountIds(node.getAccountIds()))
.build();
}

Expand Down
Loading