generated from databricks-industry-solutions/industry-solutions-blueprints
-
Notifications
You must be signed in to change notification settings - Fork 7
/
2_setup_metadata.py
113 lines (89 loc) · 4.77 KB
/
2_setup_metadata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Databricks notebook source
# MAGIC %md
# MAGIC For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/genome-wide-association-studies. Due to the length and complexity, this website does not display the full set of notebooks. You may find the full set of notebooks at https://github.com/databricks-industry-solutions/glow-solution-accelerator.
# COMMAND ----------
# MAGIC %md
# MAGIC ##### run metadata
# MAGIC
# MAGIC Warning: this will only work in Databricks
# MAGIC
# MAGIC It requires the databricks-cli to be installed via pip
# MAGIC
# MAGIC (this is included in the `projectglow/databricks-glow` docker container)
# COMMAND ----------
# MAGIC %run ./0_setup_constants_glow
# COMMAND ----------
import pyspark.sql.functions as fx
from pyspark.sql.types import *
from databricks_cli.sdk.service import JobsService
from databricks_cli.sdk.service import ClusterService
from databricks_cli.configure.config import _get_api_client
from databricks_cli.configure.provider import get_config
from pathlib import Path
# COMMAND ----------
dbfs_home_path = Path("dbfs:/home/{}/".format(user))
run_metadata_delta_path = str(dbfs_home_path / "genomics/data/delta/pipeline_runs_info_hail_glow.delta")
# COMMAND ----------
cluster_id=dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('clusterId')
# COMMAND ----------
cs = ClusterService(_get_api_client(get_config()))
_list = cs.list_clusters()['clusters']
conv = lambda x: {c:v for c,v in x.items() if type(v) in (str, int)}
cluster_info = spark.createDataFrame([conv(x) for x in _list])
cluster_info = cluster_info.where(fx.col("cluster_id") == cluster_id)
worker_info = cluster_info.select("node_type_id", "num_workers", "spark_version", "creator_user_name").collect()
node_type_id = worker_info[0].node_type_id
n_workers = worker_info[0].num_workers
spark_version = worker_info[0].spark_version
creator_user_name = worker_info[0].creator_user_name
# COMMAND ----------
display(cluster_info)
# COMMAND ----------
print("spark_version = " + str(spark_version))
print("node_type_id = " + str(node_type_id))
print("n_workers = " + str(n_workers))
print("creator_user_name = " + str(creator_user_name))
# COMMAND ----------
#define schema for logging runs in delta lake
schema = StructType([StructField("datetime", TimestampType(), True),
StructField("n_samples", LongType(), True),
StructField("n_variants", LongType(), True),
StructField("n_covariates", LongType(), True),
StructField("n_phenotypes", LongType(), True),
StructField("method", StringType(), True),
StructField("test", StringType(), True),
StructField("library", StringType(), True),
StructField("spark_version", StringType(), True),
StructField("node_type_id", StringType(), True),
StructField("n_workers", LongType(), True),
StructField("n_cores", LongType(), True),
StructField("runtime", DoubleType(), True)
])
# COMMAND ----------
#TODO add GCP
node_to_core_mapping = {"r5d.2xlarge": 8,
"r5d.4xlarge": 16,
"c5d.2xlarge": 8,
"i3.xlarge": 4,
"Standard_DS3_v2": 4,
"Standard_DS4_v2": 8,
"Standard_E4d_v4": 4,
"Standard_E8s_v3": 8,
"Standard_L8s_v2": 8,
"Standard_L64s_v2": 64
}
# COMMAND ----------
def lookup_cores(node_type_id, n_workers, node_to_core_mapping=node_to_core_mapping):
cores_per_node = node_to_core_mapping[node_type_id]
total_cores = cores_per_node * n_workers
return total_cores
def log_metadata(datetime, n_samples, n_variants, n_covariates, n_binary_phenotypes, method, test, library, spark_version, node_type_id, n_workers, start_time, end_time, run_metadata_delta_path):
"""
log metadata about each step in the pipeline and append to delta lake table
"""
spark.sql("RESET") # resetting the runtime configurations specific to the current session is necessary to avoid `com.databricks.sql.io.FileReadException: Error while reading file`
runtime = float("{:.2f}".format((end_time - start_time)))
n_cores = lookup_cores(node_type_id, n_workers, node_to_core_mapping=node_to_core_mapping)
l = [(datetime, n_samples, n_variants, n_covariates, n_binary_phenotypes, method, test, library, spark_version, node_type_id, n_workers, n_cores, runtime)]
run_metadata_delta_df = spark.createDataFrame(l, schema=schema)
run_metadata_delta_df.write.mode("append").format("delta").save(run_metadata_delta_path)