diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index b9fafcecb7..f39c46d1bf 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -134,6 +134,10 @@ class HighThroughputExecutor(StatusHandlingExecutor, RepresentationMixin): When there are a few tasks (<100) or when tasks are long running, this option should be set to 0 for better load balancing. Default is 0. + address_probe_timeout : int + Managers attempt connecting over many different addesses to determine a viable address. + This option sets a time limit in seconds on the connection attempt. Default is 30s. + suppress_failure : Bool If set, the interchange will suppress failures rather than terminate early. Default: True @@ -172,6 +176,7 @@ def __init__(self, heartbeat_threshold: int = 120, heartbeat_period: int = 30, poll_period: int = 10, + address_probe_timeout: int = 30, suppress_failure: bool = True, managed: bool = True, worker_logdir_root: Optional[str] = None): @@ -191,6 +196,7 @@ def __init__(self, self.max_workers = max_workers self.prefetch_capacity = prefetch_capacity self.address = address + self.address_probe_timeout = address_probe_timeout if self.address: self.all_addresses = address else: @@ -236,6 +242,7 @@ def __init__(self, "--logdir={logdir} " "--block_id={{block_id}} " "--hb_period={heartbeat_period} " + "--address_probe_timeout={address_probe_timeout} " "--hb_threshold={heartbeat_threshold} ") def initialize_scaling(self): @@ -253,6 +260,7 @@ def initialize_scaling(self): l_cmd = self.launch_cmd.format(debug=debug_opts, prefetch_capacity=self.prefetch_capacity, + address_probe_timeout=self.address_probe_timeout, addresses=self.all_addresses, task_port=self.worker_task_port, result_port=self.worker_result_port, diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index a21256f4c0..c3365d714d 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -54,6 +54,7 @@ class Manager(object): """ def __init__(self, addresses="127.0.0.1", + address_probe_timeout=30, task_port="50097", result_port="50098", cores_per_worker=1, @@ -68,6 +69,13 @@ def __init__(self, """ Parameters ---------- + addresses : str + comma separated list of addresses for the interchange + + address_probe_timeout : int + Timeout in seconds for the address probe to detect viable addresses + to the interchange. Default : 30s + worker_url : str Worker url on which workers will attempt to connect back @@ -114,7 +122,7 @@ def __init__(self, logger.info("Manager started") try: - ix_address = probe_addresses(addresses.split(','), task_port) + ix_address = probe_addresses(addresses.split(','), task_port, timeout=address_probe_timeout) if not ix_address: raise Exception("No viable address found") else: @@ -573,6 +581,8 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_ help="Heartbeat period in seconds. Uses manager default unless set") parser.add_argument("--hb_threshold", default=120, help="Heartbeat threshold in seconds. Uses manager default unless set") + parser.add_argument("--address_probe_timeout", default=30, + help="Timeout to probe for viable address to interchange. Default: 30s") parser.add_argument("--poll", default=10, help="Poll period used in milliseconds") parser.add_argument("-r", "--result_port", required=True, @@ -599,11 +609,13 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_ logger.info("addresses: {}".format(args.addresses)) logger.info("max_workers: {}".format(args.max_workers)) logger.info("poll_period: {}".format(args.poll)) + logger.info("address_probe_timeout: {}".format(args.address_probe_timeout)) logger.info("Prefetch capacity: {}".format(args.prefetch_capacity)) manager = Manager(task_port=args.task_port, result_port=args.result_port, addresses=args.addresses, + address_probe_timeout=int(args.address_probe_timeout), uid=args.uid, block_id=args.block_id, cores_per_worker=float(args.cores_per_worker),