-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathasync.py
90 lines (68 loc) · 2.63 KB
/
async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import multiprocessing as mp
import os
import random
import chainer
import numpy as np
import random_seed
def set_shared_params(a, b):
"""
Args:
a (chainer.Link): link whose params are to be replaced
b (dict): dict that consists of (param_name, multiprocessing.Array)
"""
assert isinstance(a, chainer.Link)
for param_name, param in a.namedparams():
if param_name in b:
shared_param = b[param_name]
param.data = np.frombuffer(
shared_param, dtype=param.data.dtype).reshape(param.data.shape)
def set_shared_states(a, b):
assert isinstance(a, chainer.Optimizer)
assert hasattr(a, 'target'), 'Optimizer.setup must be called first'
for state_name, shared_state in b.items():
for param_name, param in shared_state.items():
old_param = a._states[state_name][param_name]
a._states[state_name][param_name] = np.frombuffer(
param,
dtype=old_param.dtype).reshape(old_param.shape)
def extract_params_as_shared_arrays(link):
assert isinstance(link, chainer.Link)
shared_arrays = {}
for param_name, param in link.namedparams():
shared_arrays[param_name] = mp.RawArray('f', param.data.ravel())
return shared_arrays
def share_params_as_shared_arrays(link):
shared_arrays = extract_params_as_shared_arrays(link)
set_shared_params(link, shared_arrays)
return shared_arrays
def share_states_as_shared_arrays(link):
shared_arrays = extract_states_as_shared_arrays(link)
set_shared_states(link, shared_arrays)
return shared_arrays
def extract_states_as_shared_arrays(optimizer):
assert isinstance(optimizer, chainer.Optimizer)
assert hasattr(optimizer, 'target'), 'Optimizer.setup must be called first'
shared_arrays = {}
for state_name, state in optimizer._states.items():
shared_arrays[state_name] = {}
for param_name, param in state.items():
shared_arrays[state_name][
param_name] = mp.RawArray('f', param.ravel())
return shared_arrays
def run_async(n_process, run_func):
"""Run experiments asynchronously.
Args:
n_process (int): number of processes
run_func: function that will be run in parallel
"""
processes = []
def set_seed_and_run(process_idx, run_func):
random_seed.set_random_seed(np.random.randint(0, 2 ** 32))
run_func(process_idx)
for process_idx in range(n_process):
processes.append(mp.Process(target=set_seed_and_run, args=(
process_idx, run_func)))
for p in processes:
p.start()
for p in processes:
p.join()