Skip to content

Conversation

@nrghosh
Copy link
Contributor

@nrghosh nrghosh commented Oct 30, 2025

Refactor data.llm processor configs to support nested stage configuration

Summary

Refactors Ray Data LLM processor configuration from flat boolean flags (apply_chat_template, tokenize, detokenize, has_image) to nested, typed stage configs. This enables per-stage control over batch_size, concurrency, runtime_env, num_cpus, and memory while maintaining backward compatibility with legacy boolean flags.

What's Changing

Before: All stages (chat template, tokenization, engine, detokenization) inherit the same processor-level defaults (batch_size, concurrency, runtime_env). Configuration is done via flat boolean flags with no per-stage customization.

After: Each stage can be configured independently via nested StageConfig objects. Processor-level defaults still apply, but stages can override them individually. Legacy boolean flags continue to work (with deprecation warnings).

How It Works

  1. Stage Config Models: New Pydantic models (ChatTemplateStageConfig, TokenizerStageConfig, etc.) define per-stage settings with optional overrides.

  2. Flexible Input: Processor configs accept stage configs in three forms:

    • bool: tokenize_stage=True → enabled with processor defaults
    • dict: tokenize_stage={"batch_size": 128} → enabled with overrides
    • StageConfig: tokenize_stage=TokenizerStageConfig(batch_size=128) → typed config
  3. Resolution & Merging: The resolve_stage_config() function:

    • Converts bool/dict/StageConfig → typed StageConfig instance
    • Merges processor-level defaults (batch_size, concurrency, runtime_env, model_source) into stage config
    • Stage-specific overrides take precedence over processor defaults
    • Creates a copy to prevent mutation when reusing configs across processors
  4. Backward Compatibility: A root_validator automatically coerces legacy boolean flags (apply_chat_template, tokenize, etc.) into the new nested format and emits deprecation warnings.

Problem

Current config uses flat boolean flags (apply_chat_template, tokenize, detokenize, has_image) with processor-level defaults. This prevents:

  • Per-stage resource tuning (e.g., different batch_size for tokenization vs engine)
  • Clear ownership of stage-specific parameters
  • Extensibility for new stages without modifying shared config classes
  • Alignment with Ray Serve's nested Pydantic config pattern

Solution

Introduce typed StageConfig models per stage, extend OfflineProcessorConfig to accept nested configs (bool | dict | StageConfig), and update builders to resolve and merge stage configs with processor defaults.

Architecture Diagram

BEFORE (Flat Config):
┌─────────────────────────────────────────┐
│ vLLMEngineProcessorConfig              │
│ ├─ apply_chat_template: bool          │
│ ├─ tokenize: bool                     │
│ ├─ detokenize: bool                   │
│ ├─ batch_size: int (shared)           │
│ └─ concurrency: int (shared)           │
└─────────────────────────────────────────┘
              │
              ▼
    All stages inherit same values

AFTER (Nested Config):
┌─────────────────────────────────────────┐
│ vLLMEngineProcessorConfig              │
│ ├─ chat_template_stage:                │
│ │   ├─ enabled: bool                   │
│ │   ├─ batch_size: Optional[int]       │
│ │   ├─ concurrency: Optional[int]      │
│ │   └─ chat_template: Optional[str]    │
│ ├─ tokenize_stage:                     │
│ │   ├─ enabled: bool                   │
│ │   ├─ batch_size: Optional[int]       │
│ │   └─ concurrency: Optional[int]      │
│ ├─ batch_size: int (processor default)│
│ └─ concurrency: int (processor default)│
└─────────────────────────────────────────┘
              │
              ▼
    resolve_stage_config() merges:
    stage override OR processor default

Changes

1. New StageConfig Models (stages/configs.py)

  • ChatTemplateStageConfig, TokenizerStageConfig, DetokenizeStageConfig, PrepareImageStageConfig
  • Base class with enabled, batch_size, concurrency, runtime_env
  • resolve_stage_config() function converts bool|dict|StageConfig → typed config with merged defaults

2. Extended Processor Config (processor/base.py)

  • Add nested fields: chat_template_stage, tokenize_stage, detokenize_stage, prepare_image_stage
  • root_validator coerces legacy booleans → stage configs
  • Emit deprecation warnings when legacy fields used

