-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
13 changed files
with
374 additions
and
138 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# kafka-data transformer (KSQLDB) | ||
|
||
Reference: https://docs.ksqldb.io/ | ||
|
||
The KSQLDB Kafka data transformer is able to register Consumer/Producers to Kafka that transform data in a topic and | ||
publish the results to another topic. | ||
|
||
The provided KSQLDB _questionnaire_response_observations.sql_ and _questionnaire_app_events_observation.sql_ SQL files | ||
transform, respectively, the _questionnaire_response_ and _questionnaire_app_event_ topics and publish the data to the | ||
_ksql_observations_ topic. The _ksql_observations_ topic is consumed by the Kafka-JDBC-connector used for the by the | ||
RADAR-base Data Dashboard backend service (see: [20-data-dashboard.yaml](../../helmfile.d/20-dashboard.yaml)). | ||
|
||
When transformation of other topics is required, new SQL files can be added to this directory. These new files should be | ||
referenced in the _kafka_data_transformer -> ksql -> queries_ section of the `etc/base.yaml.gotmpl` file. New KSQLDB SQL | ||
files should transform towards the following format of the _ksql_observations_ topic: | ||
|
||
``` | ||
TOPIC KEY: | ||
PROJECT: the project identifier | ||
SOURCE: the source identifier | ||
SUBJECT: the subject/study participant identifier | ||
TOPIC VALUE: | ||
TOPIC: the topic identifier | ||
CATEGORY: the category identifier (optional) | ||
VARIABLE: the variable identifier | ||
DATE: the date of the observation | ||
END_DATE: the end date of the observation (optional) | ||
TYPE: the type of the observation (STRING, STRING_JSON, INTEGER, DOUBLE) | ||
VALUE_TEXTUAL: the textual value of the observation (optional, must be set when VALUE_NUMERIC is NULL) | ||
VALUE_NUMERIC: the numeric value of the observation (optional, must be set when VALUE_TEXTUAL is NULL) | ||
``` | ||
|
||
New messages are added to the _ksql_observations_ topic by inserting into the _observations_ stream (see [_base_observations_stream.sql](_base_observations_stream.sql)): | ||
|
||
``` | ||
INSERT INTO observations | ||
SELECT | ||
... | ||
PARTITION BY q.projectId, q.userId, q.sourceId | ||
EMIT CHANGES; | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
SET 'auto.offset.reset' = 'earliest'; | ||
|
||
-- Register the 'ksql_observations' topic (is created when not exists). | ||
CREATE STREAM observations ( | ||
PROJECT VARCHAR KEY, -- 'KEY' means that this field is part of the kafka message key | ||
SUBJECT VARCHAR KEY, | ||
SOURCE VARCHAR KEY, | ||
`TOPIC` VARCHAR, | ||
CATEGORY VARCHAR, | ||
VARIABLE VARCHAR, | ||
DATE TIMESTAMP, | ||
END_DATE TIMESTAMP, | ||
TYPE VARCHAR, | ||
VALUE_NUMERIC DOUBLE, | ||
VALUE_TEXTUAL VARCHAR | ||
) WITH ( | ||
kafka_topic = 'ksql_observations', | ||
partitions = 3, | ||
format = 'avro' | ||
); |
Empty file.
30 changes: 30 additions & 0 deletions
30
etc/cp-ksql-server/questionnaire_app_event_observations.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
CREATE STREAM questionnaire_app_event ( | ||
projectId VARCHAR KEY, -- 'KEY' means that this field is part of the kafka message key | ||
userId VARCHAR KEY, | ||
sourceId VARCHAR KEY, | ||
questionnaireName VARCHAR, | ||
eventType VARCHAR, | ||
time DOUBLE, | ||
metadata MAP<VARCHAR, VARCHAR> | ||
) WITH ( | ||
kafka_topic = 'questionnaire_app_event', | ||
partitions = 3, | ||
format = 'avro' | ||
); | ||
|
||
INSERT INTO observations | ||
SELECT | ||
q.projectId AS PROJECT, | ||
q.userId AS SUBJECT, | ||
q.sourceId AS SOURCE, | ||
'questionnaire_app_event' as `TOPIC`, | ||
CAST(NULL as VARCHAR) as CATEGORY, | ||
q.questionnaireName as VARIABLE, | ||
FROM_UNIXTIME(CAST(q.time * 1000 AS BIGINT)) as DATE, | ||
CAST(NULL as TIMESTAMP) as END_DATE, | ||
'STRING_JSON' as TYPE, | ||
CAST(NULL as DOUBLE) as VALUE_NUMERIC, | ||
TO_JSON_STRING(q.metadata) as VALUE_TEXTUAL | ||
FROM questionnaire_app_event q | ||
PARTITION BY q.projectId, q.userId, q.sourceId -- this sets the fields in the kafka message key | ||
EMIT CHANGES; |
82 changes: 82 additions & 0 deletions
82
etc/cp-ksql-server/questionnaire_response_observations.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
CREATE STREAM questionnaire_response ( | ||
projectId VARCHAR KEY, -- 'KEY' means that this field is part of the kafka message key | ||
userId VARCHAR KEY, | ||
sourceId VARCHAR KEY, | ||
time DOUBLE, | ||
timeCompleted DOUBLE, | ||
timeNotification DOUBLE, | ||
name VARCHAR, | ||
version VARCHAR, | ||
answers ARRAY<STRUCT<questionId VARCHAR, value STRUCT<int INT, string VARCHAR, double DOUBLE>, startTime DOUBLE, endTime DOUBLE>> | ||
) WITH ( | ||
kafka_topic = 'questionnaire_response', | ||
partitions = 3, | ||
format = 'avro' | ||
); | ||
|
||
CREATE STREAM questionnaire_response_exploded | ||
AS SELECT | ||
EXPLODE(TRANSFORM(q.answers, a => a->questionId)) as VARIABLE, | ||
FROM_UNIXTIME(CAST(q.time * 1000 AS BIGINT)) as DATE, | ||
q.projectId, | ||
q.userId, | ||
q.sourceId, | ||
'questionnaire_response' as `TOPIC`, | ||
q.name as CATEGORY, | ||
CAST(NULL as TIMESTAMP) as END_DATE, | ||
-- WARNING!!! The cast from VARCHAR (string) to DOUBLE will throw an JAVA exception if the string is not a number. | ||
-- This does not mean that the message will be lost. The value will be present in the VALUE_TEXTUAL_OPTIONAL field. | ||
EXPLODE(TRANSFORM(q.answers, a => COALESCE(a->value->double, CAST(a->value->int as DOUBLE), CAST(a->value->string as DOUBLE)))) as VALUE_NUMERIC, | ||
EXPLODE(TRANSFORM(q.answers, a => CASE | ||
WHEN a->value->int IS NOT NULL THEN 'INTEGER' | ||
WHEN a->value->double IS NOT NULL THEN 'DOUBLE' | ||
ELSE NULL | ||
END)) as TYPE, | ||
-- Note: When cast to double works for the string value, the VALUE_TEXTUAL_OPTIONAL will also be set. | ||
EXPLODE(TRANSFORM(q.answers, a => a->value->string)) as VALUE_TEXTUAL_OPTIONAL | ||
FROM questionnaire_response q | ||
EMIT CHANGES; | ||
|
||
INSERT INTO observations | ||
SELECT | ||
q.projectId as PROJECT, | ||
q.sourceId as SOURCE, | ||
q.userId as SUBJECT, | ||
`TOPIC`, CATEGORY, VARIABLE, DATE, END_DATE, | ||
CASE | ||
WHEN TYPE IS NULL AND VALUE_NUMERIC IS NOT NULL THEN 'DOUBLE' -- must have been derived from a string cast | ||
WHEN TYPE IS NULL AND VALUE_NUMERIC IS NULL THEN 'STRING' | ||
ELSE TYPE -- keep the original type when TYPE is not NULL | ||
END as TYPE, | ||
VALUE_NUMERIC, | ||
CASE | ||
WHEN VALUE_NUMERIC IS NOT NULL THEN NULL -- When cast to double has worked for the string value, set VALUE_TEXTUAL to NULL. | ||
ELSE VALUE_TEXTUAL_OPTIONAL | ||
END as VALUE_TEXTUAL | ||
FROM questionnaire_response_exploded q | ||
PARTITION BY q.projectId, q.userId, q.sourceId -- this sets the fields in the kafka message key | ||
EMIT CHANGES; | ||
|
||
-- TODO: exploding the 'select:' questions is not yet fully designed. | ||
-- I keep the code here for future reference. | ||
-- Multi-select questionnaire questions are stored as a single 'value' string with the | ||
-- names of the selected options separated by comma's. Multiselect questions are prefixed | ||
-- by 'select:' in the questionId. | ||
-- When 'questionId' is like 'select:%' create a new stream with the select options. | ||
-- The options in the value field split commas and added as separate VARIABLE records. | ||
-- The VALUE_NUMERIC is set to 1 and VALUE_TEXTUAL is set to NULL. | ||
-- INSERT INTO observations | ||
-- SELECT | ||
-- EXPLODE(SPLIT(VALUE_TEXTUAL, ',')) as VARIABLE, | ||
-- PROJECT, SOURCE, SUBJECT, `TOPIC`, CATEGORY, DATE, END_DATE, | ||
-- 'INTEGER' as TYPE, | ||
-- CAST(1 as DOUBLE) VALUE_NUMERIC, | ||
-- CAST(NULL as VARCHAR) as VALUE_TEXTUAL | ||
-- FROM questionnaire_response_observations | ||
-- WHERE | ||
-- VARIABLE IS NOT NULL | ||
-- AND VARIABLE LIKE 'select:%' | ||
-- AND VALUE_TEXTUAL IS NOT NULL | ||
-- AND VALUE_TEXTUAL != '' | ||
-- PARTITION BY SUBJECT, PROJECT, SOURCE | ||
-- EMIT CHANGES; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
ingress: | ||
enabled: true | ||
annotations: | ||
cert-manager.io/cluster-issuer: letsencrypt-prod | ||
nginx.ingress.kubernetes.io/enable-cors: "true" | ||
className: nginx | ||
hosts: | ||
- host: localhost | ||
tls: | ||
secretName: radar-base-data-dashboard | ||
hosts: | ||
- localhost | ||
path: /api | ||
jdbc: | ||
url: jdbc:postgresql://timescaledb-postgresql-hl:5432/data-dashboard | ||
dialect: org.hibernate.dialect.PostgreSQLDialect | ||
user: postgres | ||
password: secret | ||
managementPortal: | ||
url: http://management-portal:8080/managementportal | ||
clientId: radar_data_dashboard_backend | ||
clientSecret: secret | ||
jwtResourceName: res_DataDashboardAPI |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters