diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java index 1f869124..686143e3 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java @@ -11,22 +11,34 @@ import de.wuespace.telestion.api.message.JsonMessage; import de.wuespace.telestion.api.config.Config; +/** + * Listener that collects all incoming data configured in listeningAddresses and redirects them to be saved to the + * MongoDatabaseService. + */ public final class DataListener extends AbstractVerticle { private final Configuration forcedConfig; private Configuration config; private final Logger logger = LoggerFactory.getLogger(DataListener.class); - private final String save = Address.incoming(DataService.class, "save"); - - public DataListener() { - this.forcedConfig = null; - } + private final String save = Address.incoming(MongoDatabaseService.class, "save"); + /** + * This constructor supplies default options. + * + * @param listeningAddresses List of addresses that should be saved + */ public DataListener(List listeningAddresses) { this.forcedConfig = new Configuration(listeningAddresses); } + /** + * If this constructor is used, settings have to be specified in the config file. + */ + public DataListener() { + this.forcedConfig = null; + } + @Override public void start(Promise startPromise) throws Exception { config = Config.get(forcedConfig, config(), Configuration.class); @@ -34,6 +46,9 @@ public void start(Promise startPromise) throws Exception { startPromise.complete(); } + /** + * Function to register consumers to the eventbus. + */ private void registerConsumers() { config.listeningAddresses().forEach(address -> { vertx.eventBus().consumer(address, document -> { diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java index 10faface..d3c7b56e 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java @@ -5,11 +5,11 @@ import de.wuespace.telestion.api.message.JsonMessage; public record DataRequest( - @JsonProperty String className, - @JsonProperty JsonObject query, + @JsonProperty String collection, + @JsonProperty String query, @JsonProperty String operation, @JsonProperty JsonObject operationParams) implements JsonMessage { private DataRequest() { - this("", new JsonObject(), "", new JsonObject()); + this("", "", "", new JsonObject()); } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java index be819f36..eb304825 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java @@ -40,7 +40,6 @@ public DataService() { /** * This constructor supplies default options. * - * @param dataTypes List of all full class names of the data types * @param dataOperationMap Map of String->DataOperation for incoming dataRequests */ public DataService(Map dataOperationMap) { @@ -91,30 +90,24 @@ private void registerConsumers() { * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. */ private void dataRequestDispatcher(DataRequest request, Handler> resultHandler) { - // TODO: If className is empty, check if query exists and just pass the query to the DatabaseClient - try { - var dataType = Class.forName(request.className()); - if (request.operation().isEmpty()) { - this.fetchLatestData(dataType, request.query(), res -> { - if (res.failed()) { - resultHandler.handle(Future.failedFuture(res.cause().getMessage())); - return; - } - resultHandler.handle(Future.succeededFuture(res.result())); - }); - } else { - var dataOperation = new DataOperation(new JsonObject(), request.operationParams()); - this.fetchLatestData(dataType, request.query(), res -> { - if (res.failed()) { - resultHandler.handle(Future.failedFuture(res.cause().getMessage())); - return; - } - dataOperation.data().put("data", res.result()); - }); - this.applyManipulation(request.operation(), dataOperation, resultHandler); - } - } catch (ClassNotFoundException e) { - logger.error("ClassNotFoundException: {}", e.getMessage()); + if (request.operation().isEmpty()) { + this.fetchLatestData(request.collection(), request.query(), res -> { + if (res.failed()) { + resultHandler.handle(Future.failedFuture(res.cause().getMessage())); + return; + } + resultHandler.handle(Future.succeededFuture(res.result())); + }); + } else { + var dataOperation = new DataOperation(new JsonObject(), request.operationParams()); + this.fetchLatestData(request.collection(), request.query(), res -> { + if (res.failed()) { + resultHandler.handle(Future.failedFuture(res.cause().getMessage())); + return; + } + dataOperation.data().put("data", res.result()); + }); + this.applyManipulation(request.operation(), dataOperation, resultHandler); } } @@ -142,13 +135,13 @@ private void requestResultHandler( /** * Method to fetch the latest data of a specified data type. * - * @param dataType Determines which data type should be fetched. + * @param collection Determines from which collection data should be fetched. * @param query MongoDB query, can be empty JsonObject if no specific query is needed. * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. */ - private void fetchLatestData(Class dataType, JsonObject query, + private void fetchLatestData(String collection, String query, Handler> resultHandler) { - DbRequest dbRequest = new DbRequest(dataType, query); + DbRequest dbRequest = new DbRequest(collection, query); this.requestResultHandler(dbFind, dbRequest, resultHandler); } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java index 7808bb13..e0e79995 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java @@ -4,21 +4,46 @@ import io.vertx.core.json.JsonObject; import de.wuespace.telestion.api.message.JsonMessage; +import java.util.Collections; +import java.util.List; + /** * Record to provide the structure of a database request. * - * @param dataType class of the data type - * @param query MongoDb query looks like this: { key: value } - * with key meaning the name of the field in the document. - * IN condition: { key: { $in: ["value1", "value2", ...] }} - * AND condition: { key1: "value1", key2: { $lt: value } } with $lt meaning less than - * OR condition: { $or: [{ key1: "value1" }, { key2: { $gt: value2 }}] } - * @see MongoDB manual for more information + * @param collection String of the desired MongoDB collection. + * @param query MongoDB queries are written in JSON. + * This parameter is a String representation of JSONObject. "{ "key": "value" }" + * with key meaning the name of the field in the MongoDB document. + * IN condition: { key: { $in: ["value1", "value2", ...] }} + * AND condition: { key1: "value1", key2: { $lt: value } } with $lt meaning less than + * OR condition: { $or: [{ key1: "value1" }, { key2: { $gt: value2 }}] } + * @param fields List of key Strings in the collection limiting the fields that should be returned. + * @param sort List of key Strings that the returned data should be sorted by. + * @param limit Limits the amount of returned entries. -1 equals all entries found. + * @param skip Specifies how many entries should be skipped. 0 is default, meaning no entries are skipped. + * @param aggregate Field, that should be aggregated. + * @see MongoDB manual for more information. + * + * @author Jan Tischhöfer + * @version 07-05-2021 */ public record DbRequest( - @JsonProperty Class dataType, - @JsonProperty JsonObject query) implements JsonMessage { - private DbRequest() { - this(null, new JsonObject()); - } + @JsonProperty String collection, + @JsonProperty String query, + @JsonProperty List fields, + @JsonProperty List sort, + @JsonProperty int limit, + @JsonProperty int skip, + @JsonProperty String aggregate) implements JsonMessage { + private DbRequest() { + this("", "", Collections.emptyList(), Collections.emptyList(), -1, 0, ""); + } + + public DbRequest(String collection) { + this(collection, "", Collections.emptyList(), Collections.emptyList(), -1, 0, ""); + } + + public DbRequest(String collection, String query) { + this(collection, query, Collections.emptyList(), Collections.emptyList(), -1, 0, ""); + } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java index c48b7382..0aba996f 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java @@ -1,15 +1,14 @@ package de.wuespace.telestion.services.database; import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.message.JsonMessage; import io.vertx.core.json.JsonObject; import java.util.Collections; import java.util.List; -import de.wuespace.telestion.api.message.JsonMessage; public record DbResponse( - @JsonProperty Class dataType, @JsonProperty List result) implements JsonMessage { private DbResponse() { - this(null, Collections.emptyList()); + this(Collections.emptyList()); } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java index 9456c06f..674432f8 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java @@ -1,16 +1,22 @@ package de.wuespace.telestion.services.database; import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.message.JsonMessage; +import de.wuespace.telestion.api.config.Config; +import de.wuespace.telestion.services.message.Address; import io.vertx.core.*; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.mongo.FindOptions; import io.vertx.ext.mongo.MongoClient; -import java.util.List; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.api.config.Config; -import de.wuespace.telestion.services.message.Address; /** * MongoDatabaseService is a verticle which connects to a local running MongoDB-Database and listens for incoming @@ -19,7 +25,10 @@ * TODO: but listens to the same address for DBRequests. The address is the interface to the database implementation, * TODO: so that the used DB can be replaced easily by spawning another DBClient. * Mongo specific: - * Data is always saved in their exclusive collection which is always named after their Class.name / MessageType. + * Data is always saved in their exclusive collection which is always named after their Class.name. + * + * @author Jan Tischhöfer + * @version 07-05-2021 */ public final class MongoDatabaseService extends AbstractVerticle { private final Logger logger = LoggerFactory.getLogger(MongoDatabaseService.class); @@ -33,6 +42,7 @@ public final class MongoDatabaseService extends AbstractVerticle { private final String inSave = Address.incoming(MongoDatabaseService.class, "save"); private final String outSave = Address.outgoing(MongoDatabaseService.class, "save"); private final String inFind = Address.incoming(MongoDatabaseService.class, "find"); + private final String inAgg = Address.incoming(MongoDatabaseService.class, "aggregate"); /** * This constructor supplies default options. @@ -69,7 +79,19 @@ private void registerConsumers() { }); vertx.eventBus().consumer(inFind, request -> { JsonMessage.on(DbRequest.class, request, dbRequest -> { - this.findLatest(dbRequest, result -> { + this.find(dbRequest, result -> { + if (result.failed()) { + request.fail(-1, result.cause().getMessage()); + } + if (result.succeeded()) { + request.reply(result.result()); + } + }); + }); + }); + vertx.eventBus().consumer(inAgg, request -> { + JsonMessage.on(DbRequest.class, request, dbRequest -> { + this.aggregate(dbRequest, result -> { if (result.failed()) { request.fail(-1, result.cause().getMessage()); } @@ -85,55 +107,193 @@ private void registerConsumers() { * Save the received document to the database. * If a MongoDB-ObjectId is specified data will be upserted, meaning if the id does not exist it will be inserted, * otherwise it will be updated. Else it will be inserted with a new id. + * Additionally the current date/time is added for future queries regarding date and time. * If the save was successful the database looks for the newly saved document and publishes it to the database - * outgoing address concatenated with "/Class.name". With this behaviour clients (e.g. Frontend) can listen + * outgoing address concatenated with "/Class.name". + * Through this behaviour clients (e.g. GUI) can listen * to the outgoing address of a specific data value and will always be provided with the most recent data. * * @param document a JsonMessage validated through the JsonMessage.on method */ private void save(JsonMessage document) { var object = document.json(); + var dateString = getISO8601StringForDate(new Date()); + object.put("datetime", new JsonObject().put("$date", dateString)); client.save(document.className(), object, res -> { if (res.failed()) { logger.error("DB Save failed: ", res.cause()); return; } - String id = res.result(); + /*String id = res.result(); client.find(document.className(), new JsonObject().put("_id", id), rec -> { if (rec.failed()) { logger.error("DB Find failed: ", rec.cause()); return; } - DbResponse dbRes = new DbResponse(document.getClass(), rec.result()); + DbResponse dbRes = new DbResponse(rec.result()); vertx.eventBus().publish(outSave.concat("/").concat(document.className()), dbRes.json()); - }); + });*/ }); } /** * Find the latest entry of the requested data type. * - * @param request DbRequest = { class of requested data type, query? } - * @param handler Result handler, can be failed or succeeded + * @param request {@link de.wuespace.telestion.services.database.DbRequest} + * @param handler result handler, can be failed or succeeded */ - private void findLatest(DbRequest request, Handler>> handler) { - FindOptions findOptions = new FindOptions() - .setSort(new JsonObject().put("_id", -1)).setLimit(1); // last item - client.findWithOptions(request.dataType().getName(), - request.query(), - findOptions, res -> { + @SuppressWarnings("unused") + private void findLatest(DbRequest request, Handler> handler) { + client.findWithOptions(request.collection(), + getJsonQueryFromString(request.query()), + setFindOptions(request.fields(), List.of("_id"), 1, 0), + res -> { if (res.failed()) { logger.error("DB Request failed: ", res.cause()); handler.handle(Future.failedFuture(res.cause())); return; } - handler.handle(Future.succeededFuture(res.result())); + var dbRes = new DbResponse(res.result()); + handler.handle(Future.succeededFuture(dbRes.json())); }); } + /** + * Find all requested entries in the MongoDB. + * + * @param request query options are defined by {@link de.wuespace.telestion.services.database.DbRequest}. + * @param handler result handler, can be failed or succeeded. + */ + private void find(DbRequest request, Handler> handler) { + client.findWithOptions( + request.collection(), + getJsonQueryFromString(request.query()), + setFindOptions(request.fields(), request.sort(), request.limit(), request.skip()), + res -> { + if (res.failed()) { + logger.error("DB Request failed: ", res.cause()); + handler.handle(Future.failedFuture(res.cause())); + return; + } + var dbRes = new DbResponse(res.result()); + handler.handle(Future.succeededFuture(dbRes.json())); + } + ); + } + + private void aggregate(DbRequest request, Handler> handler) { + var command = new JsonObject() + .put("aggregate", request.collection()) + .put("pipeline", new JsonArray()); + command.getJsonArray("pipeline") + .add(new JsonObject() + .put("$match", getJsonQueryFromString(request.query()))); + // For each field in specified collection document you need to define the field and the operations + // Outsource in helper function + command.getJsonArray("pipeline") + .add(new JsonObject() + .put("$group", getGroupStageFromFields(request.aggregate()))) + .add(new JsonObject() + .put("$project", new JsonObject() + .put("_id", 0) + .put("min", "$min") + .put("avg", "$avg") + .put("max", "$max") + .put("last", "$last") + .put("time", new JsonObject().put("$toLong", "$time")))); + command.put("cursor", new JsonObject()); + client.runCommand("aggregate", command, result -> { + if (result.failed()) { + logger.error("Aggregation failed: ", result.cause().getMessage()); + handler.handle(Future.failedFuture(result.cause())); + return; + } + handler.handle(Future.succeededFuture(result.result())); + }); + } + + private JsonObject getGroupStageFromFields(String field) { + var group = new JsonObject().put("_id", "$datetime"); + // calculate avg/min/max for field + group.put("min", new JsonObject().put("$min", "$" + field)); + group.put("avg", new JsonObject().put("$avg", "$" + field)); + group.put("max", new JsonObject().put("$max", "$" + field)); + group.put("last", new JsonObject().put("$last", "$" + field)); + group.put("time", new JsonObject().put("$first", "$datetime")); + return group; + } + + /** + * Helper function to set the {@link io.vertx.ext.mongo.FindOptions} + * for the {@link io.vertx.ext.mongo.MongoClient#findWithOptions(String, JsonObject, FindOptions)}. + * + * @param limit Limits the amount of returned entries. -1 equals all entries found. + * @param skip Specifies if and how many entries should be skipped. + * @return {@link io.vertx.ext.mongo.FindOptions} for the MongoClient. + */ + private FindOptions setFindOptions(List fields, List sort, int limit, int skip) { + return setFindOptions(fields, sort).setLimit(limit).setSkip(skip); + } + + /** + * Helper function to set the {@link io.vertx.ext.mongo.FindOptions} + * for the {@link io.vertx.ext.mongo.MongoClient#findWithOptions(String, JsonObject, FindOptions)}. + * + * @param fields List of key Strings in the collection limiting the fields that should be returned. + * @param sort List of key Strings that the returned data should be sorted by. + * @return {@link io.vertx.ext.mongo.FindOptions} for the MongoClient. + */ + private FindOptions setFindOptions(List fields, List sort) { + FindOptions findOptions = new FindOptions(); + if (!fields.isEmpty()) { + JsonObject jsonFields = new JsonObject(); + fields.forEach(f -> jsonFields.put(f, true)); + findOptions.setFields(jsonFields); + } + if (!sort.isEmpty()) { + JsonObject jsonSort = new JsonObject(); + sort.forEach(s -> jsonSort.put(s, -1)); + findOptions.setSort(jsonSort); + } + return findOptions; + } + + /** + * Helper function to parse a query string to a {@link io.vertx.core.json.JsonObject}. + * + * @param query JSON String - "{"key":"value"}, ..." + * @return {@link io.vertx.core.json.JsonObject} query for + * {@link io.vertx.ext.mongo.MongoClient#findWithOptions(String, JsonObject, FindOptions)} + */ + private JsonObject getJsonQueryFromString(String query) { + if (query.isEmpty()) { + return new JsonObject("{}"); + } else { + try { + return new JsonObject(query); + } catch (DecodeException e) { + logger.error("No valid JSON String: ".concat(e.getMessage()).concat("\nReturning empty JsonObject.")); + return new JsonObject(); + } + } + } + + /** + * Helper function to convert a {@link java.util.Date} to a ISO-8601 Date/Time string. + * + * @param date {@link java.util.Date} that should be converted. + * @return ISO-8601 Date/Time string representation + */ + private static String getISO8601StringForDate(Date date) { + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", Locale.GERMANY); + dateFormat.setTimeZone(TimeZone.getTimeZone("CET")); + var dateString = dateFormat.format(date); + return dateFormat.format(date); + } + private static record Configuration(@JsonProperty JsonObject dbConfig, @JsonProperty String dbPoolName) { private Configuration() { - this(new JsonObject().put("db_name", "raketenpraktikum").put("useObjectId", true), "raketenpraktikumPool"); + this(new JsonObject(), ""); } } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java new file mode 100644 index 00000000..59fd039a --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataAggregator.java @@ -0,0 +1,161 @@ +package de.wuespace.telestion.services.database; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.config.Config; +import de.wuespace.telestion.services.message.Address; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Verticle periodically querying the database for preconfigured data, + * publishing the results to a preconfigured outgoing address. + * + * @author Jan Tischhöfer + * @version 07-05-2021 + */ +public final class PeriodicDataAggregator extends AbstractVerticle { + private final Configuration forcedConfig; + private Configuration config; + + private final Logger logger = LoggerFactory.getLogger(PeriodicDataAggregator.class); + private Handler reqTimer; + private Long timer; + private DbRequest dbRequest; + private String timeOfLastDataSet; + + /** + * MongoDB Eventbus Address. + */ + private final String agg = Address.incoming(MongoDatabaseService.class, "aggregate"); + + /** + * This constructor supplies default options. + * + * @param collection the name of the MongoDB collection (table in SQL databases) + * @param rate the desired rate (1 per rate milliseconds) at which the data should be queried + * @param outAddress the desired outgoing address of the periodic data + */ + public PeriodicDataAggregator(String collection, String field, int rate, String outAddress) { + this.forcedConfig = new Configuration( + collection, field, rate, outAddress + ); + } + + /** + * If this constructor is used, settings have to be specified in the config file. + */ + public PeriodicDataAggregator() { this.forcedConfig = null; } + + @Override + public void start(Promise startPromise) throws Exception { + config = Config.get(forcedConfig, config(), Configuration.class); + dbRequest = getDbRequestFromConfig(); + timer = getRateInMillis(config.rate()); + logger.info("Timer is: " + String.valueOf(timer)); + timeOfLastDataSet = getISO8601StringForDate(new Date()); + reqTimer = id -> { + databaseRequest(); + vertx.setTimer(timer, reqTimer); + }; + vertx.setTimer(timer, reqTimer); + startPromise.complete(); + } + + /** + * Function to request the preconfigured DbRequest. + */ + private void databaseRequest() { + var dateQuery = new JsonObject() + .put("datetime", new JsonObject().put("$gt", new JsonObject().put("$date", timeOfLastDataSet))); + dbRequest = getDbRequestFromConfig(dateQuery.toString()); + vertx.eventBus().request(agg, dbRequest.json(), (Handler>>) reply -> { + if (reply.failed()) { + logger.error(reply.cause().getMessage()); + return; + } + logger.info(reply.result().body().toString()); + if (reply.result().body() != null) { + var jArr = reply.result().body().getJsonObject("cursor").getJsonArray("firstBatch"); + if (!jArr.isEmpty()) { + // Set timeOfLastDataSet to the datetime of the last received data + long unixTs = Long.parseLong(jArr.getJsonObject(jArr.size()-1).getString("time")); + timeOfLastDataSet = getISO8601StringForDate(new Date(unixTs)); + vertx.eventBus().publish(config.outAddress(), jArr); + } + } + }); + } + + /** + * Function to create DbRequest from config. + * + * @return {@link de.wuespace.telestion.services.database.DbRequest} + */ + private DbRequest getDbRequestFromConfig() { + // TODO: Make parameters optional and config easier. + return new DbRequest( + config.collection(), + "", + Collections.emptyList(), + Collections.emptyList(), + -1, + 0, + config.field() + ); + } + + /** + * Function to create DbRequest from config with a new query containing e.g. the new last date/time. + * + * @param query new query in JSON String representation + * @return {@link de.wuespace.telestion.services.database.DbRequest} + */ + private DbRequest getDbRequestFromConfig(String query) { + return new DbRequest( + config.collection(), + query, + Collections.emptyList(), + Collections.emptyList(), + -1, + 0, + config.field() + ); + } + + /** + * Helper function to turn rate into milliseconds. + * + * @param rate the desired data rate + * @return milliseconds of (1/rate) + */ + private static long getRateInMillis(int rate) { + return (long) ((1.0 / rate) * 1000.5); + } + + private static String getISO8601StringForDate(Date date) { + // TODO: fix time zone issues, maybe use newer date implementation + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.GERMANY); + dateFormat.setTimeZone(TimeZone.getTimeZone("CET")); + return dateFormat.format(date); + } + + private static record Configuration( + @JsonProperty String collection, + @JsonProperty String field, + @JsonProperty int rate, + @JsonProperty String outAddress + ) { + private Configuration() { + this("", "", 0, ""); + } + } +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java new file mode 100644 index 00000000..20acd89e --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/PeriodicDataPublisher.java @@ -0,0 +1,163 @@ +package de.wuespace.telestion.services.database; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.config.Config; +import de.wuespace.telestion.services.message.Address; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.Collections; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Verticle periodically querying the database for preconfigured data, + * publishing the results to a preconfigured outgoing address. + * + * @author Jan Tischhöfer + * @version 07-05-2021 + */ +public final class PeriodicDataPublisher extends AbstractVerticle { + private final Configuration forcedConfig; + private Configuration config; + + private final Logger logger = LoggerFactory.getLogger(PeriodicDataPublisher.class); + private Handler reqTimer; + private Long timer; + private DbRequest dbRequest; + private String timeOfLastDataSet = null; + + /** + * MongoDB Eventbus Address. + */ + private final String db = Address.incoming(MongoDatabaseService.class, "find"); + private final String agg = Address.incoming(MongoDatabaseService.class, "aggregate"); + + /** + * This constructor supplies default options. + * + * @param collection the name of the MongoDB collection (table in SQL databases) + * @param rate the desired rate (1 per rate milliseconds) at which the data should be queried + * @param outAddress the desired outgoing address of the periodic data + */ + public PeriodicDataPublisher(String collection, int rate, String outAddress, String aggregate) { + this.forcedConfig = new Configuration( + collection, + "", + Collections.emptyList(), + Collections.emptyList(), + rate, + outAddress, + aggregate + ); + } + + /** + * If this constructor is used, settings have to be specified in the config file. + */ + public PeriodicDataPublisher() { this.forcedConfig = null; } + + @Override + public void start(Promise startPromise) throws Exception { + config = Config.get(forcedConfig, config(), Configuration.class); + dbRequest = getDbRequestFromConfig(); + timer = getRateInMillis(config.rate()); + reqTimer = id -> { + databaseRequest(); + vertx.setTimer(timer, reqTimer); + }; + vertx.setTimer(timer, reqTimer); + startPromise.complete(); + } + + /** + * Function to request the preconfigured DbRequest. + */ + private void databaseRequest() { + if (timeOfLastDataSet != null) { + var dateQuery = new JsonObject() + .put("datetime", new JsonObject().put("$gt", new JsonObject().put("$date", timeOfLastDataSet))); + dbRequest = getDbRequestFromConfig(dateQuery.toString()); + } + vertx.eventBus().request(agg, dbRequest.json(), (Handler>>) reply -> { + if (reply.failed()) { + logger.error(reply.cause().getMessage()); + return; + } + + logger.info(reply.result().body().toString()); + var jArr = reply.result().body().getJsonArray("result"); + // Set timeOfLastDataSet to the datetime of the last received data + // timeOfLastDataSet = jArr.getJsonObject(jArr.size()-1).getJsonObject("datetime").getString("$date"); + // vertx.eventBus().publish(config.outAddress(), jArr); + }); + } + + /** + * Function to create DbRequest from config. + * + * @return {@link de.wuespace.telestion.services.database.DbRequest} + */ + private DbRequest getDbRequestFromConfig() { + // TODO: Make parameters optional and config easier. + return new DbRequest( + config.collection(), + config.query(), + config.fields(), + config.sort(), + -1, + 0, + config.aggregate() + ); + } + + /** + * Function to create DbRequest from config with a new query containing e.g. the new last date/time. + * + * @param query new query in JSON String representation + * @return {@link de.wuespace.telestion.services.database.DbRequest} + */ + private DbRequest getDbRequestFromConfig(String query) { + return new DbRequest( + config.collection(), + query, + config.fields(), + config.sort(), + -1, + 0, + config.aggregate() + ); + } + + /** + * Helper function to turn rate into milliseconds. + * + * @param rate the desired data rate + * @return milliseconds of (1/rate) + */ + private static long getRateInMillis(int rate) { + BigDecimal bd = new BigDecimal((double) (1 / rate)); + bd = bd.setScale(3, RoundingMode.HALF_UP); + return (long) bd.doubleValue() * 1000L; + } + + private static record Configuration( + @JsonProperty String collection, + @JsonProperty String query, + @JsonProperty List fields, + @JsonProperty List sort, + @JsonProperty int rate, + @JsonProperty String outAddress, + @JsonProperty String aggregate + ) { + private Configuration() { + this("", "", Collections.emptyList(), Collections.emptyList(), 0, "", ""); + } + } +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java deleted file mode 100644 index 21b46f5e..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java +++ /dev/null @@ -1,49 +0,0 @@ -package de.wuespace.telestion.services.database; - -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; -import java.time.Duration; -import java.util.Random; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import de.wuespace.telestion.services.message.Address; - -/** - * Test class.
- * Will be removed upon first release. - */ -public final class RandomPositionPublisher extends AbstractVerticle { - private static final Logger logger = LoggerFactory.getLogger(RandomPositionPublisher.class); - private final Random rand = new Random(555326456); - - private final String inSave = Address.incoming(DataService.class, "save"); - - @Override - public void start(Promise startPromise) { - vertx.setPeriodic(Duration.ofSeconds(3).toMillis(), timerId -> publishPosition()); - startPromise.complete(); - } - - /** - * Publishes random Position around Kiruna. - */ - private void publishPosition() { - var x = (double) vertx.sharedData().getLocalMap("randPos").getOrDefault("x", 67.8915); - var y = (double) vertx.sharedData().getLocalMap("randPos").getOrDefault("y", 21.0836); - var z = (double) vertx.sharedData().getLocalMap("randPos").getOrDefault("z", 0.0); - - //final Position pos = new Position(x, y, z); - - x += rand.nextDouble() * 0.02; - y += rand.nextDouble() * 0.02; - // z += rand.nextDouble()*0.02; - vertx.sharedData().getLocalMap("randPos").put("x", x); - vertx.sharedData().getLocalMap("randPos").put("y", y); - vertx.sharedData().getLocalMap("randPos").put("z", z); - - //vertx.eventBus().publish(Address.outgoing(RandomPositionPublisher.class, "MockPos"), pos.json()); - //vertx.eventBus().publish(inSave, pos.json()); - //logger.debug("Sending current pos: {} on {}", pos, RandomPositionPublisher.class.getName()); - } -} -