3. Builder Updates (processor/vllm_engine_proc.py, processor/sglang_engine_proc.py)

  • Use resolve_stage_config() for all stages
  • Merge stage-specific overrides with processor defaults
  • Normalize concurrency (int → tuple) per stage

Migration

Legacy code (still works, emits warnings):

config = vLLMEngineProcessorConfig(
    model_source="...",
    apply_chat_template=True,  # Deprecated
    tokenize=True,              # Deprecated
)

New code (nested configs):

config = vLLMEngineProcessorConfig(
    model_source="...",
    chat_template_stage=ChatTemplateStageConfig(batch_size=128),
    tokenize_stage={"enabled": True, "concurrency": 2},
)

Benefits

  1. Per-stage control: Tune batch_size, concurrency, runtime_env independently per stage
  2. Type safety: Pydantic validation catches config errors early
  3. YAML-friendly: Nested configs serialize cleanly
  4. Backward compatible: Legacy flags work with deprecation warnings
  5. Extensible: Easy to add new stages without modifying processor config

Implementation Status

  • Stage 1: StageConfig scaffolding + OfflineProcessorConfig extension
  • Stage 2: Resolver function + vLLM builder updates
  • Stage 3: SGLang processor updates
  • Stage 4: Deprecation warnings
  • Stage 5: Docstring updates
  • Stage 6: Tests

Related

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Stage config resolver and merging
- resolver function
- update vLLM builder to merge stage configs with processor defaults

Changes:
- Add resolve_stage_config() function in stages/configs.py to convert
  bool|dict|StageConfig -> typed StageConfig with processor defaults merged
- Update build_vllm_engine_processor() to use resolver for all stages:
  - PrepareImageStage, ChatTemplateStage, TokenizeStage, DetokenizeStage
- Each stage now respects per-stage overrides for:
  - batch_size: stage-specific override, falls back to processor default
  - concurrency: stage-specific override (normalized int -> tuple), falls back
  - runtime_env: stage-specific override, falls back to processor default
  - model: stage-specific model override for tokenizer/chat template stages
- Keep backward compatibility

This unlocks per-stage resource tuning while preserving the processor-first UX/API

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
… config resolver

Apply resolver and merge logic to SGLang processor for parity with vLLM.

Changes:
- Update build_sglang_engine_processor() to use resolve_stage_config() for:
  - ChatTemplateStage, TokenizeStage, DetokenizeStage
- Each stage respects per-stage overrides (batch_size, concurrency, runtime_env, model)
- Maintains backward compatibility with legacy boolean flags
- Consistent behavior with vLLM processor

Note: ServeDeployment and HttpRequest processors don't use OfflineProcessorConfig
and only have single stages, so they don't require stage config refactoring.

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
…y boolean flags

Emit deprecation warnings when legacy boolean flags are used, guiding users
to migrate to nested stage configs.

Changes:
- Update _coerce_legacy_to_stage_config() root_validator in OfflineProcessorConfig
  to emit logger.warning() when legacy fields are detected:
  - `apply_chat_template` / `chat_template` -> `chat_template_stage`
  - `tokenize` -> `tokenize_stage`
  - `detokenize` -> `detokenize_stage`
  - `has_image` -> `prepare_image_stage`
- Warnings include examples showing how to use the new nested config API
- Warnings only emitted when legacy fields are explicitly set (not on defaults)
- Maintains backward compatibility - legacy flags still work

This provides clear migration guidance while preserving existing functionality.
Users will see helpful warnings pointing them to the new API without breaking
their code.

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
@nrghosh nrghosh force-pushed the nrghosh/data-llm-config-refactor branch from 2c3086a to e97df77 Compare November 12, 2025 19:07
Update docstrings in ray.data.llm to document nested stage configs and
backward compatibility with legacy boolean flags.

Changes:
- Update vLLMEngineProcessorConfig docstring:
  - Replace legacy field docs (apply_chat_template, tokenize, etc.) with
    nested stage config fields (chat_template_stage, tokenize_stage, etc.)
  - Note that legacy fields are deprecated but still supported
  - Mention per-stage control over batch_size, concurrency, runtime_env
- Update SGLangEngineProcessorConfig docstring:
  - Same updates as vLLM config
- Update build_llm_processor docstring:
  - Mention nested stage config support in config parameter
  - Note backward compatibility with legacy flags

