Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Discussion] def Mapper may not be ideal #1444

Open
keunwoochoi opened this issue Feb 16, 2025 · 1 comment
Open

[Discussion] def Mapper may not be ideal #1444

keunwoochoi opened this issue Feb 16, 2025 · 1 comment

Comments

@keunwoochoi
Copy link

At 0.10.1, torchdata.nodes.Mapper is a function, not a class.

code

def Mapper(source: BaseNode[X], map_fn: Callable[[X], T]) -> "ParallelMapper[T]":
    """Returns a :class:`ParallelMapper` node with num_workers=0, which will execute map_fn in the current process/thread.

    Args:
        source (BaseNode[X]): The source node to map over.
        map_fn (Callable[[X], T]): The function to apply to each item from the source node.
    """
    return ParallelMapper(
        source=source,
        map_fn=map_fn,
        num_workers=0,
    )

I'm sure it has been discussed before making this decision. But I'd like to ask / discuss. An alternative would be something like this.

class Mapper(ParallelMapper[T]):
    """Mapper applies map_fn to each item from source sequentially.
    
    This is a simplified version of ParallelMapper that operates in a single thread,
    equivalent to ParallelMapper with num_workers=0.

    Args:
        source (BaseNode[X]): The source node to map over.
        map_fn (Callable[[X], T]): The function to apply to each item from the source node.
        snapshot_frequency (int): The frequency at which to snapshot the state of the source node. Default is 1.
        prebatch (Optional[int]): Optionally perform pre-batching of items from source before mapping.
          For small items, this may improve throughput at the expense of peak memory.
    """

    def __init__(
        self,
        source: BaseNode[X],
        map_fn: Callable[[X], T],
        snapshot_frequency: int = 1,
        prebatch: Optional[int] = None,
    ):
        # Call parent constructor with num_workers=0 and other params set to their simplest form
        # since we're doing sequential processing
        super().__init__(
            source=source,
            map_fn=map_fn,
            num_workers=0,  # Key difference - forces sequential processing
            in_order=True,  # Always in order for sequential processing
            method="thread",  # Method doesn't matter since num_workers=0
            multiprocessing_context=None,  # Not used since no parallel processing
            max_concurrent=None,  # Not used since no parallel processing
            snapshot_frequency=snapshot_frequency,
            prebatch=prebatch,
        )

    def reset(self, initial_state: Optional[Dict[str, Any]] = None):
        super().reset(initial_state)
        if initial_state is not None:
            self._it.reset(initial_state[self.IT_STATE_KEY])
        else:
            self._it.reset()

    def next(self) -> T:
        return next(self._it)  # type: ignore[arg-type, union-attr]

    def get_state(self) -> Dict[str, Any]:
        return {self.IT_STATE_KEY: self._it.state_dict()}  # type: ignore[union-attr]

In my quick thought, this seems pretty good. What am I missing in the decision making process? While I'm not sure about it, the downside of the current approach is pretty clear. Mapper looks like a class but it is not. It confused me, and it would confuse other users too. For example, I tried to subclass Mapper and surely it didn't work.

@andrewkho
Copy link
Contributor

HI @keunwoochoi I think this makes a lot of sense, generally I personally try to avoid inheritance whenever possible but IMO this is a reasonable use and we could land this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants