diff --git a/four_stage_problem.py b/four_stage_problem.py new file mode 100644 index 0000000000000..be04e6c2a9c3c --- /dev/null +++ b/four_stage_problem.py @@ -0,0 +1,116 @@ +import datetime +import json +import os +import time +import numpy as np +import timeline_utils +import ray +import psutil + +LOG_FILE = "four_stage_problem.log" + + +class Logger: + def __init__(self, filename: str = LOG_FILE): + self._filename = filename + self._start_time = time.time() + + def record_start(self): + self._start_time = time.time() + with open(self._filename, "w"): + pass + + def log(self, payload: dict): + payload = { + **payload, + "time": time.time() - self._start_time, + "clock_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f"), + } + with open(self._filename, "a") as f: + f.write(json.dumps(payload) + "\n") + +logger = Logger() + +TIME_UNIT = 0.5 + + +def main(is_flink: bool): + + os.environ["RAY_DATA_OP_RESERVATION_RATIO"] = "0" + + NUM_CPUS = 8 + NUM_GPUS = 4 + NUM_ROWS_PER_TASK = 10 + BUFFER_SIZE_LIMIT = 30 + NUM_TASKS = 16 * 5 + NUM_ROWS_TOTAL = NUM_ROWS_PER_TASK * NUM_TASKS + BLOCK_SIZE = 10 * 1024 * 1024 * 10 + + def produce(batch): + logger.log({ + "name": "producer_start", + "id": [int(x) for x in batch["id"]]}) + time.sleep(TIME_UNIT * 10) + for id in batch["id"]: + yield { + "id": [id], + "image": [np.zeros(BLOCK_SIZE, dtype=np.uint8)], + } + + def consume(batch): + logger.log({"name": "consume", "id": int(batch["id"].item())}) + time.sleep(TIME_UNIT) + return {"id": batch["id"], "result": [0 for _ in batch["id"]]} + + def inference(batch): + logger.log({"name": "inference", "id": int(batch["id"].item())}) + time.sleep(TIME_UNIT) + return {"id": batch["id"]} + + def write(batch): + logger.log({"name": "write", "id": int(batch["id"].item())}) + time.sleep(TIME_UNIT) + return {"id": batch["id"]} + + data_context = ray.data.DataContext.get_current() + data_context.execution_options.verbose_progress = True + data_context.target_max_block_size = BLOCK_SIZE + + if is_flink: + data_context.is_budget_policy = False # Disable our policy. + else: + data_context.is_budget_policy = True + + ray.init(num_cpus=NUM_CPUS, num_gpus=NUM_GPUS, object_store_memory=BUFFER_SIZE_LIMIT * BLOCK_SIZE) + + ds = ray.data.range(NUM_ROWS_TOTAL, override_num_blocks=NUM_TASKS) + + if is_flink: + ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK, concurrency=2) + ds = ds.map_batches(consume, batch_size=1, num_cpus=0.99, concurrency=3) + ds = ds.map_batches(inference, batch_size=1, num_gpus=1, concurrency=4) + ds = ds.map_batches(write, batch_size=1, num_cpus=0.99, concurrency=3) + + else: + ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK) + ds = ds.map_batches(consume, batch_size=1, num_cpus=0.99) + ds = ds.map_batches(inference, batch_size=1, num_gpus=1) + ds = ds.map_batches(write, batch_size=1, num_cpus=0.99) + + logger.record_start() + + start_time = time.time() + logger.log({"name": "execution_start"}) + for i, _ in enumerate(ds.iter_batches(batch_size=NUM_ROWS_PER_TASK)): + logger.log({"name": "iteration", "id": i}) + pass + end_time = time.time() + print(ds.stats()) + print(ray._private.internal_api.memory_summary(stats_only=True)) + print(f"Total time: {end_time - start_time:.4f}s") + timeline_utils.save_timeline_with_cpus_gpus(f"timeline_{'ray' if not is_flink else 'flink'}_four_stage.json", NUM_CPUS, NUM_GPUS) + ray.shutdown() + +if __name__ == "__main__": + # main(is_flink=True) + main(is_flink=False) diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index b88b7f89d1044..667916ccc42ae 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -317,35 +317,43 @@ def mark_finished(self, exception: Optional[Exception] = None): def _get_average_ouput_size(self) -> float: return self.op._metrics.average_bytes_outputs_per_task - + def _get_grow_rate(self, resource_manager: ResourceManager) -> float: - cumulative_grow_rate = 0 - - # Assume no more than one output. - assert len(self.op.output_dependencies) <= 1 - for op in self.op.output_dependencies: - logger.debug( - "@mzm: average_bytes_inputs_per_task " - f"{op._metrics.average_bytes_inputs_per_task}, " - f"average_task_duration: {op._metrics.average_task_duration}, " - ) + + time_for_pipeline_to_process_one_data = 0 + next_op = self.op + output_input_multipler = 1 + + while len(next_op.output_dependencies) > 0: + assert len(next_op.output_dependencies) == 1 + + next_op = next_op.output_dependencies[0] + # Initialize grow rate to be 0. if ( - not op._metrics.average_task_duration - or not op._metrics.average_bytes_inputs_per_task + not next_op._metrics.average_task_duration + or not next_op._metrics.average_bytes_inputs_per_task + or not next_op._metrics.average_bytes_outputs_per_task ): continue - - cumulative_grow_rate += ( - op._metrics.average_bytes_inputs_per_task - / op._metrics.average_task_duration - * ( + + time_for_op = output_input_multipler * next_op._metrics.average_task_duration / next_op._metrics.average_bytes_inputs_per_task + + if next_op.incremental_resource_usage().cpu: + # @MaoZiming: if it is on GPU, it doesn't take CPU time. + time_for_pipeline_to_process_one_data += time_for_op + + output_input_multipler *= (next_op._metrics.average_bytes_outputs_per_task / next_op._metrics.average_bytes_inputs_per_task) + + num_executors_not_running_op = ( resource_manager.get_global_limits().cpu - self.op.num_active_tasks() * self.op.incremental_resource_usage().cpu ) - ) - return cumulative_grow_rate + if time_for_pipeline_to_process_one_data == 0: + return 0 + + return (1 / time_for_pipeline_to_process_one_data) * num_executors_not_running_op def _replenish_output_budget(self, resource_manager: ResourceManager) -> float: diff --git a/spiky_benchmark.py b/spiky_benchmark.py index b24331605a792..b3a9fb81fd2d7 100644 --- a/spiky_benchmark.py +++ b/spiky_benchmark.py @@ -102,5 +102,5 @@ def consume(batch): ray.shutdown() if __name__ == "__main__": - main(is_flink=True) + # main(is_flink=True) main(is_flink=False) diff --git a/test_large_e2e_backpressure.py b/test_large_e2e_backpressure.py index d9afc720a3a9b..2bc60675f9b2c 100644 --- a/test_large_e2e_backpressure.py +++ b/test_large_e2e_backpressure.py @@ -6,7 +6,7 @@ import numpy as np import ray -import ray.timeline_utils as timeline_utils +import timeline_utils LOG_FILE = "test_large_e2e_backpressure.log" diff --git a/three_cpu_stage_problem.py b/three_cpu_stage_problem.py new file mode 100644 index 0000000000000..55cf43ac04f61 --- /dev/null +++ b/three_cpu_stage_problem.py @@ -0,0 +1,109 @@ +import datetime +import json +import os +import time +import numpy as np +import timeline_utils +import ray +import psutil + +LOG_FILE = "three_cpu_stage_problem.log" + + +class Logger: + def __init__(self, filename: str = LOG_FILE): + self._filename = filename + self._start_time = time.time() + + def record_start(self): + self._start_time = time.time() + with open(self._filename, "w"): + pass + + def log(self, payload: dict): + payload = { + **payload, + "time": time.time() - self._start_time, + "clock_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f"), + } + with open(self._filename, "a") as f: + f.write(json.dumps(payload) + "\n") + +logger = Logger() + +TIME_UNIT = 0.5 + + +def main(is_flink: bool): + + os.environ["RAY_DATA_OP_RESERVATION_RATIO"] = "0" + + NUM_CPUS = 8 + NUM_ROWS_PER_TASK = 10 + BUFFER_SIZE_LIMIT = 30 + NUM_TASKS = 16 * 5 + NUM_ROWS_TOTAL = NUM_ROWS_PER_TASK * NUM_TASKS + BLOCK_SIZE = 10 * 1024 * 1024 * 10 + + def produce(batch): + logger.log({ + "name": "producer_start", + "id": [int(x) for x in batch["id"]]}) + time.sleep(TIME_UNIT * 10) + for id in batch["id"]: + yield { + "id": [id], + "image": [np.zeros(BLOCK_SIZE, dtype=np.uint8)], + } + + def transform(batch): + logger.log({"name": "transform", "id": int(batch["id"].item())}) + time.sleep(TIME_UNIT) + return { + "id": batch["id"], + "image": [np.zeros(BLOCK_SIZE, dtype=np.uint8)]} + + def consume(batch): + logger.log({"name": "consume", "id": int(batch["id"].item())}) + time.sleep(TIME_UNIT) + return {"id": batch["id"], "result": [0 for _ in batch["id"]]} + + data_context = ray.data.DataContext.get_current() + data_context.execution_options.verbose_progress = True + data_context.target_max_block_size = BLOCK_SIZE + + if is_flink: + data_context.is_budget_policy = False # Disable our policy. + else: + data_context.is_budget_policy = True + + ray.init(num_cpus=NUM_CPUS, object_store_memory=BUFFER_SIZE_LIMIT * BLOCK_SIZE) + + ds = ray.data.range(NUM_ROWS_TOTAL, override_num_blocks=NUM_TASKS) + + if is_flink: + ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK, concurrency=2) + ds = ds.map_batches(transform, batch_size=1, num_cpus=0.99, concurrency=3) + ds = ds.map_batches(consume, batch_size=1, num_cpus=1, concurrency=3) + else: + ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK) + ds = ds.map_batches(transform, batch_size=1, num_cpus=0.99) + ds = ds.map_batches(consume, batch_size=1, num_cpus=1) + + logger.record_start() + + start_time = time.time() + logger.log({"name": "execution_start"}) + for i, _ in enumerate(ds.iter_batches(batch_size=NUM_ROWS_PER_TASK)): + logger.log({"name": "iteration", "id": i}) + pass + end_time = time.time() + print(ds.stats()) + print(ray._private.internal_api.memory_summary(stats_only=True)) + print(f"Total time: {end_time - start_time:.4f}s") + timeline_utils.save_timeline_with_cpus_gpus(f"timeline_{'ray' if not is_flink else 'flink'}_three_cpu_stage.json", NUM_CPUS, 0) + ray.shutdown() + +if __name__ == "__main__": + main(is_flink=True) + # main(is_flink=False) diff --git a/three_stage_problem.py b/three_stage_problem.py index 41c9621b7794a..adaa8183afa06 100644 --- a/three_stage_problem.py +++ b/three_stage_problem.py @@ -104,5 +104,5 @@ def inference(batch): ray.shutdown() if __name__ == "__main__": - main(is_flink=True) + # main(is_flink=True) main(is_flink=False) diff --git a/timeline_utils.py b/timeline_utils.py index 9558b6fb5671b..973e60711bc3e 100644 --- a/timeline_utils.py +++ b/timeline_utils.py @@ -5,11 +5,15 @@ producer_task_name = "task::ReadRange->MapBatches(produce)" consumer_task_name = "task::MapBatches(consume)" inference_task_name = "task::MapBatches(inference)" +transform_task_name = "task::MapBatches(transform)" +write_task_name = "task::MapBatches(write)" COLORS = { producer_task_name: "rail_response", consumer_task_name: "cq_build_passed", - inference_task_name: "cq_build_failed" + inference_task_name: "cq_build_failed", + transform_task_name: "rail_load", + write_task_name: "cq_build_running" } def assign_slots(events, num_cpus, num_gpus): @@ -31,7 +35,7 @@ def i_to_slot_name(i): assigned = False # Find an available slot for the worker for i in range(num_cpus + num_gpus): - if event['cat'] in [producer_task_name, consumer_task_name] and i > num_cpus: + if event['cat'] in [producer_task_name, consumer_task_name, write_task_name] and i > num_cpus: continue if event['cat'] in [inference_task_name] and i < num_cpus: continue @@ -113,4 +117,4 @@ def read_and_save_timeline_with_cpus_gpus(addr: str, num_cpus: int, num_gpus: in print(f"Processed and saved modified data to {addr}") # if __name__ == "__main__": -# read_and_save_timeline_with_cpus_gpus('timeline_flink_three_stage.json', 8, 4) +# read_and_save_timeline_with_cpus_gpus('timeline_flink_four_stage.json', 8, 4) diff --git a/variable_duration_benchmark.py b/variable_duration_benchmark.py index bc0ecf97adde4..f7371718d30d3 100644 --- a/variable_duration_benchmark.py +++ b/variable_duration_benchmark.py @@ -4,7 +4,7 @@ import time import numpy as np -import ray.timeline_utils as timeline_utils +import timeline_utils import ray LOG_FILE = "variable_duration_benchmark.log" @@ -62,7 +62,7 @@ def produce(batch): def consume(batch): logger.log({"name": "consume", "id": int(batch["id"].item())}) - if int(batch["id"]) < NUM_ROWS_TOTAL / 2: + if int(batch["id"].item()) < NUM_ROWS_TOTAL / 2: time.sleep(TIME_UNIT) else: time.sleep(TIME_UNIT * 2) @@ -103,5 +103,5 @@ def consume(batch): ray.shutdown() if __name__ == "__main__": - main(is_flink=True) + # main(is_flink=True) main(is_flink=False)