Skip to content

Commit 61ff414

Browse files
razvansbernauer
andauthored
waterlevel data: update docs and nifi flow (#319)
* move nifi flow json definition to config map (and fix bootstrap.servers) * disable sni check for nifi * docs: add kafka section back in * fix pre-comit lint * Update docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc Co-authored-by: Sebastian Bernauer <sebastian.bernauer@stackable.de> * docs: re-add kafka section * move water level flow to config map and update bootstrap.servers * re-add topics image --------- Co-authored-by: Sebastian Bernauer <sebastian.bernauer@stackable.de>
1 parent e3baa1c commit 61ff414

File tree

4 files changed

+224
-8
lines changed

4 files changed

+224
-8
lines changed

demos/nifi-kafka-druid-water-level-data/IngestWaterLevelsToKafka.json

Lines changed: 0 additions & 1 deletion
This file was deleted.

demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml

Lines changed: 15 additions & 7 deletions
Large diffs are not rendered by default.
63.4 KB
Loading

docs/modules/demos/pages/nifi-kafka-druid-water-level-data.adoc

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,215 @@ $ stackablectl stacklet list
9292

9393
include::partial$instance-hint.adoc[]
9494

95+
== Inspect the data in Kafka
96+
97+
Kafka is an event streaming platform to stream the data in near real-time. All the messages put in and read from Kafka
98+
are structured in dedicated queues called topics. The test data will be put into topics called stations and measurements. The records
99+
are produced (put in) by the test data generator and consumed (read) by Druid afterwards in the same order they were
100+
created.
101+
102+
To interact with Kafka you will use the client scripts shipped with the Kafka image. Kafka uses mutual TLS, so clients
103+
wanting to connect to Kafka must present a valid TLS certificate. The easiest way to obtain this is to shell into the
104+
`kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes. For a production setup,
105+
you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the
106+
Kafka Pod.
107+
108+
=== List the available Topics
109+
110+
You can execute a command on the Kafka broker to list the available topics as follows:
111+
112+
[source,console]
113+
----
114+
$ kubectl k exec kafka-broker-default-0 -c kafka -- \
115+
/stackable/kafka/bin/kafka-topics.sh \
116+
--describe \
117+
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
118+
--command-config /stackable/config/client.properties
119+
...
120+
Topic: measurements TopicId: w9qYb3GaTvCMZj4G8pkPPQ PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=100000000,retention.bytes=900000000
121+
Topic: measurements Partition: 0 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
122+
Topic: measurements Partition: 1 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
123+
Topic: measurements Partition: 2 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
124+
Topic: measurements Partition: 3 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
125+
Topic: measurements Partition: 4 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
126+
Topic: measurements Partition: 5 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
127+
Topic: measurements Partition: 6 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
128+
Topic: measurements Partition: 7 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
129+
Topic: stations TopicId: QkKmvOagQkG4QbeS0IZ_Tg PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=100000000,retention.bytes=900000000
130+
Topic: stations Partition: 0 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
131+
Topic: stations Partition: 1 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
132+
Topic: stations Partition: 2 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
133+
Topic: stations Partition: 3 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
134+
Topic: stations Partition: 4 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
135+
Topic: stations Partition: 5 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
136+
Topic: stations Partition: 6 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
137+
Topic: stations Partition: 7 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
138+
----
139+
140+
You can see that Kafka consists of one broker, and the topics `stations` and `measurements` have been created with eight
141+
partitions each.
142+
143+
=== Show Sample Records
144+
145+
To see some records sent to Kafka, run the following commands. You can change the number of records to
146+
print via the `--max-messages` parameter.
147+
148+
[source,console]
149+
----
150+
$ kubectl exec kafka-broker-default-0 -c kafka -- \
151+
/stackable/kafka/bin/kafka-console-consumer.sh \
152+
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
153+
--consumer.config /stackable/config/client.properties \
154+
--topic stations \
155+
--offset earliest \
156+
--partition 0 \
157+
--max-messages 2
158+
----
159+
160+
Below is an example of the output of two records:
161+
162+
[source,json]
163+
----
164+
{
165+
"uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
166+
"number": 48900237,
167+
"shortname": "EITZE",
168+
"longname": "EITZE",
169+
"km": 9.56,
170+
"agency": "VERDEN",
171+
"longitude": 9.2767694354,
172+
"latitude": 52.9040654474,
173+
"water": {
174+
"shortname": "ALLER",
175+
"longname": "ALLER"
176+
}
177+
}
178+
{
179+
"uuid": "5aaed954-de4e-4528-8f65-f3f530bc8325",
180+
"number": 48900204,
181+
"shortname": "RETHEM",
182+
"longname": "RETHEM",
183+
"km": 34.22,
184+
"agency": "VERDEN",
185+
"longitude": 9.3828408101,
186+
"latitude": 52.7890975921,
187+
"water": {
188+
"shortname": "ALLER",
189+
"longname": "ALLER"
190+
}
191+
}
192+
----
193+
194+
[source,console]
195+
----
196+
$ kubectl exec kafka-broker-default-0 -c kafka -- \
197+
/stackable/kafka/bin/kafka-console-consumer.sh \
198+
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
199+
--consumer.config /stackable/config/client.properties \
200+
--topic measurements \
201+
--offset earliest \
202+
--partition 0 \
203+
--max-messages 3
204+
----
205+
206+
Below is an example of the output of three records:
207+
208+
[source,json]
209+
----
210+
{
211+
"timestamp": 1658151900000,
212+
"value": 221,
213+
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
214+
}
215+
{
216+
"timestamp": 1658152800000,
217+
"value": 220,
218+
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
219+
}
220+
{
221+
"timestamp": 1658153700000,
222+
"value": 220,
223+
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
224+
}
225+
----
226+
227+
The records of the two topics only contain the needed data. The measurement records contain a `station_uuid` for the
228+
measuring station. The relationship is illustrated below.
229+
230+
image::nifi-kafka-druid-water-level-data/topics.png[]
231+
232+
The reason for splitting the data up into two different topics is the improved performance. One more straightforward
233+
solution would be to use a single topic and produce records like the following:
234+
235+
[source,json]
236+
----
237+
{
238+
"uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
239+
"number": 48900237,
240+
"shortname": "EITZE",
241+
"longname": "EITZE",
242+
"km": 9.56,
243+
"agency": "VERDEN",
244+
"longitude": 9.2767694354,
245+
"latitude": 52.9040654474,
246+
"water": {
247+
"shortname": "ALLER",
248+
"longname": "ALLER"
249+
},
250+
"timestamp": 1658151900000,
251+
"value": 221
252+
}
253+
----
254+
255+
Notice the two last attributes that differ from the previously shown `stations` records. The obvious downside is that
256+
every measurement (multiple millions of it) has to contain all the data known about the station it was measured at. This
257+
often leads to transmitting and storing duplicated information, e.g., the longitude of a station, resulting in increased
258+
network traffic and storage usage. The solution is only to send a station's known/needed data or measurement data. This
259+
process is called data normalization. The downside is that when analyzing the data, you need to combine the records from
260+
multiple tables in Druid (`stations` and `measurements`).
261+
262+
If you are interested in how many records have been produced to the Kafka topic so far, use the following command. It
263+
will print the last record produced to the topic partition, formatted with the pattern specified in the `-f` parameter.
264+
The given pattern will print some metadata of the record.
265+
266+
[source,console]
267+
----
268+
$ kubectl exec kafka-broker-default-0 -c kafka -- \
269+
/stackable/kafka/bin/kafka-get-offsets.sh \
270+
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
271+
--command-config /stackable/config/client.properties \
272+
--topic measurements
273+
...
274+
measurements:0:1366665
275+
measurements:1:1364930
276+
measurements:2:1395607
277+
measurements:3:1390762
278+
measurements:4:1368829
279+
measurements:5:1362539
280+
measurements:6:1344362
281+
measurements:7:1369651
282+
----
283+
284+
Multiplying `1,324,098` records by `8` partitions, we end up with ~ 10,592,784 records.
285+
286+
To inspect the last produced records, use the following command. Here, we consume the last three records from partition
287+
`0` of the `measurements` topic.
288+
289+
[source,console]
290+
----
291+
$ kubectl exec kafka-broker-default-0 -c kafka -- \
292+
/stackable/kafka/bin/kafka-console-consumer.sh \
293+
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
294+
--consumer.config /stackable/config/client.properties \
295+
--topic measurements \
296+
--offset latest \
297+
--partition 0 \
298+
--max-messages 3
299+
-...
300+
{"timestamp":"2025-10-21T11:00:00+02:00","value":369.54,"station_uuid":"5cdc6555-87d7-4fcd-834d-cbbe24c9d08b"}
301+
{"timestamp":"2025-10-21T11:15:00+02:00","value":369.54,"station_uuid":"5cdc6555-87d7-4fcd-834d-cbbe24c9d08b"}
302+
{"timestamp":"2025-10-21T11:00:00+02:00","value":8.0,"station_uuid":"7deedc21-2878-40cc-ab47-f6da0d9002f1"}
303+
----
95304
== NiFi
96305

97306
NiFi fetches water-level data from the internet and ingests it into Kafka in real time. This demo includes a workflow

0 commit comments

Comments
 (0)