Skip to content

Conversation

@xinyuangui2
Copy link
Contributor

@xinyuangui2 xinyuangui2 commented Aug 11, 2025

This PR implements local mode support for Ray Train v2, allowing training functions to run in the same process when num_workers=0 is specified in the ScalingConfig. This feature provides a lightweight alternative for development, debugging, and single-node training scenarios.

Key Changes:

Core Infrastructure

  • Abstract TrainContext: Refactored TrainContext to be an abstract base class with concrete implementations for distributed and local modes
    • Created LocalTrainContext and DistributedTrainContext implementations
  • TrainFnUtils Implementations:
    • DistributedTrainFnUtils: Handles distributed training scenarios
    • LocalTrainFnUtils: Provides local mode implementation with simulated distributed training APIs
  • LocalController: New controller class for managing local mode training execution

API Enhancements

  • ScalingConfig: Added support for num_workers=0 with informational logging
  • DataParallelTrainer: Enhanced to detect local mode and route to appropriate controller

Framework Integration

  • PyTorch: Updated device detection to work correctly in local mode
  • Other Frameworks: Full support for TensorFlow, Lightning, LightGBM, JAX, Xgboost, and HuggingFace Transformers

Testing

  • Comprehensive Test Suite: Added extensive tests covering all supported frameworks in local mode
  • Edge Cases: Validation of metrics reporting, checkpointing, and dataset handling

Usage Example:

from ray.train import ScalingConfig
from ray.train.v2.api.data_parallel_trainer import DataParallelTrainer

def train_fn():
    # Your training code here
    ray.train.report({"accuracy": 0.95})

trainer = DataParallelTrainer(
    train_fn, 
    scaling_config=ScalingConfig(num_workers=0)  # Local mode
)
result = trainer.fit()

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: xgui <xgui@anyscale.com>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @xinyuangui2, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the flexibility of Ray's TorchTrainer by enabling it to operate in a standalone mode, independent of the full Ray Train distributed execution framework. This change is particularly beneficial for local development, debugging, or scenarios where the overhead of a distributed Ray cluster is not required. It introduces a new backend to manage local training execution, including integration with torchrun, and refactors key internal APIs to support both distributed and local training contexts seamlessly. The overall impact is a more versatile TorchTrainer that can be adopted for a wider range of PyTorch training workflows.

Highlights

  • Standalone TorchTrainer Execution: This PR introduces the capability for TorchTrainer to run training loops without requiring the full Ray Train distributed execution environment. This is achieved by adding a new running_without_ray_train flag to the TorchTrainer constructor.
  • New Local Training Backend: A new backend, TorchBackendWithoutRayTrain, has been implemented to manage the execution flow when TorchTrainer is used in this standalone mode. This backend handles local process group initialization (e.g., for torchrun) and executes the user-defined training function directly.
  • API Abstraction and Context Management: The core TrainContext and TrainFnUtils classes have been refactored into Abstract Base Classes (ABCs). New concrete implementations, TrainContextWithoutRayTrain and TorchWithoutRayTrainTrainFnUtils, provide the necessary context and utility functions tailored for non-Ray Train execution, ensuring consistent API usage.
  • Device Management Utilities: New get_device and get_devices utility functions have been added to ray.train.torch.train_loop_utils to correctly identify and return the appropriate torch.device (CPU or CUDA) based on whether the training is managed by Ray Train or running locally.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and valuable feature: the ability to run TorchTrainer without a full Ray Train environment. This is excellent for local development, debugging, and single-node training. The overall approach, which involves abstracting TrainContext and TrainFnUtils, is well-designed. I've identified a couple of critical bugs related to an undefined attribute and a missing return value, along with some suggestions to improve the design and maintainability. Once these issues are addressed, this will be a great addition to Ray Train.

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 changed the title [Train] TorchTrainer without ray train mode [Train] TorchTrainer without ray train controller mode Aug 13, 2025
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 changed the title [Train] TorchTrainer without ray train controller mode [Train] Run the train function locally when scaling_config.num_workers == 0 Aug 17, 2025
xinyuangui2 and others added 4 commits August 17, 2025 18:20
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 marked this pull request as ready for review August 18, 2025 19:08
@xinyuangui2 xinyuangui2 requested review from a team as code owners August 18, 2025 19:08
@ray-gardener ray-gardener bot added the train Ray Train Related Issue label Aug 19, 2025
Signed-off-by: xgui <xgui@anyscale.com>
xinyuangui2 and others added 2 commits August 20, 2025 12:17
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: xgui <xgui@anyscale.com>
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Sep 3, 2025
@matthewdeng matthewdeng merged commit 117a642 into ray-project:master Sep 3, 2025
6 of 7 checks passed
sampan-s-nayak pushed a commit to sampan-s-nayak/ray that referenced this pull request Sep 8, 2025
…roject#55487)

