Skip to content

Commit

Permalink
Feature/database (#36)
Browse files Browse the repository at this point in the history
* Fix #19 (#20)

* Fix #21 (#22)

* fix README (#23)

* Fixed a problem in which Snapshot could not be saved during parallel execution.

* Add function of delete any data in DB/FS.

* Fix code style

Co-authored-by: Takashi OKADA <tk.okada.github@gmail.com>
Co-authored-by: KanaiYuma-aist <105629713+KanaiYuma-aist@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 15, 2022
1 parent ebaeafb commit 3511f87
Show file tree
Hide file tree
Showing 66 changed files with 1,071 additions and 314 deletions.
1 change: 0 additions & 1 deletion aiaccel/argument.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ def __new__(cls):
p.add_argument('--config', '-c', type=str)
p.add_argument('--resume', type=int, default=None)
p.add_argument('--clean', nargs='?', const=True, default=False)
p.add_argument('--nosave', nargs='?', const=True, default=False)
p.add_argument('--fs', nargs='?', const=True, default=False)
return vars(p.parse_args())
48 changes: 0 additions & 48 deletions aiaccel/master/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,10 @@ def __init__(self, options: dict) -> None:
self.trial_number = self.config.trial_number.get()
self.serialize = Serializer(self.config, 'master', self.options)

barrier = multiprocessing.Barrier(3)
self.set_barrier(barrier)

# optimizer
self.o = create_optimizer(options['config'])(options)
self.o.set_barrier(barrier)
# scheduler
self.s = create_scheduler(options['config'])(options)
self.s.set_barrier(barrier)

self.worker_o = multiprocessing.Process(target=self.o.start)
self.worker_s = multiprocessing.Process(target=self.s.start)
Expand Down Expand Up @@ -250,49 +245,6 @@ def inner_loop_post_process(self) -> bool:

return True

def _serialize(self) -> None:
"""Serialize this module.
Returns:
dict: The serialized master objects.
"""
if self.options['nosave'] is True:
return
else:
self.serialize_data = {
'start_time': self.start_time,
'loop_start_time': self.loop_start_time
}

if self.current_max_trial_number is None:
return

self.serialize.serialize(
self.current_max_trial_number,
self.serialize_data,
self.get_native_random_state(),
self.get_numpy_random_state()
)

def _deserialize(self, trial_id: int) -> None:
"""Deserialize this module.
Args:
dict_objects(dict): A dictionary including serialized objects.
Returns:
None
"""
data = self.serialize.deserialize(trial_id)

loop_counts = data['optimization_variables']['loop_count']

if loop_counts is None:
return

self.loop_count = loop_counts
print(f"(master)set inner loop count: {self.loop_count}")

def other_process_is_alive(self) -> bool:
if (
not self.worker_o.is_alive() or
Expand Down
48 changes: 0 additions & 48 deletions aiaccel/module.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import aiaccel
import logging
import os
import threading
import time
import sys
from aiaccel.config import Config
from aiaccel.util.process import is_process_running
from multiprocessing import Barrier
from pathlib import Path
import numpy as np
import random
Expand All @@ -32,7 +30,6 @@ class AbstractModule(object):
5. call post_process()
Attributes:
barrier (multiprocessing.Barrier): A barrier to synchronize processes.
config (ConfileWrapper): A config object.
dict_hp (Path): A path to hp directory.
dict_lock (Path): A path to lock directory.
Expand Down Expand Up @@ -99,15 +96,13 @@ def __init__(self, options: dict) -> None:
self.hp_running = 0
self.hp_finished = 0
self.sleep_time = 1.0
self.barrier = None
self.seed = self.config.randseed.get()
self.storage = Storage(
self.ws,
fsmode=options['fs'],
config_path=self.config.config_path
)
self.management_trial_id = TrialId(self.options['config'])
self.barrier_timeout = (self.config.batch_job_timeout.get() * self.config.job_retry.get())
self.serialize_datas = {}
self.deserialize_datas = {}

Expand Down Expand Up @@ -191,17 +186,6 @@ def print_dict_state(self) -> None:
self.hp_running)
)

def set_barrier(self, barrier: Barrier) -> None:
"""Set a multiprocessing barrier.
Args:
barrier (multiprocessing.Barrier): A barrier object.
Returns:
None
"""
self.barrier = barrier

def set_logger(
self,
logger_name: str,
Expand Down Expand Up @@ -262,10 +246,6 @@ def pre_process(self) -> None:
self.storage.alive.set_any_process_state(module_type, 1)
self.storage.pid.set_any_process_pid(module_type, os.getpid())

self.set_native_random_seed()
self.set_numpy_random_seed()
self.resume()

def post_process(self) -> None:
"""Post-procedure after executed processes.
Expand Down Expand Up @@ -368,39 +348,11 @@ def loop(self) -> None:
self.wait()
self.loop_count += 1

if not self.is_barrier():
break

if not self.check_error():
break

self._serialize()

if not self.is_barrier():
break

self.loop_post_process()

def is_barrier(self) -> bool:
"""Is barrier waiting working well or not.
Returns:
bool: It returns false if the barrier object is not set or the
processes are not running. Otherwise, it returns true.
"""
self.logger.debug("Waiting for sync")

start_time = time.time()
check_cycle_time = 60

while self.is_process_alive() and (time.time() - start_time) < self.barrier_timeout:
try:
self.barrier.wait(check_cycle_time)
return True
except threading.BrokenBarrierError:
self.wait()
return False

def is_process_alive(self) -> bool:
"""Is processes(master, optimizer and scheduler) running or not.
Expand Down
36 changes: 16 additions & 20 deletions aiaccel/optimizer/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def register_ready(self, param: dict) -> str:

# wd/
self.trial_id.increment()
self._serialize()
param['trial_id'] = self.trial_id.get()
# for p in param['parameters']:
# self.storage.hp.set_any_trial_param(
Expand Down Expand Up @@ -149,6 +150,9 @@ def pre_process(self) -> None:
None
"""
super().pre_process()
self.set_native_random_seed()
self.set_numpy_random_seed()
self.resume()

def post_process(self) -> None:
"""Post-procedure after executed processes.
Expand Down Expand Up @@ -235,31 +239,23 @@ def inner_loop_post_process(self) -> bool:
return True

def _serialize(self) -> None:
"""Serialize this module.
Returns:
None
"""
if self.options['nosave'] is True:
pass
else:
self.serialize.serialize(
trial_id=self.current_max_trial_number,
optimization_variables=self.serialize_datas,
native_random_state=self.get_native_random_state(),
numpy_random_state=self.get_numpy_random_state()
)
pass

def _deserialize(self, trial_id: int) -> None:
"""Deserialize this module.
pass

def resume(self) -> None:
""" When in resume mode, load the previous
optimization data in advance.
Args:
dict_objects(dict): A dictionary including serialized objects.
None
Returns:
None
"""
d = self.serialize.deserialize(trial_id)
self.deserialize_datas = d['optimization_variables']
self.set_native_random_state(d['native_random_state'])
self.set_numpy_random_state(d['numpy_random_state'])
if (
self.options['resume'] is not None and
self.options['resume'] > 0
):
self._deserialize(self.options['resume'])
13 changes: 11 additions & 2 deletions aiaccel/optimizer/grid/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,12 @@ def _serialize(self) -> None:
'ready_params': self.ready_params,
'generate_index': self.generate_index
}
super()._serialize()
self.serialize.serialize(
trial_id=self.trial_id.integer,
optimization_variables=self.serialize_datas,
native_random_state=self.get_native_random_state(),
numpy_random_state=self.get_numpy_random_state()
)

def _deserialize(self, trial_id: int) -> None:
"""Deserialize this module.
Expand All @@ -186,7 +191,11 @@ def _deserialize(self, trial_id: int) -> None:
Returns:
None
"""
super()._deserialize(trial_id)
d = self.serialize.deserialize(trial_id)
self.deserialize_datas = d['optimization_variables']
self.set_native_random_state(d['native_random_state'])
self.set_numpy_random_state(d['numpy_random_state'])

self.ready_params = self.deserialize_datas['ready_params']
self.generate_index = self.deserialize_datas['generate_index']
self.num_of_generated_parameter = self.deserialize_datas['num_of_generated_parameter']
Expand Down
13 changes: 11 additions & 2 deletions aiaccel/optimizer/nelder_mead/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,12 @@ def _serialize(self) -> None:
'nelder_mead': self.nelder_mead.serialize(),
'order': self.order
}
super()._serialize()
self.serialize.serialize(
trial_id=self.trial_id.integer,
optimization_variables=self.serialize_datas,
native_random_state=self.get_native_random_state(),
numpy_random_state=self.get_numpy_random_state()
)

def _deserialize(self, trial_id: int) -> None:
"""Deserialize this module.
Expand All @@ -359,7 +364,11 @@ def _deserialize(self, trial_id: int) -> None:
Returns:
None
"""
super()._deserialize(trial_id)
d = self.serialize.deserialize(trial_id)
self.deserialize_datas = d['optimization_variables']
self.set_native_random_state(d['native_random_state'])
self.set_numpy_random_state(d['numpy_random_state'])

parameter_pool = copy.deepcopy(self.deserialize_datas['parameter_pool'])
for p_pool in parameter_pool:
for p_pool_param in p_pool['parameters']:
Expand Down
13 changes: 11 additions & 2 deletions aiaccel/optimizer/random/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ def _serialize(self) -> dict:
'num_of_generated_parameter': self.num_of_generated_parameter,
'loop_count': self.loop_count
}
super()._serialize()
self.serialize.serialize(
trial_id=self.trial_id.integer,
optimization_variables=self.serialize_datas,
native_random_state=self.get_native_random_state(),
numpy_random_state=self.get_numpy_random_state()
)

def _deserialize(self, trial_id: int) -> None:
""" Deserialize this module.
Expand All @@ -63,6 +68,10 @@ def _deserialize(self, trial_id: int) -> None:
Returns:
None
"""
super()._deserialize(trial_id)
d = self.serialize.deserialize(trial_id)
self.deserialize_datas = d['optimization_variables']
self.set_native_random_state(d['native_random_state'])
self.set_numpy_random_state(d['numpy_random_state'])

self.num_of_generated_parameter = self.deserialize_datas['num_of_generated_parameter']
self.loop_count = self.deserialize_datas['loop_count']
13 changes: 11 additions & 2 deletions aiaccel/optimizer/sobol/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ def _serialize(self) -> dict:
'loop_count': self.loop_count,
'generate_index': self.generate_index
}
super()._serialize()
self.serialize.serialize(
trial_id=self.trial_id.integer,
optimization_variables=self.serialize_datas,
native_random_state=self.get_native_random_state(),
numpy_random_state=self.get_numpy_random_state()
)

def _deserialize(self, trial_id: int) -> None:
"""Deserialize this module.
Expand All @@ -93,7 +98,11 @@ def _deserialize(self, trial_id: int) -> None:
Returns:
None
"""
super()._deserialize(trial_id)
d = self.serialize.deserialize(trial_id)
self.deserialize_datas = d['optimization_variables']
self.set_native_random_state(d['native_random_state'])
self.set_numpy_random_state(d['numpy_random_state'])

self.generate_index = self.deserialize_datas['generate_index']
self.loop_count = self.deserialize_datas['loop_count']
self.num_of_generated_parameter = self.deserialize_datas['num_of_generated_parameter']
13 changes: 11 additions & 2 deletions aiaccel/optimizer/tpe/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ def _serialize(self) -> None:
'parameter_pool': parameter_pool,
'study': self.study
}
super()._serialize()
self.serialize.serialize(
trial_id=self.trial_id.integer,
optimization_variables=self.serialize_datas,
native_random_state=self.get_native_random_state(),
numpy_random_state=self.get_numpy_random_state()
)

def _deserialize(self, trial_id: int) -> None:

Expand All @@ -183,7 +188,11 @@ def _deserialize(self, trial_id: int) -> None:
Returns:
None
"""
super()._deserialize(trial_id)
d = self.serialize.deserialize(trial_id)
self.deserialize_datas = d['optimization_variables']
self.set_native_random_state(d['native_random_state'])
self.set_numpy_random_state(d['numpy_random_state'])

parameter_pool = copy.deepcopy(self.deserialize_datas['parameter_pool'])
for _, params in parameter_pool.items():
for param in params:
Expand Down
Loading

0 comments on commit 3511f87

Please sign in to comment.