diff --git a/python/ray/data/_internal/block_list.py b/python/ray/data/_internal/block_list.py index 5425fa66ac4ac..d04c5ec658a53 100644 --- a/python/ray/data/_internal/block_list.py +++ b/python/ray/data/_internal/block_list.py @@ -8,9 +8,7 @@ class BlockList: """A list of blocks that may be computed or pending computation. - In the basic version of BlockList, all blocks are known ahead of time. In - LazyBlockList, blocks are not yet computed, so the number of blocks may - change after execution due to block splitting. + All blocks are known ahead of time """ def __init__( @@ -69,7 +67,6 @@ def get_blocks(self) -> List[ObjectRef[Block]]: The length of this iterator is not known until execution. """ self._check_if_cleared() - # Overriden in LazyBlockList for bulk evaluation. return list(self._blocks) def get_blocks_with_metadata(self) -> List[Tuple[ObjectRef[Block], BlockMetadata]]: @@ -78,7 +75,7 @@ def get_blocks_with_metadata(self) -> List[Tuple[ObjectRef[Block], BlockMetadata Prefer calling this instead of the iter form for performance if you don't need lazy evaluation. """ - self.get_blocks() # Force bulk evaluation in LazyBlockList. + self.get_blocks() return list(self.iter_blocks_with_metadata()) def iter_blocks_with_metadata( diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index 953d924027978..44fde949b3a6f 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -11,21 +11,11 @@ PhysicalOperator, RefBundle, ) -from ray.data._internal.lazy_block_list import LazyBlockList -from ray.data._internal.logical.interfaces.logical_plan import LogicalPlan -from ray.data._internal.logical.operators.read_operator import Read from ray.data._internal.logical.optimizers import get_execution_plan -from ray.data._internal.logical.rules.set_read_parallelism import ( - compute_additional_split_factor, -) from ray.data._internal.logical.util import record_operators_usage from ray.data._internal.plan import ExecutionPlan -from ray.data._internal.planner.plan_read_op import ( - apply_output_blocks_handling_to_read_task, -) from ray.data._internal.stats import DatasetStats from ray.data.block import Block, BlockMetadata, List -from ray.data.context import DataContext from ray.types import ObjectRef # Warn about tasks larger than this. @@ -111,55 +101,6 @@ def execute_to_legacy_block_list( return block_list -def get_legacy_lazy_block_list_read_only( - plan: ExecutionPlan, -) -> LazyBlockList: - """For a read-only plan, construct a LazyBlockList with ReadTasks from the - input Datasource or Reader. Note that the plan and the underlying ReadTasks - are not executed, only their known metadata is fetched. - - Args: - plan: The legacy plan to execute. - - Returns: - The output as a legacy LazyBlockList. - """ - assert plan.is_read_only(), "This function only supports read-only plans." - assert isinstance(plan._logical_plan, LogicalPlan) - read_logical_op = plan._logical_plan.dag - assert isinstance(read_logical_op, Read) - - # In the full dataset execution, the logic in ApplyAdditionalSplitToOutputBlocks - # is normally executed as part of the MapOperator created in the - # LogicalPlan -> PhysicalPlan plan translation. In this case, since we - # get the ReadTasks directly from the Datasource or Reader, - # we need to manually apply this logic in order to update the ReadTasks. - ctx = DataContext.get_current() - (parallelism, _, estimated_num_blocks, k,) = compute_additional_split_factor( - read_logical_op._datasource_or_legacy_reader, - read_logical_op._parallelism, - read_logical_op._mem_size, - ctx.target_max_block_size, - cur_additional_split_factor=None, - ) - read_tasks = read_logical_op._datasource_or_legacy_reader.get_read_tasks( - parallelism - ) - for read_task in read_tasks: - apply_output_blocks_handling_to_read_task(read_task, k) - - block_list = LazyBlockList( - read_tasks, - read_logical_op.name, - ray_remote_args=read_logical_op._ray_remote_args, - owned_by_consumer=False, - ) - # Update the estimated number of blocks after applying optimizations - # and fetching metadata (e.g. SetReadParallelismRule). - block_list._estimated_num_blocks = estimated_num_blocks - return block_list - - def _get_execution_dag( executor: Executor, plan: ExecutionPlan, diff --git a/python/ray/data/_internal/lazy_block_list.py b/python/ray/data/_internal/lazy_block_list.py deleted file mode 100644 index bdc7f66973630..0000000000000 --- a/python/ray/data/_internal/lazy_block_list.py +++ /dev/null @@ -1,431 +0,0 @@ -import uuid -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union - -import ray -from ray.data._internal.block_list import BlockList -from ray.data._internal.memory_tracing import trace_allocation -from ray.data._internal.progress_bar import ProgressBar -from ray.data._internal.remote_fn import cached_remote_fn -from ray.data._internal.stats import DatasetStats, _get_or_create_stats_actor -from ray.data.block import ( - Block, - BlockAccessor, - BlockExecStats, - BlockMetadata, - BlockPartitionMetadata, - MaybeBlockPartition, -) -from ray.data.context import DataContext -from ray.data.datasource import ReadTask -from ray.types import ObjectRef - - -class LazyBlockList(BlockList): - """A BlockList that submits tasks lazily on-demand. - - This BlockList is used for implementing read operations (e.g., to avoid - needing to read all files of a Dataset when the user is just wanting to - .take() the first few rows or view the schema). - """ - - def __init__( - self, - tasks: List[ReadTask], - read_op_name: Optional[str] = None, - block_partition_refs: Optional[List[ObjectRef[MaybeBlockPartition]]] = None, - block_partition_meta_refs: Optional[List[ObjectRef[BlockMetadata]]] = None, - cached_metadata: Optional[List[BlockPartitionMetadata]] = None, - ray_remote_args: Optional[Dict[str, Any]] = None, - stats_uuid: str = None, - *, - owned_by_consumer: bool, - ): - """Create a LazyBlockList on the provided read tasks. - - Args: - tasks: The read tasks that will produce the blocks of this lazy block list. - _read_op_name: An optional name for the read operator, derived from the - underlying Datasource - block_partition_refs: An optional list of already submitted read task - futures (i.e. block partition refs). This should be the same length as - the tasks argument. - block_partition_meta_refs: An optional list of block partition metadata - refs. This should be the same length as the tasks argument. - cached_metadata: An optional list of already computed AND fetched metadata. - This serves as a cache of fetched block metadata. Note that each entry - in cached_metadata represents the list of output blocks metadata per - the read task. One task can produce multiple output blocks. - ray_remote_args: Ray remote arguments for the read tasks. - stats_uuid: UUID for the dataset stats, used to group and fetch read task - stats. If not provided, a new UUID will be created. - """ - self._tasks = tasks - self._read_op_name = read_op_name - self._num_blocks = len(self._tasks) - if stats_uuid is None: - stats_uuid = uuid.uuid4() - self._stats_uuid = stats_uuid - self._execution_started = False - self._remote_args = ray_remote_args or {} - # Block partition metadata that have already been computed and fetched. - if cached_metadata is not None: - self._cached_metadata = cached_metadata - else: - self._cached_metadata = [None] * len(tasks) - # Block partition metadata that have already been computed. - if block_partition_meta_refs is not None: - self._block_partition_meta_refs = block_partition_meta_refs - else: - self._block_partition_meta_refs = [None] * len(tasks) - # Block partitions that have already been computed. - if block_partition_refs is not None: - self._block_partition_refs = block_partition_refs - else: - self._block_partition_refs = [None] * len(tasks) - assert len(tasks) == len(self._block_partition_refs), ( - tasks, - self._block_partition_refs, - ) - assert len(tasks) == len(self._block_partition_meta_refs), ( - tasks, - self._block_partition_meta_refs, - ) - assert len(tasks) == len(self._cached_metadata), ( - tasks, - self._cached_metadata, - ) - # Whether the block list is owned by consuming APIs, and if so it can be - # eagerly deleted after read by the consumer. - self._owned_by_consumer = owned_by_consumer - self._stats_actor = _get_or_create_stats_actor() - # This field can be set to indicate the number of estimated output blocks, - # since each read task may produce multiple output blocks after splitting. - self._estimated_num_blocks = None - - def __repr__(self): - return f"LazyBlockList(owned_by_consumer={self._owned_by_consumer})" - - def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]: - """Get the metadata for all blocks.""" - if all(meta is not None for meta in self._cached_metadata): - # Always return fetched metadata if we already have it. - metadata = self._flatten_metadata(self._cached_metadata) - elif not fetch_if_missing: - metadata = [ - m if m is not None else [t.get_metadata()] - for m, t in zip(self._cached_metadata, self._tasks) - ] - metadata = self._flatten_metadata(metadata) - else: - _, metadata = self._get_blocks_with_metadata() - return metadata - - def stats(self) -> DatasetStats: - """Create DatasetStats for this LazyBlockList.""" - return DatasetStats( - # Make a copy of metadata, as the DatasetStats may mutate it in-place. - metadata={"Read": self.get_metadata(fetch_if_missing=False).copy()}, - parent=None, - needs_stats_actor=True, - stats_uuid=self._stats_uuid, - ) - - def copy(self) -> "LazyBlockList": - return LazyBlockList( - self._tasks.copy(), - read_op_name=self._read_op_name, - block_partition_refs=self._block_partition_refs.copy(), - block_partition_meta_refs=self._block_partition_meta_refs.copy(), - cached_metadata=self._cached_metadata, - ray_remote_args=self._remote_args.copy(), - owned_by_consumer=self._owned_by_consumer, - stats_uuid=self._stats_uuid, - ) - - def clear(self): - """Clears all object references (block partitions and base block partitions) - from this lazy block list. - """ - self._block_partition_refs = [None for _ in self._block_partition_refs] - self._block_partition_meta_refs = [ - None for _ in self._block_partition_meta_refs - ] - self._cached_metadata = [None for _ in self._cached_metadata] - self._stats_actor = None - - def is_cleared(self) -> bool: - return all(ref is None for ref in self._block_partition_refs) - - def _check_if_cleared(self): - pass # LazyBlockList can always be re-computed. - - def get_blocks(self) -> List[ObjectRef[Block]]: - """Bulk version of iter_blocks(). - - Prefer calling this instead of the iter form for performance if you - don't need lazy evaluation. - """ - blocks, _ = self._get_blocks_with_metadata() - return blocks - - def get_blocks_with_metadata(self) -> List[Tuple[ObjectRef[Block], BlockMetadata]]: - """Bulk version of iter_blocks_with_metadata(). - - Prefer calling this instead of the iter form for performance if you - don't need lazy evaluation. - """ - blocks, metadata = self._get_blocks_with_metadata() - return list(zip(blocks, metadata)) - - def _get_blocks_with_metadata( - self, - ) -> Tuple[List[ObjectRef[Block]], List[BlockMetadata]]: - """Get all underlying block futures and concrete metadata. - - This will block on the completion of the underlying read tasks and will fetch - all block metadata outputted by those tasks. - """ - block_refs, meta_refs = [], [] - for block_ref, meta_ref in self._iter_block_partition_refs(): - block_refs.append(block_ref) - meta_refs.append(meta_ref) - # If block splitting is enabled, fetch the partitions through generator. - read_progress_bar = ProgressBar("Read progress", total=len(block_refs)) - # Handle duplicates (e.g. due to unioning the same dataset). - unique_refs = list(set(block_refs)) - generators = read_progress_bar.fetch_until_complete(unique_refs) - - ref_to_blocks = {} - ref_to_metadata = {} - for ref, generator in zip(unique_refs, generators): - refs_list = list(generator) - meta = ray.get(refs_list.pop(-1)) - ref_to_blocks[ref] = refs_list - ref_to_metadata[ref] = meta - - output_block_refs = [] - for idx, ref in enumerate(block_refs): - output_block_refs += ref_to_blocks[ref] - self._cached_metadata[idx] = ref_to_metadata[ref] - return output_block_refs, self._flatten_metadata(self._cached_metadata) - - def compute_to_blocklist(self) -> BlockList: - """Launch all tasks and return a concrete BlockList.""" - blocks, metadata = self._get_blocks_with_metadata() - return BlockList(blocks, metadata, owned_by_consumer=self._owned_by_consumer) - - def ensure_metadata_for_first_block(self) -> Optional[BlockMetadata]: - """Ensure that the metadata is fetched and set for the first block. - - This will only block execution in order to fetch the post-read metadata for the - first block if the pre-read metadata for the first block has no schema. - - Returns: - None if the block list is empty, the metadata for the first block otherwise. - """ - if not self._tasks: - return None - metadata = self._tasks[0].get_metadata() - if metadata.schema is not None: - # If pre-read schema is not null, we consider it to be "good enough" and use - # it. - return metadata - # Otherwise, we trigger computation (if needed), wait until the task completes, - # and fetch the block partition metadata. - try: - block_partition_ref, metadata_ref = next(self._iter_block_partition_refs()) - except (StopIteration, ValueError): - # Dataset is empty (no blocks) or was manually cleared. - pass - else: - # This blocks until the underlying read task is finished. - generator = ray.get(block_partition_ref) - blocks_ref = list(generator) - metadata = ray.get(blocks_ref[-1]) - self._cached_metadata[0] = metadata - return metadata - - def iter_blocks_with_metadata( - self, - block_for_metadata: bool = False, - ) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]: - """Iterate over the blocks along with their metadata. - - Note that, if block_for_metadata is False (default), this iterator returns - pre-read metadata from the ReadTasks given to this LazyBlockList so it doesn't - have to block on the execution of the read tasks. Therefore, the metadata may be - under-specified, e.g. missing schema or the number of rows. If fully-specified - block metadata is required, pass block_for_metadata=True. When dynamic block - splitting is enabled, always block on the execution of the read tasks. - - The length of this iterator is not known until execution. - - Args: - block_for_metadata: Whether we should block on the execution of read tasks - in order to obtain fully-specified block metadata. - - Returns: - An iterator of block references and the corresponding block metadata. - """ - outer = self - - class Iter: - def __init__(self): - self._base_iter = outer._iter_block_partition_refs() - self._pos = -1 - self._buffer = [] - - def __iter__(self): - return self - - def __next__(self): - while not self._buffer: - self._pos += 1 - generator_ref, _ = next(self._base_iter) - generator = ray.get(generator_ref) - refs = list(generator) - # This blocks until the read task completes, returning - # fully-specified block metadata for each output block. - metadata = ray.get(refs.pop(-1)) - assert len(metadata) == len(refs) - for block_ref, meta in zip(refs, metadata): - self._buffer.append((block_ref, meta)) - return self._buffer.pop(0) - - return Iter() - - def _iter_block_partition_refs( - self, - ) -> Iterator[ - Tuple[ - ObjectRef[MaybeBlockPartition], - Union[None, ObjectRef[BlockMetadata]], - ] - ]: - """Iterate over the block futures and their corresponding metadata futures. - - This does NOT block on the execution of each submitted task. - """ - outer = self - - class Iter: - def __init__(self): - self._pos = -1 - - def __iter__(self): - return self - - def __next__(self): - self._pos += 1 - if self._pos < len(outer._tasks): - return outer._get_or_compute(self._pos) - raise StopIteration - - return Iter() - - def _get_or_compute( - self, - i: int, - ) -> Tuple[ObjectRef[MaybeBlockPartition], Union[None, ObjectRef[BlockMetadata]]]: - assert i < len(self._tasks), i - # Check if we need to compute more block_partition_refs. - if not self._block_partition_refs[i]: - # Exponentially increase the number computed per batch. - for j in range(max(i + 1, i * 2)): - if j >= len(self._block_partition_refs): - break - if not self._block_partition_refs[j]: - ( - self._block_partition_refs[j], - self._block_partition_meta_refs[j], - ) = self._submit_task(j) - assert self._block_partition_refs[i], self._block_partition_refs - trace_allocation( - self._block_partition_refs[i], f"LazyBlockList.get_or_compute({i})" - ) - return self._block_partition_refs[i], self._block_partition_meta_refs[i] - - def _submit_task( - self, task_idx: int - ) -> Tuple[ObjectRef[MaybeBlockPartition], Union[None, ObjectRef[BlockMetadata]]]: - """Submit the task with index task_idx. - - NOTE: When dynamic block splitting is enabled, returns - Tuple[ObjectRef[DynamicObjectRefGenerator], None] instead of - Tuple[ObjectRef[Block], ObjectRef[BlockMetadata]], and the blocks metadata will - be fetched as the last element in DynamicObjectRefGenerator. - """ - if self._stats_actor is None: - self._stats_actor = _get_or_create_stats_actor() - stats_actor = self._stats_actor - if not self._execution_started: - # NOTE: We should wait for `record_start` to finish here. - # Otherwise, `record_task` may arrive before `record_start`, and - # the stats will be lost. - ray.get(stats_actor.record_start.remote(self._stats_uuid)) - self._execution_started = True - task = self._tasks[task_idx] - return ( - cached_remote_fn(_execute_read_task_split) - .options(num_returns="dynamic", **self._remote_args) - .remote( - i=task_idx, - task=task, - context=DataContext.get_current(), - stats_uuid=self._stats_uuid, - stats_actor=stats_actor, - ), - None, - ) - - def _num_computed(self) -> int: - i = 0 - for b in self._block_partition_refs: - if b is not None: - i += 1 - return i - - def _flatten_metadata( - self, metadata: List[BlockPartitionMetadata] - ) -> List[BlockMetadata]: - """Flatten the metadata of computed blocks into a list. - - This is required because dynamic block splitting can produce multiple output - blocks from each task. - """ - return [meta for meta_list in metadata for meta in meta_list] - - -def _execute_read_task_split( - i: int, - task: ReadTask, - context: DataContext, - stats_uuid: str, - stats_actor: ray.actor.ActorHandle, -) -> Iterable[Union[Block, List[BlockMetadata]]]: - """Execute read task with dynamic block splitting. - - Returns an Iterable of blocks followed by their metadata. - Example of return value for 3 blocks: - (Block1, Block2, Block3, [BlockMetadata1, BlockMetadata2, BlockMetadata3]) - """ - DataContext._set_current(context) - - # Execute the task. - blocks = task() - - input_files = task.get_metadata().input_files - blocks_metadata = [] - block_exec_stats = BlockExecStats.builder() - for block in blocks: - metadata = BlockAccessor.for_block(block).get_metadata( - input_files=input_files, - exec_stats=block_exec_stats.build(), - ) - yield block - blocks_metadata.append(metadata) - block_exec_stats = BlockExecStats.builder() - - stats_actor.record_task.remote(stats_uuid, i, blocks_metadata) - # Return metadata of blocks as a list at the end. - yield blocks_metadata diff --git a/python/ray/data/_internal/planner/plan_read_op.py b/python/ray/data/_internal/planner/plan_read_op.py index 02fead6909e2c..49c025cc904fe 100644 --- a/python/ray/data/_internal/planner/plan_read_op.py +++ b/python/ray/data/_internal/planner/plan_read_op.py @@ -1,4 +1,4 @@ -from typing import Iterable, List, Optional +from typing import Iterable, List import ray import ray.cloudpickle as cloudpickle @@ -8,7 +8,6 @@ from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.map_transformer import ( - ApplyAdditionalSplitToOutputBlocks, BlockMapTransformFn, BuildOutputBlocksMapTransformFn, MapTransformer, @@ -17,7 +16,6 @@ from ray.data._internal.logical.operators.read_operator import Read from ray.data._internal.util import _warn_on_high_parallelism, call_with_retry from ray.data.block import Block -from ray.data.context import DataContext from ray.data.datasource.datasource import ReadTask TASK_SIZE_WARN_THRESHOLD_BYTES = 100000 @@ -115,33 +113,3 @@ def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]: compute_strategy=TaskPoolStrategy(op._concurrency), ray_remote_args=op._ray_remote_args, ) - - -def apply_output_blocks_handling_to_read_task( - read_task: ReadTask, - additional_split_factor: Optional[int], -): - """Patch the read task and apply output blocks handling logic. - This function is only used for compability with the legacy LazyBlockList code path. - """ - transform_fns: List[MapTransformFn] = [] - transform_fns.append(BuildOutputBlocksMapTransformFn.for_blocks()) - - if additional_split_factor is not None: - transform_fns.append( - ApplyAdditionalSplitToOutputBlocks(additional_split_factor) - ) - - map_transformer = MapTransformer(transform_fns) - ctx = DataContext.get_current() - map_transformer.set_target_max_block_size(ctx.target_max_block_size) - - original_read_fn = read_task._read_fn - - def new_read_fn(): - blocks = original_read_fn() - # We pass None as the TaskContext because we don't have access to it here. - # This is okay because the transform functions don't use the TaskContext. - return map_transformer.apply_transform(blocks, None) # type: ignore - - read_task._read_fn = new_read_fn