|
24 | 24 | from ray._private.ray_constants import DEFAULT_CLIENT_RECONNECT_GRACE_PERIOD |
25 | 25 | from ray._private.runtime_env.py_modules import upload_py_modules_if_needed |
26 | 26 | from ray._private.runtime_env.working_dir import upload_working_dir_if_needed |
| 27 | +from ray._private.ray_constants import env_integer, env_float |
27 | 28 |
|
28 | 29 | # Use cloudpickle's version of pickle for UnpicklingError |
29 | 30 | from ray.cloudpickle.compat import pickle |
|
52 | 53 |
|
53 | 54 | logger = logging.getLogger(__name__) |
54 | 55 |
|
55 | | -INITIAL_TIMEOUT_SEC = 5 |
56 | | -MAX_TIMEOUT_SEC = 30 |
57 | | - |
| 56 | +INITIAL_TIMEOUT_SEC = env_integer("RAY_CLIENT_INITIAL_CONNECTION_TIMEOUT_S", 5) |
| 57 | +MAX_TIMEOUT_SEC = env_integer("RAY_CLIENT_MAX_CONNECTION_TIMEOUT_S", 30) |
58 | 58 | # The max amount of time an operation can run blocking in the server. This |
59 | 59 | # allows for Ctrl-C of the client to work without explicitly cancelling server |
60 | 60 | # operations. |
61 | | -MAX_BLOCKING_OPERATION_TIME_S: float = 2.0 |
| 61 | +MAX_BLOCKING_OPERATION_TIME_S: float = env_float("RAY_CLIENT_MAX_BLOCKING_OPERATION_TIME_S", 2.0) |
62 | 62 |
|
63 | 63 | # If the total size (bytes) of all outbound messages to schedule tasks since |
64 | 64 | # the connection began exceeds this value, a warning should be raised |
@@ -416,19 +416,14 @@ def get(self, vals, *, timeout: Optional[float] = None) -> Any: |
416 | 416 | else: |
417 | 417 | deadline = time.monotonic() + timeout |
418 | 418 |
|
419 | | - max_blocking_operation_time = MAX_BLOCKING_OPERATION_TIME_S |
420 | | - if "RAY_CLIENT_MAX_BLOCKING_OPERATION_TIME_S" in os.environ: |
421 | | - max_blocking_operation_time = float( |
422 | | - os.environ["RAY_CLIENT_MAX_BLOCKING_OPERATION_TIME_S"] |
423 | | - ) |
424 | 419 | while True: |
425 | 420 | if deadline: |
426 | 421 | op_timeout = min( |
427 | | - max_blocking_operation_time, |
| 422 | + MAX_BLOCKING_OPERATION_TIME_S, |
428 | 423 | max(deadline - time.monotonic(), 0.001), |
429 | 424 | ) |
430 | 425 | else: |
431 | | - op_timeout = max_blocking_operation_time |
| 426 | + op_timeout = MAX_BLOCKING_OPERATION_TIME_S |
432 | 427 | try: |
433 | 428 | res = self._get(to_get, op_timeout) |
434 | 429 | break |
|
0 commit comments