Skip to content

Commit

Permalink
Metrics for monitoring endpoints (#24)
Browse files Browse the repository at this point in the history
* Add metrics for avg latency by partition, avg offset difference, data depth by partition, estimated data depth

* Metrics: change histograms to gauges, change offset-dif to depth-bytes

* Metrics: fix latency calculation

* Metrics: add testing for the metrics

* Metrics: fix thread safety issues and remove unnecessary function in MetricsConfig

* Improve metric unit tests

* Logging reconnectInterval fixed, move OutputFake to test folder
  • Loading branch information
51-code authored Aug 16, 2024
1 parent fa85d46 commit 79b1032
Show file tree
Hide file tree
Showing 11 changed files with 845 additions and 164 deletions.
189 changes: 189 additions & 0 deletions src/main/java/com/teragrep/aer_01/DefaultOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Teragrep Azure Eventhub Reader
* Copyright (C) 2023 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://github.com/teragrep/teragrep/blob/main/LICENSE>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.aer_01;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.teragrep.aer_01.config.RelpConfig;
import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.RelpConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.TimeoutException;

import static com.codahale.metrics.MetricRegistry.name;

// TODO unify, this is a copy from cfe_35 which is a copy from rlo_10 with FIXES
final class DefaultOutput implements Output {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultOutput.class);

private final RelpConnection relpConnection;
private final String relpAddress;
private final int relpPort;
private final int reconnectInterval;

// metrics
private final Counter records;
private final Counter bytes;
private final Counter resends;
private final Counter connects;
private final Counter retriedConnects;
private final Timer sendLatency;
private final Timer connectLatency;


DefaultOutput(
String name,
RelpConfig relpConfig,
MetricRegistry metricRegistry) {
this.relpAddress = relpConfig.destinationAddress;
this.relpPort = relpConfig.destinationPort;
this.reconnectInterval = relpConfig.reconnectInterval;

this.relpConnection = new RelpConnection();
this.relpConnection.setConnectionTimeout(relpConfig.connectionTimeout);
this.relpConnection.setReadTimeout(relpConfig.readTimeout);
this.relpConnection.setWriteTimeout(relpConfig.writeTimeout);

this.records = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "records"));
this.bytes = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "bytes"));
this.resends = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "resends"));
this.connects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "connects"));
this.retriedConnects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "retriedConnects"));
this.sendLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "sendLatency"));
this.connectLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "connectLatency"));

connect();
}

private void connect() {
boolean connected = false;
while (!connected) {
try (final Timer.Context context = connectLatency.time()) {
connected = this.relpConnection.connect(relpAddress, relpPort);
connects.inc();
} catch (IOException | TimeoutException e) {
LOGGER.error("Exception while connecting to <[{}]>:<[{}]>", relpAddress, relpPort, e);
} catch (UnresolvedAddressException e) {
LOGGER.error("Can't resolve address of target <[{}]>", relpAddress, e);
}

if (!connected) {
try {
Thread.sleep(reconnectInterval);
retriedConnects.inc();
} catch (InterruptedException e) {
LOGGER.warn("Sleep interrupted while waiting for reconnectInterval <[{}]> on <[{}]>:<[{}]>", reconnectInterval, relpAddress, relpPort, e);
}
}
}
}


@Override
public void accept(byte[] syslogMessage) {
try (final Timer.Context context = sendLatency.time()) {
RelpBatch batch = new RelpBatch();
batch.insert(syslogMessage);

boolean allSent = false;
while (!allSent) {
try {
this.relpConnection.commit(batch);

// metrics
// NOTICE these if batch size changes
records.inc(1);
bytes.inc(syslogMessage.length);

} catch (IllegalStateException | IOException | TimeoutException e) {
LOGGER.error("Exception while committing a batch to <[{}]>:<[{}]>", relpAddress, relpPort, e);
}
// Check if everything has been sent, retry and reconnect if not.
if (!batch.verifyTransactionAll()) {
batch.retryAllFailed();

// metrics
// NOTICE this if batch size changes
resends.inc(1);
relpConnection.tearDown();
try {
Thread.sleep(reconnectInterval);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
connect();
} else {
allSent = true;
}
}
}
}

@Override
public String toString() {
return "DefaultOutput{" +
"relpAddress='" + relpAddress + '\'' +
", relpPort=" + relpPort +
'}';
}

public void close() {
try {
relpConnection.disconnect();
}
catch (IOException | TimeoutException e) {
LOGGER.warn("Exception while disconnecting from <[{}]>:<[{}]>", relpAddress, relpPort, e);
}
finally {
relpConnection.tearDown();
}
}
}
112 changes: 95 additions & 17 deletions src/main/java/com/teragrep/aer_01/EventContextConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,44 +46,80 @@
package com.teragrep.aer_01;

import com.azure.messaging.eventhubs.models.EventContext;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.*;
import com.codahale.metrics.jmx.JmxReporter;
import com.teragrep.aer_01.config.RelpConfig;
import com.teragrep.aer_01.config.SyslogConfig;
import com.teragrep.aer_01.config.source.Sourceable;
import com.teragrep.rlo_14.Facility;
import com.teragrep.rlo_14.SDElement;
import com.teragrep.rlo_14.Severity;
import com.teragrep.rlo_14.SyslogMessage;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.MetricsServlet;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static com.codahale.metrics.MetricRegistry.name;

