-
Notifications
You must be signed in to change notification settings - Fork 0
/
profile.py
115 lines (93 loc) · 3.19 KB
/
profile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import sys
import time
import pickle
import coiled
import dask
import dask.utils
import dask.dataframe
import distributed
import distributed.protocol
from distributed_pyspy import pyspy_on_scheduler
def print_sizeof_serialized_graph(x) -> float:
start = total_start = time.perf_counter()
dsk = dask.base.collections_to_dsk([x], optimize_graph=True)
optimize_time = time.perf_counter() - start
start = time.perf_counter()
packed = dsk.__dask_distributed_pack__(distributed.get_client(), x.__dask_keys__())
pack_time = time.perf_counter() - start
start = time.perf_counter()
frames = distributed.protocol.dumps(packed)
dumps_time = time.perf_counter() - start
dumps = sum(len(f) for f in frames)
start = time.perf_counter()
pickled = len(pickle.dumps(packed))
pickle_time = time.perf_counter() - start
print(
f"Graph ({len(dsk)} optimized tasks) is:\n"
f"* {dask.utils.format_bytes(dumps)} with distributed-dumps ({len(frames)} frames) - {dumps_time:.1}s\n"
f"* {dask.utils.format_bytes(pickled)} pickled - {pickle_time:.1}s\n"
f"Optimize: {optimize_time:.1}s, pack: {pack_time:.1}s"
)
return time.perf_counter() - total_start
def main():
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-06-30", # 720 ~partitions
partition_freq="1h",
freq="60s",
)
df = df.persist()
distributed.wait(df)
print("DataFrame persisted")
start = time.perf_counter()
reindexed = df.set_index("id", compute=False)
print(f"Reindexed generated in {time.perf_counter() - start:.1f} sec")
extra_time = print_sizeof_serialized_graph(reindexed)
df2 = reindexed.persist()
distributed.wait(df2)
elapsed = time.perf_counter() - start
print(
f"{elapsed:.1f} sec total, {elapsed - extra_time:.1f} sec without diagnostics"
)
if __name__ == "__main__":
n_workers = 100
cluster = coiled.Cluster(
software="gjoseph92/profiling",
n_workers=n_workers,
worker_cpu=1,
worker_memory="4 GiB",
scheduler_cpu=4,
scheduler_memory="8 GiB",
shutdown_on_close=True,
)
client = distributed.Client(cluster)
if not client.run_on_scheduler(lambda: distributed.scheduler.COMPILED):
print("Scheduler is not compiled!")
client.shutdown()
client.close()
sys.exit(1)
print(f"Waiting for {n_workers} workers...")
client.wait_for_workers(n_workers)
def disable_gc():
# https://github.com/benfred/py-spy/issues/389#issuecomment-833903190
import gc
gc.disable()
gc.set_threshold(0)
print("Disabling GC on scheduler")
client.run_on_scheduler(disable_gc)
print("Here we go!")
# This is key---otherwise we're uploading ~300MiB of graph to the scheduler
dask.config.set({"optimization.fuse.active": False})
test_name = "cython-nogc"
with (
distributed.performance_report(f"results/{test_name}.html"),
pyspy_on_scheduler(
f"results/{test_name}.json",
subprocesses=True,
idle=True,
native=True,
),
):
main()
client.shutdown()
client.close()