This PR implements local mode support for Ray Train v2, allowing
training functions to run in the same process when `num_workers=0` is
specified in the `ScalingConfig`. This feature provides a lightweight
alternative for development, debugging, and single-node training
scenarios.

### Key Changes:

#### Core Infrastructure
- **Abstract TrainContext**: Refactored `TrainContext` to be an abstract
base class with concrete implementations for distributed and local modes
- Created `LocalTrainContext` and `DistributedTrainContext`
implementations
- **TrainFnUtils Implementations**:
  - `DistributedTrainFnUtils`: Handles distributed training scenarios
- `LocalTrainFnUtils`: Provides local mode implementation with simulated
distributed training APIs
- **LocalController**: New controller class for managing local mode
training execution

#### API Enhancements
- **ScalingConfig**: Added support for `num_workers=0` with
informational logging
- **DataParallelTrainer**: Enhanced to detect local mode and route to
appropriate controller

#### Framework Integration
- **PyTorch**: Updated device detection to work correctly in local mode
- **Other Frameworks**: Full support for TensorFlow, Lightning,
LightGBM, JAX, Xgboost, and HuggingFace Transformers

#### Testing
- **Comprehensive Test Suite**: Added extensive tests covering all
supported frameworks in local mode
- **Edge Cases**: Validation of metrics reporting, checkpointing, and
dataset handling

### Usage Example:
```python
from ray.train import ScalingConfig
from ray.train.v2.api.data_parallel_trainer import DataParallelTrainer

def train_fn():
    # Your training code here
    ray.train.report({"accuracy": 0.95})

trainer = DataParallelTrainer(
    train_fn,
    scaling_config=ScalingConfig(num_workers=0)  # Local mode
)
result = trainer.fit()
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: matthewdeng <matthew.j.deng@gmail.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: sampan <sampan@anyscale.com>
jugalshah291 pushed a commit to jugalshah291/ray_fork that referenced this pull request Sep 11, 2025
…t#55689)

This PR moves the implementations of collectives to `TrainFnUtils`. This
would unblock the local mode that is introduced in
ray-project#55487

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
jugalshah291 pushed a commit to jugalshah291/ray_fork that referenced this pull request Sep 11, 2025
…roject#55487)

This PR implements local mode support for Ray Train v2, allowing
training functions to run in the same process when `num_workers=0` is
specified in the `ScalingConfig`. This feature provides a lightweight
alternative for development, debugging, and single-node training
scenarios.

### Key Changes:

#### Core Infrastructure
- **Abstract TrainContext**: Refactored `TrainContext` to be an abstract
base class with concrete implementations for distributed and local modes
- Created `LocalTrainContext` and `DistributedTrainContext`
implementations
- **TrainFnUtils Implementations**:
  - `DistributedTrainFnUtils`: Handles distributed training scenarios
- `LocalTrainFnUtils`: Provides local mode implementation with simulated
distributed training APIs
- **LocalController**: New controller class for managing local mode
training execution

#### API Enhancements
- **ScalingConfig**: Added support for `num_workers=0` with
informational logging
- **DataParallelTrainer**: Enhanced to detect local mode and route to
appropriate controller

#### Framework Integration
- **PyTorch**: Updated device detection to work correctly in local mode
- **Other Frameworks**: Full support for TensorFlow, Lightning,
LightGBM, JAX, Xgboost, and HuggingFace Transformers

#### Testing
- **Comprehensive Test Suite**: Added extensive tests covering all
supported frameworks in local mode
- **Edge Cases**: Validation of metrics reporting, checkpointing, and
dataset handling

### Usage Example:
```python
from ray.train import ScalingConfig
from ray.train.v2.api.data_parallel_trainer import DataParallelTrainer

