The entire stack runs on containerized Docker services.
- Run start.sh
- (Optional) Use ngrok tcp 27017 to expose mongoDB to external dashboards
-
Data Generator (generator)
- Generate fake data for a sensor and publish them on a Kafka topic
-
Kafka Broker (broker)
- Listen to specific Kafka topic and forward the messages to other services
-
MongoDB-Sink (connect)
- Listen to specific Kafka topic and store the messages in MongoDB
-
MongoDB (mongo)
- Store the messages received and the aggregated values from the scheduled task
-
Apache Drill (drill)
- Query the database via SQL queries
- Expose REST APIs to execute queries
-
Scheduled Aggregator (sched_aggregator)
- Runs scheduled aggregation tasks through Drill, store the output in MongoDB
- Evaluate temporal mean on each minute
- The task is scheduled to run every 5 minutes
- Only the last 5 minutes are evaluated
- Drill logs shows that the scheduled aggregation query is performed successfully every 5 minutes
- Some dashboards realized via Qlik, which connects to MongoDB to fetch up-to-date values
- Access to MongoDB via MongoShell
mongosh "mongodb://localhost:27017/m_db" --username admin1 --password admin1
- Select all measurements stored in the db
SELECT * FROM mongo.m_db.measure;
- Evaluate 1-minute mean for the measurement
SELECT nearestDate(TO_TIMESTAMP(CAST(macro.macro_val.`timestamp` AS int)), 'MINUTE') AS `minute_timestamp`,
AVG(CAST(`right`(macro.macro_val.`value`,6) AS float) ) AS `measure_value`
FROM (SELECT value as macro_val FROM mongo.m_db.measure) macro
GROUP BY minute_timestamp;
- Evaluate 1-minute mean for the measurement for the last 5 minutes
SELECT nearestDate(TO_TIMESTAMP(CAST(macro.macro_val.`timestamp` AS int)), 'MINUTE') AS `minute_timestamp`,
AVG(CAST(`right`(macro.macro_val.`value`,6) AS float) ) AS `measure_value`
FROM (SELECT value as macro_val FROM mongo.m_db.measure) macro
WHERE nearestDate(TO_TIMESTAMP(CAST(macro.macro_val.`timestamp` AS int)), 'MINUTE') >= DATE_SUB(CURRENT_TIMESTAMP, interval '"+str(batch_time)+"' minute)
GROUP BY minute_timestamp