Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
19 changes: 13 additions & 6 deletions .buildkite/copy_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ def perform_auth():
resp = requests.get(
"https://vop4ss7n22.execute-api.us-west-2.amazonaws.com/endpoint/",
auth=auth,
params={"job_id": os.environ["BUILDKITE_JOB_ID"]})
params={"job_id": os.environ["BUILDKITE_JOB_ID"]},
)
return resp


def handle_docker_login(resp):
pwd = resp.json()["docker_password"]
subprocess.call(
["docker", "login", "--username", "raytravisbot", "--password", pwd])
["docker", "login", "--username", "raytravisbot", "--password", pwd]
)


def gather_paths(dir_path) -> List[str]:
Expand Down Expand Up @@ -85,7 +87,7 @@ def upload_paths(paths, resp, destination):
"branch_wheels": f"{branch}/{sha}/{fn}",
"jars": f"jars/latest/{current_os}/{fn}",
"branch_jars": f"jars/{branch}/{sha}/{current_os}/{fn}",
"logs": f"bazel_events/{branch}/{sha}/{bk_job_id}/{fn}"
"logs": f"bazel_events/{branch}/{sha}/{bk_job_id}/{fn}",
}[destination]
of["file"] = open(path, "rb")
r = requests.post(c["url"], files=of)
Expand All @@ -94,14 +96,19 @@ def upload_paths(paths, resp, destination):

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Helper script to upload files to S3 bucket")
description="Helper script to upload files to S3 bucket"
)
parser.add_argument("--path", type=str, required=False)
parser.add_argument("--destination", type=str)
args = parser.parse_args()

assert args.destination in {
"branch_jars", "branch_wheels", "jars", "logs", "wheels",
"docker_login"
"branch_jars",
"branch_wheels",
"jars",
"logs",
"wheels",
"docker_login",
}
assert "BUILDKITE_JOB_ID" in os.environ
assert "BUILDKITE_COMMIT" in os.environ
Expand Down
8 changes: 7 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ exclude =
python/.eggs/
python/ray/_private/thirdparty/*
python/ray/workflow/tests/mock_server.py
max-line-length = 79
max-line-length = 88
inline-quotes = "
ignore =
C408
E121
E123
E126
E203
E226
E24
E704
Expand All @@ -41,3 +42,8 @@ ignore =
B016
B017
avoid-escape = no
# Error E731 is ignored because of the migration from YAPF to Black.
# See https://github.com/ray-project/ray/issues/21315 for more information.
per-file-ignores =
rllib/evaluation/worker_set.py:E731
rllib/evaluation/sampler.py:E731
2 changes: 1 addition & 1 deletion .gitpod/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ RUN set -x; apt update \
&& mv bazel.gpg /etc/apt/trusted.gpg.d/ \
&& echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | tee /etc/apt/sources.list.d/bazel.list \
&& apt update && apt install bazel-3.7.2 -y \
&& pip3 install cython==0.29.26 pytest pandas tree tabulate pexpect sklearn joblib yapf==0.23.0 flake8==3.9.1 mypy==0.782 flake8-quotes flake8-bugbear==21.9.2 setproctitle==1.1.10 psutil \
&& pip3 install cython==0.29.26 pytest pandas tree tabulate pexpect sklearn joblib black==21.12b0 yapf==0.23.0 flake8==3.9.1 mypy==0.782 flake8-quotes flake8-bugbear==21.9.2 setproctitle==1.1.10 psutil \
&& python3 -c 'print("startup --output_base=/workspace/ray/.bazel-cache\nstartup --host_jvm_args=-Xmx1800m\nbuild --jobs=6")' > /etc/bazel.bazelrc

RUN update-alternatives --install /usr/local/bin/python python /usr/bin/python3 30 \
Expand Down
8 changes: 5 additions & 3 deletions benchmarks/distributed/test_many_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ def no_resource_leaks():
test_utils.wait_for_condition(no_resource_leaks)

rate = MAX_ACTORS_IN_CLUSTER / (end_time - start_time)
print(f"Success! Started {MAX_ACTORS_IN_CLUSTER} actors in "
f"{end_time - start_time}s. ({rate} actors/s)")
print(
f"Success! Started {MAX_ACTORS_IN_CLUSTER} actors in "
f"{end_time - start_time}s. ({rate} actors/s)"
)

if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
Expand All @@ -62,6 +64,6 @@ def no_resource_leaks():
"time": end_time - start_time,
"success": "1",
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage
"_peak_process_memory": usage,
}
json.dump(results, out_file)
8 changes: 5 additions & 3 deletions benchmarks/distributed/test_many_pgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ def no_resource_leaks():
test_utils.wait_for_condition(no_resource_leaks)

rate = MAX_PLACEMENT_GROUPS / (end_time - start_time)
print(f"Success! Started {MAX_PLACEMENT_GROUPS} pgs in "
f"{end_time - start_time}s. ({rate} pgs/s)")
print(
f"Success! Started {MAX_PLACEMENT_GROUPS} pgs in "
f"{end_time - start_time}s. ({rate} pgs/s)"
)

if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
Expand All @@ -88,6 +90,6 @@ def no_resource_leaks():
"time": end_time - start_time,
"success": "1",
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage
"_peak_process_memory": usage,
}
json.dump(results, out_file)
15 changes: 7 additions & 8 deletions benchmarks/distributed/test_many_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ def test_max_running_tasks(num_tasks):
def task():
time.sleep(sleep_time)

refs = [
task.remote() for _ in tqdm.trange(num_tasks, desc="Launching tasks")
]
refs = [task.remote() for _ in tqdm.trange(num_tasks, desc="Launching tasks")]

max_cpus = ray.cluster_resources()["CPU"]
min_cpus_available = max_cpus
Expand Down Expand Up @@ -48,8 +46,7 @@ def no_resource_leaks():


@click.command()
@click.option(
"--num-tasks", required=True, type=int, help="Number of tasks to launch.")
@click.option("--num-tasks", required=True, type=int, help="Number of tasks to launch.")
def test(num_tasks):
ray.init(address="auto")

Expand All @@ -66,8 +63,10 @@ def test(num_tasks):
test_utils.wait_for_condition(no_resource_leaks)

rate = num_tasks / (end_time - start_time - sleep_time)
print(f"Success! Started {num_tasks} tasks in {end_time - start_time}s. "
f"({rate} tasks/s)")
print(
f"Success! Started {num_tasks} tasks in {end_time - start_time}s. "
f"({rate} tasks/s)"
)

if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
Expand All @@ -77,7 +76,7 @@ def test(num_tasks):
"time": end_time - start_time,
"success": "1",
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage
"_peak_process_memory": usage,
}
json.dump(results, out_file)

Expand Down
3 changes: 1 addition & 2 deletions benchmarks/distributed/wait_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ def num_alive_nodes():


@click.command()
@click.option(
"--num-nodes", required=True, type=int, help="The target number of nodes")
@click.option("--num-nodes", required=True, type=int, help="The target number of nodes")
def wait_cluster(num_nodes: int):
ray.init(address="auto")
while num_alive_nodes() != num_nodes:
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/object_store/test_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from tqdm import tqdm

NUM_NODES = 50
OBJECT_SIZE = 2**30
OBJECT_SIZE = 2 ** 30


def num_alive_nodes():
Expand Down Expand Up @@ -60,6 +60,6 @@ def sum(self, arr):
"broadcast_time": end - start,
"object_size": OBJECT_SIZE,
"num_nodes": NUM_NODES,
"success": "1"
"success": "1",
}
json.dump(results, out_file)
7 changes: 3 additions & 4 deletions benchmarks/single_node/test_single_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
MAX_RETURNS = 3000
MAX_RAY_GET_ARGS = 10000
MAX_QUEUED_TASKS = 1_000_000
MAX_RAY_GET_SIZE = 100 * 2**30
MAX_RAY_GET_SIZE = 100 * 2 ** 30


def assert_no_leaks():
Expand Down Expand Up @@ -189,8 +189,7 @@ def test_large_object():
print(f"Many returns time: {returns_time} ({MAX_RETURNS} returns)")
print(f"Ray.get time: {get_time} ({MAX_RAY_GET_ARGS} args)")
print(f"Queued task time: {queued_time} ({MAX_QUEUED_TASKS} tasks)")
print(f"Ray.get large object time: {large_object_time} "
f"({MAX_RAY_GET_SIZE} bytes)")
print(f"Ray.get large object time: {large_object_time} " f"({MAX_RAY_GET_SIZE} bytes)")

if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
Expand All @@ -205,6 +204,6 @@ def test_large_object():
"num_queued": MAX_QUEUED_TASKS,
"large_object_time": large_object_time,
"large_object_size": MAX_RAY_GET_SIZE,
"success": "1"
"success": "1",
}
json.dump(results, out_file)
55 changes: 41 additions & 14 deletions ci/remote-watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def get_remote_url(remote):

def replace_suffix(base, old_suffix, new_suffix=""):
if base.endswith(old_suffix):
base = base[:len(base) - len(old_suffix)] + new_suffix
base = base[: len(base) - len(old_suffix)] + new_suffix
return base


Expand Down Expand Up @@ -199,12 +199,21 @@ def monitor():
expected_line = "{}\t{}".format(expected_sha, ref)

if should_keep_alive(git("show", "-s", "--format=%B", "HEAD^-")):
logger.info("Not monitoring %s on %s due to keep-alive on: %s", ref,
remote, expected_line)
logger.info(
"Not monitoring %s on %s due to keep-alive on: %s",
ref,
remote,
expected_line,
)
return

logger.info("Monitoring %s (%s) for changes in %s: %s", remote,
get_remote_url(remote), ref, expected_line)
logger.info(
"Monitoring %s (%s) for changes in %s: %s",
remote,
get_remote_url(remote),
ref,
expected_line,
)

for to_wait in yield_poll_schedule():
time.sleep(to_wait)
Expand All @@ -217,27 +226,46 @@ def monitor():
status = ex.returncode

if status == 2:
logger.info("Terminating job as %s has been deleted on %s: %s",
ref, remote, expected_line)
logger.info(
"Terminating job as %s has been deleted on %s: %s",
ref,
remote,
expected_line,
)
break
elif status != 0:
logger.error("Error %d: unable to check %s on %s: %s", status, ref,
remote, expected_line)
logger.error(
"Error %d: unable to check %s on %s: %s",
status,
ref,
remote,
expected_line,
)
else:
prev = expected_line
expected_line = detect_spurious_commit(line, expected_line, remote)
if expected_line != line:
logger.info(
"Terminating job as %s has been updated on %s\n"
" from:\t%s\n"
" to: \t%s", ref, remote, expected_line, line)
" to: \t%s",
ref,
remote,
expected_line,
line,
)
time.sleep(1) # wait for CI to flush output
break
if expected_line != prev:
logger.info(
"%s appeared to spuriously change on %s\n"
" from:\t%s\n"
" to: \t%s", ref, remote, prev, expected_line)
" to: \t%s",
ref,
remote,
prev,
expected_line,
)

return terminate_my_process_group()

Expand All @@ -259,9 +287,8 @@ def main(program, *args):

if __name__ == "__main__":
logging.basicConfig(
format="%(levelname)s: %(message)s",
stream=sys.stderr,
level=logging.DEBUG)
format="%(levelname)s: %(message)s", stream=sys.stderr, level=logging.DEBUG
)
try:
raise SystemExit(main(*sys.argv) or 0)
except KeyboardInterrupt:
Expand Down
Loading