def train_fn():
    # Your training code here
    ray.train.report({"accuracy": 0.95})

trainer = DataParallelTrainer(
    train_fn,
    scaling_config=ScalingConfig(num_workers=0)  # Local mode
)
result = trainer.fit()
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: matthewdeng <matthew.j.deng@gmail.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
wyhong3103 pushed a commit to wyhong3103/ray that referenced this pull request Sep 12, 2025
…roject#55487)

This PR implements local mode support for Ray Train v2, allowing
training functions to run in the same process when `num_workers=0` is
specified in the `ScalingConfig`. This feature provides a lightweight
alternative for development, debugging, and single-node training
scenarios.

### Key Changes:

#### Core Infrastructure
- **Abstract TrainContext**: Refactored `TrainContext` to be an abstract
base class with concrete implementations for distributed and local modes
- Created `LocalTrainContext` and `DistributedTrainContext`
implementations
- **TrainFnUtils Implementations**:
  - `DistributedTrainFnUtils`: Handles distributed training scenarios
- `LocalTrainFnUtils`: Provides local mode implementation with simulated
distributed training APIs
- **LocalController**: New controller class for managing local mode
training execution

#### API Enhancements
- **ScalingConfig**: Added support for `num_workers=0` with
informational logging
- **DataParallelTrainer**: Enhanced to detect local mode and route to
appropriate controller

#### Framework Integration
- **PyTorch**: Updated device detection to work correctly in local mode
- **Other Frameworks**: Full support for TensorFlow, Lightning,
LightGBM, JAX, Xgboost, and HuggingFace Transformers

#### Testing
- **Comprehensive Test Suite**: Added extensive tests covering all
supported frameworks in local mode
- **Edge Cases**: Validation of metrics reporting, checkpointing, and
dataset handling

