Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronized to main repository #23

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions nebulous/ems-core/baguette-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
<name>EMS - Baguette Client</name>

<properties>
<atomix.version>3.1.12</atomix.version>

<!-- io.fabricat8 docker-maven-plugin properties -->
<docker-maven-plugin.version>0.43.2</docker-maven-plugin.version>
<docker.image.name>ems-client</docker.image.name>
<docker.user>emsuser</docker.user>
<docker.user.home>/opt/baguette-client</docker.user.home>
Expand Down Expand Up @@ -145,7 +141,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<version>${exec-maven-plugin.version}</version>
<executions>
<execution>
<goals>
Expand Down Expand Up @@ -196,7 +192,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<version>1.2.0</version>
<version>${properties-maven-plugin.version}</version>
<executions>
<execution>
<id>read-docker-image-properties</id>
Expand Down Expand Up @@ -228,7 +224,7 @@
<!-- Set docker image properties -->
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>3.1.0</version>
<version>${maven-antrun-plugin.version}</version>
<executions>
<execution>
<id>set-docker-properties</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package gr.iccs.imu.ems.baguette.client;

import gr.iccs.imu.ems.baguette.client.cluster.ClusterManagerProperties;
import gr.iccs.imu.ems.baguette.client.collector.generator.ClientGeneratorCollector;
import gr.iccs.imu.ems.baguette.client.collector.netdata.K8sNetdataCollector;
import gr.iccs.imu.ems.baguette.client.collector.prometheus.PrometheusCollector2;
import gr.iccs.imu.ems.baguette.client.plugin.recovery.SelfHealingPlugin;
Expand Down Expand Up @@ -48,7 +49,7 @@ public class BaguetteClient implements ApplicationRunner {
private final ConfigurableApplicationContext applicationContext;

private final List<Class<? extends IClientCollector>> DEFAULT_COLLECTORS_LIST = List.of(
K8sNetdataCollector.class, PrometheusCollector2.class
K8sNetdataCollector.class, PrometheusCollector2.class, ClientGeneratorCollector.class
);

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,8 @@ private void sendClientProperty(String propertyName, String propertyValue) {

@SneakyThrows
private void getStatistics(String inputUuid) {
Map<String,Object> statsMap = brokerCepService.getBrokerCepStatistics();
//Map<String,Object> statsMap = brokerCepService.getBrokerCepStatistics();
Map<String,Object> statsMap = _collectAndSendStatistics(false);
log.debug("Statistics: {}", statsMap);
if (out!=null) out.println("-INPUT:"+inputUuid+":"+SerializationUtil.serializeToString(statsMap));
}
Expand All @@ -1313,17 +1314,7 @@ private void collectStatistics() {
boolean result = systemResourceMonitor.runImmediatelyBlocking(-1); // >=0: timeout in millis; <0: wait forever
log.debug("Running system metrics collection... {}", result ? "done" : "cancel/timeout");

// Collect metrics
Map<String, Object> statsMap = brokerCepService.getBrokerCepStatistics();
log.debug("BCEP Statistics: {}", statsMap);
Map<String, Object> sysMap = systemResourceMonitor.getLatestMeasurements();
log.debug("System Statistics: {}", sysMap);

// Prepare and send response
Map<String, Object> clientStats = new HashMap<>();
if (statsMap!=null) clientStats.putAll(statsMap);
if (sysMap!=null) clientStats.putAll(sysMap);
if (out!=null) out.println("-STATS:" + SerializationUtil.serializeToString(statsMap));
_collectAndSendStatistics(true);
} catch (Exception ex) {
log.error("Exception while getting Statistics to server: ", ex);
}
Expand All @@ -1333,22 +1324,29 @@ private void collectStatistics() {
private void sendStatisticsStart() {
statsSendTask = taskScheduler.scheduleWithFixedDelay(() -> {
try {
Map<String, Object> statsMap = brokerCepService.getBrokerCepStatistics();
log.debug("BCEP Statistics: {}", statsMap);
Map<String, Object> sysMap = systemResourceMonitor.getLatestMeasurements();
log.debug("System Statistics: {}", sysMap);

Map<String, Object> clientStats = new HashMap<>();
if (statsMap!=null) clientStats.putAll(statsMap);
if (sysMap!=null) clientStats.putAll(sysMap);
if (out != null) out.println("-STATS:" + SerializationUtil.serializeToString(clientStats));
_collectAndSendStatistics(true);
} catch (Exception ex) {
log.error("Exception while sending Statistics to server: ", ex);
}
}, Duration.ofMillis(baguetteClient.getBaguetteClientProperties().getSendStatisticsDelay()));
log.info("Start sending STATS to server");
}

private Map<String, Object> _collectAndSendStatistics(boolean sendStats) throws IOException {
// Collect metrics
Map<String, Object> statsMap = brokerCepService.getBrokerCepStatistics();
log.debug("BCEP Statistics: {}", statsMap);
Map<String, Object> sysMap = systemResourceMonitor.getLatestMeasurements();
log.debug("System Statistics: {}", sysMap);

// Prepare and send response
Map<String, Object> clientStats = new HashMap<>();
if (statsMap!=null) clientStats.putAll(statsMap);
if (sysMap!=null) clientStats.putAll(sysMap);
if (sendStats && out!=null) out.println("-STATS:" + SerializationUtil.serializeToString(statsMap));
return clientStats;
}

@SneakyThrows
private void sendStatisticsStop() {
statsSendTask.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/

package gr.iccs.imu.ems.baguette.client.collector.generator;

import gr.iccs.imu.ems.baguette.client.IClientCollector;
import gr.iccs.imu.ems.baguette.client.collector.ClientCollectorContext;
import gr.iccs.imu.ems.common.collector.CollectorContext;
import gr.iccs.imu.ems.common.collector.generator.GeneratorCollectorProperties;
import gr.iccs.imu.ems.util.EventBus;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
* Generates measurements
*/
@Slf4j
@Component
public class ClientGeneratorCollector extends gr.iccs.imu.ems.common.collector.generator.GeneratorCollector implements IClientCollector {
private List<Map<String,Object>> configurations;

public ClientGeneratorCollector(@NonNull GeneratorCollectorProperties properties,
@NonNull CollectorContext collectorContext,
@NonNull TaskScheduler taskScheduler,
@NonNull EventBus<String, Object, Object> eventBus)
{
super("ClientGeneratorCollector", properties, collectorContext, taskScheduler, eventBus);
if (!(collectorContext instanceof ClientCollectorContext))
throw new IllegalArgumentException("Invalid CollectorContext provided. Expected: ClientCollectorContext, but got "+collectorContext.getClass().getName());
}

@Override
public void setConfiguration(Object config) {
log.info("Collectors::{}: setConfiguration: {}", collectorId, config);
}

@Override
public synchronized void activeGroupingChanged(String oldGrouping, String newGrouping) {
}
}
2 changes: 1 addition & 1 deletion nebulous/ems-core/baguette-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
<version>3.1.4</version>
<version>${glassfish.jersey.core.version}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ public Map<String,Object> getBrokerCepStatistics() {
bcepStats.put("count-total-events-object", BrokerCepConsumer.getObjectEventCounter());
bcepStats.put("count-total-events-other", BrokerCepConsumer.getOtherEventCounter());
bcepStats.put("count-total-events-failures", BrokerCepConsumer.getEventFailuresCounter());
bcepStats.put("count-cep-events", CepService.getEventCounter());

bcepStats.put("latest-events", eventCache.asList());

Expand All @@ -395,6 +396,7 @@ public Map<String,Object> getBrokerCepStatistics() {
public void clearBrokerCepStatistics() {
BrokerCepStatementSubscriber.clearCounters();
BrokerCepConsumer.clearCounters();
CepService.clearCounters();
log.debug("BrokerCepService.clearBrokerCepStatistics(): broker-CEP statistics cleared");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/

package gr.iccs.imu.ems.brokercep;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import gr.iccs.imu.ems.brokercep.cep.CepService;
import gr.iccs.imu.ems.brokercep.properties.BrokerCepProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
@Service
@RequiredArgsConstructor
public class StatsPrinter implements InitializingBean, Runnable {
private final BrokerCepProperties properties;
private final TaskScheduler taskScheduler;
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();

@Override
public void afterPropertiesSet() throws Exception {
if (properties.isStatsPrinterEnabled()) {
taskScheduler.scheduleAtFixedRate(this,
Instant.now().plusSeconds(properties.getStatsPrinterInitDelay()),
Duration.ofSeconds(properties.getStatsPrinterRate()));
log.info("BCEP Statistics Printer enabled");
} else {
log.info("BCEP Statistics Printer disabled");
}
}

@Override
public void run() {
Map<String, Long> stats = new LinkedHashMap<>();
stats.put("timestamp", Instant.now().toEpochMilli());
stats.put("OUT.publish.success", BrokerCepStatementSubscriber.getLocalPublishSuccessCounter());
stats.put("OUT.publish.failure", BrokerCepStatementSubscriber.getLocalPublishFailureCounter());
stats.put("OUT.forward.success", BrokerCepStatementSubscriber.getForwardSuccessCounter());
stats.put("OUT.forward.failure", BrokerCepStatementSubscriber.getForwardFailureCounter());
stats.put("IN.receive.success", BrokerCepConsumer.getEventCounter());
stats.put("IN.receive.failure", BrokerCepConsumer.getEventFailuresCounter());
stats.put("CEP.incoming.events", CepService.getEventCounter());

if (properties.isStatsPrinterAsJson())
log.info("BCEP statistics:\n{}", gson.toJson(stats));
if (properties.isStatsPrinterAsCsv())
log.info("BCEP statistics:\n{}\n{}",
String.join(",", stats.keySet()),
stats.values().stream().map(l->Long.toString(l)).collect(Collectors.joining(",")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package gr.iccs.imu.ems.brokercep.cep;

import com.espertech.esper.client.*;
import com.google.gson.Gson;
import gr.iccs.imu.ems.brokercep.event.EventMap;
import gr.iccs.imu.ems.util.FunctionDefinition;
import gr.iccs.imu.ems.util.StrUtil;
Expand All @@ -22,14 +21,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

@Slf4j
@Service
@RequiredArgsConstructor
public class CepService implements InitializingBean {

private final Gson gson;
private final static AtomicLong eventCounter = new AtomicLong(0);

/**
* Esper service
Expand Down Expand Up @@ -127,6 +126,7 @@ public void handleEvent(Map<String, Object> event, String eventType) {
log.debug("CepService.handleEvent(): type={}, event={}", eventType, event.toString());
EventMap.checkEvent(event);
epService.getEPRuntime().sendEvent(event, eventType);
eventCounter.incrementAndGet();
}

/**
Expand All @@ -137,6 +137,7 @@ public void handleEvent(String event, String eventType) {
EventMap eventMap = EventMap.parseEventMap(event);
log.trace("CepService.handleEvent(): event-map={}", eventMap);
epService.getEPRuntime().sendEvent(eventMap, eventType);
eventCounter.incrementAndGet();
}

/**
Expand All @@ -146,6 +147,7 @@ public void handleEvent(Object event) {
log.debug("CepService.handleEvent(): event={}", event);
EventMap.checkEvent(StrUtil.castToMapStringObject(event));
epService.getEPRuntime().sendEvent(event);
eventCounter.incrementAndGet();
}

/**
Expand Down Expand Up @@ -221,4 +223,12 @@ public void clearConstants() {
log.debug("CepService.clearConstants(): Clear constants");
MathUtil.clearConstants();
}

public static long getEventCounter() {
return eventCounter.get();
}

public static synchronized void clearCounters() {
eventCounter.set(0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public void afterPropertiesSet() {
private boolean eventCacheEnabled = true;
private int eventCacheSize = EventCache.DEFAULT_EVENT_CACHE_SIZE;

private boolean statsPrinterEnabled;
private boolean statsPrinterAsJson = true;
private boolean statsPrinterAsCsv = false;
private long statsPrinterInitDelay = 30;
private long statsPrinterRate = 30;

@Data
public static class Usage {
private Memory memory = new Memory();
Expand Down
12 changes: 11 additions & 1 deletion nebulous/ems-core/broker-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@
<artifactId>activemq-client</artifactId>
<version>${activemq.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-amqp</artifactId>
<version>${activemq.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>${qpid-jms-client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
Expand Down Expand Up @@ -74,7 +84,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<version>${maven-assembly-plugin.version}</version>

<configuration>
<descriptorRefs>
Expand Down
Loading