|
21 | 21 | import ray.cloudpickle as cloudpickle |
22 | 22 | import ray.core.generated.ray_client_pb2 as ray_client_pb2 |
23 | 23 | import ray.core.generated.ray_client_pb2_grpc as ray_client_pb2_grpc |
24 | | -from ray._private.ray_constants import DEFAULT_CLIENT_RECONNECT_GRACE_PERIOD |
| 24 | +from ray._private.ray_constants import ( |
| 25 | + DEFAULT_CLIENT_RECONNECT_GRACE_PERIOD, |
| 26 | + env_float, |
| 27 | + env_integer, |
| 28 | +) |
25 | 29 | from ray._private.runtime_env.py_modules import upload_py_modules_if_needed |
26 | 30 | from ray._private.runtime_env.working_dir import upload_working_dir_if_needed |
27 | 31 |
|
|
52 | 56 |
|
53 | 57 | logger = logging.getLogger(__name__) |
54 | 58 |
|
55 | | -INITIAL_TIMEOUT_SEC = 5 |
56 | | -MAX_TIMEOUT_SEC = 30 |
57 | | - |
| 59 | +INITIAL_TIMEOUT_SEC = env_integer("RAY_CLIENT_INITIAL_CONNECTION_TIMEOUT_S", 5) |
| 60 | +MAX_TIMEOUT_SEC = env_integer("RAY_CLIENT_MAX_CONNECTION_TIMEOUT_S", 30) |
58 | 61 | # The max amount of time an operation can run blocking in the server. This |
59 | 62 | # allows for Ctrl-C of the client to work without explicitly cancelling server |
60 | 63 | # operations. |
61 | | -MAX_BLOCKING_OPERATION_TIME_S: float = 2.0 |
| 64 | +MAX_BLOCKING_OPERATION_TIME_S: float = env_float( |
| 65 | + "RAY_CLIENT_MAX_BLOCKING_OPERATION_TIME_S", 2.0 |
| 66 | +) |
62 | 67 |
|
63 | 68 | # If the total size (bytes) of all outbound messages to schedule tasks since |
64 | 69 | # the connection began exceeds this value, a warning should be raised |
@@ -416,19 +421,14 @@ def get(self, vals, *, timeout: Optional[float] = None) -> Any: |
416 | 421 | else: |
417 | 422 | deadline = time.monotonic() + timeout |
418 | 423 |
|
419 | | - max_blocking_operation_time = MAX_BLOCKING_OPERATION_TIME_S |
420 | | - if "RAY_CLIENT_MAX_BLOCKING_OPERATION_TIME_S" in os.environ: |
421 | | - max_blocking_operation_time = float( |
422 | | - os.environ["RAY_CLIENT_MAX_BLOCKING_OPERATION_TIME_S"] |
423 | | - ) |
424 | 424 | while True: |
425 | 425 | if deadline: |
426 | 426 | op_timeout = min( |
427 | | - max_blocking_operation_time, |
| 427 | + MAX_BLOCKING_OPERATION_TIME_S, |
428 | 428 | max(deadline - time.monotonic(), 0.001), |
429 | 429 | ) |
430 | 430 | else: |
431 | | - op_timeout = max_blocking_operation_time |
| 431 | + op_timeout = MAX_BLOCKING_OPERATION_TIME_S |
432 | 432 | try: |
433 | 433 | res = self._get(to_get, op_timeout) |
434 | 434 | break |
|
0 commit comments