### Usage Example:
```python
from ray.train import ScalingConfig
from ray.train.v2.api.data_parallel_trainer import DataParallelTrainer

def train_fn():
    # Your training code here
    ray.train.report({"accuracy": 0.95})

trainer = DataParallelTrainer(
    train_fn,
    scaling_config=ScalingConfig(num_workers=0)  # Local mode
)
result = trainer.fit()
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: matthewdeng <matthew.j.deng@gmail.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: yenhong.wong <yenhong.wong@grabtaxi.com>
matthewdeng added a commit that referenced this pull request Sep 18, 2025
…h torchrun (#56218)

This PR extends the Ray Train v2 local mode support (from #55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
zma2 pushed a commit to zma2/ray that referenced this pull request Sep 23, 2025
…h torchrun (ray-project#56218)

This PR extends the Ray Train v2 local mode support (from ray-project#55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
ray-project#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: Zhiqiang Ma <zhiqiang.ma@intel.com>
ZacAttack pushed a commit to ZacAttack/ray that referenced this pull request Sep 24, 2025
…h torchrun (ray-project#56218)

This PR extends the Ray Train v2 local mode support (from ray-project#55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
ray-project#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: zac <zac@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Sep 24, 2025
…h torchrun (#56218)

This PR extends the Ray Train v2 local mode support (from #55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
marcostephan pushed a commit to marcostephan/ray that referenced this pull request Sep 24, 2025
…h torchrun (ray-project#56218)

This PR extends the Ray Train v2 local mode support (from ray-project#55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
ray-project#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: Marco Stephan <marco@magic.dev>
elliot-barn pushed a commit that referenced this pull request Sep 27, 2025
…h torchrun (#56218)

This PR extends the Ray Train v2 local mode support (from #55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
dstrodtman pushed a commit to dstrodtman/ray that referenced this pull request Oct 6, 2025
…t#55689)

This PR moves the implementations of collectives to `TrainFnUtils`. This
would unblock the local mode that is introduced in
ray-project#55487

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
dstrodtman pushed a commit that referenced this pull request Oct 6, 2025
This PR implements local mode support for Ray Train v2, allowing
training functions to run in the same process when `num_workers=0` is
specified in the `ScalingConfig`. This feature provides a lightweight
alternative for development, debugging, and single-node training
scenarios.

### Key Changes:

#### Core Infrastructure
- **Abstract TrainContext**: Refactored `TrainContext` to be an abstract
base class with concrete implementations for distributed and local modes
- Created `LocalTrainContext` and `DistributedTrainContext`
implementations
- **TrainFnUtils Implementations**:
  - `DistributedTrainFnUtils`: Handles distributed training scenarios
- `LocalTrainFnUtils`: Provides local mode implementation with simulated
distributed training APIs
- **LocalController**: New controller class for managing local mode
training execution

#### API Enhancements
- **ScalingConfig**: Added support for `num_workers=0` with
informational logging
- **DataParallelTrainer**: Enhanced to detect local mode and route to
appropriate controller

#### Framework Integration
- **PyTorch**: Updated device detection to work correctly in local mode
- **Other Frameworks**: Full support for TensorFlow, Lightning,
LightGBM, JAX, Xgboost, and HuggingFace Transformers

#### Testing
- **Comprehensive Test Suite**: Added extensive tests covering all
supported frameworks in local mode
- **Edge Cases**: Validation of metrics reporting, checkpointing, and
dataset handling

### Usage Example:
```python
from ray.train import ScalingConfig
from ray.train.v2.api.data_parallel_trainer import DataParallelTrainer

def train_fn():
    # Your training code here
    ray.train.report({"accuracy": 0.95})

