Skip to content

Commit e3baa1c

Browse files
razvansbernauer
andauthored
earthquake-data: update docs and nifi flows (#318)
* move nifi flow json definition to config map (and fix bootstrap.servers) * disable sni check for nifi * docs: add kafka section back in * fix pre-comit lint * Update docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc Co-authored-by: Sebastian Bernauer <sebastian.bernauer@stackable.de> --------- Co-authored-by: Sebastian Bernauer <sebastian.bernauer@stackable.de>
1 parent 404459c commit e3baa1c

File tree

4 files changed

+126
-9
lines changed

4 files changed

+126
-9
lines changed

demos/nifi-kafka-druid-earthquake-data/IngestEarthquakesToKafka.json

Lines changed: 0 additions & 1 deletion
This file was deleted.

demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml

Lines changed: 15 additions & 6 deletions
Large diffs are not rendered by default.

docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Additionally, we have to use the FQDN service names (including the namespace), s
2727
To run this demo, your system needs at least:
2828

2929
* 9 {k8s-cpu}[cpu units] (core/hyperthread)
30-
* 42GiB memory
30+
* 42GiB memory (minimum of 16GiB per node)
3131
* 75GiB disk storage
3232
3333
== Overview
@@ -86,6 +86,111 @@ $ stackablectl stacklet list
8686

8787
include::partial$instance-hint.adoc[]
8888

89+
== Inspect the data in Kafka
90+
91+
Kafka is an event streaming platform to stream the data in near real-time.
92+
All the messages put in and read from Kafka are structured in dedicated queues called topics.
93+
The test data will be put into a topic called `earthquakes`.
94+
The records are produced (written) by the test data generator and consumed (read) by Druid afterwards in the same order they were created.
95+
96+
Kafka uses mutual TLS, so clients wanting to connect to Kafka must present a valid TLS certificate.
97+
The easiest way to obtain this is to shell into the `kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes.
98+
For a production setup, you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the Kafka Pod.
99+
100+
=== List the available Topics
101+
102+
You can execute a command on the Kafka broker to list the available topics as follows:
103+
104+
// In the following commands the kcat-prober container instead of the kafka container is used to send requests to Kafka.
105+
// This is necessary because kcat cannot use key- and truststore files with empty passwords, which are mounted here to the kafka container.
106+
// However, the kcat-prober container has TLS certificates mounted, which can be used by kcat to connect to Kafka.
107+
[source,console]
108+
----
109+
$ kubectl exec kafka-broker-default-0 -c kafka -- \
110+
/stackable/kafka/bin/kafka-topics.sh \
111+
--describe \
112+
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
113+
--command-config /stackable/config/client.properties
114+
...
115+
Topic: earthquakes TopicId: ND51v_XcQPK4Ilm7A35Pag PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=100000000,retention.bytes=900000000
116+
Topic: earthquakes Partition: 0 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
117+
Topic: earthquakes Partition: 1 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
118+
Topic: earthquakes Partition: 2 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
119+
Topic: earthquakes Partition: 3 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
120+
Topic: earthquakes Partition: 4 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
121+
Topic: earthquakes Partition: 5 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
122+
Topic: earthquakes Partition: 6 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
123+
Topic: earthquakes Partition: 7 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
124+
----
125+
126+
You can see that Kafka consists of one broker, and the topic `earthquakes` with eight partitions has been created. To
127+
see some records sent to Kafka, run the following command. You can change the number of records to print via the `--max-messages`
128+
parameter.
129+
130+
[source,console]
131+
----
132+
$ kubectl exec kafka-broker-default-0 -c kafka -- \
133+
/stackable/kafka/bin/kafka-console-consumer.sh \
134+
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
135+
--consumer.config /stackable/config/client.properties \
136+
--topic earthquakes \
137+
--offset earliest \
138+
--partition 0 \
139+
--max-messages 1
140+
----
141+
142+
Below is an example of the output of one record:
143+
144+
[source,json]
145+
----
146+
{
147+
"time":"1950-02-07T10:37:29.240Z",
148+
"latitude":45.949,
149+
"longitude":151.59,
150+
"depth":35.0,
151+
"mag":5.94,
152+
"magType":"mw",
153+
"nst":null,
154+
"gap":null,
155+
"dmin":null,
156+
"rms":null,
157+
"net":"iscgem",
158+
"id":"iscgem895202",
159+
"updated":"2022-04-26T18:23:38.377Z",
160+
"place":"Kuril Islands",
161+
"type":"earthquake",
162+
"horizontalError":null,
163+
"depthError":12.6,
164+
"magError":0.55,
165+
"magNst":null,
166+
"status":"reviewed",
167+
"locationSource":"iscgem",
168+
"magSource":"iscgem"
169+
}
170+
----
171+
172+
If you are interested in how many records have been produced to the Kafka topic so far, use the following command.
173+
174+
[source,console]
175+
----
176+
$ kubectl exec kafka-broker-default-0 -c kafka -- \
177+
/stackable/kafka/bin/kafka-get-offsets.sh \
178+
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
179+
--command-config /stackable/config/client.properties \
180+
--topic earthquakes
181+
...
182+
earthquakes:0:757379
183+
earthquakes:1:759282
184+
earthquakes:2:761924
185+
earthquakes:3:761339
186+
earthquakes:4:759059
187+
earthquakes:5:767695
188+
earthquakes:6:771457
189+
earthquakes:7:768301
190+
----
191+
192+
If you calculate `765,000` records * `8` partitions, you end up with ~ 6,120,000 records.
193+
89194
== NiFi
90195

91196
NiFi is used to fetch earthquake data from the internet and ingest it into Kafka.

stacks/nifi-kafka-druid-superset-s3/nifi.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ spec:
2121
min: "500m"
2222
max: "4"
2323
memory:
24-
limit: '6Gi'
24+
limit: "6Gi"
2525
storage:
2626
contentRepo:
2727
capacity: "10Gi"
@@ -33,6 +33,10 @@ spec:
3333
capacity: "4Gi"
3434
stateRepo:
3535
capacity: "1Gi"
36+
configOverrides:
37+
nifi.properties:
38+
nifi.web.https.sni.required: "false"
39+
nifi.web.https.sni.host.check: "false"
3640
roleGroups:
3741
default:
3842
replicas: 1

0 commit comments

Comments
 (0)