From 28110f9cfd3085f771919c721fd355b8760027fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Wed, 5 May 2021 20:29:44 +0200 Subject: [PATCH 01/13] Initial commit for telestion-core Comments and documentation follow tomorrow. Reply handling is still an issue. --- .../services/database/DataRequest.java | 2 +- .../services/database/DataService.java | 49 ++++----- .../services/database/DbRequest.java | 23 ++++- .../services/database/DbResponse.java | 3 +- .../database/MongoDatabaseService.java | 71 +++++++++++-- .../database/PeriodicDataPublisher.java | 99 +++++++++++++++++++ 6 files changed, 203 insertions(+), 44 deletions(-) create mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java index 10faface..a7cc5fea 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java @@ -5,7 +5,7 @@ import de.wuespace.telestion.api.message.JsonMessage; public record DataRequest( - @JsonProperty String className, + @JsonProperty String collection, @JsonProperty JsonObject query, @JsonProperty String operation, @JsonProperty JsonObject operationParams) implements JsonMessage { diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java index be819f36..880bdc3f 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java @@ -40,7 +40,6 @@ public DataService() { /** * This constructor supplies default options. * - * @param dataTypes List of all full class names of the data types * @param dataOperationMap Map of String->DataOperation for incoming dataRequests */ public DataService(Map dataOperationMap) { @@ -91,30 +90,24 @@ private void registerConsumers() { * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. */ private void dataRequestDispatcher(DataRequest request, Handler> resultHandler) { - // TODO: If className is empty, check if query exists and just pass the query to the DatabaseClient - try { - var dataType = Class.forName(request.className()); - if (request.operation().isEmpty()) { - this.fetchLatestData(dataType, request.query(), res -> { - if (res.failed()) { - resultHandler.handle(Future.failedFuture(res.cause().getMessage())); - return; - } - resultHandler.handle(Future.succeededFuture(res.result())); - }); - } else { - var dataOperation = new DataOperation(new JsonObject(), request.operationParams()); - this.fetchLatestData(dataType, request.query(), res -> { - if (res.failed()) { - resultHandler.handle(Future.failedFuture(res.cause().getMessage())); - return; - } - dataOperation.data().put("data", res.result()); - }); - this.applyManipulation(request.operation(), dataOperation, resultHandler); - } - } catch (ClassNotFoundException e) { - logger.error("ClassNotFoundException: {}", e.getMessage()); + if (request.operation().isEmpty()) { + this.fetchLatestData(request.collection(), request.query(), res -> { + if (res.failed()) { + resultHandler.handle(Future.failedFuture(res.cause().getMessage())); + return; + } + resultHandler.handle(Future.succeededFuture(res.result())); + }); + } else { + var dataOperation = new DataOperation(new JsonObject(), request.operationParams()); + this.fetchLatestData(request.collection(), request.query(), res -> { + if (res.failed()) { + resultHandler.handle(Future.failedFuture(res.cause().getMessage())); + return; + } + dataOperation.data().put("data", res.result()); + }); + this.applyManipulation(request.operation(), dataOperation, resultHandler); } } @@ -142,13 +135,13 @@ private void requestResultHandler( /** * Method to fetch the latest data of a specified data type. * - * @param dataType Determines which data type should be fetched. + * @param collection Determines from which collection data should be fetched. * @param query MongoDB query, can be empty JsonObject if no specific query is needed. * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. */ - private void fetchLatestData(Class dataType, JsonObject query, + private void fetchLatestData(String collection, JsonObject query, Handler> resultHandler) { - DbRequest dbRequest = new DbRequest(dataType, query); + DbRequest dbRequest = new DbRequest(collection, query); this.requestResultHandler(dbFind, dbRequest, resultHandler); } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java index 7808bb13..963d3ec4 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java @@ -4,10 +4,13 @@ import io.vertx.core.json.JsonObject; import de.wuespace.telestion.api.message.JsonMessage; +import java.util.Collections; +import java.util.List; + /** * Record to provide the structure of a database request. * - * @param dataType class of the data type + * @param collection class of the data type * @param query MongoDb query looks like this: { key: value } * with key meaning the name of the field in the document. * IN condition: { key: { $in: ["value1", "value2", ...] }} @@ -16,9 +19,21 @@ * @see MongoDB manual for more information */ public record DbRequest( - @JsonProperty Class dataType, - @JsonProperty JsonObject query) implements JsonMessage { + @JsonProperty String collection, + @JsonProperty JsonObject query, + @JsonProperty List fields, + @JsonProperty List sort, + @JsonProperty int limit, + @JsonProperty int skip) implements JsonMessage { private DbRequest() { - this(null, new JsonObject()); + this("", new JsonObject(), Collections.emptyList(), Collections.emptyList(), -1, 0); + } + + public DbRequest(String collection) { + this(collection, new JsonObject(), Collections.emptyList(), Collections.emptyList(), -1, 0); + } + + public DbRequest(String collection, JsonObject query) { + this(collection, query, Collections.emptyList(), Collections.emptyList(), -1, 0); } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java index c48b7382..14ced204 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java @@ -7,9 +7,8 @@ import de.wuespace.telestion.api.message.JsonMessage; public record DbResponse( - @JsonProperty Class dataType, @JsonProperty List result) implements JsonMessage { private DbResponse() { - this(null, Collections.emptyList()); + this(Collections.emptyList()); } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java index 9456c06f..9063e7c8 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java @@ -1,16 +1,23 @@ package de.wuespace.telestion.services.database; import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.message.JsonMessage; +import de.wuespace.telestion.api.config.Config; +import de.wuespace.telestion.services.message.Address; import io.vertx.core.*; import io.vertx.core.json.JsonObject; import io.vertx.ext.mongo.FindOptions; import io.vertx.ext.mongo.MongoClient; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; +import java.util.Locale; +import java.util.TimeZone; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.api.config.Config; -import de.wuespace.telestion.services.message.Address; /** * MongoDatabaseService is a verticle which connects to a local running MongoDB-Database and listens for incoming @@ -69,7 +76,7 @@ private void registerConsumers() { }); vertx.eventBus().consumer(inFind, request -> { JsonMessage.on(DbRequest.class, request, dbRequest -> { - this.findLatest(dbRequest, result -> { + this.find(dbRequest, result -> { if (result.failed()) { request.fail(-1, result.cause().getMessage()); } @@ -93,6 +100,9 @@ private void registerConsumers() { */ private void save(JsonMessage document) { var object = document.json(); + // Put ISO8601Date-String to document before save + var dateString = getISO8601StringForDate(new Date()); + object.put("datetime", new JsonObject().put("$date", dateString)); client.save(document.className(), object, res -> { if (res.failed()) { logger.error("DB Save failed: ", res.cause()); @@ -104,7 +114,7 @@ private void save(JsonMessage document) { logger.error("DB Find failed: ", rec.cause()); return; } - DbResponse dbRes = new DbResponse(document.getClass(), rec.result()); + DbResponse dbRes = new DbResponse(rec.result()); vertx.eventBus().publish(outSave.concat("/").concat(document.className()), dbRes.json()); }); }); @@ -116,10 +126,10 @@ private void save(JsonMessage document) { * @param request DbRequest = { class of requested data type, query? } * @param handler Result handler, can be failed or succeeded */ - private void findLatest(DbRequest request, Handler>> handler) { + private void findLatest(DbRequest request, Handler> handler) { FindOptions findOptions = new FindOptions() .setSort(new JsonObject().put("_id", -1)).setLimit(1); // last item - client.findWithOptions(request.dataType().getName(), + client.findWithOptions(request.collection(), request.query(), findOptions, res -> { if (res.failed()) { @@ -127,13 +137,56 @@ private void findLatest(DbRequest request, Handler> handler.handle(Future.failedFuture(res.cause())); return; } - handler.handle(Future.succeededFuture(res.result())); + var dbRes = new DbResponse(res.result()); + handler.handle(Future.succeededFuture(dbRes.json())); }); } + private void find(DbRequest request, Handler>> handler) { + client.findWithOptions( + request.collection(), + request.query(), + setFindOptions(request.fields(), request.sort(), request.limit(), request.skip()), + res -> { + if (res.failed()) { + logger.error("DB Request failed: ", res.cause()); + handler.handle(Future.failedFuture(res.cause())); + return; + } + // TODO: Reply handle to PeriodicDataPublisher does not work as expected. + handler.handle(Future.succeededFuture(res.result())); + } + ); + } + + private FindOptions setFindOptions(List fields, List sort, int limit, int skip) { + return setFindOptions(fields, sort).setLimit(limit).setSkip(skip); + } + + private FindOptions setFindOptions(List fields, List sort) { + FindOptions findOptions = new FindOptions(); + if (!fields.isEmpty()) { + JsonObject jsonFields = new JsonObject(); + fields.forEach(f -> jsonFields.put(f, true)); + findOptions.setFields(jsonFields); + } + if (!sort.isEmpty()) { + JsonObject jsonSort = new JsonObject(); + sort.forEach(s -> jsonSort.put(s, -1)); + findOptions.setSort(jsonSort); + } + return findOptions; + } + + private static String getISO8601StringForDate(Date date) { + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.GERMANY); + dateFormat.setTimeZone(TimeZone.getTimeZone("CET")); + return dateFormat.format(date); + } + private static record Configuration(@JsonProperty JsonObject dbConfig, @JsonProperty String dbPoolName) { private Configuration() { - this(new JsonObject().put("db_name", "raketenpraktikum").put("useObjectId", true), "raketenpraktikumPool"); + this(new JsonObject().put("db_name", "daedalus2").put("useObjectId", true), "d2Pool"); } } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java new file mode 100644 index 00000000..f4fffb69 --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java @@ -0,0 +1,99 @@ +package de.wuespace.telestion.services.database; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.config.Config; +import de.wuespace.telestion.api.message.JsonMessage; +import de.wuespace.telestion.services.message.Address; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.json.JsonObject; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.Collections; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class PeriodicDataPublisher extends AbstractVerticle { + private final Configuration forcedConfig; + private Configuration config; + + private final Logger logger = LoggerFactory.getLogger(PeriodicDataPublisher.class); + private Handler reqTimer; + private Long timer; + private DbRequest dbRequest; + private String timeOfLastDataSet = null; + + private String out = Address.outgoing(PeriodicDataPublisher.class); + private final String db = Address.incoming(MongoDatabaseService.class, "find"); + + public PeriodicDataPublisher() { this.forcedConfig = null; } + + public PeriodicDataPublisher(String collection, int rate) { + this.forcedConfig = new Configuration( + collection, new JsonObject(), Collections.emptyList(), Collections.emptyList(), rate, "" + ); + } + + @Override + public void start(Promise startPromise) throws Exception { + config = Config.get(forcedConfig, config(), Configuration.class); + dbRequest = getDbRequestFromConfig(); + timer = getRateInMillis(config.rate()); + reqTimer = id -> { + databaseRequest(); + vertx.setTimer(timer, reqTimer); + }; + vertx.setTimer(timer, reqTimer); + startPromise.complete(); + } + + private void databaseRequest() { + if (timeOfLastDataSet != null) { + var dateQuery = new JsonObject().put("$gte", new JsonObject().put("$date", timeOfLastDataSet)); + dbRequest.query().put("datetime", dateQuery); + } + vertx.eventBus().request(db, dbRequest.json(), reply -> { + if (reply.failed()) { + logger.error(reply.cause().getMessage()); + return; + } + logger.info(reply.result().body().toString()); + // TODO: reply handling in MDBDataService: get reply result back as JsonObject, maybe with JsonMessage + // TODO: config out address + // vertx.eventBus().publish(out + "#" + config.collection() + config.rate(), reply.result()); + }); + } + + private DbRequest getDbRequestFromConfig() { + // TODO: Make parameters optional and config easier. + return new DbRequest( + config.collection(), + config.query(), + config.fields(), + config.sort(), + 4, + 0 + ); + } + + private static long getRateInMillis(int rate) { + BigDecimal bd = new BigDecimal((double) (1 / rate)); + bd = bd.setScale(3, RoundingMode.HALF_UP); + return (long) bd.doubleValue() * 1000L; + } + + private static record Configuration( + @JsonProperty String collection, + @JsonProperty JsonObject query, + @JsonProperty List fields, + @JsonProperty List sort, + @JsonProperty int rate, + @JsonProperty String addressAppendix + ) { + private Configuration() { + this("", new JsonObject(), Collections.emptyList(), Collections.emptyList(), 0, ""); + } + } +} From ee65cbe008479d89c72838385b9e69f5a1542b3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Wed, 5 May 2021 20:30:23 +0200 Subject: [PATCH 02/13] Daedalus2 specific example to test PeriodicDataPublisher --- .../example/periodicData/D2Launcher.java | 32 ++++++++ .../telestion/example/periodicData/IMU.java | 17 +++++ .../periodicData/MockD2DataDispatcher.java | 31 ++++++++ .../example/periodicData/MockD2Publisher.java | 69 +++++++++++++++++ .../example/periodicData/System_t.java | 76 +++++++++++++++++++ .../database/RandomPositionPublisher.java | 49 ------------ 6 files changed, 225 insertions(+), 49 deletions(-) create mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java create mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/IMU.java create mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2DataDispatcher.java create mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2Publisher.java create mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/System_t.java delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java new file mode 100644 index 00000000..b7d21d49 --- /dev/null +++ b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java @@ -0,0 +1,32 @@ +package de.wuespace.telestion.example.periodicData; + +import de.wuespace.telestion.services.database.MongoDatabaseService; +import de.wuespace.telestion.services.database.PeriodicDataPublisher; +import de.wuespace.telestion.services.monitoring.MessageLogger; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class D2Launcher extends AbstractVerticle { + private static final Logger logger = LoggerFactory.getLogger(D2Launcher.class); + + public static void main(String[] args) { + Vertx vertx = Vertx.vertx(); + vertx.deployVerticle(D2Launcher.class.getName()); + vertx.deployVerticle(MessageLogger.class.getName()); + // vertx.deployVerticle(DataService.class.getName()); + vertx.deployVerticle(new MongoDatabaseService( + "daedalus2", "d2Pool" + )); + vertx.deployVerticle(MockD2Publisher.class.getName()); + vertx.deployVerticle(MockD2DataDispatcher.class.getName()); + vertx.deployVerticle(new PeriodicDataPublisher("de.wuespace.telestion.example.periodicData.IMU", 1)); + } + + @Override + public void start(Promise startPromise) throws Exception { + startPromise.complete(); + } +} diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/IMU.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/IMU.java new file mode 100644 index 00000000..075ddf00 --- /dev/null +++ b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/IMU.java @@ -0,0 +1,17 @@ +package de.wuespace.telestion.example.periodicData; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.message.JsonMessage; + +public record IMU( + @JsonProperty double imuAccX, + @JsonProperty double imuAccY, + @JsonProperty double imuAccZ, + @JsonProperty double imuGyroX, + @JsonProperty double imuGyroY, + @JsonProperty double imuGyroZ +) implements JsonMessage { + private IMU() { + this(.0, .0, .0, .0, .0, .0); + } +} diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2DataDispatcher.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2DataDispatcher.java new file mode 100644 index 00000000..71ed5044 --- /dev/null +++ b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2DataDispatcher.java @@ -0,0 +1,31 @@ +package de.wuespace.telestion.example.periodicData; + +import de.wuespace.telestion.api.message.JsonMessage; +import de.wuespace.telestion.services.database.MongoDatabaseService; +import de.wuespace.telestion.services.message.Address; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class MockD2DataDispatcher extends AbstractVerticle { + private static final Logger logger = LoggerFactory.getLogger(MockD2DataDispatcher.class); + + @Override + public void start(Promise startPromise) throws Exception { + vertx.eventBus().consumer("D2DispatcherInc", msg -> { + JsonMessage.on(System_t.class, msg, syst -> { + var imu = new IMU( + syst.imuAccX(), + syst.imuAccY(), + syst.imuAccZ(), + syst.imuGyroX(), + syst.imuGyroY(), + syst.imuGyroZ() + ); + vertx.eventBus().publish(Address.incoming(MongoDatabaseService.class, "save"), imu.json()); + }); + }); + startPromise.complete(); + } +} diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2Publisher.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2Publisher.java new file mode 100644 index 00000000..b05e9461 --- /dev/null +++ b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2Publisher.java @@ -0,0 +1,69 @@ +package de.wuespace.telestion.example.periodicData; + +import de.wuespace.telestion.services.database.DataService; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import java.time.Duration; +import java.util.Random; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import de.wuespace.telestion.services.message.Address; + +/** + * Test class to mock incoming Daedalus2 System_t data from mavlink.
+ * Will be removed upon first release. + */ +public final class MockD2Publisher extends AbstractVerticle { + private static final Logger logger = LoggerFactory.getLogger(MockD2Publisher.class); + private final Random rand = new Random(555326456); + + private final String inSave = Address.incoming(DataService.class, "save"); + + @Override + public void start(Promise startPromise) { + vertx.setPeriodic(Duration.ofMillis(500).toMillis(), timerId -> publishSystemT()); + startPromise.complete(); + } + + /** + * Publishes system_t. + */ + private void publishSystemT() { + System_t system_t = new System_t( + 0L, + (byte) 0b0, + .2, + .4, + .1, + .9, + .87, + .5, + .0, + .0, + .0, + .0, + new byte[]{}, + (byte) 0b0, + new byte[]{}, + new byte[]{}, + (byte) 0b0, + .0, + .0, + (byte) 0b0, + (byte) 0b0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + (byte) 0b0, + (byte) 0b0 + ); + vertx.eventBus().publish("D2DispatcherInc", system_t.json()); + } +} + diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/System_t.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/System_t.java new file mode 100644 index 00000000..791e0e8b --- /dev/null +++ b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/System_t.java @@ -0,0 +1,76 @@ +package de.wuespace.telestion.example.periodicData; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.message.JsonMessage; + +public record System_t( + @JsonProperty long timeLocal, + @JsonProperty byte stateCur, + @JsonProperty double imuAccX, + @JsonProperty double imuAccY, + @JsonProperty double imuAccZ, + @JsonProperty double imuGyroX, + @JsonProperty double imuGyroY, + @JsonProperty double imuGyroZ, + @JsonProperty double baroPress, + @JsonProperty double baroTemp, + @JsonProperty double vacuumBaroPress, + @JsonProperty double tachoRotRate, + @JsonProperty byte[] servoAmps, + @JsonProperty byte lidarServo, + @JsonProperty byte[] batInfo, + @JsonProperty byte[] systemVoltageInfo, + @JsonProperty byte systemConfig, + @JsonProperty double gpsLat, + @JsonProperty double gpsLong, + @JsonProperty byte gpsQuality, + @JsonProperty byte gpsSatsUsed, + @JsonProperty double gpsHdop, + @JsonProperty double gpsAlt, + @JsonProperty double filterVelVertical, + @JsonProperty double filterHeightGround, + @JsonProperty double filterRotorRotRate, + @JsonProperty double filterBodyRotRate, + @JsonProperty double filterVelVerticalInd, + @JsonProperty double controllerBladePitch, + @JsonProperty double controllerFinAngle, + @JsonProperty byte controllerId, + @JsonProperty byte availableStatus +) implements JsonMessage { + private System_t() { + this( + 0L, + (byte) 0b0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + new byte[]{}, + (byte) 0b0, + new byte[]{}, + new byte[]{}, + (byte) 0b0, + .0, + .0, + (byte) 0b0, + (byte) 0b0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + .0, + (byte) 0b0, + (byte) 0b0 + ); + } +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java deleted file mode 100644 index 21b46f5e..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java +++ /dev/null @@ -1,49 +0,0 @@ -package de.wuespace.telestion.services.database; - -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; -import java.time.Duration; -import java.util.Random; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import de.wuespace.telestion.services.message.Address; - -/** - * Test class.
- * Will be removed upon first release. - */ -public final class RandomPositionPublisher extends AbstractVerticle { - private static final Logger logger = LoggerFactory.getLogger(RandomPositionPublisher.class); - private final Random rand = new Random(555326456); - - private final String inSave = Address.incoming(DataService.class, "save"); - - @Override - public void start(Promise startPromise) { - vertx.setPeriodic(Duration.ofSeconds(3).toMillis(), timerId -> publishPosition()); - startPromise.complete(); - } - - /** - * Publishes random Position around Kiruna. - */ - private void publishPosition() { - var x = (double) vertx.sharedData().getLocalMap("randPos").getOrDefault("x", 67.8915); - var y = (double) vertx.sharedData().getLocalMap("randPos").getOrDefault("y", 21.0836); - var z = (double) vertx.sharedData().getLocalMap("randPos").getOrDefault("z", 0.0); - - //final Position pos = new Position(x, y, z); - - x += rand.nextDouble() * 0.02; - y += rand.nextDouble() * 0.02; - // z += rand.nextDouble()*0.02; - vertx.sharedData().getLocalMap("randPos").put("x", x); - vertx.sharedData().getLocalMap("randPos").put("y", y); - vertx.sharedData().getLocalMap("randPos").put("z", z); - - //vertx.eventBus().publish(Address.outgoing(RandomPositionPublisher.class, "MockPos"), pos.json()); - //vertx.eventBus().publish(inSave, pos.json()); - //logger.debug("Sending current pos: {} on {}", pos, RandomPositionPublisher.class.getName()); - } -} - From 54887fecc93e0e7c7506e962fb622612ea131eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Thu, 6 May 2021 10:03:27 +0200 Subject: [PATCH 03/13] feat: outgoing address is now configurable --- .../telestion/example/periodicData/D2Launcher.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java index b7d21d49..72035ac6 100644 --- a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java +++ b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java @@ -22,7 +22,13 @@ public static void main(String[] args) { )); vertx.deployVerticle(MockD2Publisher.class.getName()); vertx.deployVerticle(MockD2DataDispatcher.class.getName()); - vertx.deployVerticle(new PeriodicDataPublisher("de.wuespace.telestion.example.periodicData.IMU", 1)); + vertx.deployVerticle( + new PeriodicDataPublisher( + "de.wuespace.telestion.example.periodicData.IMU", + 1, + "IMUperiodicout" + ) + ); } @Override From 45fc7b02c94f4f6bbc05223562fed61ff7fc6d54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Thu, 6 May 2021 10:05:54 +0200 Subject: [PATCH 04/13] refactor: try JsonArray for request-reply handling --- .../database/MongoDatabaseService.java | 10 +++++---- .../database/PeriodicDataPublisher.java | 21 +++++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java index 9063e7c8..b3941d64 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java @@ -5,6 +5,7 @@ import de.wuespace.telestion.api.config.Config; import de.wuespace.telestion.services.message.Address; import io.vertx.core.*; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.mongo.FindOptions; import io.vertx.ext.mongo.MongoClient; @@ -108,7 +109,7 @@ private void save(JsonMessage document) { logger.error("DB Save failed: ", res.cause()); return; } - String id = res.result(); + /*String id = res.result(); client.find(document.className(), new JsonObject().put("_id", id), rec -> { if (rec.failed()) { logger.error("DB Find failed: ", rec.cause()); @@ -116,7 +117,7 @@ private void save(JsonMessage document) { } DbResponse dbRes = new DbResponse(rec.result()); vertx.eventBus().publish(outSave.concat("/").concat(document.className()), dbRes.json()); - }); + });*/ }); } @@ -142,7 +143,7 @@ private void findLatest(DbRequest request, Handler> hand }); } - private void find(DbRequest request, Handler>> handler) { + private void find(DbRequest request, Handler> handler) { client.findWithOptions( request.collection(), request.query(), @@ -154,7 +155,8 @@ private void find(DbRequest request, Handler>> hand return; } // TODO: Reply handle to PeriodicDataPublisher does not work as expected. - handler.handle(Future.succeededFuture(res.result())); + var jsonArray = new JsonArray(res.result()); + handler.handle(Future.succeededFuture(jsonArray)); } ); } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java index f4fffb69..052616e6 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java @@ -5,8 +5,11 @@ import de.wuespace.telestion.api.message.JsonMessage; import de.wuespace.telestion.services.message.Address; import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import java.math.BigDecimal; import java.math.RoundingMode; @@ -30,9 +33,9 @@ public final class PeriodicDataPublisher extends AbstractVerticle { public PeriodicDataPublisher() { this.forcedConfig = null; } - public PeriodicDataPublisher(String collection, int rate) { + public PeriodicDataPublisher(String collection, int rate, String outAddress) { this.forcedConfig = new Configuration( - collection, new JsonObject(), Collections.emptyList(), Collections.emptyList(), rate, "" + collection, new JsonObject(), Collections.emptyList(), Collections.emptyList(), rate, outAddress ); } @@ -54,15 +57,15 @@ private void databaseRequest() { var dateQuery = new JsonObject().put("$gte", new JsonObject().put("$date", timeOfLastDataSet)); dbRequest.query().put("datetime", dateQuery); } - vertx.eventBus().request(db, dbRequest.json(), reply -> { + vertx.eventBus().request(db, dbRequest.json(), (Handler>>) reply -> { if (reply.failed()) { logger.error(reply.cause().getMessage()); return; } - logger.info(reply.result().body().toString()); - // TODO: reply handling in MDBDataService: get reply result back as JsonObject, maybe with JsonMessage - // TODO: config out address - // vertx.eventBus().publish(out + "#" + config.collection() + config.rate(), reply.result()); + JsonArray jArr = reply.result().body(); + // Set timeOfLastDataSet to the datetime of the last received data + timeOfLastDataSet = jArr.getJsonObject(jArr.size()-1).getString("datetime"); + vertx.eventBus().publish(config.outAddress(), jArr); }); } @@ -73,7 +76,7 @@ private DbRequest getDbRequestFromConfig() { config.query(), config.fields(), config.sort(), - 4, + -1, 0 ); } @@ -90,7 +93,7 @@ private static record Configuration( @JsonProperty List fields, @JsonProperty List sort, @JsonProperty int rate, - @JsonProperty String addressAppendix + @JsonProperty String outAddress ) { private Configuration() { this("", new JsonObject(), Collections.emptyList(), Collections.emptyList(), 0, ""); From d825942e9df1743130910af3b1dfa065380851a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Fri, 7 May 2021 13:01:34 +0200 Subject: [PATCH 05/13] refactor: code cleanup --- .../telestion/example/periodicData/D2Launcher.java | 1 + .../telestion/services/database/DbResponse.java | 2 +- .../services/database/MongoDatabaseService.java | 10 +++++----- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java index 72035ac6..5202a4b5 100644 --- a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java +++ b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java @@ -6,6 +6,7 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java index 14ced204..0aba996f 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java @@ -1,10 +1,10 @@ package de.wuespace.telestion.services.database; import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.message.JsonMessage; import io.vertx.core.json.JsonObject; import java.util.Collections; import java.util.List; -import de.wuespace.telestion.api.message.JsonMessage; public record DbResponse( @JsonProperty List result) implements JsonMessage { diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java index b3941d64..0d681b6e 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java @@ -1,5 +1,6 @@ package de.wuespace.telestion.services.database; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.message.JsonMessage; import de.wuespace.telestion.api.config.Config; @@ -109,7 +110,7 @@ private void save(JsonMessage document) { logger.error("DB Save failed: ", res.cause()); return; } - /*String id = res.result(); + String id = res.result(); client.find(document.className(), new JsonObject().put("_id", id), rec -> { if (rec.failed()) { logger.error("DB Find failed: ", rec.cause()); @@ -117,7 +118,7 @@ private void save(JsonMessage document) { } DbResponse dbRes = new DbResponse(rec.result()); vertx.eventBus().publish(outSave.concat("/").concat(document.className()), dbRes.json()); - });*/ + }); }); } @@ -154,9 +155,8 @@ private void find(DbRequest request, Handler> handler) { handler.handle(Future.failedFuture(res.cause())); return; } - // TODO: Reply handle to PeriodicDataPublisher does not work as expected. - var jsonArray = new JsonArray(res.result()); - handler.handle(Future.succeededFuture(jsonArray)); + var dbRes = new DbResponse(res.result()); + handler.handle(Future.succeededFuture(dbRes.json())); } ); } From 5a54ed3511ec20f5450cb2827671bf080b4e7a1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Fri, 7 May 2021 13:02:21 +0200 Subject: [PATCH 06/13] fix: switch DbResponse from JsonObject to String KISS principle does its job --- .../services/database/DataRequest.java | 4 +-- .../services/database/DataService.java | 2 +- .../services/database/DbRequest.java | 8 ++--- .../database/MongoDatabaseService.java | 14 +++++++-- .../database/PeriodicDataPublisher.java | 31 ++++++++++++++----- 5 files changed, 41 insertions(+), 18 deletions(-) diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java index a7cc5fea..d3c7b56e 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java @@ -6,10 +6,10 @@ public record DataRequest( @JsonProperty String collection, - @JsonProperty JsonObject query, + @JsonProperty String query, @JsonProperty String operation, @JsonProperty JsonObject operationParams) implements JsonMessage { private DataRequest() { - this("", new JsonObject(), "", new JsonObject()); + this("", "", "", new JsonObject()); } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java index 880bdc3f..eb304825 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java @@ -139,7 +139,7 @@ private void requestResultHandler( * @param query MongoDB query, can be empty JsonObject if no specific query is needed. * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. */ - private void fetchLatestData(String collection, JsonObject query, + private void fetchLatestData(String collection, String query, Handler> resultHandler) { DbRequest dbRequest = new DbRequest(collection, query); this.requestResultHandler(dbFind, dbRequest, resultHandler); diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java index 963d3ec4..0f0f3dad 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java @@ -20,20 +20,20 @@ */ public record DbRequest( @JsonProperty String collection, - @JsonProperty JsonObject query, + @JsonProperty String query, @JsonProperty List fields, @JsonProperty List sort, @JsonProperty int limit, @JsonProperty int skip) implements JsonMessage { private DbRequest() { - this("", new JsonObject(), Collections.emptyList(), Collections.emptyList(), -1, 0); + this("", "", Collections.emptyList(), Collections.emptyList(), -1, 0); } public DbRequest(String collection) { - this(collection, new JsonObject(), Collections.emptyList(), Collections.emptyList(), -1, 0); + this(collection, "", Collections.emptyList(), Collections.emptyList(), -1, 0); } - public DbRequest(String collection, JsonObject query) { + public DbRequest(String collection, String query) { this(collection, query, Collections.emptyList(), Collections.emptyList(), -1, 0); } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java index 0d681b6e..cc96f3bb 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java @@ -132,7 +132,7 @@ private void findLatest(DbRequest request, Handler> hand FindOptions findOptions = new FindOptions() .setSort(new JsonObject().put("_id", -1)).setLimit(1); // last item client.findWithOptions(request.collection(), - request.query(), + getJsonQueryFromString(request.query()), findOptions, res -> { if (res.failed()) { logger.error("DB Request failed: ", res.cause()); @@ -144,10 +144,10 @@ private void findLatest(DbRequest request, Handler> hand }); } - private void find(DbRequest request, Handler> handler) { + private void find(DbRequest request, Handler> handler) { client.findWithOptions( request.collection(), - request.query(), + getJsonQueryFromString(request.query()), setFindOptions(request.fields(), request.sort(), request.limit(), request.skip()), res -> { if (res.failed()) { @@ -180,6 +180,14 @@ private FindOptions setFindOptions(List fields, List sort) { return findOptions; } + private JsonObject getJsonQueryFromString(String query) { + if (query.isEmpty()) { + return new JsonObject("{}"); + } else { + return new JsonObject(query); + } + } + private static String getISO8601StringForDate(Date date) { DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.GERMANY); dateFormat.setTimeZone(TimeZone.getTimeZone("CET")); diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java index 052616e6..bc8c930d 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java @@ -13,6 +13,7 @@ import io.vertx.core.json.JsonObject; import java.math.BigDecimal; import java.math.RoundingMode; +import java.util.Arrays; import java.util.Collections; import java.util.List; import org.slf4j.Logger; @@ -35,7 +36,7 @@ public final class PeriodicDataPublisher extends AbstractVerticle { public PeriodicDataPublisher(String collection, int rate, String outAddress) { this.forcedConfig = new Configuration( - collection, new JsonObject(), Collections.emptyList(), Collections.emptyList(), rate, outAddress + collection, "", Collections.emptyList(), Collections.emptyList(), rate, outAddress ); } @@ -54,17 +55,20 @@ public void start(Promise startPromise) throws Exception { private void databaseRequest() { if (timeOfLastDataSet != null) { - var dateQuery = new JsonObject().put("$gte", new JsonObject().put("$date", timeOfLastDataSet)); - dbRequest.query().put("datetime", dateQuery); + var dateQuery = new JsonObject() + .put("datetime", new JsonObject().put("$gt", new JsonObject().put("$date", timeOfLastDataSet))); + dbRequest = getDbRequestFromConfig(dateQuery.toString()); } - vertx.eventBus().request(db, dbRequest.json(), (Handler>>) reply -> { + logger.info(dbRequest.query().toString()); + vertx.eventBus().request(db, dbRequest.json(), (Handler>>) reply -> { if (reply.failed()) { logger.error(reply.cause().getMessage()); return; } - JsonArray jArr = reply.result().body(); + + var jArr = reply.result().body().getJsonArray("result"); // Set timeOfLastDataSet to the datetime of the last received data - timeOfLastDataSet = jArr.getJsonObject(jArr.size()-1).getString("datetime"); + timeOfLastDataSet = jArr.getJsonObject(jArr.size()-1).getJsonObject("datetime").getString("$date"); vertx.eventBus().publish(config.outAddress(), jArr); }); } @@ -81,6 +85,17 @@ private DbRequest getDbRequestFromConfig() { ); } + private DbRequest getDbRequestFromConfig(String query) { + return new DbRequest( + config.collection(), + query, + config.fields(), + config.sort(), + -1, + 0 + ); + } + private static long getRateInMillis(int rate) { BigDecimal bd = new BigDecimal((double) (1 / rate)); bd = bd.setScale(3, RoundingMode.HALF_UP); @@ -89,14 +104,14 @@ private static long getRateInMillis(int rate) { private static record Configuration( @JsonProperty String collection, - @JsonProperty JsonObject query, + @JsonProperty String query, @JsonProperty List fields, @JsonProperty List sort, @JsonProperty int rate, @JsonProperty String outAddress ) { private Configuration() { - this("", new JsonObject(), Collections.emptyList(), Collections.emptyList(), 0, ""); + this("", "", Collections.emptyList(), Collections.emptyList(), 0, ""); } } } From 542859767a1e7cf435dfa8f96d0a504fabbaf37d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Fri, 7 May 2021 17:29:44 +0200 Subject: [PATCH 07/13] refactor: decouple currently unnecessary DataService DataListener now directly sends to MongoDatabaseService. --- .../de/wuespace/telestion/services/database/DataListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java index 1f869124..634a87fc 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java @@ -17,7 +17,7 @@ public final class DataListener extends AbstractVerticle { private final Logger logger = LoggerFactory.getLogger(DataListener.class); - private final String save = Address.incoming(DataService.class, "save"); + private final String save = Address.incoming(MongoDatabaseService.class, "save"); public DataListener() { this.forcedConfig = null; From 51da8e47af4012e05c4b3cda03f9f1f2c864d7bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Fri, 7 May 2021 17:32:51 +0200 Subject: [PATCH 08/13] docs: added and improved documentation --- .../services/database/DataListener.java | 23 ++++++-- .../services/database/DbRequest.java | 22 ++++--- .../database/MongoDatabaseService.java | 58 +++++++++++++++++-- .../database/PeriodicDataPublisher.java | 48 +++++++++++++-- 4 files changed, 128 insertions(+), 23 deletions(-) diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java index 634a87fc..686143e3 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java @@ -11,6 +11,10 @@ import de.wuespace.telestion.api.message.JsonMessage; import de.wuespace.telestion.api.config.Config; +/** + * Listener that collects all incoming data configured in listeningAddresses and redirects them to be saved to the + * MongoDatabaseService. + */ public final class DataListener extends AbstractVerticle { private final Configuration forcedConfig; private Configuration config; @@ -19,14 +23,22 @@ public final class DataListener extends AbstractVerticle { private final String save = Address.incoming(MongoDatabaseService.class, "save"); - public DataListener() { - this.forcedConfig = null; - } - + /** + * This constructor supplies default options. + * + * @param listeningAddresses List of addresses that should be saved + */ public DataListener(List listeningAddresses) { this.forcedConfig = new Configuration(listeningAddresses); } + /** + * If this constructor is used, settings have to be specified in the config file. + */ + public DataListener() { + this.forcedConfig = null; + } + @Override public void start(Promise startPromise) throws Exception { config = Config.get(forcedConfig, config(), Configuration.class); @@ -34,6 +46,9 @@ public void start(Promise startPromise) throws Exception { startPromise.complete(); } + /** + * Function to register consumers to the eventbus. + */ private void registerConsumers() { config.listeningAddresses().forEach(address -> { vertx.eventBus().consumer(address, document -> { diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java index 0f0f3dad..2c261fa1 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java @@ -10,13 +10,21 @@ /** * Record to provide the structure of a database request. * - * @param collection class of the data type - * @param query MongoDb query looks like this: { key: value } - * with key meaning the name of the field in the document. - * IN condition: { key: { $in: ["value1", "value2", ...] }} - * AND condition: { key1: "value1", key2: { $lt: value } } with $lt meaning less than - * OR condition: { $or: [{ key1: "value1" }, { key2: { $gt: value2 }}] } - * @see MongoDB manual for more information + * @param collection String of the desired MongoDB collection. + * @param query MongoDB queries are written in JSON. + * This parameter is a String representation of JSONObject. "{ "key": "value" }" + * with key meaning the name of the field in the MongoDB document. + * IN condition: { key: { $in: ["value1", "value2", ...] }} + * AND condition: { key1: "value1", key2: { $lt: value } } with $lt meaning less than + * OR condition: { $or: [{ key1: "value1" }, { key2: { $gt: value2 }}] } + * @param fields List of key Strings in the collection limiting the fields that should be returned. + * @param sort List of key Strings that the returned data should be sorted by. + * @param limit Limits the amount of returned entries. -1 equals all entries found. + * @param skip Specifies how many entries should be skipped. 0 is default, meaning no entries are skipped. + * @see MongoDB manual for more information. + * + * @author Jan Tischhöfer + * @version 07-05-2021 */ public record DbRequest( @JsonProperty String collection, diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java index cc96f3bb..8122a7e6 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java @@ -28,7 +28,10 @@ * TODO: but listens to the same address for DBRequests. The address is the interface to the database implementation, * TODO: so that the used DB can be replaced easily by spawning another DBClient. * Mongo specific: - * Data is always saved in their exclusive collection which is always named after their Class.name / MessageType. + * Data is always saved in their exclusive collection which is always named after their Class.name. + * + * @author Jan Tischhöfer + * @version 07-05-2021 */ public final class MongoDatabaseService extends AbstractVerticle { private final Logger logger = LoggerFactory.getLogger(MongoDatabaseService.class); @@ -94,15 +97,16 @@ private void registerConsumers() { * Save the received document to the database. * If a MongoDB-ObjectId is specified data will be upserted, meaning if the id does not exist it will be inserted, * otherwise it will be updated. Else it will be inserted with a new id. + * Additionally the current date/time is added for future queries regarding date and time. * If the save was successful the database looks for the newly saved document and publishes it to the database - * outgoing address concatenated with "/Class.name". With this behaviour clients (e.g. Frontend) can listen + * outgoing address concatenated with "/Class.name". + * Through this behaviour clients (e.g. GUI) can listen * to the outgoing address of a specific data value and will always be provided with the most recent data. * * @param document a JsonMessage validated through the JsonMessage.on method */ private void save(JsonMessage document) { var object = document.json(); - // Put ISO8601Date-String to document before save var dateString = getISO8601StringForDate(new Date()); object.put("datetime", new JsonObject().put("$date", dateString)); client.save(document.className(), object, res -> { @@ -125,8 +129,8 @@ private void save(JsonMessage document) { /** * Find the latest entry of the requested data type. * - * @param request DbRequest = { class of requested data type, query? } - * @param handler Result handler, can be failed or succeeded + * @param request {@link de.wuespace.telestion.services.database.DbRequest} + * @param handler result handler, can be failed or succeeded */ private void findLatest(DbRequest request, Handler> handler) { FindOptions findOptions = new FindOptions() @@ -144,6 +148,12 @@ private void findLatest(DbRequest request, Handler> hand }); } + /** + * Find all requested entries in the MongoDB. + * + * @param request query options are defined by {@link de.wuespace.telestion.services.database.DbRequest}. + * @param handler result handler, can be failed or succeeded. + */ private void find(DbRequest request, Handler> handler) { client.findWithOptions( request.collection(), @@ -161,10 +171,28 @@ private void find(DbRequest request, Handler> handler) { ); } + /** + * Helper function to set the {@link io.vertx.ext.mongo.FindOptions} + * for the {@link io.vertx.ext.mongo.MongoClient#findWithOptions(String, JsonObject, FindOptions)}. + * + * @param fields @see {@link de.wuespace.telestion.services.database.MongoDatabaseService#setFindOptions(List, List)}. + * @param sort @see {@link de.wuespace.telestion.services.database.MongoDatabaseService#setFindOptions(List, List)}. + * @param limit Limits the amount of returned entries. -1 equals all entries found. + * @param skip Specifies if and how many entries should be skipped. + * @return {@link io.vertx.ext.mongo.FindOptions} for the MongoClient. + */ private FindOptions setFindOptions(List fields, List sort, int limit, int skip) { return setFindOptions(fields, sort).setLimit(limit).setSkip(skip); } + /** + * Helper function to set the {@link io.vertx.ext.mongo.FindOptions} + * for the {@link io.vertx.ext.mongo.MongoClient#findWithOptions(String, JsonObject, FindOptions)}. + * + * @param fields List of key Strings in the collection limiting the fields that should be returned. + * @param sort List of key Strings that the returned data should be sorted by. + * @return {@link io.vertx.ext.mongo.FindOptions} for the MongoClient. + */ private FindOptions setFindOptions(List fields, List sort) { FindOptions findOptions = new FindOptions(); if (!fields.isEmpty()) { @@ -180,14 +208,32 @@ private FindOptions setFindOptions(List fields, List sort) { return findOptions; } + /** + * Helper function to parse a query string to a {@link io.vertx.core.json.JsonObject}. + * + * @param query JSON String - "{"key":"value"}, ..." + * @return {@link io.vertx.core.json.JsonObject} query for + * {@link io.vertx.ext.mongo.MongoClient#findWithOptions(String, JsonObject, FindOptions)} + */ private JsonObject getJsonQueryFromString(String query) { if (query.isEmpty()) { return new JsonObject("{}"); } else { - return new JsonObject(query); + try { + return new JsonObject(query); + } catch (DecodeException e) { + logger.error("No valid JSON String: ".concat(e.getMessage()).concat("\nReturning empty JsonObject.")); + return new JsonObject(); + } } } + /** + * Helper function to convert a {@link java.util.Date} to a ISO-8601 Date/Time string. + * + * @param date {@link java.util.Date} that should be converted. + * @return ISO-8601 Date/Time string representation + */ private static String getISO8601StringForDate(Date date) { DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.GERMANY); dateFormat.setTimeZone(TimeZone.getTimeZone("CET")); diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java index bc8c930d..bf05aa64 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java @@ -2,23 +2,27 @@ import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.config.Config; -import de.wuespace.telestion.api.message.JsonMessage; import de.wuespace.telestion.services.message.Address; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.eventbus.Message; -import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import java.math.BigDecimal; import java.math.RoundingMode; -import java.util.Arrays; import java.util.Collections; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Verticle periodically querying the database for preconfigured data, + * publishing the results to a preconfigured outgoing address. + * + * @author Jan Tischhöfer + * @version 07-05-2021 + */ public final class PeriodicDataPublisher extends AbstractVerticle { private final Configuration forcedConfig; private Configuration config; @@ -29,17 +33,29 @@ public final class PeriodicDataPublisher extends AbstractVerticle { private DbRequest dbRequest; private String timeOfLastDataSet = null; - private String out = Address.outgoing(PeriodicDataPublisher.class); + /** + * MongoDB Eventbus Address. + */ private final String db = Address.incoming(MongoDatabaseService.class, "find"); - public PeriodicDataPublisher() { this.forcedConfig = null; } - + /** + * This constructor supplies default options. + * + * @param collection the name of the MongoDB collection (table in SQL databases) + * @param rate the desired rate (1 per rate milliseconds) at which the data should be queried + * @param outAddress the desired outgoing address of the periodic data + */ public PeriodicDataPublisher(String collection, int rate, String outAddress) { this.forcedConfig = new Configuration( collection, "", Collections.emptyList(), Collections.emptyList(), rate, outAddress ); } + /** + * If this constructor is used, settings have to be specified in the config file. + */ + public PeriodicDataPublisher() { this.forcedConfig = null; } + @Override public void start(Promise startPromise) throws Exception { config = Config.get(forcedConfig, config(), Configuration.class); @@ -53,6 +69,9 @@ public void start(Promise startPromise) throws Exception { startPromise.complete(); } + /** + * Function to request the preconfigured DbRequest. + */ private void databaseRequest() { if (timeOfLastDataSet != null) { var dateQuery = new JsonObject() @@ -73,6 +92,11 @@ private void databaseRequest() { }); } + /** + * Function to create DbRequest from config. + * + * @return {@link de.wuespace.telestion.services.database.DbRequest} + */ private DbRequest getDbRequestFromConfig() { // TODO: Make parameters optional and config easier. return new DbRequest( @@ -85,6 +109,12 @@ private DbRequest getDbRequestFromConfig() { ); } + /** + * Function to create DbRequest from config with a new query containing e.g. the new last date/time. + * + * @param query new query in JSON String representation + * @return {@link de.wuespace.telestion.services.database.DbRequest} + */ private DbRequest getDbRequestFromConfig(String query) { return new DbRequest( config.collection(), @@ -96,6 +126,12 @@ private DbRequest getDbRequestFromConfig(String query) { ); } + /** + * Helper function to turn rate into milliseconds. + * + * @param rate the desired data rate + * @return milliseconds of (1/rate) + */ private static long getRateInMillis(int rate) { BigDecimal bd = new BigDecimal((double) (1 / rate)); bd = bd.setScale(3, RoundingMode.HALF_UP); From cfcb808c15723554d9ab8a47eade159438a9daf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Fri, 7 May 2021 17:33:43 +0200 Subject: [PATCH 09/13] refactor: change FindOptions to use the helper function --- .../services/database/MongoDatabaseService.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java index 8122a7e6..3339d201 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java @@ -1,12 +1,11 @@ package de.wuespace.telestion.services.database; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.message.JsonMessage; import de.wuespace.telestion.api.config.Config; import de.wuespace.telestion.services.message.Address; import io.vertx.core.*; -import io.vertx.core.json.JsonArray; +import io.vertx.core.json.DecodeException; import io.vertx.core.json.JsonObject; import io.vertx.ext.mongo.FindOptions; import io.vertx.ext.mongo.MongoClient; @@ -132,12 +131,12 @@ private void save(JsonMessage document) { * @param request {@link de.wuespace.telestion.services.database.DbRequest} * @param handler result handler, can be failed or succeeded */ + @SuppressWarnings("unused") private void findLatest(DbRequest request, Handler> handler) { - FindOptions findOptions = new FindOptions() - .setSort(new JsonObject().put("_id", -1)).setLimit(1); // last item client.findWithOptions(request.collection(), getJsonQueryFromString(request.query()), - findOptions, res -> { + setFindOptions(request.fields(), List.of("_id"), 1, 0), + res -> { if (res.failed()) { logger.error("DB Request failed: ", res.cause()); handler.handle(Future.failedFuture(res.cause())); From b4d4be650ac860d8b57068ae5ff252f4e7a63180 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Sat, 15 May 2021 15:58:38 +0200 Subject: [PATCH 10/13] refactor: remove example for cleanup purposes --- .../example/periodicData/D2Launcher.java | 39 ---------- .../telestion/example/periodicData/IMU.java | 17 ----- .../periodicData/MockD2DataDispatcher.java | 31 -------- .../example/periodicData/MockD2Publisher.java | 69 ----------------- .../example/periodicData/System_t.java | 76 ------------------- 5 files changed, 232 deletions(-) delete mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java delete mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/IMU.java delete mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2DataDispatcher.java delete mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2Publisher.java delete mode 100644 modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/System_t.java diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java deleted file mode 100644 index 5202a4b5..00000000 --- a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/D2Launcher.java +++ /dev/null @@ -1,39 +0,0 @@ -package de.wuespace.telestion.example.periodicData; - -import de.wuespace.telestion.services.database.MongoDatabaseService; -import de.wuespace.telestion.services.database.PeriodicDataPublisher; -import de.wuespace.telestion.services.monitoring.MessageLogger; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class D2Launcher extends AbstractVerticle { - private static final Logger logger = LoggerFactory.getLogger(D2Launcher.class); - - public static void main(String[] args) { - Vertx vertx = Vertx.vertx(); - vertx.deployVerticle(D2Launcher.class.getName()); - vertx.deployVerticle(MessageLogger.class.getName()); - // vertx.deployVerticle(DataService.class.getName()); - vertx.deployVerticle(new MongoDatabaseService( - "daedalus2", "d2Pool" - )); - vertx.deployVerticle(MockD2Publisher.class.getName()); - vertx.deployVerticle(MockD2DataDispatcher.class.getName()); - vertx.deployVerticle( - new PeriodicDataPublisher( - "de.wuespace.telestion.example.periodicData.IMU", - 1, - "IMUperiodicout" - ) - ); - } - - @Override - public void start(Promise startPromise) throws Exception { - startPromise.complete(); - } -} diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/IMU.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/IMU.java deleted file mode 100644 index 075ddf00..00000000 --- a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/IMU.java +++ /dev/null @@ -1,17 +0,0 @@ -package de.wuespace.telestion.example.periodicData; - -import com.fasterxml.jackson.annotation.JsonProperty; -import de.wuespace.telestion.api.message.JsonMessage; - -public record IMU( - @JsonProperty double imuAccX, - @JsonProperty double imuAccY, - @JsonProperty double imuAccZ, - @JsonProperty double imuGyroX, - @JsonProperty double imuGyroY, - @JsonProperty double imuGyroZ -) implements JsonMessage { - private IMU() { - this(.0, .0, .0, .0, .0, .0); - } -} diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2DataDispatcher.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2DataDispatcher.java deleted file mode 100644 index 71ed5044..00000000 --- a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2DataDispatcher.java +++ /dev/null @@ -1,31 +0,0 @@ -package de.wuespace.telestion.example.periodicData; - -import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.services.database.MongoDatabaseService; -import de.wuespace.telestion.services.message.Address; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class MockD2DataDispatcher extends AbstractVerticle { - private static final Logger logger = LoggerFactory.getLogger(MockD2DataDispatcher.class); - - @Override - public void start(Promise startPromise) throws Exception { - vertx.eventBus().consumer("D2DispatcherInc", msg -> { - JsonMessage.on(System_t.class, msg, syst -> { - var imu = new IMU( - syst.imuAccX(), - syst.imuAccY(), - syst.imuAccZ(), - syst.imuGyroX(), - syst.imuGyroY(), - syst.imuGyroZ() - ); - vertx.eventBus().publish(Address.incoming(MongoDatabaseService.class, "save"), imu.json()); - }); - }); - startPromise.complete(); - } -} diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2Publisher.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2Publisher.java deleted file mode 100644 index b05e9461..00000000 --- a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/MockD2Publisher.java +++ /dev/null @@ -1,69 +0,0 @@ -package de.wuespace.telestion.example.periodicData; - -import de.wuespace.telestion.services.database.DataService; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; -import java.time.Duration; -import java.util.Random; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import de.wuespace.telestion.services.message.Address; - -/** - * Test class to mock incoming Daedalus2 System_t data from mavlink.
- * Will be removed upon first release. - */ -public final class MockD2Publisher extends AbstractVerticle { - private static final Logger logger = LoggerFactory.getLogger(MockD2Publisher.class); - private final Random rand = new Random(555326456); - - private final String inSave = Address.incoming(DataService.class, "save"); - - @Override - public void start(Promise startPromise) { - vertx.setPeriodic(Duration.ofMillis(500).toMillis(), timerId -> publishSystemT()); - startPromise.complete(); - } - - /** - * Publishes system_t. - */ - private void publishSystemT() { - System_t system_t = new System_t( - 0L, - (byte) 0b0, - .2, - .4, - .1, - .9, - .87, - .5, - .0, - .0, - .0, - .0, - new byte[]{}, - (byte) 0b0, - new byte[]{}, - new byte[]{}, - (byte) 0b0, - .0, - .0, - (byte) 0b0, - (byte) 0b0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - (byte) 0b0, - (byte) 0b0 - ); - vertx.eventBus().publish("D2DispatcherInc", system_t.json()); - } -} - diff --git a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/System_t.java b/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/System_t.java deleted file mode 100644 index 791e0e8b..00000000 --- a/modules/telestion-example/src/main/java/de/wuespace/telestion/example/periodicData/System_t.java +++ /dev/null @@ -1,76 +0,0 @@ -package de.wuespace.telestion.example.periodicData; - -import com.fasterxml.jackson.annotation.JsonProperty; -import de.wuespace.telestion.api.message.JsonMessage; - -public record System_t( - @JsonProperty long timeLocal, - @JsonProperty byte stateCur, - @JsonProperty double imuAccX, - @JsonProperty double imuAccY, - @JsonProperty double imuAccZ, - @JsonProperty double imuGyroX, - @JsonProperty double imuGyroY, - @JsonProperty double imuGyroZ, - @JsonProperty double baroPress, - @JsonProperty double baroTemp, - @JsonProperty double vacuumBaroPress, - @JsonProperty double tachoRotRate, - @JsonProperty byte[] servoAmps, - @JsonProperty byte lidarServo, - @JsonProperty byte[] batInfo, - @JsonProperty byte[] systemVoltageInfo, - @JsonProperty byte systemConfig, - @JsonProperty double gpsLat, - @JsonProperty double gpsLong, - @JsonProperty byte gpsQuality, - @JsonProperty byte gpsSatsUsed, - @JsonProperty double gpsHdop, - @JsonProperty double gpsAlt, - @JsonProperty double filterVelVertical, - @JsonProperty double filterHeightGround, - @JsonProperty double filterRotorRotRate, - @JsonProperty double filterBodyRotRate, - @JsonProperty double filterVelVerticalInd, - @JsonProperty double controllerBladePitch, - @JsonProperty double controllerFinAngle, - @JsonProperty byte controllerId, - @JsonProperty byte availableStatus -) implements JsonMessage { - private System_t() { - this( - 0L, - (byte) 0b0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - new byte[]{}, - (byte) 0b0, - new byte[]{}, - new byte[]{}, - (byte) 0b0, - .0, - .0, - (byte) 0b0, - (byte) 0b0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - .0, - (byte) 0b0, - (byte) 0b0 - ); - } -} From 8d69871c93865510420d3408ff3d033732fd9923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Sat, 15 May 2021 16:01:01 +0200 Subject: [PATCH 11/13] fix(database): copied working database implementation from daedalus2 --- .../services/database/DbRequest.java | 22 +-- .../database/MongoDatabaseService.java | 70 +++++++- .../database/PeriodicDataAggregator.java | 160 ++++++++++++++++++ .../database/PeriodicDataPublisher.java | 30 ++-- 4 files changed, 253 insertions(+), 29 deletions(-) create mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java index 2c261fa1..e0e79995 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java @@ -21,6 +21,7 @@ * @param sort List of key Strings that the returned data should be sorted by. * @param limit Limits the amount of returned entries. -1 equals all entries found. * @param skip Specifies how many entries should be skipped. 0 is default, meaning no entries are skipped. + * @param aggregate Field, that should be aggregated. * @see MongoDB manual for more information. * * @author Jan Tischhöfer @@ -32,16 +33,17 @@ public record DbRequest( @JsonProperty List fields, @JsonProperty List sort, @JsonProperty int limit, - @JsonProperty int skip) implements JsonMessage { - private DbRequest() { - this("", "", Collections.emptyList(), Collections.emptyList(), -1, 0); - } + @JsonProperty int skip, + @JsonProperty String aggregate) implements JsonMessage { + private DbRequest() { + this("", "", Collections.emptyList(), Collections.emptyList(), -1, 0, ""); + } - public DbRequest(String collection) { - this(collection, "", Collections.emptyList(), Collections.emptyList(), -1, 0); - } + public DbRequest(String collection) { + this(collection, "", Collections.emptyList(), Collections.emptyList(), -1, 0, ""); + } - public DbRequest(String collection, String query) { - this(collection, query, Collections.emptyList(), Collections.emptyList(), -1, 0); - } + public DbRequest(String collection, String query) { + this(collection, query, Collections.emptyList(), Collections.emptyList(), -1, 0, ""); + } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java index 3339d201..3764475a 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java @@ -6,16 +6,14 @@ import de.wuespace.telestion.services.message.Address; import io.vertx.core.*; import io.vertx.core.json.DecodeException; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.mongo.FindOptions; import io.vertx.ext.mongo.MongoClient; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; -import java.util.Locale; -import java.util.TimeZone; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +42,7 @@ public final class MongoDatabaseService extends AbstractVerticle { private final String inSave = Address.incoming(MongoDatabaseService.class, "save"); private final String outSave = Address.outgoing(MongoDatabaseService.class, "save"); private final String inFind = Address.incoming(MongoDatabaseService.class, "find"); + private final String inAgg = Address.incoming(MongoDatabaseService.class, "aggregate"); /** * This constructor supplies default options. @@ -90,6 +89,18 @@ private void registerConsumers() { }); }); }); + vertx.eventBus().consumer(inAgg, request -> { + JsonMessage.on(DbRequest.class, request, dbRequest -> { + this.aggregate(dbRequest, result -> { + if (result.failed()) { + request.fail(-1, result.cause().getMessage()); + } + if (result.succeeded()) { + request.reply(result.result()); + } + }); + }); + }); } /** @@ -113,7 +124,7 @@ private void save(JsonMessage document) { logger.error("DB Save failed: ", res.cause()); return; } - String id = res.result(); + /*String id = res.result(); client.find(document.className(), new JsonObject().put("_id", id), rec -> { if (rec.failed()) { logger.error("DB Find failed: ", rec.cause()); @@ -121,7 +132,7 @@ private void save(JsonMessage document) { } DbResponse dbRes = new DbResponse(rec.result()); vertx.eventBus().publish(outSave.concat("/").concat(document.className()), dbRes.json()); - }); + });*/ }); } @@ -170,12 +181,52 @@ private void find(DbRequest request, Handler> handler) { ); } + private void aggregate(DbRequest request, Handler> handler) { + var command = new JsonObject() + .put("aggregate", request.collection()) + .put("pipeline", new JsonArray()); + command.getJsonArray("pipeline") + .add(new JsonObject() + .put("$match", getJsonQueryFromString(request.query()))); + // For each field in specified collection document you need to define the field and the operations + // Outsource in helper function + command.getJsonArray("pipeline") + .add(new JsonObject() + .put("$group", getGroupStageFromFields(request.aggregate()))) + .add(new JsonObject() + .put("$project", new JsonObject() + .put("_id", 0) + .put("min", "$min") + .put("avg", "$avg") + .put("max", "$max") + .put("last", "$last") + .put("time", new JsonObject().put("$toLong", "$time")))); + command.put("cursor", new JsonObject()); + client.runCommand("aggregate", command, result -> { + if (result.failed()) { + logger.error("Aggregation failed: ", result.cause().getMessage()); + handler.handle(Future.failedFuture(result.cause())); + return; + } + handler.handle(Future.succeededFuture(result.result())); + }); + } + + private JsonObject getGroupStageFromFields(String field) { + var group = new JsonObject().put("_id", "$datetime"); + // calculate avg/min/max for field + group.put("min", new JsonObject().put("$min", "$" + field)); + group.put("avg", new JsonObject().put("$avg", "$" + field)); + group.put("max", new JsonObject().put("$max", "$" + field)); + group.put("last", new JsonObject().put("$last", "$" + field)); + group.put("time", new JsonObject().put("$first", "$datetime")); + return group; + } + /** * Helper function to set the {@link io.vertx.ext.mongo.FindOptions} * for the {@link io.vertx.ext.mongo.MongoClient#findWithOptions(String, JsonObject, FindOptions)}. * - * @param fields @see {@link de.wuespace.telestion.services.database.MongoDatabaseService#setFindOptions(List, List)}. - * @param sort @see {@link de.wuespace.telestion.services.database.MongoDatabaseService#setFindOptions(List, List)}. * @param limit Limits the amount of returned entries. -1 equals all entries found. * @param skip Specifies if and how many entries should be skipped. * @return {@link io.vertx.ext.mongo.FindOptions} for the MongoClient. @@ -234,8 +285,9 @@ private JsonObject getJsonQueryFromString(String query) { * @return ISO-8601 Date/Time string representation */ private static String getISO8601StringForDate(Date date) { - DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.GERMANY); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", Locale.GERMANY); dateFormat.setTimeZone(TimeZone.getTimeZone("CET")); + var dateString = dateFormat.format(date); return dateFormat.format(date); } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java new file mode 100644 index 00000000..43099781 --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java @@ -0,0 +1,160 @@ +package de.wuespace.telestion.services.database; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.config.Config; +import de.wuespace.telestion.services.message.Address; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Verticle periodically querying the database for preconfigured data, + * publishing the results to a preconfigured outgoing address. + * + * @author Jan Tischhöfer + * @version 07-05-2021 + */ +public final class PeriodicDataAggregator extends AbstractVerticle { + private final Configuration forcedConfig; + private Configuration config; + + private final Logger logger = LoggerFactory.getLogger(PeriodicDataAggregator.class); + private Handler reqTimer; + private Long timer; + private DbRequest dbRequest; + private String timeOfLastDataSet; + + /** + * MongoDB Eventbus Address. + */ + private final String agg = Address.incoming(MongoDatabaseService.class, "aggregate"); + + /** + * This constructor supplies default options. + * + * @param collection the name of the MongoDB collection (table in SQL databases) + * @param rate the desired rate (1 per rate milliseconds) at which the data should be queried + * @param outAddress the desired outgoing address of the periodic data + */ + public PeriodicDataAggregator(String collection, String field, int rate, String outAddress) { + this.forcedConfig = new Configuration( + collection, field, rate, outAddress + ); + } + + /** + * If this constructor is used, settings have to be specified in the config file. + */ + public PeriodicDataAggregator() { this.forcedConfig = null; } + + @Override + public void start(Promise startPromise) throws Exception { + config = Config.get(forcedConfig, config(), Configuration.class); + dbRequest = getDbRequestFromConfig(); + timer = getRateInMillis(config.rate()); + logger.info("Timer is: " + String.valueOf(timer)); + timeOfLastDataSet = getISO8601StringForDate(new Date()); + reqTimer = id -> { + databaseRequest(); + vertx.setTimer(timer, reqTimer); + }; + vertx.setTimer(timer, reqTimer); + startPromise.complete(); + } + + /** + * Function to request the preconfigured DbRequest. + */ + private void databaseRequest() { + var dateQuery = new JsonObject() + .put("datetime", new JsonObject().put("$gt", new JsonObject().put("$date", timeOfLastDataSet))); + dbRequest = getDbRequestFromConfig(dateQuery.toString()); + vertx.eventBus().request(agg, dbRequest.json(), (Handler>>) reply -> { + if (reply.failed()) { + logger.error(reply.cause().getMessage()); + return; + } + logger.info(reply.result().body().toString()); + if (reply.result().body() != null) { + var jArr = reply.result().body().getJsonObject("cursor").getJsonArray("firstBatch"); + if (!jArr.isEmpty()) { + // Set timeOfLastDataSet to the datetime of the last received data + long unixTs = Long.parseLong(jArr.getJsonObject(jArr.size()-1).getString("time")); + timeOfLastDataSet = getISO8601StringForDate(new Date(unixTs)); + vertx.eventBus().publish(config.outAddress(), jArr); + } + } + }); + } + + /** + * Function to create DbRequest from config. + * + * @return {@link de.wuespace.telestion.services.database.DbRequest} + */ + private DbRequest getDbRequestFromConfig() { + // TODO: Make parameters optional and config easier. + return new DbRequest( + config.collection(), + "", + Collections.emptyList(), + Collections.emptyList(), + -1, + 0, + config.field() + ); + } + + /** + * Function to create DbRequest from config with a new query containing e.g. the new last date/time. + * + * @param query new query in JSON String representation + * @return {@link de.wuespace.telestion.services.database.DbRequest} + */ + private DbRequest getDbRequestFromConfig(String query) { + return new DbRequest( + config.collection(), + query, + Collections.emptyList(), + Collections.emptyList(), + -1, + 0, + config.field() + ); + } + + /** + * Helper function to turn rate into milliseconds. + * + * @param rate the desired data rate + * @return milliseconds of (1/rate) + */ + private static long getRateInMillis(int rate) { + return (long) ((1.0 / rate) * 1000.5); + } + + private static String getISO8601StringForDate(Date date) { + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.GERMANY); + dateFormat.setTimeZone(TimeZone.getTimeZone("CET")); + return dateFormat.format(date); + } + + private static record Configuration( + @JsonProperty String collection, + @JsonProperty String field, + @JsonProperty int rate, + @JsonProperty String outAddress + ) { + private Configuration() { + this("", "", 0, ""); + } + } +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java index bf05aa64..20acd89e 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java @@ -37,6 +37,7 @@ public final class PeriodicDataPublisher extends AbstractVerticle { * MongoDB Eventbus Address. */ private final String db = Address.incoming(MongoDatabaseService.class, "find"); + private final String agg = Address.incoming(MongoDatabaseService.class, "aggregate"); /** * This constructor supplies default options. @@ -45,9 +46,15 @@ public final class PeriodicDataPublisher extends AbstractVerticle { * @param rate the desired rate (1 per rate milliseconds) at which the data should be queried * @param outAddress the desired outgoing address of the periodic data */ - public PeriodicDataPublisher(String collection, int rate, String outAddress) { + public PeriodicDataPublisher(String collection, int rate, String outAddress, String aggregate) { this.forcedConfig = new Configuration( - collection, "", Collections.emptyList(), Collections.emptyList(), rate, outAddress + collection, + "", + Collections.emptyList(), + Collections.emptyList(), + rate, + outAddress, + aggregate ); } @@ -78,17 +85,17 @@ private void databaseRequest() { .put("datetime", new JsonObject().put("$gt", new JsonObject().put("$date", timeOfLastDataSet))); dbRequest = getDbRequestFromConfig(dateQuery.toString()); } - logger.info(dbRequest.query().toString()); - vertx.eventBus().request(db, dbRequest.json(), (Handler>>) reply -> { + vertx.eventBus().request(agg, dbRequest.json(), (Handler>>) reply -> { if (reply.failed()) { logger.error(reply.cause().getMessage()); return; } + logger.info(reply.result().body().toString()); var jArr = reply.result().body().getJsonArray("result"); // Set timeOfLastDataSet to the datetime of the last received data - timeOfLastDataSet = jArr.getJsonObject(jArr.size()-1).getJsonObject("datetime").getString("$date"); - vertx.eventBus().publish(config.outAddress(), jArr); + // timeOfLastDataSet = jArr.getJsonObject(jArr.size()-1).getJsonObject("datetime").getString("$date"); + // vertx.eventBus().publish(config.outAddress(), jArr); }); } @@ -105,7 +112,8 @@ private DbRequest getDbRequestFromConfig() { config.fields(), config.sort(), -1, - 0 + 0, + config.aggregate() ); } @@ -122,7 +130,8 @@ private DbRequest getDbRequestFromConfig(String query) { config.fields(), config.sort(), -1, - 0 + 0, + config.aggregate() ); } @@ -144,10 +153,11 @@ private static record Configuration( @JsonProperty List fields, @JsonProperty List sort, @JsonProperty int rate, - @JsonProperty String outAddress + @JsonProperty String outAddress, + @JsonProperty String aggregate ) { private Configuration() { - this("", "", Collections.emptyList(), Collections.emptyList(), 0, ""); + this("", "", Collections.emptyList(), Collections.emptyList(), 0, "", ""); } } } From fa397b34d0c73c97f88b94d6153e4b060d5083ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Sat, 15 May 2021 16:18:35 +0200 Subject: [PATCH 12/13] refactor: todo comment for later fixes --- .../telestion/services/database/PeriodicDataAggregator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java index 43099781..59fd039a 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java @@ -142,6 +142,7 @@ private static long getRateInMillis(int rate) { } private static String getISO8601StringForDate(Date date) { + // TODO: fix time zone issues, maybe use newer date implementation DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.GERMANY); dateFormat.setTimeZone(TimeZone.getTimeZone("CET")); return dateFormat.format(date); From 2b814875c9bcefd4119a4bc0b46a44df44b463f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Tischh=C3=B6fer?= Date: Sat, 15 May 2021 16:19:34 +0200 Subject: [PATCH 13/13] refactor: private constructor of MDBService should use null values --- .../telestion/services/database/MongoDatabaseService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java index 3764475a..674432f8 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java @@ -293,7 +293,7 @@ private static String getISO8601StringForDate(Date date) { private static record Configuration(@JsonProperty JsonObject dbConfig, @JsonProperty String dbPoolName) { private Configuration() { - this(new JsonObject().put("db_name", "daedalus2").put("useObjectId", true), "d2Pool"); + this(new JsonObject(), ""); } } }