trainer = DataParallelTrainer(
    train_fn,
    scaling_config=ScalingConfig(num_workers=0)  # Local mode
)
result = trainer.fit()
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: matthewdeng <matthew.j.deng@gmail.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
dstrodtman pushed a commit that referenced this pull request Oct 6, 2025
…h torchrun (#56218)

This PR extends the Ray Train v2 local mode support (from #55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
snorkelopstesting2-coder pushed a commit to snorkel-marlin-repos/ray-project_ray_pr_55487_845d0704-f603-4119-a84c-c718ea697641 that referenced this pull request Oct 11, 2025
snorkelopstesting2-coder added a commit to snorkel-marlin-repos/ray-project_ray_pr_55487_845d0704-f603-4119-a84c-c718ea697641 that referenced this pull request Oct 11, 2025
… (num_workers=0)

Merged from original PR #55487
Original: ray-project/ray#55487
snorkelopstesting2-coder pushed a commit to snorkel-marlin-repos/ray-project_ray_pr_55487_ad8ef9be-5d2d-4f36-ab8e-33ca20d8c314 that referenced this pull request Oct 11, 2025
snorkelopstesting2-coder added a commit to snorkel-marlin-repos/ray-project_ray_pr_55487_ad8ef9be-5d2d-4f36-ab8e-33ca20d8c314 that referenced this pull request Oct 11, 2025
… (num_workers=0)

Merged from original PR #55487
Original: ray-project/ray#55487
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
…h torchrun (ray-project#56218)

This PR extends the Ray Train v2 local mode support (from ray-project#55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
ray-project#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
snorkelopstesting3-bot pushed a commit to snorkel-marlin-repos/ray-project_ray_pr_55487_ec2971a4-aa1b-47c7-83d4-f4d2fe4a2a1e that referenced this pull request Oct 22, 2025
snorkelopstesting2-coder added a commit to snorkel-marlin-repos/ray-project_ray_pr_55487_ec2971a4-aa1b-47c7-83d4-f4d2fe4a2a1e that referenced this pull request Oct 22, 2025
… (num_workers=0)

Merged from original PR #55487
Original: ray-project/ray#55487
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…t#55689)

This PR moves the implementations of collectives to `TrainFnUtils`. This
would unblock the local mode that is introduced in
ray-project#55487

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…roject#55487)

This PR implements local mode support for Ray Train v2, allowing
training functions to run in the same process when `num_workers=0` is
specified in the `ScalingConfig`. This feature provides a lightweight
alternative for development, debugging, and single-node training
scenarios.


### Key Changes:

#### Core Infrastructure
- **Abstract TrainContext**: Refactored `TrainContext` to be an abstract
base class with concrete implementations for distributed and local modes
- Created `LocalTrainContext` and `DistributedTrainContext`
implementations
- **TrainFnUtils Implementations**: 
  - `DistributedTrainFnUtils`: Handles distributed training scenarios
- `LocalTrainFnUtils`: Provides local mode implementation with simulated
distributed training APIs
- **LocalController**: New controller class for managing local mode
training execution

#### API Enhancements
- **ScalingConfig**: Added support for `num_workers=0` with
informational logging
- **DataParallelTrainer**: Enhanced to detect local mode and route to
appropriate controller

#### Framework Integration
- **PyTorch**: Updated device detection to work correctly in local mode
- **Other Frameworks**: Full support for TensorFlow, Lightning,
LightGBM, JAX, Xgboost, and HuggingFace Transformers

#### Testing
- **Comprehensive Test Suite**: Added extensive tests covering all
supported frameworks in local mode
- **Edge Cases**: Validation of metrics reporting, checkpointing, and
dataset handling

### Usage Example:
```python
from ray.train import ScalingConfig
from ray.train.v2.api.data_parallel_trainer import DataParallelTrainer

def train_fn():
    # Your training code here
    ray.train.report({"accuracy": 0.95})

trainer = DataParallelTrainer(
    train_fn, 
    scaling_config=ScalingConfig(num_workers=0)  # Local mode
)
result = trainer.fit()
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: matthewdeng <matthew.j.deng@gmail.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…h torchrun (ray-project#56218)

This PR extends the Ray Train v2 local mode support (from ray-project#55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
ray-project#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
…h torchrun (ray-project#56218)

This PR extends the Ray Train v2 local mode support (from ray-project#55487) to
enable users to launch multiple local mode processes using torchrun for
PyTorch distributed training. **With this new feature, users can easily
switch between torchrun and Ray Train without modifying their training
code.**

<img width="1249" height="811" alt="image"
src="https://github.com/user-attachments/assets/5d998b5e-8f58-425a-b535-d4f4d0b64a5c"
/>

### Note

Ray data on multiple processes is not supported. Might need to wait for
ray-project#55114 or similar components.

## Key Changes

### Multi-Process Local Mode Support
- **`LocalTorchController`**: New controller that detects torchrun env
variables and sets contexts accordingly
- **Torchrun Integration**: Users can now launch multiple local mode
processes using `torchrun` command
- **Environment Detection**: Automatically detects torchrun environment
variables and initializes distributed training

## Usage Example

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray
from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.v2.api.config import FailureConfig
import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# Configuration for local mode
use_gpu = True
scaling_config = ScalingConfig(num_workers=0, use_gpu=use_gpu)  # Local mode
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

# Note: Ray Data not supported with multiple processes in local mode
# For multi-process training, use PyTorch DataLoader as shown above

# Initialize the Trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=scaling_config,
    run_config=run_config,
)

# Train the model
result = trainer.fit()
```

### Running Options:

```bash
# Option 1: Single process local mode
RAY_TRAIN_V2_ENABLED=1 python test.py

# Option 2: Multi-process local mode with torchrun
RAY_TRAIN_V2_ENABLED=1 torchrun --standalone --nnodes=1 --nproc-per-node=4 test.py

# Option 3: Switch to distributed Ray Train (change num_workers=4)
# Same training code works across all modes!
```

---------

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests train Ray Train Related Issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants