-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate.sql
68 lines (59 loc) · 2.37 KB
/
create.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
-- CREATE SOURCES
CREATE SOURCE json_pageviews
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'pageviews'
FORMAT BYTES;
CREATE SOURCE purchases
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.purchases'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
ENVELOPE DEBEZIUM;
CREATE SOURCE items
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.items'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
ENVELOPE DEBEZIUM;
CREATE SOURCE users
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.users'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
ENVELOPE DEBEZIUM;
CREATE SOURCE vendors
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.vendors'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
ENVELOPE DEBEZIUM;
-- CREATE NON-MATERIALIZED VIEW TO PARSE JSON PAGEVIEWS
CREATE VIEW pageview_stg AS
SELECT
*,
regexp_match(url, '/(products|profiles)/')[1] AS pageview_type,
(regexp_match(url, '/(?:products|profiles)/(\d+)')[1])::INT AS target_id,
to_timestamp(received_at) as received_at_ts
FROM (
SELECT
(data->'user_id')::INT AS user_id,
data->>'url' AS url,
data->>'channel' AS channel,
(data->>'received_at')::double AS received_at
FROM (
SELECT CAST(data AS jsonb) AS data
FROM (
SELECT convert_from(data, 'utf8') AS data
FROM json_pageviews
)
)
);
-- CREATE AN ANALYTICAL AGGREGATION FOR VENDORS
CREATE MATERIALIZED VIEW agg_vendors_minute AS
SELECT
vendors.id as vendor_id,
vendors.name as vendor_name,
minute_series.m::timestamp,
SUM(purchases.quantity) as items_sold,
COUNT(purchases.id) as orders,
SUM(purchases.purchase_price) as revenue,
COUNT(pageview_stg.url) as pageviews
FROM vendors
JOIN items ON items.vendor_id = vendors.id
JOIN (
SELECT generate_series('2022-05-19 00:00:00', '2022-05-20 00:00:00', '1 MINUTE') as m
) minute_series ON true
LEFT JOIN purchases ON purchases.item_id = items.id AND date_trunc('minute', purchases.created_at) = minute_series.m
LEFT JOIN pageview_stg ON pageview_stg.target_id = items.id AND pageview_stg.pageview_type = 'products' AND date_trunc('minute', pageview_stg.received_at_ts) = minute_series.m
GROUP BY 1, 2, 3;