Docstrings remain concise and focus on essential information for users.

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
@nrghosh nrghosh force-pushed the nrghosh/data-llm-config-refactor branch from e97df77 to 275a93d Compare November 12, 2025 19:29
@nrghosh nrghosh marked this pull request as ready for review November 12, 2025 20:49
@nrghosh nrghosh requested a review from a team as a code owner November 12, 2025 20:49
chat_template=config.chat_template,
model=chat_template_stage_cfg.model or config.model_source,
chat_template=chat_template_stage_cfg.chat_template
or config.chat_template,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Chat Template Kwargs Silently Ignored

The builder ignores chat_template_stage_cfg.chat_template_kwargs and only uses the chat_template_kwargs parameter passed to the builder function. When users configure chat_template_stage with chat_template_kwargs, those settings are silently ignored, causing the stage to use incorrect or missing template kwargs.

Fix in Cursor Fix in Web

@nrghosh nrghosh requested review from a team and richardliaw November 12, 2025 20:55
@nrghosh
Copy link
Contributor Author

nrghosh commented Nov 12, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a great refactoring of the LLM processor configuration, moving from flat boolean flags to nested, typed stage configs. This significantly improves per-stage configurability and aligns with modern configuration patterns. The backward compatibility is handled well with deprecation warnings.

My review focuses on a few areas to further improve the implementation:

  • Improving the resolve_stage_config helper to be more complete.
  • Reducing code duplication in the processor builder functions for better maintainability.

Comment on lines 101 to 134
# Resolve and build ChatTemplateStage if enabled
chat_template_stage_cfg = resolve_stage_config(
getattr(config, "chat_template_stage", config.apply_chat_template),
ChatTemplateStageConfig,
processor_defaults,
)
if chat_template_stage_cfg.enabled:
# Use stage-specific concurrency if set, otherwise processor default
stage_concurrency = (
chat_template_stage_cfg.concurrency
if chat_template_stage_cfg.concurrency is not None
else config.get_concurrency()
)
# Normalize concurrency to tuple if needed
if isinstance(stage_concurrency, int):
stage_concurrency = (stage_concurrency, stage_concurrency)

