Skip to content

Commit

Permalink
Adding new configurable option address_probe_timeout to HTEX. (#1667)
Browse files Browse the repository at this point in the history
* Adding new configurable option address_probe_timeout to HTEX.

* fix typo

Co-authored-by: Ben Clifford <benc@hawaga.org.uk>
  • Loading branch information
yadudoc and benclifford authored May 6, 2020
1 parent 3ed5ac1 commit 4709b30
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
8 changes: 8 additions & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ class HighThroughputExecutor(StatusHandlingExecutor, RepresentationMixin):
When there are a few tasks (<100) or when tasks are long running, this option should
be set to 0 for better load balancing. Default is 0.
address_probe_timeout : int
Managers attempt connecting over many different addesses to determine a viable address.
This option sets a time limit in seconds on the connection attempt. Default is 30s.
suppress_failure : Bool
If set, the interchange will suppress failures rather than terminate early. Default: True
Expand Down Expand Up @@ -172,6 +176,7 @@ def __init__(self,
heartbeat_threshold: int = 120,
heartbeat_period: int = 30,
poll_period: int = 10,
address_probe_timeout: int = 30,
suppress_failure: bool = True,
managed: bool = True,
worker_logdir_root: Optional[str] = None):
Expand All @@ -191,6 +196,7 @@ def __init__(self,
self.max_workers = max_workers
self.prefetch_capacity = prefetch_capacity
self.address = address
self.address_probe_timeout = address_probe_timeout
if self.address:
self.all_addresses = address
else:
Expand Down Expand Up @@ -236,6 +242,7 @@ def __init__(self,
"--logdir={logdir} "
"--block_id={{block_id}} "
"--hb_period={heartbeat_period} "
"--address_probe_timeout={address_probe_timeout} "
"--hb_threshold={heartbeat_threshold} ")

def initialize_scaling(self):
Expand All @@ -253,6 +260,7 @@ def initialize_scaling(self):

l_cmd = self.launch_cmd.format(debug=debug_opts,
prefetch_capacity=self.prefetch_capacity,
address_probe_timeout=self.address_probe_timeout,
addresses=self.all_addresses,
task_port=self.worker_task_port,
result_port=self.worker_result_port,
Expand Down
14 changes: 13 additions & 1 deletion parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Manager(object):
"""
def __init__(self,
addresses="127.0.0.1",
address_probe_timeout=30,
task_port="50097",
result_port="50098",
cores_per_worker=1,
Expand All @@ -68,6 +69,13 @@ def __init__(self,
"""
Parameters
----------
addresses : str
comma separated list of addresses for the interchange
address_probe_timeout : int
Timeout in seconds for the address probe to detect viable addresses
to the interchange. Default : 30s
worker_url : str
Worker url on which workers will attempt to connect back
Expand Down Expand Up @@ -114,7 +122,7 @@ def __init__(self,
logger.info("Manager started")

try:
ix_address = probe_addresses(addresses.split(','), task_port)
ix_address = probe_addresses(addresses.split(','), task_port, timeout=address_probe_timeout)
if not ix_address:
raise Exception("No viable address found")
else:
Expand Down Expand Up @@ -573,6 +581,8 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_
help="Heartbeat period in seconds. Uses manager default unless set")
parser.add_argument("--hb_threshold", default=120,
help="Heartbeat threshold in seconds. Uses manager default unless set")
parser.add_argument("--address_probe_timeout", default=30,
help="Timeout to probe for viable address to interchange. Default: 30s")
parser.add_argument("--poll", default=10,
help="Poll period used in milliseconds")
parser.add_argument("-r", "--result_port", required=True,
Expand All @@ -599,11 +609,13 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_
logger.info("addresses: {}".format(args.addresses))
logger.info("max_workers: {}".format(args.max_workers))
logger.info("poll_period: {}".format(args.poll))
logger.info("address_probe_timeout: {}".format(args.address_probe_timeout))
logger.info("Prefetch capacity: {}".format(args.prefetch_capacity))

manager = Manager(task_port=args.task_port,
result_port=args.result_port,
addresses=args.addresses,
address_probe_timeout=int(args.address_probe_timeout),
uid=args.uid,
block_id=args.block_id,
cores_per_worker=float(args.cores_per_worker),
Expand Down

0 comments on commit 4709b30

Please sign in to comment.