This code file accompanies the slides and recording available online at : https://www.confluent.io/online-talks/
Don’t forget to check out the #ksql channel on our Community Slack group
——Robin Moffatt @rmoff, 28 Feb 2018
Built using Confluent Platform 4.0 and KSQL 0.5.
confluent destroy
confluent start
Robin@asgard02 ~/c/confluent-4.0.0> confluent start
Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Robin@asgard02 ~/c/confluent-4.0.0>
kafkacat -P -b localhost:9092 -t mysql_users -K ":"
Paste the data to stdin
:
1:{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"} 2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"} 3:{"uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"} 4:{"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"} 5:{"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"} 6:{"uid":6,"name":"Hojjat","locale":"en_US","address_city":"Palo Alto","elite":"S"} 7:{"uid":7,"name":"Damian","locale":"en_US","address_city":"London","elite":"S"} 8:{"uid":8,"name":"Apurva","locale":"en_US","address_city":"Palo Alto","elite":"S"} 9:{"uid":9,"name":"Neha","locale":"en_US","address_city":"Palo Alto","elite":"P"} 10:{"uid":10,"name":"Mattias","locale":"DE","address_city":"Palo Alto","elite":"S"} 11:{"uid":11,"name":"Gwen","locale":"en_US","address_city":"Palo Alto","elite":"P"} 12:{"uid":12,"name":"Guozhang","locale":"en_US","address_city":"Palo Alto","elite":"S"} 13:{"uid":13,"name":"Lu","locale":"en_US","address_city":"Palo Alto","elite":"P"} 14:{"uid":14,"name":"Robin","locale":"en_US","address_city":"Ilkley","elite":"G"} 2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"San Francisco","elite":"G"} 2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"P"}
Exit: Ctrl-D
kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t mysql_users
kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t mysql_users
[...]
Key: 4
Value: {"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"}
Key: 5
Value: {"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"}
[...]
Note
|
Create blank window in |
confluent status
Robin@asgard02 ~/c/confluent-4.0.0> confluent status
Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]
~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500
Robin@asgard02 ~/c/confluent-4.0.0> ~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500
1 --> ([ 1 | 10 | 3 | 9297 | 1519302621843 | 'iOS' | 'your team here rocks!' ])
2 --> ([ 2 | 10 | 1 | 3415 | 1519302622767 | 'android' | 'Surprisingly good, maybe you are getting your mojo back at long last!' ])
3 --> ([ 3 | 17 | 2 | 5197 | 1519302623090 | 'web' | 'meh' ])
4 --> ([ 4 | 9 | 3 | 9389 | 1519302623414 | 'android' | 'is this as good as it gets? really ?' ])
Note
|
Name |
~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties
$ ~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2018 Confluent Inc.
Server 0.5 listening on http://localhost:8090
To access the KSQL CLI, run:
ksql-cli remote http://localhost:8090
To access the UI, point your browser at:
http://localhost:8090/index.html
Note
|
Name |
Note
|
New |
Connect to KSQL server running locally
~/git/ksql/bin/ksql-cli remote http://localhost:8090/
Note
|
Name |
Robin@asgard02 ~> ~/git/ksql/bin/ksql-cli remote http://localhost:8090/
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2018 Confluent Inc.
CLI v0.5, Server v0.5 located at http://localhost:8090/
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
show topics;
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups
-----------------------------------------------------------------------------------------------
_schemas | false | 1 | 1 | 0 | 0
connect-configs | false | 1 | 1 | 0 | 0
connect-offsets | false | 25 | 1 | 0 | 0
connect-statuses | false | 5 | 1 | 0 | 0
ksql__commands | true | 1 | 1 | 0 | 0
ratings | false | 1 | 1 | 0 | 0
users | false | 1 | 1 | 0 | 0
-----------------------------------------------------------------------------------------------
Tip
|
Don’t need to know the format of the data. Can see column names and values. |
PRINT 'ratings';
Explain TS/Key/Message concept
ksql> PRINT 'ratings';
Format:AVRO
22/02/18 12:55:04 GMT, 5312, {"rating_id": 5312, "user_id": 4, "stars": 4, "route_id": 2440, "rating_time": 1519304104965, "channel": "web", "message": "Surprisingly good, maybe you are getting your mojo back at long last!"}
22/02/18 12:55:05 GMT, 5313, {"rating_id": 5313, "user_id": 3, "stars": 4, "route_id": 6975, "rating_time": 1519304105213, "channel": "web", "message": "why is it so difficult to keep the bathrooms clean ?"}
Don’t need to know the format of the data. Can see column names and values.
PRINT 'mysql_users' FROM BEGINNING;
Process from beginning of topic
SET 'auto.offset.reset' = 'earliest';
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');
Why’s it a stream? Because it’s a continuous stream of events
ksql> CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');
Message
---------------
Table created
---------------
DESCRIBE ratings;
Note :
-
System columns for timestamp and key
-
All the other columns have been picked up automagically - have not had to specify them
ksql> DESCRIBE ratings;
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
RATING_ID | BIGINT
USER_ID | INTEGER
STARS | INTEGER
ROUTE_ID | INTEGER
RATING_TIME | BIGINT
CHANNEL | VARCHAR(STRING)
MESSAGE | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
SELECT * FROM ratings;
This is a continuous query!
ksql> SELECT * FROM ratings;
1519402268942 | 1 | 1 | 13 | 1 | 3700 | 1519402267832 | ios | airport refurb looks great, will fly outta here more!
1519402269200 | 2 | 2 | 12 | 2 | 9907 | 1519402269200 | android | (expletive deleted)
1519402269694 | 3 | 3 | 2 | 1 | 5421 | 1519402269694 | android | is this as good as it gets? really ?
1519402269857 | 4 | 4 | 18 | 2 | 1462 | 1519402269856 | android | your team here rocks!
Cancel the datagen task - note that the query stops.
Restart the datagen task - query now continues to return data
SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5;
Note the use of LIMIT
so that we just see a sample of the stream of data
ksql> SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5;
1519402272247 | 13 | 13 | 9 | 1 | 2545 | 1519402272247 | iOS | more peanuts please
1519402272750 | 14 | 14 | 0 | 2 | 4419 | 1519402272749 | iOS | airport refurb looks great, will fly outta here more!
1519402273755 | 18 | 18 | 15 | 1 | 5306 | 1519402273755 | iOS | Surprisingly good, maybe you are getting your mojo back at long last!
1519402278686 | 37 | 37 | 17 | 1 | 725 | 1519402278686 | iOS | meh
1519402279186 | 39 | 39 | 10 | 1 | 6304 | 1519402279186 | iOS | (expletive deleted)
LIMIT reached for the partition.
Query terminated
ksql>
Let’s take the poor ratings from people with iOS devices, and create a new stream from them!
CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
ksql> CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
Message
----------------------------
Stream created and running
----------------------------
DESCRIBE POOR_RATINGS;
ksql> DESCRIBE POOR_RATINGS;
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
RATING_ID | BIGINT
USER_ID | INTEGER
STARS | INTEGER
ROUTE_ID | INTEGER
RATING_TIME | BIGINT
CHANNEL | VARCHAR(STRING)
MESSAGE | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
DESCRIBE EXTENDED POOR_RATINGS;
ksql> DESCRIBE EXTENDED POOR_RATINGS;
Type : STREAM
Key field :
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : AVRO
Kafka output topic : POOR_RATINGS (partitions: 4, replication: 1)
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
RATING_ID | BIGINT
USER_ID | INTEGER
STARS | INTEGER
ROUTE_ID | INTEGER
RATING_TIME | BIGINT
CHANNEL | VARCHAR(STRING)
MESSAGE | VARCHAR(STRING)
-----------------------------------------
Queries that write into this STREAM
-----------------------------------
id:CSAS_POOR_RATINGS - CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.33 total-messages: 33 last-message: 27/02/18 11:28:58 GMT
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic POOR_RATINGS)
ksql>
SELECT * FROM POOR_RATINGS;
ksql> SELECT * FROM POOR_RATINGS;
1519730883770 | 1608 | 1608 | 18 | 2 | 829 | 1519730883770 | iOS | your team here rocks!
1519730881132 | 1596 | 1596 | 6 | 2 | 3185 | 1519730881132 | iOS | thank you for the most friendly, helpful experience today at your new lounge
1519730886192 | 1615 | 1615 | 19 | 1 | 9409 | 1519730886192 | iOS | more peanuts please
1519730880186 | 1591 | 1591 | 7 | 1 | 9830 | 1519730880186 | iOS | is this as good as it gets? really ?
1519730894709 | 1655 | 1655 | 7 | 2 | 6036 | 1519730894709 | iOS | meh
From the command line, use the standard Kafka tools to interact with the new stream — it’s just a Kafka topic!
kafka-topics --zookeeper localhost:2181 --list
Robin@asgard02 ~> kafka-topics --zookeeper localhost:2181 --list
POOR_RATINGS
__consumer_offsets
_schemas
connect-configs
connect-offsets
connect-statuses
ksql__commands
mysql_users
ratings
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic POOR_RATINGS --from-beginning | jq '.'
Robin@asgard02 ~> kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic POOR_RATINGS --from-beginning | jq '.'
{
"RATING_ID": {
"long": 1615
},
"USER_ID": {
"int": 19
},
"STARS": {
"int": 1
},
"ROUTE_ID": {
"int": 9409
},
"RATING_TIME": {
"long": 1519730886192
},
"CHANNEL": {
"string": "iOS"
},
"MESSAGE": {
"string": "more peanuts please"
}
}
Remember our Users data? Let’s bring that into play, and use it to enrich the inbound stream of ratings data.
Let’s check the data first, using the very handy PRINT
command:
PRINT 'mysql_users' FROM BEGINNING;
ksql> PRINT 'mysql_users' FROM BEGINNING;
Format:JSON
{"ROWTIME":1519640382207,"ROWKEY":"1","uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"ROWTIME":1519640382207,"ROWKEY":"2","uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
{"ROWTIME":1519640382207,"ROWKEY":"3","uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"}
Now, create a TABLE
over the Kafka topic. Why’s it a table? Because for each key (user id), we want to know its value (name, address, etc)
CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid');
Note that we’ve specified the KEY
here, which must match the key of the Kafka message too.
ksql> CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid'); Message --------------- Table created --------------- ksql>
Now let’s join our ratings data, which includes user ID, to our user information:
Basics to start with — rating message plus the user’s name.
Couple of things to note: * We’re aliasing the table and stream names to make column names unambiguous * I’m using the backspace line continuation character
SELECT R.MESSAGE, U.NAME \
FROM RATINGS R LEFT JOIN USERS U \
ON R.USER_ID = U.UID \
LIMIT 5;
ksql> SELECT R.MESSAGE, U.NAME \
> FROM RATINGS R LEFT JOIN USERS U \
> ON R.USER_ID = U.UID \
> LIMIT 5;
airport refurb looks great, will fly outta here more! | Damian
Exceeded all my expectations. Thank you ! | Neil
(expletive deleted) | Lu
why is it so difficult to keep the bathrooms clean ? | Apurva
thank you for the most friendly, helpful experience today at your new lounge | Cliff
LIMIT reached for the partition.
Query terminated
ksql>
Now let’s pull the full set of data, including a reformat of the timestamp into something human readable:
Note the IS NOT NULL
clause to filter out any ratings with no corresponding user data
SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \
R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
FROM RATINGS R LEFT JOIN USERS U \
ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
ksql> SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \
> R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
> FROM RATINGS R LEFT JOIN USERS U \
> ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
18196 | 1 | 1790 | 2018-02-26 12:42:38 | iOS-test | Surprisingly good, maybe you are getting your mojo back at long last! | Robin | Ilkley | G
18197 | 4 | 2862 | 2018-02-26 12:42:38 | iOS-test | your team here rocks! | Jeremy | Austin | P
18198 | 1 | 6954 | 2018-02-26 12:42:39 | ios | Exceeded all my expectations. Thank you ! | Nick | Palo Alto | P
18199 | 1 | 2092 | 2018-02-26 12:42:39 | ios | (expletive deleted) | Neil | London | G
18200 | 2 | 6042 | 2018-02-26 12:42:39 | ios | more peanuts please | Neil | London | G
18202 | 2 | 1133 | 2018-02-26 12:42:39 | iOS-test | thank you for the most friendly, helpful experience today at your new lounge | Neha | Palo Alto | P
18205 | 3 | 4086 | 2018-02-26 12:42:40 | ios | meh | Neil | London | G
18206 | 1 | 9217 | 2018-02-26 12:42:41 | web | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P
18207 | 3 | 2655 | 2018-02-26 12:42:41 | android | more peanuts please | Neha | Palo Alto | P
18209 | 3 | 2086 | 2018-02-26 12:42:42 | iOS-test | meh | Lu | Palo Alto | P
18210 | 2 | 1629 | 2018-02-26 12:42:42 | web | airport refurb looks great, will fly outta here more! | Neha | Palo Alto | P
^CQuery terminated
Let’s persist this as an enriched stream:
CREATE STREAM RATINGS_FULL AS \
SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \
R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
FROM RATINGS R LEFT JOIN USERS U \
ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
ksql> CREATE STREAM RATINGS_FULL AS \
> SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \
> R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
> FROM RATINGS R LEFT JOIN USERS U \
> ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
Message
----------------------------
Stream created and running
----------------------------
ksql>
Which of our Premier customers are not happy?
SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
ksql> SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
1519649077925 | 2 | 18684 | 1 | 3895 | 2018-02-26 12:44:37 | ios | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P
1519649078280 | 13 | 18685 | 1 | 4088 | 2018-02-26 12:44:38 | iOS | more peanuts please | Lu | Palo Alto | P
1519649079664 | 1 | 18691 | 2 | 8933 | 2018-02-26 12:44:39 | iOS-test | more peanuts please | Cliff | St Louis | P
1519649084065 | 13 | 18709 | 1 | 4366 | 2018-02-26 12:44:44 | iOS-test | airport refurb looks great, will fly outta here more! | Lu | Palo Alto | P
1519649084603 | 3 | 18712 | 2 | 744 | 2018-02-26 12:44:44 | ios | worst. flight. ever. #neveragain | Jeremy | Austin | P
^CQuery terminated
CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
ksql> CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
Message
----------------------------
Stream created and running
----------------------------
ksql>
SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;
ksql> SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;
1 | why is it so difficult to keep the bathrooms clean ? | Nick
1 | is this as good as it gets? really ? | Jeremy
1 | thank you for the most friendly, helpful experience today at your new lounge | Jeremy
1 | your team here rocks! | Lu
1 | thank you for the most friendly, helpful experience today at your new lounge | Lu
1 | why is it so difficult to keep the bathrooms clean ? | Jeremy
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic UNHAPPY_VIPS --from-beginning
Robin@asgard02 ~> kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic UNHAPPY_VIPS --from-beginning
{"RATING_ID":{"long":25},"STARS":{"int":1},"ROUTE_ID":{"int":1548},"KSQL_COL_3":{"string":"2018-02-27 11:21:21"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"thank you for the most friendly, helpful experience today at your new lounge"},"NAME":{"string":"Neha"},"ADDRESS_CITY":{"string":"Palo Alto"},"ELITE":{"string":"P"}}
{"RATING_ID":{"long":31},"STARS":{"int":2},"ROUTE_ID":{"int":451},"KSQL_COL_3":{"string":"2018-02-27 11:21:22"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"Exceeded all my expectations. Thank you !"},"NAME":{"string":"Cliff"},"ADDRESS_CITY":{"string":"St Louis"},"ELITE":{"string":"P"}}
Explain windowing
-
Tumbling (e.g. every 5 minutes : 00:00, 00:05, 00:10)
-
Hopping (e.g. every 5 minutes, advancing 1 minute: 00:00-00:05, 00:01-00:06)
-
Session (Sets a timeout for the given key, after which any new data is treated as a new session)
Show count of ratings per City, per minute
SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
GROUP BY ADDRESS_CITY;
ksql> SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
> FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
> GROUP BY ADDRESS_CITY;
Geneva | 11
Geneva | 11
Geneva | 14
Geneva | 10
Geneva | 19
Let’s persist that into a TABLE. So far we’ve only worked with a STREAM. A table gives the state of a given key at a given point in time. So here, for each city, at each minute window, what’s the total count
CREATE TABLE RATINGS_BY_CITY AS \
SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
GROUP BY ADDRESS_CITY;
ksql> CREATE TABLE RATINGS_BY_CITY AS \
> SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
> FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
> GROUP BY ADDRESS_CITY;
Message
---------------------------
Table created and running
---------------------------
ksql>
DESCRIBE RATINGS_BY_CITY;
Note that the city is denoted as (key)
ksql> DESCRIBE RATINGS_BY_CITY;
Field | Type
------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ADDRESS_CITY | VARCHAR(STRING) (key)
RATING_COUNT | BIGINT
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
Point out the system columns - ROWTIME
and ROWKEY
.
SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5;
ksql> SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5;
1519730460000 | Geneva : Window{start=1519730460000 end=-} | Geneva | 11
1519730460000 | Ilkley : Window{start=1519730460000 end=-} | Ilkley | 11
1519730520000 | Geneva : Window{start=1519730520000 end=-} | Geneva | 11
1519730460000 | St Louis : Window{start=1519730460000 end=-} | St Louis | 9
1519730580000 | Geneva : Window{start=1519730580000 end=-} | Geneva | 14
KSQL comes with a bunch of functions, both scalar and aggregate (like COUNT
which we saw previously).
Let’s convert the ROWTIME
epoch value to a more readable one:
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
2018-02-27 11:21:00 | London | 25
2018-02-27 11:21:00 | Palo Alto | 71
2018-02-27 11:22:00 | Palo Alto | 89
2018-02-27 11:22:00 | London | 24
2018-02-27 11:23:00 | London | 25
This table is just a first class object in KSQL, that we can query and filter as any other:
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley';
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley';
2018-02-27 11:21:00 | Ilkley | 11
2018-02-27 11:22:00 | Ilkley | 6
2018-02-27 11:23:00 | Ilkley | 13
2018-02-27 11:24:00 | Ilkley | 14
2018-02-27 11:25:00 | Ilkley | 15
Note how aggregates update within the current window.
Let’s step out of KSQL for a moment. When you create a STREAM or `TABLE
in KSQL, it is backed by a Kafka topic. Let’s check this out:
kafka-topics --zookeeper localhost:2181 --list|grep RATINGS
Robin@asgard02 ~/c/confluent-4.0.0> kafka-topics --zookeeper localhost:2181 --list|grep RATINGS
RATINGS_BY_CITY
ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-changelog
ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-repartition
[...]
And it’s just a Kafka topic, that we can consume from just as any other:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic RATINGS_BY_CITY --from-beginning -max-messages 5| jq '.'
Robin@asgard02 ~> kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic RATINGS_BY_CITY2 --from-beginning -max-messages 5| jq '.'
{
"ADDRESS_CITY": {
"string": "London"
},
"RATING_COUNT": {
"long": 25
}
}
{
"ADDRESS_CITY": {
"string": "Palo Alto"
},
"RATING_COUNT": {
"long": 71
}
}
Where’s the window? Well that’s a system column that’s part of the message key. If we wanted to expose it further, we could do a CREATE TABLE
based on that TIMESTAMPTOSTRING
function which exposes the system column
CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
ksql> CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
Message
---------------------------
Table created and running
---------------------------
ksql>
DESCRIBE RATINGS_BY_CITY_TS;
ksql> describe RATINGS_BY_CITY_TS;
Field | Type
---------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
WINDOW_START_TS | VARCHAR(STRING)
ADDRESS_CITY | VARCHAR(STRING) (key)
RATING_COUNT | BIGINT
---------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5;
ksql> SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5;
1519730460000 | Geneva : Window{start=1519730460000 end=-} | 2018-02-27 11:21:00 | Geneva | 11
1519730520000 | Geneva : Window{start=1519730520000 end=-} | 2018-02-27 11:22:00 | Geneva | 11
1519730580000 | Geneva : Window{start=1519730580000 end=-} | 2018-02-27 11:23:00 | Geneva | 14
1519730640000 | Geneva : Window{start=1519730640000 end=-} | 2018-02-27 11:24:00 | Geneva | 10
1519730700000 | Geneva : Window{start=1519730700000 end=-} | 2018-02-27 11:25:00 | Geneva | 19
LIMIT reached for the partition.
Query terminated
ksql>
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.'
Robin@asgard02 ~> kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.'
{
"WINDOW_START_TS": {
"string": "2018-02-27 11:21:00"
},
"ADDRESS_CITY": {
"string": "Geneva"
},
"RATING_COUNT": {
"long": 11
}
}
SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;