|
| 1 | +# JailedStream Implementation |
| 2 | + |
| 3 | +## Overview |
| 4 | + |
| 5 | +The `JailedStream` is a standalone implementation for handling "jail" detection in token streams. It provides a clean, builder-based API for accumulating tokens when certain sequences are detected, then releasing them as a single chunk when the jail ends. |
| 6 | + |
| 7 | +## Key Features |
| 8 | + |
| 9 | +- **Builder Pattern**: Clean configuration API using the builder pattern |
| 10 | +- **Configurable Sequences**: Support for multiple start/end jail sequences |
| 11 | +- **Tool Call Parsing**: Integrated tool call detection and parsing |
| 12 | +- **Stream Macro**: Uses `async-stream::stream!` for clean async implementation |
| 13 | +- **Standalone**: Completely independent of existing code |
| 14 | +- **Annotations**: Preserves annotations for observability |
| 15 | + |
| 16 | +## Implementation |
| 17 | + |
| 18 | +### Location |
| 19 | +- Main implementation: `lib/llm/src/protocols/openai/chat_completions/jail.rs` |
| 20 | +- Examples: `lib/llm/src/protocols/openai/chat_completions/jail_example.rs` |
| 21 | + |
| 22 | +### Usage |
| 23 | + |
| 24 | +```rust |
| 25 | +use crate::protocols::openai::chat_completions::jail::JailedStream; |
| 26 | +use dynamo_runtime::engine::{AsyncEngineContextProvider, ResponseStream}; |
| 27 | + |
| 28 | +// Get your ResponseStream with context |
| 29 | +let response_stream: Pin<Box<ResponseStream<_>>> = get_stream_from_engine(); |
| 30 | + |
| 31 | +// Extract context BEFORE passing to apply |
| 32 | +let context = response_stream.context(); |
| 33 | + |
| 34 | +// Apply jail transformation (ResponseStream implements Stream) |
| 35 | +let jail = JailedStream::builder() |
| 36 | + .tool_call_parser("nemotron_deci") |
| 37 | + .build(); |
| 38 | + |
| 39 | +let jailed_stream = jail.apply(response_stream); |
| 40 | + |
| 41 | +// Re-wrap with context when needed for engine consumption |
| 42 | +let final_stream = ResponseStream::new(Box::pin(jailed_stream), context); |
| 43 | +``` |
| 44 | + |
| 45 | +### Advanced Configuration |
| 46 | + |
| 47 | +```rust |
| 48 | +// With custom jail sequences |
| 49 | +let jail = JailedStream::builder() |
| 50 | + .jail_start_sequence("<TOOLCALL>") |
| 51 | + .jail_end_sequence("</TOOLCALL>") |
| 52 | + .tool_call_parser("nemotron_deci") |
| 53 | + .build(); |
| 54 | + |
| 55 | +// With multiple sequences |
| 56 | +let jail = JailedStream::builder() |
| 57 | + .jail_start_sequences(vec!["<TOOLCALL>", "<FUNCTION>"]) |
| 58 | + .jail_end_sequences(vec!["</TOOLCALL>", "</FUNCTION>"]) |
| 59 | + .tool_call_parser("harmony") |
| 60 | + .build(); |
| 61 | +``` |
| 62 | + |
| 63 | +## How It Works |
| 64 | + |
| 65 | +1. **Detection**: When a jail start sequence (or tool call start) is detected, the stream enters "jail" mode |
| 66 | +2. **Accumulation**: While jailed, tokens are accumulated in memory instead of being yielded |
| 67 | +3. **Annotations**: Empty chunks with annotations are sent downstream for observability |
| 68 | +4. **Release**: When a jail end sequence is detected OR the stream ends: |
| 69 | + - Accumulated content is parsed for tool calls |
| 70 | + - A single chunk with the parsed content is yielded |
| 71 | +5. **Pass-through**: Non-jailed content passes through unchanged |
| 72 | + |
| 73 | +## Testing |
| 74 | + |
| 75 | +The implementation includes comprehensive tests: |
| 76 | + |
| 77 | +- `test_jailed_stream_with_start_end_sequences`: Tests explicit jail sequences |
| 78 | +- `test_jailed_stream_with_tool_calls`: Tests tool call detection and parsing |
| 79 | +- `test_jailed_stream_no_jailing`: Tests normal pass-through behavior |
| 80 | + |
| 81 | +Run tests with: |
| 82 | +```bash |
| 83 | +cargo test -p dynamo-llm jail --lib |
| 84 | +``` |
| 85 | + |
| 86 | +## Benefits |
| 87 | + |
| 88 | +1. **Standalone**: No modifications to existing code required |
| 89 | +2. **Clean API**: Builder pattern makes configuration intuitive |
| 90 | +3. **Flexible**: Supports multiple jail detection strategies |
| 91 | +4. **Maintainable**: Uses `stream!` macro for cleaner async code |
| 92 | +5. **Testable**: Comprehensive test suite with shared utilities |
| 93 | +6. **Efficient**: No unnecessary boxing or context handling in the library |
| 94 | +7. **Composable**: Can chain multiple stream transformers before re-adding context |
| 95 | + |
| 96 | +## Performance Optimizations |
| 97 | + |
| 98 | +- **No Boxing in Library**: Returns `impl Stream` instead of `Pin<Box<ResponseStream>>` |
| 99 | +- **Stack Pinning**: Uses `tokio::pin!()` instead of `Box::pin()` for better performance |
| 100 | +- **No Context Overhead**: JailedStream doesn't manage AsyncEngineContext |
| 101 | +- **Lazy Evaluation**: Only processes what's needed |
| 102 | +- **Efficient State Management**: Minimal cloning, only when entering jail state |
| 103 | + |
| 104 | +## Integration Options |
| 105 | + |
| 106 | +To replace the existing `apply_tool_calling_jail_internal` function: |
| 107 | + |
| 108 | +```rust |
| 109 | +// In preprocessor.rs |
| 110 | +pub fn apply_tool_calling_jail_with_parser( |
| 111 | + &self, |
| 112 | + stream: ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, |
| 113 | +) -> ManyOut<Annotated<NvCreateChatCompletionStreamResponse>> { |
| 114 | + let jail = JailedStream::builder() |
| 115 | + .tool_call_parser(self.tool_call_parser.clone()) |
| 116 | + .build(); |
| 117 | + |
| 118 | + jail.apply(stream) |
| 119 | +} |
| 120 | +``` |
| 121 | + |
| 122 | +## Future Enhancements |
| 123 | + |
| 124 | +- Add support for regex patterns for jail sequences |
| 125 | +- Add metrics/telemetry for jail detection |
| 126 | +- Support for partial sequence matching across chunk boundaries |
| 127 | +- Configurable accumulation limits |
| 128 | +- Support for nested jails |
0 commit comments