-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path2 Job to Cluster to Location Mapping.py
264 lines (216 loc) · 8.71 KB
/
2 Job to Cluster to Location Mapping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# Databricks notebook source
# MAGIC %md # 00. Setup Auth to storage accounts
# MAGIC Note: The service pincipal details below are used to both generate logs from operations on Storage Account A (e.g. read from a Delta table) AND read those resulting logs in storage account B
# COMMAND ----------
# # Uncomment and set SP access details, if not already set on the cluster
# APPLICATION_ID = "xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx"
# DIRECTORY_ID = "xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx"
# APP_KEY = dbutils.secrets.get(scope = "<SCOPE>", key = "adls-app-key")
# spark.conf.set("fs.azure.account.auth.type", "OAuth")
# spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
# spark.conf.set("fs.azure.account.oauth2.client.id", APPLICATION_ID)
# spark.conf.set("fs.azure.account.oauth2.client.secret", APP_KEY)
# spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/"+DIRECTORY_ID+"/oauth2/token")
# COMMAND ----------
# MAGIC %md # 01. Confirm Lookup tables exist and as expected
# MAGIC
# MAGIC ## HMS Details
# MAGIC Prior to running this script, you should have run this script to dump HMS information to a Delta table: https://github.com/himanshuguptadb/Unity_Catalog/tree/master/External%20Metastore%20Inventory
# MAGIC
# MAGIC The code below assumes a database name for a table called `ExternaL_Metastore_Inventory`
# MAGIC
# MAGIC
# MAGIC
# COMMAND ----------
# If all lookup tables are in the same location, set variable once here; if not, set in following cells
schema = "hms_table_lineage_v0"
# COMMAND ----------
# Schema+Table names where HMS details are stored
ExternaL_Metastore_Inventory_schema = schema
ExternaL_Metastore_Inventory_table = "ExternaL_Metastore_Inventory"
# COMMAND ----------
# Validate we can parse Location paths from HMS
# Results should show 3 new columns with expected storage_account name, storage_container name, and table_path (including sub-directories)
display(
spark.sql(f"""
SELECT
regexp_extract(Location, "@(.*?)\\.dfs") as storage_account,
regexp_extract(Location, "://(.*?)@") as storage_container,
regexp_extract(Location, "windows\\.net/(.*)") as table_path,
*
FROM {ExternaL_Metastore_Inventory_schema}.{ExternaL_Metastore_Inventory_table}
""")
)
# COMMAND ----------
# MAGIC %md ## Job Run details
# MAGIC Prior to running this script, you should have run a script to hit the Databricks jobs/runs API to get cluster Ids for each job run.
# MAGIC
# MAGIC The code below assumes you have table called `runs` containing results from the Databricks `jobs/runs/list` API
# COMMAND ----------
# Schema+Table names where job runs are available
Job_Run_schema = schema
Job_Run_table = "runs"
# COMMAND ----------
# Validate we can parse ClusterIds from Job Runs
# Results should show (at minimum) a mapping of JobId to ClusterId
display(
spark.sql(f"""
SELECT
job_id,
cluster_instance.cluster_id,
start_time,
run_name,
run_id,
state.result_state,
creator_user_name,
task,
trigger,
format,
workspace
FROM {Job_Run_schema}.{Job_Run_table}
WHERE state.result_state == "SUCCESS"
""")
)
# COMMAND ----------
# MAGIC %md ## ADLS Access logs
# MAGIC Prior to running this script, you should have run notebook `1` to ingest ADLS logs as a Delta table
# MAGIC
# MAGIC The code below assumes you have 2 tables, one for read logs and one for write logs
# COMMAND ----------
# Schema+Table names where ADLS Access logs are stored
ADLS_Logs_schema = schema
ADLS_read_logs = "read_logs"
ADLS_write_logs = "write_logs"
# COMMAND ----------
# DBTITLE 1,Functions to parse complex path URLs
def parse_storage_account(uri):
from urllib.parse import urlparse
res = urlparse(uri)
# Parse storage_account from netloc and path
storage_account = res.netloc.split(".")[0]
return storage_account
def parse_storage_container(uri):
from urllib.parse import urlparse
res = urlparse(uri)
# Parse storage_account and storage_container from netloc and path
storage_container = res.path.split("/")[1]
return storage_container
def parse_table_path(uri):
from urllib.parse import urlparse
res = urlparse(uri)
# Cover occurrences of URL encoding and file types in ADLS
# Patterns to get table path.
path_seps = ["_delta_log", "part-", ".json", ".csv", "_committed", ".crc", ".parquet"]
mid_path_seps = ["%3D", "="]
raw_path_parts = res.path.split("/")
# Partitioned tables
for sep in mid_path_seps:
for idx, part in enumerate(raw_path_parts):
if sep in part:
# print(sep, " found in ", part)
table_path = "/".join(raw_path_parts[2:idx])
return table_path
# Non-Partitioned tables. Order of priority matters (e.g. Delta has _delta_log in path before part- for Parquet files)
for sep in path_seps:
for idx, part in enumerate(raw_path_parts):
if sep in part:
# print(sep, " found in ", part)
table_path = "/".join(raw_path_parts[2:idx])
return table_path
spark.udf.register("parse_storage_account", parse_storage_account)
spark.udf.register("parse_storage_container", parse_storage_container)
spark.udf.register("parse_table_path", parse_table_path)
# COMMAND ----------
# DBTITLE 1,Testing on subset
display(
spark.sql(f"""
SELECT
cast(time as timestamp) as time,
category,
operationName,
split_part(properties.clientRequestId, "--", 1) as clusterId,
uri,
parse_storage_account(uri) as storage_account,
parse_storage_container(uri) as storage_container,
parse_table_path(uri) as table_path
FROM {ADLS_Logs_schema}.{ADLS_read_logs}
WHERE LOWER(properties.userAgentHeader) LIKE "%databricks%"
AND regexp_extract(properties.clientRequestId, "(\\d{4}-\\d{6}-[a-z0-9]+)") IS NOT NULL
AND operationName IN ("ReadFile")
LIMIT 100
""")
)
# COMMAND ----------
# DBTITLE 1,Regex-based approach - more error prone
# # Validate that table paths are as expected for each table type
# # NOTE: in testing, not all possible table types/storage patterns were accounted for, so regex adjustment may be needed
# display(
# spark.sql(f"""
# SELECT
# cast(time as timestamp) as time,
# category,
# operationName,
# -- regexp_extract(properties.clientRequestId, "(\d{4}-\d{6}-[a-z0-9]+)") as clusterId,
# split_part(properties.clientRequestId, "--", 1) as clusterId,
# uri,
# regexp_extract(uri, "https://(.*?)\\.dfs") as storage_account,
# -- regexp_extract(uri, "\.net/([\w-]+)[/?&]") as storage_container,
# regexp_extract(uri, "https?://[^/]+/([^/?&]*)") as storage_container,
# regexp_extract(uri, "https?://[^/]+/[^/]+/([^/?&]+(?:/[^/?&]+)*)") as file_key_raw,
# regexp_extract(regexp_extract(uri, "https?://[^/]+/[^/]+/([^/?&]+(?:/[^/?&]+)*)"), "(.*?)(?=/[^/]*(?:_delta_log|\\.[^.]+$))") as table_path,
# properties.objectKey
# FROM {ADLS_Logs_schema}.{ADLS_read_logs}
# WHERE LOWER(properties.userAgentHeader) LIKE "%databricks%"
# AND regexp_extract(properties.clientRequestId, "(\\d{4}-\\d{6}-[a-z0-9]+)") IS NOT NULL
# AND operationName IN ("ReadFile")
# LIMIT 100
# """)
# )
# COMMAND ----------
# MAGIC %md # 02. Join Jobs to Tables via Cluster ID
# COMMAND ----------
display(
spark.sql(f"""
WITH job_clusters AS (
SELECT
job_id,
cluster_instance.cluster_id
FROM {Job_Run_schema}.{Job_Run_table}
WHERE state.result_state == "SUCCESS"
),
external_metastore AS (
SELECT
regexp_extract(Location, "@(.*?)\\.dfs") as storage_account,
regexp_extract(Location, "://(.*?)@") as storage_container,
regexp_extract(Location, "windows\\.net/(.*)") as table_path,
*
FROM {ExternaL_Metastore_Inventory_schema}.{ExternaL_Metastore_Inventory_table}
),
combined_logs AS (
SELECT * FROM {ADLS_Logs_schema}.{ADLS_read_logs}
UNION ALL
SELECT * FROM {ADLS_Logs_schema}.{ADLS_write_logs}
),
parsed_logs AS (
SELECT
cast(time as timestamp) as time,
category,
operationName,
split_part(properties.clientRequestId, "--", 1) as clusterId,
uri,
parse_storage_account(uri) as storage_account,
parse_storage_container(uri) as storage_container,
parse_table_path(uri) as table_path
FROM {ADLS_Logs_schema}.{ADLS_read_logs}
WHERE LOWER(properties.userAgentHeader) LIKE "%databricks%"
AND regexp_extract(properties.clientRequestId, "(\\d{4}-\\d{6}-[a-z0-9]+)") IS NOT NULL
)
SELECT * FROM parsed_logs
WHERE operationName IN ("AppendFile", "ReadFile")
AND parse_table_path(uri) IS NOT NULL
""")
)
# COMMAND ----------
# MAGIC %md
# MAGIC # TO DO
# MAGIC * Join logic to determine which jobs read/write which tables