final class EventContextConsumer implements AutoCloseable, Consumer<EventContext> {

private final Output output;
private final String realHostName;
private final SyslogConfig syslogConfig;
EventContextConsumer(Sourceable configSource) {
RelpConfig relpConfig = new RelpConfig(configSource);

this.output = new Output(
"defaultOutput",
relpConfig.destinationAddress,
relpConfig.destinationPort,
relpConfig.connectionTimeout,
relpConfig.readTimeout,
relpConfig.writeTimeout,
relpConfig.reconnectInterval,
new MetricRegistry()
private final MetricRegistry metricRegistry;
private final JmxReporter jmxReporter;
private final Slf4jReporter slf4jReporter;
private final Server jettyServer;

// metrics
private final AtomicLong records = new AtomicLong();
private final AtomicLong allSize = new AtomicLong();

EventContextConsumer(Sourceable configSource, int prometheusPort) {
this(configSource, new MetricRegistry(), prometheusPort);
}

EventContextConsumer(Sourceable configSource, MetricRegistry metricRegistry, int prometheusPort) {
this(
configSource,
new DefaultOutput("defaultOutput", new RelpConfig(configSource), metricRegistry),
metricRegistry,
prometheusPort
);
}

EventContextConsumer(Sourceable configSource, Output output, MetricRegistry metricRegistry, int prometheusPort) {
this.metricRegistry = metricRegistry;
this.output = output;
this.realHostName = getRealHostName();
this.syslogConfig = new SyslogConfig(configSource);

this.jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
this.slf4jReporter = Slf4jReporter
.forRegistry(metricRegistry)
.outputTo(LoggerFactory.getLogger(EventContextConsumer.class))
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
jettyServer = new Server(prometheusPort);
startMetrics();

metricRegistry.register(name(EventContextConsumer.class, "estimated-data-depth"),
(Gauge<Double>) () -> (allSize.get() / records.doubleValue()) / records.doubleValue());
}

private String getRealHostName() {
Expand All @@ -96,8 +132,50 @@ private String getRealHostName() {
return hostname;
}

private void startMetrics() {
this.jmxReporter.start();
this.slf4jReporter.start(1, TimeUnit.MINUTES);

// prometheus-exporter
CollectorRegistry.defaultRegistry.register(new DropwizardExports(metricRegistry));

ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
jettyServer.setHandler(context);

MetricsServlet metricsServlet = new MetricsServlet();
ServletHolder servletHolder = new ServletHolder(metricsServlet);
context.addServlet(servletHolder, "/metrics");

// Start the webserver.
try {
jettyServer.start();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void accept(EventContext eventContext) {
int messageLength = eventContext.getEventData().getBody().length;
String partitionId = eventContext.getPartitionContext().getPartitionId();

records.incrementAndGet();
allSize.addAndGet(messageLength);

metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", partitionId), () -> new Gauge<Long>() {
@Override
public Long getValue() {
return Instant.now().getEpochSecond() - eventContext.getEventData().getEnqueuedTime().getEpochSecond();
}
});
metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", partitionId), () -> new Gauge<Long>() {
@Override
public Long getValue() {
return eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset();
}
});

String eventUuid = eventContext.getEventData().getMessageId();

Expand Down Expand Up @@ -136,12 +214,9 @@ public void accept(EventContext eventContext) {
// TODO metrics about these vs last retrieved, these are tracked per partition!:
eventContext.getLastEnqueuedEventProperties().getEnqueuedTime();
eventContext.getLastEnqueuedEventProperties().getSequenceNumber();
eventContext.getLastEnqueuedEventProperties().getOffset();
eventContext.getLastEnqueuedEventProperties().getRetrievalTime(); // null if not retrieved
// TODO compare these to above
eventContext.getEventData().getOffset();
eventContext.getEventData().getEnqueuedTime();
eventContext.getEventData().getPartitionKey();
eventContext.getEventData().getProperties();
*/
Expand All @@ -166,7 +241,10 @@ public void accept(EventContext eventContext) {
}

@Override
public void close() {
public void close() throws Exception {
output.close();
slf4jReporter.close();
jmxReporter.close();
jettyServer.stop();
}
}
8 changes: 4 additions & 4 deletions src/main/java/com/teragrep/aer_01/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.teragrep.aer_01.config.AzureConfig;
import com.teragrep.aer_01.config.MetricsConfig;
import com.teragrep.aer_01.config.source.EnvironmentSource;
import com.teragrep.aer_01.config.source.PropertySource;
import com.teragrep.aer_01.config.source.Sourceable;

import java.io.IOException;

// https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send?tabs=passwordless%2Croles-azure-portal

public final class Main {

public static void main(String[] args) throws IOException, InterruptedException {
public static void main(String[] args) throws Exception {
final Sourceable configSource = getConfigSource();
final int prometheusPort = new MetricsConfig(configSource).prometheusPort;

try (final EventContextConsumer PARTITION_PROCESSOR = new EventContextConsumer(configSource)) {
try (final EventContextConsumer PARTITION_PROCESSOR = new EventContextConsumer(configSource, prometheusPort)) {
AzureConfig azureConfig = new AzureConfig(configSource);
final ErrorContextConsumer ERROR_HANDLER = new ErrorContextConsumer();

Expand Down
Loading

0 comments on commit 79b1032

Please sign in to comment.