generated from databricks-industry-solutions/industry-solutions-blueprints
-
Notifications
You must be signed in to change notification settings - Fork 6
/
09_dlt_ioc_matching_historical.sql
71 lines (61 loc) · 2.52 KB
/
09_dlt_ioc_matching_historical.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
-- Databricks notebook source
-- MAGIC %md
-- MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/ioc-matching.
-- COMMAND ----------
-- MAGIC %md
-- MAGIC
-- MAGIC # DLT pipeline for continuous IOC matching in the presence of IOC updates
-- MAGIC
-- MAGIC ## Prerequisites
-- MAGIC
-- MAGIC * the `ioc_summary_all` view must created using the command in the notebook `08_handling_ioc_updates.sql`
-- MAGIC * the `l7_logs_obs` view must created using the command in the notebook `08_handling_ioc_updates.sql`
-- MAGIC
-- MAGIC ## Parametrization
-- MAGIC
-- MAGIC In production, these DLT queries will be parametrized with the time window for the historical search over the summary tables and with the time window between the last update of the summary tables and now for the search over the recent log data.
-- MAGIC
-- MAGIC ## Consumption of the results
-- MAGIC
-- MAGIC The `summary_table_iochits` and the `recent_history_iochits` can be merged into a single view via union-all for a better user experience.
-- COMMAND ----------
CREATE STREAMING LIVE TABLE summary_table_iochits
AS
SELECT
now() AS detection_ts,
s.obs_value AS matched_ioc,
ioc.ioc_type,
min(s.first_seen) AS first_seen,
max(s.last_seen) AS last_seen,
collect_set(s.src_table) AS src_tables,
collect_set(logs.raw) AS raw
FROM STREAM(ioc_matching_lipyeow_lim.ioc) AS ioc
INNER JOIN ioc_matching_lipyeow_lim.ioc_summary_all AS s
ON s.obs_value = ioc.ioc_value AND ioc.active = TRUE
LEFT OUTER JOIN ioc_matching_lipyeow_lim.v_logs_silver AS logs
ON s.src_table = logs.src_table
AND s.src_ip = logs.src_ip
AND s.dst_ip = logs.dst_ip
AND logs.ts >= s.first_seen
AND logs.ts <= s.last_seen
WHERE s.ts_day BETWEEN '2012-03-01T00:00:00+0000' AND '2012-04-01T08:00:00+0000'
GROUP BY s.obs_value, ioc.ioc_type, s.src_ip, s.dst_ip
;
-- COMMAND ----------
CREATE STREAMING LIVE TABLE recent_history_iochits
AS
SELECT now() AS detection_ts,
ioc.ioc_value AS matched_ioc,
ioc.ioc_type,
min(timestamp(aug.ts)) AS first_seen,
max(timestamp(aug.ts)) AS last_seen,
collect_set(aug.src_table) AS src_tables,
collect_set(aug.raw) AS raw
FROM
STREAM(ioc_matching_lipyeow_lim.ioc) AS ioc
INNER JOIN ioc_matching_lipyeow_lim.l7_logs_obs AS aug
ON aug.extracted_obs=ioc.ioc_value AND ioc.active = TRUE
WHERE timestamp(aug.ts) > '2012-03-01T00:00:00+0000'
GROUP BY detection_ts, matched_ioc, ioc_type, date_trunc('DAY', timestamp(aug.ts))
;
-- COMMAND ----------