This repository has been archived by the owner on Oct 11, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
sagfcp
committed
Mar 29, 2020
0 parents
commit 17625b3
Showing
8 changed files
with
391 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>com.softwareag</groupId> | ||
<artifactId>measurement-aggregator</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
|
||
<properties> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
<c8y.version>1005.6.0</c8y.version> | ||
<maven.compiler.source>1.8</maven.compiler.source> | ||
<maven.compiler.target>1.8</maven.compiler.target> | ||
</properties> | ||
|
||
<repositories> | ||
<repository> | ||
<id>cumulocity</id> | ||
<layout>default</layout> | ||
<url>http://download.cumulocity.com/maven/repository</url> | ||
</repository> | ||
</repositories> | ||
<pluginRepositories> | ||
<pluginRepository> | ||
<id>cumulocity</id> | ||
<layout>default</layout> | ||
<url>http://download.cumulocity.com/maven/repository</url> | ||
</pluginRepository> | ||
</pluginRepositories> | ||
|
||
|
||
<dependencyManagement> | ||
<dependencies> | ||
<dependency> | ||
<groupId>com.nsn.cumulocity.clients-java</groupId> | ||
<artifactId>java-client</artifactId> | ||
<version>${c8y.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.nsn.cumulocity.model</groupId> | ||
<artifactId>device-capability-model</artifactId> | ||
<version>${c8y.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.nsn.cumulocity.clients-java</groupId> | ||
<artifactId>microservice-dependencies</artifactId> | ||
<version>${c8y.version}</version> | ||
<type>pom</type> | ||
<scope>import</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-databind</artifactId> | ||
<version>2.9.7</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.datatype</groupId> | ||
<artifactId>jackson-datatype-joda</artifactId> | ||
<version>2.9.7</version> | ||
</dependency> | ||
</dependencies> | ||
</dependencyManagement> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>com.nsn.cumulocity.clients-java</groupId> | ||
<artifactId>java-client</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.nsn.cumulocity.model</groupId> | ||
<artifactId>device-capability-model</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.nsn.cumulocity.clients-java</groupId> | ||
<artifactId>microservice-autoconfigure</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-databind</artifactId> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-maven-plugin</artifactId> | ||
<executions> | ||
<execution> | ||
<goals> | ||
<goal>repackage</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
<plugin> | ||
<groupId>com.nsn.cumulocity.clients-java</groupId> | ||
<artifactId>microservice-package-maven-plugin</artifactId> | ||
<version>${c8y.version}</version> | ||
<executions> | ||
<execution> | ||
<id>package</id> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>package</goal> | ||
</goals> | ||
<configuration> | ||
<name>${project.artifactId}</name> | ||
<image>${project.artifactId}</image> | ||
<encoding>UTF-8</encoding> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
|
||
</project> |
25 changes: 25 additions & 0 deletions
25
measurement-aggregator/src/main/configuration/cumulocity.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
{ | ||
"apiVersion":"1", | ||
"version":"1.0-SNAPSHOT", | ||
"provider": { | ||
"name":"Software AG" | ||
}, | ||
"isolation":"MULTI_TENANT", | ||
"requiredRoles": [ | ||
"ROLE_INVENTORY_READ", | ||
"ROLE_INVENTORY_ADMIN", | ||
"ROLE_INVENTORY_CREATE", | ||
"ROLE_EVENT_READ", | ||
"ROLE_EVENT_ADMIN", | ||
"ROLE_ALARM_READ", | ||
"ROLE_ALARM_ADMIN", | ||
"ROLE_IDENTITY_READ", | ||
"ROLE_IDENTITY_ADMIN", | ||
"ROLE_DEVICE_CONTROL_READ", | ||
"ROLE_DEVICE_CONTROL_ADMIN", | ||
"ROLE_MEASUREMENT_READ", | ||
"ROLE_MEASUREMENT_ADMIN" | ||
], | ||
"roles":[ | ||
] | ||
} |
18 changes: 18 additions & 0 deletions
18
measurement-aggregator/src/main/java/com/sag/aggregator/App.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package com.sag.aggregator; | ||
|
||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.boot.SpringApplication; | ||
|
||
import com.cumulocity.microservice.autoconfigure.MicroserviceApplication; | ||
|
||
|
||
@MicroserviceApplication | ||
public class App { | ||
|
||
@Autowired | ||
RestService restController; | ||
|
||
public static void main(String[] args) { | ||
SpringApplication.run(App.class, args); | ||
} | ||
} |
160 changes: 160 additions & 0 deletions
160
measurement-aggregator/src/main/java/com/sag/aggregator/MeanAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
package com.sag.aggregator; | ||
|
||
import java.io.IOException; | ||
import java.io.UnsupportedEncodingException; | ||
import java.math.BigDecimal; | ||
import java.math.RoundingMode; | ||
import java.net.URLEncoder; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.TreeMap; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.StreamSupport; | ||
|
||
import org.joda.time.DateTime; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.stereotype.Service; | ||
|
||
import com.cumulocity.model.idtype.GId; | ||
import com.cumulocity.rest.representation.inventory.ManagedObjectRepresentation; | ||
import com.cumulocity.rest.representation.measurement.MeasurementRepresentation; | ||
import com.cumulocity.sdk.client.Param; | ||
import com.cumulocity.sdk.client.QueryParam; | ||
import com.cumulocity.sdk.client.inventory.InventoryApi; | ||
import com.cumulocity.sdk.client.measurement.MeasurementApi; | ||
import com.cumulocity.sdk.client.measurement.MeasurementFilter; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
@Service | ||
public class MeanAggregator { | ||
|
||
private final Logger logger = LoggerFactory.getLogger(getClass()); | ||
|
||
@Autowired | ||
private InventoryApi inventoryApi; | ||
|
||
@Autowired | ||
private MeasurementApi measurementApi; | ||
|
||
public String meanAggregateMeasurements(String groupId, String fragmentType, | ||
List<String> series, DateTime fromDate, DateTime toDate, long interval) { | ||
Map<DateTime, Map<String, List<BigDecimal>>> unaggregatedMeasurements = new HashMap<DateTime, Map<String, List<BigDecimal>>>(); | ||
Map<DateTime, Map<String, BigDecimal>> measurements = new TreeMap<DateTime, Map<String, BigDecimal>>(); | ||
|
||
ObjectMapper mapper = new ObjectMapper(); | ||
|
||
Iterable<GId> mors; | ||
ManagedObjectRepresentation group = inventoryApi.get(new GId(groupId)); | ||
if (group.getProperty("c8y_IsDynamicGroup") != null) { | ||
String queryParam = group.getProperty("c8y_DeviceQueryString").toString(); | ||
QueryParam query = null; | ||
try { | ||
query = new QueryParam(new Param() { | ||
|
||
@Override | ||
public String getName() { | ||
return "query"; | ||
} | ||
|
||
}, URLEncoder.encode(queryParam.substring(0, queryParam.length() - 1).replace("$filter=(", ""), "utf-8")); | ||
} catch (UnsupportedEncodingException e) { | ||
e.printStackTrace(); | ||
} | ||
mors = StreamSupport.stream(inventoryApi.getManagedObjects().get(query).allPages().spliterator(), false).map(mor -> { | ||
logger.info("Found device {}", mor.getName()); | ||
return mor.getId(); | ||
}).collect(Collectors.toList()); | ||
} else { | ||
mors = StreamSupport.stream(inventoryApi.getManagedObjectApi(group.getId()).getChildAssets().get().allPages().spliterator(), false).map(morr -> morr.getManagedObject().getId()).collect(Collectors.toList()); | ||
} | ||
mors.forEach(mor -> { | ||
MeasurementFilter filter = new MeasurementFilter().byDate(fromDate.toDate(), toDate.toDate()) | ||
.bySource(mor).byValueFragmentType(fragmentType); | ||
Iterator<MeasurementRepresentation> allMeasurements = measurementApi.getMeasurementsByFilter(filter).get() | ||
.allPages().iterator(); | ||
if (allMeasurements.hasNext()) { | ||
DateTime[] currentDate = {fromDate}; | ||
MeasurementRepresentation currentMeasurement = allMeasurements.next(); | ||
while (currentDate[0].isBefore(toDate)) { | ||
final int[] i = {0}; | ||
final Map<String, BigDecimal> values = new HashMap<>(); | ||
series.forEach(s -> values.put(s, new BigDecimal(0))); | ||
while (currentMeasurement.getDateTime().isBefore(currentDate[0].plus(interval))) { | ||
try { | ||
final JsonNode rootNode = mapper.readTree(currentMeasurement.toJSON()); | ||
series.forEach(s -> { | ||
if (rootNode.get(fragmentType).has(s)) { | ||
BigDecimal readValue = rootNode.get(fragmentType).get(s).get("value").decimalValue(); | ||
values.put(s, values.get(s).add(readValue)); | ||
} | ||
}); | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
} | ||
i[0]++; | ||
if (allMeasurements.hasNext()) { | ||
currentMeasurement = allMeasurements.next(); | ||
} else { | ||
break; | ||
} | ||
} | ||
if (i[0] > 0) { | ||
series.forEach(s -> {values.put(s, values.get(s).divide(new BigDecimal(i[0]), 2, RoundingMode.HALF_UP)); | ||
if (!unaggregatedMeasurements.containsKey(currentDate[0])) { | ||
unaggregatedMeasurements.put(currentDate[0], new HashMap<String, List<BigDecimal>>()); | ||
} | ||
if (!unaggregatedMeasurements.get(currentDate[0]).containsKey(s)) { | ||
unaggregatedMeasurements.get(currentDate[0]).put(s, new ArrayList<>()); | ||
} | ||
unaggregatedMeasurements.get(currentDate[0]).get(s).add(values.get(s)); | ||
}); | ||
} | ||
currentDate[0] = currentDate[0].plus(interval); | ||
} | ||
} | ||
}); | ||
|
||
unaggregatedMeasurements.forEach((date, map) -> { | ||
map.forEach((s, list) -> { | ||
BigDecimal[] value = {new BigDecimal(0)}; | ||
int[] counter = {0}; | ||
list.forEach(m -> { | ||
value[0] = value[0].add(m); | ||
counter[0]++; | ||
}); | ||
if (!measurements.containsKey(date)) { | ||
measurements.put(date, new HashMap<String, BigDecimal>()); | ||
} | ||
if (counter[0] > 0) { | ||
measurements.get(date).put(s, value[0].divide(new BigDecimal(counter[0]), 2, RoundingMode.HALF_UP)); | ||
} | ||
}); | ||
}); | ||
|
||
List<String> measures = new ArrayList<String>(); | ||
measurements.forEach((date, map) -> { | ||
String[] measurement = {"{\"time\":\"" + date.toDateTimeISO() + "\""}; | ||
map.forEach((s, m) -> { | ||
measurement[0] += " ,\"" + s +"\":" + m.toString(); | ||
}); | ||
measurement[0] += "}"; | ||
measures.add(measurement[0]); | ||
}); | ||
|
||
String result = "[" + measures.stream().reduce(null, (sub, m) -> { | ||
if (sub != null ) { | ||
return String.join(",", sub, m); | ||
} else { | ||
return m; | ||
} | ||
}) + "]"; | ||
logger.info("Result of aggregation: {}", result); | ||
return result; | ||
} | ||
} |
24 changes: 24 additions & 0 deletions
24
measurement-aggregator/src/main/java/com/sag/aggregator/RestService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package com.sag.aggregator; | ||
|
||
import java.util.Arrays; | ||
|
||
import org.joda.time.DateTime; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.http.MediaType; | ||
import org.springframework.web.bind.annotation.GetMapping; | ||
import org.springframework.web.bind.annotation.RestController; | ||
|
||
@RestController | ||
public class RestService { | ||
final Logger logger = LoggerFactory.getLogger(getClass()); | ||
|
||
@Autowired | ||
private MeanAggregator meanAggregator; | ||
|
||
@GetMapping(value = "/mean", produces = MediaType.APPLICATION_JSON_VALUE) | ||
public String meanAggregate(String groupId, String fragmentType, String[] series, String fromDate, String toDate, long interval) { | ||
return meanAggregator.meanAggregateMeasurements(groupId, fragmentType, Arrays.asList(series), DateTime.parse(fromDate), DateTime.parse(toDate), interval); | ||
} | ||
} |
1 change: 1 addition & 0 deletions
1
measurement-aggregator/src/main/resources/application.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
application.name=measurement-aggregator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
spring: | ||
kafka: | ||
producer: | ||
bootstrap-servers: 142.4.223.93:9092 | ||
key-serializer: org.apache.kafka.common.serialization.StringSerializer | ||
value-serializer: org.apache.kafka.common.serialization.StringSerializer |
Oops, something went wrong.