stages.append(
ChatTemplateStage(
fn_constructor_kwargs=dict(
model=config.model_source,
chat_template=config.chat_template,
model=chat_template_stage_cfg.model or config.model_source,
chat_template=chat_template_stage_cfg.chat_template
or config.chat_template,
chat_template_kwargs=chat_template_kwargs,
),
map_batches_kwargs=dict(
zero_copy_batch=True,
concurrency=config.get_concurrency(),
batch_size=config.batch_size,
runtime_env=config.runtime_env,
concurrency=stage_concurrency,
batch_size=chat_template_stage_cfg.batch_size or config.batch_size,
runtime_env=chat_template_stage_cfg.runtime_env
or config.runtime_env,
),
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to vllm_engine_proc.py, there is a lot of repeated code for building each stage (ChatTemplateStage, TokenizeStage, DetokenizeStage). The logic for resolving configuration, handling concurrency, and creating the stage object is duplicated.

To improve code maintainability, consider refactoring this repeated logic into a helper function. This would make the build_sglang_engine_processor function more concise and easier to manage.

Comment on lines 150 to 176
# Resolve and build PrepareImageStage if enabled
image_stage_cfg = resolve_stage_config(
getattr(config, "prepare_image_stage", config.has_image),
PrepareImageStageConfig,
processor_defaults,
)
if image_stage_cfg.enabled:
# Use stage-specific concurrency if set, otherwise processor default
stage_concurrency = (
image_stage_cfg.concurrency
if image_stage_cfg.concurrency is not None
else config.get_concurrency()
)
# Normalize concurrency to tuple if needed
if isinstance(stage_concurrency, int):
stage_concurrency = (stage_concurrency, stage_concurrency)

stages.append(
PrepareImageStage(
map_batches_kwargs=dict(
zero_copy_batch=True,
concurrency=config.get_concurrency(),
batch_size=config.batch_size,
concurrency=stage_concurrency,
batch_size=image_stage_cfg.batch_size or config.batch_size,
runtime_env=image_stage_cfg.runtime_env or config.runtime_env,
),
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's a significant amount of duplicated code for building each stage (PrepareImageStage, ChatTemplateStage, TokenizeStage, DetokenizeStage). The logic for resolving the stage config, determining concurrency, and constructing the stage is nearly identical for each.

To improve maintainability and reduce redundancy, you could extract this logic into a helper function. This would make the build_vllm_engine_processor function much cleaner and easier to read and would centralize the stage creation logic.

Comment on lines 72 to 75
if resolved.batch_size is None and "batch_size" in processor_defaults:
resolved.batch_size = processor_defaults["batch_size"]
if resolved.runtime_env is None and "runtime_env" in processor_defaults:
resolved.runtime_env = processor_defaults["runtime_env"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resolve_stage_config function is very helpful for merging processor-level defaults into stage-specific configurations. I noticed that it handles batch_size and runtime_env, but concurrency is missing. The docstring for processor_defaults even mentions concurrency as an expected key.

To make this function more complete and to reduce repetitive code in the processor builders, consider also merging the concurrency default here.

This would require updating the processor_defaults dictionary in vllm_engine_proc.py and sglang_engine_proc.py to include concurrency, and would allow simplifying the concurrency handling logic in those files.

Suggested change
if resolved.batch_size is None and "batch_size" in processor_defaults:
resolved.batch_size = processor_defaults["batch_size"]
if resolved.runtime_env is None and "runtime_env" in processor_defaults:
resolved.runtime_env = processor_defaults["runtime_env"]
if resolved.batch_size is None and "batch_size" in processor_defaults:
resolved.batch_size = processor_defaults["batch_size"]
if resolved.concurrency is None and "concurrency" in processor_defaults:
resolved.concurrency = processor_defaults["concurrency"]
if resolved.runtime_env is None and "runtime_env" in processor_defaults:
resolved.runtime_env = processor_defaults["runtime_env"]

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
…tage autoscaling

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
…source control

Expose num_cpus and memory as optional fields in _StageConfigBase to enable
per-stage resource control (ray remote args). These fields are extracted
from stage configs and passed to map_batches_kwargs for all CPU stages.

Example usage:
```
config = vLLMEngineProcessorConfig(
    model_id="...",
    tokenize_stage=TokenizerStageConfig(
        num_cpus=2.0,      # Per-stage CPU control
        memory=1000000,    # Per-stage memory control
    ),
    chat_template_stage=ChatTemplateStageConfig(
        num_cpus=1.0,      # Different resources for different stages
    ),
)
```

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
memory: Optional[float] = Field(
default=None,
description="Heap memory in bytes to reserve for each map worker in this stage.",
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Stage Config: Missing Validation, Hidden Errors

The _StageConfigBase class lacks validation for concurrency and batch_size fields. Users can pass invalid values like negative integers or invalid tuple ranges (e.g., {"concurrency": -1} or {"concurrency": (5, 2)}), which bypass validation and cause cryptic errors later when Ray Data attempts to use them. The processor-level validate_concurrency validator exists but doesn't apply to stage-specific configs.

Fix in Cursor Fix in Web

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
@nrghosh
Copy link
Contributor Author

nrghosh commented Nov 13, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a well-executed refactoring of the LLM processor configuration, moving from flat boolean flags to nested, typed stage configs. This significantly improves per-stage control and extensibility while maintaining backward compatibility. My review focuses on a few areas where the implementation can be made more robust and maintainable by reducing code duplication and handling edge cases in configuration resolution more explicitly.

Comment on lines 75 to 77
else:
# Fallback: create enabled=True config
resolved = stage_config_cls(enabled=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The fallback else block in this function can lead to unexpected behavior. If stage_cfg_value is None or another unexpected type, it is silently converted to a stage config with enabled=True. This is likely not the user's intent, especially if they explicitly pass None to disable a stage.

For robustness, it would be better to raise a TypeError for unsupported types. If None is a supported value, it should probably be handled explicitly to mean enabled=False.

Suggested change
else:
# Fallback: create enabled=True config
resolved = stage_config_cls(enabled=True)
else:
raise TypeError(
f"Unsupported type for stage config: {type(stage_cfg_value)}. "
"Expected bool, dict, or a _StageConfigBase object."
)

Comment on lines 102 to 194
# Resolve and build ChatTemplateStage if enabled
chat_template_stage_cfg = resolve_stage_config(
getattr(config, "chat_template_stage", config.apply_chat_template),
ChatTemplateStageConfig,
processor_defaults,
)
if chat_template_stage_cfg.enabled:
# Use stage-specific concurrency if set, otherwise processor default
stage_concurrency = (
chat_template_stage_cfg.concurrency
if chat_template_stage_cfg.concurrency is not None
else config.get_concurrency()
)
# Normalize concurrency to tuple if needed
# CPU stages use autoscaling (1, n) for int concurrency
if isinstance(stage_concurrency, int):
stage_concurrency = (1, stage_concurrency)

stages.append(
ChatTemplateStage(
fn_constructor_kwargs=dict(
model=config.model_source,
chat_template=config.chat_template,
chat_template_kwargs=chat_template_kwargs,
model=chat_template_stage_cfg.model
if chat_template_stage_cfg.model is not None
else config.model_source,
chat_template=chat_template_stage_cfg.chat_template
if chat_template_stage_cfg.chat_template is not None
else config.chat_template,
chat_template_kwargs=chat_template_stage_cfg.chat_template_kwargs
if chat_template_stage_cfg.chat_template_kwargs is not None
else chat_template_kwargs,
),
map_batches_kwargs=dict(
zero_copy_batch=True,
concurrency=config.get_concurrency(),
batch_size=config.batch_size,
runtime_env=config.runtime_env,
concurrency=stage_concurrency,
batch_size=chat_template_stage_cfg.batch_size
if chat_template_stage_cfg.batch_size is not None
else config.batch_size,
**{
k: v
for k, v in {
"runtime_env": chat_template_stage_cfg.runtime_env,
"num_cpus": chat_template_stage_cfg.num_cpus,
"memory": chat_template_stage_cfg.memory,
}.items()
if v is not None
},
),
)
)

if config.tokenize:
# Resolve and build TokenizeStage if enabled
tokenize_stage_cfg = resolve_stage_config(
getattr(config, "tokenize_stage", config.tokenize),
TokenizerStageConfig,
processor_defaults,
)
if tokenize_stage_cfg.enabled:
# Use stage-specific concurrency if set, otherwise processor default
stage_concurrency = (
tokenize_stage_cfg.concurrency
if tokenize_stage_cfg.concurrency is not None
else config.get_concurrency()
)
# Normalize concurrency to tuple if needed
# CPU stages use autoscaling (1, n) for int concurrency
if isinstance(stage_concurrency, int):
stage_concurrency = (1, stage_concurrency)

stages.append(
TokenizeStage(
fn_constructor_kwargs=dict(
model=config.model_source,
model=tokenize_stage_cfg.model
if tokenize_stage_cfg.model is not None
else config.model_source,
),
map_batches_kwargs=dict(
zero_copy_batch=True,
concurrency=config.get_concurrency(),
batch_size=config.batch_size,
runtime_env=config.runtime_env,
concurrency=stage_concurrency,
batch_size=tokenize_stage_cfg.batch_size
if tokenize_stage_cfg.batch_size is not None
else config.batch_size,
**{
k: v
for k, v in {
"runtime_env": tokenize_stage_cfg.runtime_env,
"num_cpus": tokenize_stage_cfg.num_cpus,
"memory": tokenize_stage_cfg.memory,
}.items()
if v is not None
},
),
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant code duplication in building the different stages (ChatTemplateStage, TokenizeStage). The logic for resolving configuration, determining concurrency, and constructing map_batches_kwargs is repeated for each stage. This makes the code hard to read and maintain.

This repeated logic could be extracted into a helper function.

Additionally, there are several redundant checks. For example:

batch_size=chat_template_stage_cfg.batch_size
if chat_template_stage_cfg.batch_size is not None
else config.batch_size

The resolve_stage_config function already merges the processor-level defaults, so chat_template_stage_cfg.batch_size should not be None at this point. These checks can be simplified (e.g., to batch_size=chat_template_stage_cfg.batch_size), which would make the code cleaner. This applies to concurrency and model parameters as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 151 to 282
# Resolve and build PrepareImageStage if enabled
image_stage_cfg = resolve_stage_config(
getattr(config, "prepare_image_stage", config.has_image),
PrepareImageStageConfig,
processor_defaults,
)
if image_stage_cfg.enabled:
# Use stage-specific concurrency if set, otherwise processor default
stage_concurrency = (
image_stage_cfg.concurrency
if image_stage_cfg.concurrency is not None
else config.get_concurrency()
)
# Normalize concurrency to tuple if needed
# CPU stages use autoscaling (1, n) for int concurrency
if isinstance(stage_concurrency, int):
stage_concurrency = (1, stage_concurrency)

stages.append(
PrepareImageStage(
map_batches_kwargs=dict(
zero_copy_batch=True,
concurrency=config.get_concurrency(),
batch_size=config.batch_size,
concurrency=stage_concurrency,
batch_size=image_stage_cfg.batch_size
if image_stage_cfg.batch_size is not None
else config.batch_size,
**{
k: v
for k, v in {
"runtime_env": image_stage_cfg.runtime_env,
"num_cpus": image_stage_cfg.num_cpus,
"memory": image_stage_cfg.memory,
}.items()
if v is not None
},
),
)
)
if config.apply_chat_template:

# Resolve and build ChatTemplateStage if enabled
chat_template_stage_cfg = resolve_stage_config(
getattr(config, "chat_template_stage", config.apply_chat_template),
ChatTemplateStageConfig,
processor_defaults,
)
if chat_template_stage_cfg.enabled:
# Use stage-specific concurrency if set, otherwise processor default
stage_concurrency = (
chat_template_stage_cfg.concurrency
if chat_template_stage_cfg.concurrency is not None
else config.get_concurrency()
)
# Normalize concurrency to tuple if needed
# CPU stages use autoscaling (1, n) for int concurrency
if isinstance(stage_concurrency, int):
stage_concurrency = (1, stage_concurrency)

stages.append(
ChatTemplateStage(
fn_constructor_kwargs=dict(
model=config.model_source,
chat_template=config.chat_template,
chat_template_kwargs=chat_template_kwargs,
model=chat_template_stage_cfg.model
if chat_template_stage_cfg.model is not None
else config.model_source,
chat_template=chat_template_stage_cfg.chat_template
if chat_template_stage_cfg.chat_template is not None
else config.chat_template,
chat_template_kwargs=chat_template_stage_cfg.chat_template_kwargs
if chat_template_stage_cfg.chat_template_kwargs is not None
else chat_template_kwargs,
),
map_batches_kwargs=dict(
zero_copy_batch=True,
concurrency=config.get_concurrency(),
batch_size=config.batch_size,
runtime_env=config.runtime_env,
concurrency=stage_concurrency,
batch_size=chat_template_stage_cfg.batch_size
if chat_template_stage_cfg.batch_size is not None
else config.batch_size,
**{
k: v
for k, v in {
"runtime_env": chat_template_stage_cfg.runtime_env,
"num_cpus": chat_template_stage_cfg.num_cpus,
"memory": chat_template_stage_cfg.memory,
}.items()
if v is not None
},
),
)
)

if config.tokenize:
# Resolve and build TokenizeStage if enabled
tokenize_stage_cfg = resolve_stage_config(
getattr(config, "tokenize_stage", config.tokenize),
TokenizerStageConfig,
processor_defaults,
)
if tokenize_stage_cfg.enabled:
# Use stage-specific concurrency if set, otherwise processor default
stage_concurrency = (
tokenize_stage_cfg.concurrency
if tokenize_stage_cfg.concurrency is not None
else config.get_concurrency()
)
# Normalize concurrency to tuple if needed
# CPU stages use autoscaling (1, n) for int concurrency
if isinstance(stage_concurrency, int):
stage_concurrency = (1, stage_concurrency)

stages.append(
TokenizeStage(
fn_constructor_kwargs=dict(
model=config.model_source,
model=tokenize_stage_cfg.model
if tokenize_stage_cfg.model is not None
else config.model_source,
),
map_batches_kwargs=dict(
zero_copy_batch=True,
concurrency=config.get_concurrency(),
batch_size=config.batch_size,
runtime_env=config.runtime_env,
concurrency=stage_concurrency,
batch_size=tokenize_stage_cfg.batch_size
if tokenize_stage_cfg.batch_size is not None
else config.batch_size,
**{
k: v
for k, v in {
"runtime_env": tokenize_stage_cfg.runtime_env,
"num_cpus": tokenize_stage_cfg.num_cpus,
"memory": tokenize_stage_cfg.memory,
}.items()
if v is not None
},
),
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant code duplication in building the different stages (PrepareImageStage, ChatTemplateStage, TokenizeStage). The logic for resolving configuration, determining concurrency, and constructing map_batches_kwargs is repeated for each stage. This makes the code hard to read and maintain.

This repeated logic could be extracted into a helper function.

Additionally, there are several redundant checks. For example:

batch_size=image_stage_cfg.batch_size
if image_stage_cfg.batch_size is not None
else config.batch_size

The resolve_stage_config function already merges the processor-level defaults, so image_stage_cfg.batch_size should not be None at this point. These checks can be simplified (e.g., to batch_size=image_stage_cfg.batch_size), which would make the code cleaner. This applies to concurrency and model parameters as well.

…ported types

Replace silent fallback that converted unexpected types to enabled=True
with explicit TypeError. This prevents bugs where None or invalid types
are silently treated as enabled stages.

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Nov 13, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed refactoring of the LLM processor configurations. By moving from flat boolean flags to nested, typed StageConfig objects, it provides much-needed per-stage control over resources and improves type safety and extensibility. The implementation maintains backward compatibility through a root_validator, which is a great approach for a smooth transition.

My review focuses on a small area for simplification in the processor builder functions, where the backward-compatibility logic appears to be duplicated. Overall, the changes are excellent and significantly improve the configuration API for Ray Data LLMs.

nrghosh and others added 4 commits November 17, 2025 16:44
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Nikhil G <nrghosh@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Nikhil G <nrghosh@users.noreply.github.com>
…solution and backward compatibility

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
normalize_cpu_stage_concurrency can receive None when stage_cfg.concurrency
is None (e.g., when resolve_stage_config doesn't merge defaults).
Previously returned None, violating Tuple[int, int] return type contract.

Fix: explicitly handle None by defaulting to (1, 1)

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
@nrghosh nrghosh changed the title [wip] [data.llm] Ray Data LLM Config Refactor [data.llm] Ray Data LLM Config Refactor Nov 18, 2025
stage: Dict[str, Any] = {"enabled": enabled}
if values.get("chat_template") is not None:
stage["chat_template"] = values["chat_template"]
values["chat_template_stage"] = stage
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Stage Field Type Inconsistency

The chat_template_stage field is unconditionally coerced to a dict when not explicitly provided, while other stage fields (tokenize_stage, detokenize_stage, prepare_image_stage) are only coerced when their legacy counterparts are provided. This creates inconsistent behavior: when no stage config is provided, chat_template_stage becomes a dict {"enabled": True} but tokenize_stage remains a boolean True (from the Field default). This inconsistency could confuse users who access these fields directly and expect uniform types.

Fix in Cursor Fix in Web

@jeffreyjeffreywang
Copy link
Contributor

Shall we migrate existing tests (e.g. test_vllm_engine_proc.py) to adopt the new schema or do you think we can leave them as is until we begin raising errors for the legacy schema?

@kouroshHakha
Copy link
Contributor

Shall we migrate existing tests (e.g. test_vllm_engine_proc.py) to adopt the new schema or do you think we can leave them as is until we begin raising errors for the legacy schema?

+1

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
@nrghosh
Copy link
Contributor Author

nrghosh commented Nov 18, 2025

Shall we migrate existing tests (e.g. test_vllm_engine_proc.py) to adopt the new schema or do you think we can leave them as is until we begin raising errors for the legacy schema?

Yes, updated - whether / when we want to deprecate the old schema fully is a roadmap decision cc @richardliaw

- Fix: Conditional chat_template_stage construction in legacy coercion
- The logic unconditionally set `chat_template_stage`  to a default dict
even when no legacy fields were present. This bypassed the Field default.
- Now it only constructs the stage config if legacy fields  are actually detected.

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
@nrghosh nrghosh added the go add ONLY when ready to merge, run all tests label Nov 19, 2025
Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
@kouroshHakha kouroshHakha changed the title [data.llm] Ray Data LLM Config Refactor [data][llm] Ray Data LLM Config Refactor Nov 19, 2025
@kouroshHakha kouroshHakha merged commit 367c7fe into ray-project:master Nov 19, 2025
6 checks passed
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Signed-off-by: Nikhil G <nrghosh@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
400Ping pushed a commit to 400Ping/ray that referenced this pull request Nov 21, 2025
Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Signed-off-by: Nikhil G <nrghosh@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Signed-off-by: Nikhil G <nrghosh@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Signed-off-by: Nikhil G <nrghosh@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests llm

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants