Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more benchmarks and Fix spilling #6

Merged
merged 3 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions four_stage_problem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import datetime
import json
import os
import time
import numpy as np
import timeline_utils
import ray
import psutil

LOG_FILE = "four_stage_problem.log"


class Logger:
def __init__(self, filename: str = LOG_FILE):
self._filename = filename
self._start_time = time.time()

def record_start(self):
self._start_time = time.time()
with open(self._filename, "w"):
pass

def log(self, payload: dict):
payload = {
**payload,
"time": time.time() - self._start_time,
"clock_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f"),
}
with open(self._filename, "a") as f:
f.write(json.dumps(payload) + "\n")

logger = Logger()

TIME_UNIT = 0.5


def main(is_flink: bool):

os.environ["RAY_DATA_OP_RESERVATION_RATIO"] = "0"

NUM_CPUS = 8
NUM_GPUS = 4
NUM_ROWS_PER_TASK = 10
BUFFER_SIZE_LIMIT = 30
NUM_TASKS = 16 * 5
NUM_ROWS_TOTAL = NUM_ROWS_PER_TASK * NUM_TASKS
BLOCK_SIZE = 10 * 1024 * 1024 * 10

def produce(batch):
logger.log({
"name": "producer_start",
"id": [int(x) for x in batch["id"]]})
time.sleep(TIME_UNIT * 10)
for id in batch["id"]:
yield {
"id": [id],
"image": [np.zeros(BLOCK_SIZE, dtype=np.uint8)],
}

def consume(batch):
logger.log({"name": "consume", "id": int(batch["id"].item())})
time.sleep(TIME_UNIT)
return {"id": batch["id"], "result": [0 for _ in batch["id"]]}

def inference(batch):
logger.log({"name": "inference", "id": int(batch["id"].item())})
time.sleep(TIME_UNIT)
return {"id": batch["id"]}

def write(batch):
logger.log({"name": "write", "id": int(batch["id"].item())})
time.sleep(TIME_UNIT)
return {"id": batch["id"]}

data_context = ray.data.DataContext.get_current()
data_context.execution_options.verbose_progress = True
data_context.target_max_block_size = BLOCK_SIZE

if is_flink:
data_context.is_budget_policy = False # Disable our policy.
else:
data_context.is_budget_policy = True

ray.init(num_cpus=NUM_CPUS, num_gpus=NUM_GPUS, object_store_memory=BUFFER_SIZE_LIMIT * BLOCK_SIZE)

ds = ray.data.range(NUM_ROWS_TOTAL, override_num_blocks=NUM_TASKS)

if is_flink:
ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK, concurrency=2)
ds = ds.map_batches(consume, batch_size=1, num_cpus=0.99, concurrency=3)
ds = ds.map_batches(inference, batch_size=1, num_gpus=1, concurrency=4)
ds = ds.map_batches(write, batch_size=1, num_cpus=0.99, concurrency=3)

else:
ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK)
ds = ds.map_batches(consume, batch_size=1, num_cpus=0.99)
ds = ds.map_batches(inference, batch_size=1, num_gpus=1)
ds = ds.map_batches(write, batch_size=1, num_cpus=0.99)

logger.record_start()

start_time = time.time()
logger.log({"name": "execution_start"})
for i, _ in enumerate(ds.iter_batches(batch_size=NUM_ROWS_PER_TASK)):
logger.log({"name": "iteration", "id": i})
pass
end_time = time.time()
print(ds.stats())
print(ray._private.internal_api.memory_summary(stats_only=True))
print(f"Total time: {end_time - start_time:.4f}s")
timeline_utils.save_timeline_with_cpus_gpus(f"timeline_{'ray' if not is_flink else 'flink'}_four_stage.json", NUM_CPUS, NUM_GPUS)
ray.shutdown()

if __name__ == "__main__":
# main(is_flink=True)
main(is_flink=False)
48 changes: 28 additions & 20 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,35 +317,43 @@ def mark_finished(self, exception: Optional[Exception] = None):

def _get_average_ouput_size(self) -> float:
return self.op._metrics.average_bytes_outputs_per_task

def _get_grow_rate(self, resource_manager: ResourceManager) -> float:
cumulative_grow_rate = 0

# Assume no more than one output.
assert len(self.op.output_dependencies) <= 1
for op in self.op.output_dependencies:
logger.debug(
"@mzm: average_bytes_inputs_per_task "
f"{op._metrics.average_bytes_inputs_per_task}, "
f"average_task_duration: {op._metrics.average_task_duration}, "
)

time_for_pipeline_to_process_one_data = 0
next_op = self.op
output_input_multipler = 1

while len(next_op.output_dependencies) > 0:
assert len(next_op.output_dependencies) == 1

next_op = next_op.output_dependencies[0]

# Initialize grow rate to be 0.
if (
not op._metrics.average_task_duration
or not op._metrics.average_bytes_inputs_per_task
not next_op._metrics.average_task_duration
or not next_op._metrics.average_bytes_inputs_per_task
or not next_op._metrics.average_bytes_outputs_per_task
):
continue

cumulative_grow_rate += (
op._metrics.average_bytes_inputs_per_task
/ op._metrics.average_task_duration
* (

time_for_op = output_input_multipler * next_op._metrics.average_task_duration / next_op._metrics.average_bytes_inputs_per_task

if next_op.incremental_resource_usage().cpu:
# @MaoZiming: if it is on GPU, it doesn't take CPU time.
time_for_pipeline_to_process_one_data += time_for_op

output_input_multipler *= (next_op._metrics.average_bytes_outputs_per_task / next_op._metrics.average_bytes_inputs_per_task)

num_executors_not_running_op = (
resource_manager.get_global_limits().cpu
- self.op.num_active_tasks() * self.op.incremental_resource_usage().cpu
)
)

return cumulative_grow_rate
if time_for_pipeline_to_process_one_data == 0:
return 0

return (1 / time_for_pipeline_to_process_one_data) * num_executors_not_running_op

def _replenish_output_budget(self, resource_manager: ResourceManager) -> float:

Expand Down
2 changes: 1 addition & 1 deletion spiky_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,5 @@ def consume(batch):
ray.shutdown()

if __name__ == "__main__":
main(is_flink=True)
# main(is_flink=True)
main(is_flink=False)
2 changes: 1 addition & 1 deletion test_large_e2e_backpressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np

import ray
import ray.timeline_utils as timeline_utils
import timeline_utils

LOG_FILE = "test_large_e2e_backpressure.log"

Expand Down
109 changes: 109 additions & 0 deletions three_cpu_stage_problem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import datetime
import json
import os
import time
import numpy as np
import timeline_utils
import ray
import psutil

LOG_FILE = "three_cpu_stage_problem.log"


class Logger:
def __init__(self, filename: str = LOG_FILE):
self._filename = filename
self._start_time = time.time()

def record_start(self):
self._start_time = time.time()
with open(self._filename, "w"):
pass

def log(self, payload: dict):
payload = {
**payload,
"time": time.time() - self._start_time,
"clock_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f"),
}
with open(self._filename, "a") as f:
f.write(json.dumps(payload) + "\n")

logger = Logger()

TIME_UNIT = 0.5


def main(is_flink: bool):

os.environ["RAY_DATA_OP_RESERVATION_RATIO"] = "0"

NUM_CPUS = 8
NUM_ROWS_PER_TASK = 10
BUFFER_SIZE_LIMIT = 30
NUM_TASKS = 16 * 5
NUM_ROWS_TOTAL = NUM_ROWS_PER_TASK * NUM_TASKS
BLOCK_SIZE = 10 * 1024 * 1024 * 10

def produce(batch):
logger.log({
"name": "producer_start",
"id": [int(x) for x in batch["id"]]})
time.sleep(TIME_UNIT * 10)
for id in batch["id"]:
yield {
"id": [id],
"image": [np.zeros(BLOCK_SIZE, dtype=np.uint8)],
}

def transform(batch):
logger.log({"name": "transform", "id": int(batch["id"].item())})
time.sleep(TIME_UNIT)
return {
"id": batch["id"],
"image": [np.zeros(BLOCK_SIZE, dtype=np.uint8)]}

def consume(batch):
logger.log({"name": "consume", "id": int(batch["id"].item())})
time.sleep(TIME_UNIT)
return {"id": batch["id"], "result": [0 for _ in batch["id"]]}

