-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultiprocesslibrary.py
99 lines (67 loc) · 2.59 KB
/
multiprocesslibrary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/python
import config
import multiprocessing
import numpy as np
import subprocess
import os
import time
import logging
MERGE_CHECK_PERIOD = 1
def get_core_count():
return multiprocessing.cpu_count()
NUM_CORES = get_core_count()
def create_environment():
env = os.environ.copy()
raw_config = config.raw_config()
env['RAW_DB_HOST'] = raw_config['host']
env['RAW_DB_PORT'] = raw_config['port']
env['RAW_DB_NAME'] = raw_config['database']
env['RAW_DB_USER'] = raw_config['user']
env['RAW_DB_PASS'] = raw_config['password']
agg_config = config.agg_config()
env['AGG_DB_HOST'] = agg_config['host']
env['AGG_DB_PORT'] = agg_config['port']
env['AGG_DB_NAME'] = agg_config['database']
env['AGG_DB_USER'] = agg_config['user']
env['AGG_DB_PASS'] = agg_config['password']
return env
def serialize(data: list):
data = map(lambda a: str(a), data)
return ",".join(data)
def launch(script, parameters, osm_ids):
process_list = []
if len(osm_ids) < 1:
return process_list
# divide inputs in evenly sized chunks
input_lists = np.array_split(osm_ids, NUM_CORES)
logging.debug("input args:")
logging.debug(input_lists)
for i in range(0, NUM_CORES, 1):
if len(input_lists[i]) > 0:
proc_env = create_environment()
proc_env["args"] = serialize(input_lists[i].tolist())
logging.debug("launching {0} {1}".format([script, parameters], proc_env["args"]))
# set subprocess log level
stdout = None
logger = logging.getLogger(__name__)
if logger.getEffectiveLevel() > logging.DEBUG:
stdout = subprocess.DEVNULL
process_list.append(
# subprocess.Popen(['ping', '8.8.8.8'], env=proc_env)) # TODO change with actual values PHP and script
# subprocess.Popen([script, parameters], env=proc_env)) # TODO change with actual values PHP and script
subprocess.Popen([script, parameters], env=proc_env, stdout=stdout))
return process_list
def merge(processes):
flags = [False for p in processes]
seconds = 0
while True:
for i, pro in enumerate(processes):
flags[i] = pro.poll()
time.sleep(MERGE_CHECK_PERIOD)
seconds = seconds + 1
if all(f is not None for f in flags):
break
for i, f in enumerate(flags):
if f != 0:
logging.error("{0} process exited with return code {1}".format(processes[i].pid,processes[i].returncode))
raise Exception('Aggregation subprocess error!')