data_context = ray.data.DataContext.get_current()
data_context.execution_options.verbose_progress = True
data_context.target_max_block_size = BLOCK_SIZE

if is_flink:
data_context.is_budget_policy = False # Disable our policy.
else:
data_context.is_budget_policy = True

ray.init(num_cpus=NUM_CPUS, object_store_memory=BUFFER_SIZE_LIMIT * BLOCK_SIZE)

ds = ray.data.range(NUM_ROWS_TOTAL, override_num_blocks=NUM_TASKS)

if is_flink:
ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK, concurrency=2)
ds = ds.map_batches(transform, batch_size=1, num_cpus=0.99, concurrency=3)
ds = ds.map_batches(consume, batch_size=1, num_cpus=1, concurrency=3)
else:
ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK)
ds = ds.map_batches(transform, batch_size=1, num_cpus=0.99)
ds = ds.map_batches(consume, batch_size=1, num_cpus=1)

logger.record_start()

start_time = time.time()
logger.log({"name": "execution_start"})
for i, _ in enumerate(ds.iter_batches(batch_size=NUM_ROWS_PER_TASK)):
logger.log({"name": "iteration", "id": i})
pass
end_time = time.time()
print(ds.stats())
print(ray._private.internal_api.memory_summary(stats_only=True))
print(f"Total time: {end_time - start_time:.4f}s")
timeline_utils.save_timeline_with_cpus_gpus(f"timeline_{'ray' if not is_flink else 'flink'}_three_cpu_stage.json", NUM_CPUS, 0)
ray.shutdown()

if __name__ == "__main__":
main(is_flink=True)
# main(is_flink=False)
2 changes: 1 addition & 1 deletion three_stage_problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,5 @@ def inference(batch):
ray.shutdown()

if __name__ == "__main__":
main(is_flink=True)
# main(is_flink=True)
main(is_flink=False)
10 changes: 7 additions & 3 deletions timeline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
producer_task_name = "task::ReadRange->MapBatches(produce)"
consumer_task_name = "task::MapBatches(consume)"
inference_task_name = "task::MapBatches(inference)"
transform_task_name = "task::MapBatches(transform)"
write_task_name = "task::MapBatches(write)"

COLORS = {
producer_task_name: "rail_response",
consumer_task_name: "cq_build_passed",
inference_task_name: "cq_build_failed"
inference_task_name: "cq_build_failed",
transform_task_name: "rail_load",
write_task_name: "cq_build_running"
}

def assign_slots(events, num_cpus, num_gpus):
Expand All @@ -31,7 +35,7 @@ def i_to_slot_name(i):
assigned = False
# Find an available slot for the worker
for i in range(num_cpus + num_gpus):
if event['cat'] in [producer_task_name, consumer_task_name] and i > num_cpus:
if event['cat'] in [producer_task_name, consumer_task_name, write_task_name] and i > num_cpus:
continue
if event['cat'] in [inference_task_name] and i < num_cpus:
continue
Expand Down Expand Up @@ -113,4 +117,4 @@ def read_and_save_timeline_with_cpus_gpus(addr: str, num_cpus: int, num_gpus: in
print(f"Processed and saved modified data to {addr}")

# if __name__ == "__main__":
# read_and_save_timeline_with_cpus_gpus('timeline_flink_three_stage.json', 8, 4)
# read_and_save_timeline_with_cpus_gpus('timeline_flink_four_stage.json', 8, 4)
6 changes: 3 additions & 3 deletions variable_duration_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time

import numpy as np
import ray.timeline_utils as timeline_utils
import timeline_utils
import ray

LOG_FILE = "variable_duration_benchmark.log"
Expand Down Expand Up @@ -62,7 +62,7 @@ def produce(batch):

def consume(batch):
logger.log({"name": "consume", "id": int(batch["id"].item())})
if int(batch["id"]) < NUM_ROWS_TOTAL / 2:
if int(batch["id"].item()) < NUM_ROWS_TOTAL / 2:
time.sleep(TIME_UNIT)
else:
time.sleep(TIME_UNIT * 2)
Expand Down Expand Up @@ -103,5 +103,5 @@ def consume(batch):
ray.shutdown()

if __name__ == "__main__":
main(is_flink=True)
# main(is_flink=True)
main(is_flink=False)