From 48166931cca4e043a7ec9613e5b4b69c074c3644 Mon Sep 17 00:00:00 2001 From: Pouyanpi <13303554+Pouyanpi@users.noreply.github.com> Date: Fri, 24 Oct 2025 09:14:12 +0200 Subject: [PATCH 1/3] fix(streaming): raise error when stream_async used with disabled output rails streaming When output rails are configured but output.streaming.enabled is False (or not set), calling stream_async() would result in undefined behavior or hangs due to the conflict between streaming expectations and blocking output rail processing. This change adds explicit validation in stream_async() to detect this misconfiguration and raise a clear ValueError with actionable guidance: - Set rails.output.streaming.enabled = True to use streaming with output rails - Use generate_async() instead for non-streaming with output rails Updated affected tests to expect and validate the new error behavior instead of relying on the previous buggy behavior. --- nemoguardrails/rails/llm/llmrails.py | 10 +++ tests/test_parallel_streaming_output_rails.py | 24 +++---- tests/test_streaming.py | 44 +++++++++++++ tests/test_streaming_output_rails.py | 65 ++++++++----------- 4 files changed, 93 insertions(+), 50 deletions(-) diff --git a/nemoguardrails/rails/llm/llmrails.py b/nemoguardrails/rails/llm/llmrails.py index 3505dad07..3842c6e29 100644 --- a/nemoguardrails/rails/llm/llmrails.py +++ b/nemoguardrails/rails/llm/llmrails.py @@ -1259,6 +1259,16 @@ def stream_async( ) -> AsyncIterator[str]: """Simplified interface for getting directly the streamed tokens from the LLM.""" + if len(self.config.rails.output.flows) > 0 and ( + not self.config.rails.output.streaming + or not self.config.rails.output.streaming.enabled + ): + raise ValueError( + "stream_async() cannot be used when output rails are configured but " + "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled to True in your configuration, or use " + "generate_async() instead of stream_async()." + ) # if an external generator is provided, use it directly if generator: if ( diff --git a/tests/test_parallel_streaming_output_rails.py b/tests/test_parallel_streaming_output_rails.py index 8b1166ef8..c028226b9 100644 --- a/tests/test_parallel_streaming_output_rails.py +++ b/tests/test_parallel_streaming_output_rails.py @@ -605,21 +605,21 @@ async def test_parallel_streaming_output_rails_performance_benefits(): async def test_parallel_streaming_output_rails_default_config_behavior( parallel_output_rails_default_config, ): - """Tests parallel output rails with default streaming configuration""" + """Tests that stream_async raises an error with default config (no explicit streaming config)""" - llm_completions = [ - ' express greeting\nbot express greeting\n "Hi, how are you doing?"', - ' "This is a test message with default streaming config."', - ] + from nemoguardrails import LLMRails - chunks = await run_parallel_self_check_test( - parallel_output_rails_default_config, llm_completions - ) + llmrails = LLMRails(parallel_output_rails_default_config) - response = "".join(chunks) - assert len(response) > 0 - assert len(chunks) > 0 - assert "test message" in response + with pytest.raises(ValueError) as exc_info: + async for chunk in llmrails.stream_async( + messages=[{"role": "user", "content": "Hi!"}] + ): + pass + + assert "stream_async() cannot be used when output rails are configured" in str( + exc_info.value + ) await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()}) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 114bb7dd1..8031271f0 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -474,6 +474,50 @@ def _calculate_number_of_actions(input_length, chunk_size, context_size): return math.ceil((input_length - context_size) / (chunk_size - context_size)) +@pytest.mark.asyncio +async def test_streaming_with_output_rails_disabled_raises_error(): + config = RailsConfig.from_content( + config={ + "models": [], + "rails": { + "output": { + "flows": {"self check output"}, + "streaming": { + "enabled": False, + }, + } + }, + "streaming": True, + "prompts": [{"task": "self_check_output", "content": "a test template"}], + }, + colang_content=""" + define user express greeting + "hi" + + define flow + user express greeting + bot tell joke + """, + ) + + chat = TestChat( + config, + llm_completions=[], + streaming=True, + ) + + with pytest.raises(ValueError) as exc_info: + async for chunk in chat.app.stream_async( + messages=[{"role": "user", "content": "Hi!"}], + ): + pass + + assert "stream_async() cannot be used when output rails are configured" in str( + exc_info.value + ) + assert "output.streaming.enabled is False" in str(exc_info.value) + + @pytest.mark.asyncio async def test_streaming_error_handling(): """Test that errors during streaming are properly formatted and returned.""" diff --git a/tests/test_streaming_output_rails.py b/tests/test_streaming_output_rails.py index 501353c1b..926f785fc 100644 --- a/tests/test_streaming_output_rails.py +++ b/tests/test_streaming_output_rails.py @@ -89,14 +89,16 @@ def output_rails_streaming_config_default(): @pytest.mark.asyncio async def test_stream_async_streaming_disabled(output_rails_streaming_config_default): - """Tests if stream_async returns a StreamingHandler instance when streaming is disabled""" + """Tests that stream_async raises an error when output rails are configured but streaming is disabled""" llmrails = LLMRails(output_rails_streaming_config_default) - result = llmrails.stream_async(prompt="test") - assert isinstance( - result, StreamingHandler - ), "Expected StreamingHandler instance when streaming is disabled" + with pytest.raises(ValueError) as exc_info: + llmrails.stream_async(prompt="test") + + assert "stream_async() cannot be used when output rails are configured" in str( + exc_info.value + ) @pytest.mark.asyncio @@ -175,32 +177,19 @@ async def test_streaming_output_rails_blocked_explicit(output_rails_streaming_co async def test_streaming_output_rails_blocked_default_config( output_rails_streaming_config_default, ): - """Tests if output rails streaming default config do not block content with BLOCK keyword""" + """Tests that stream_async raises an error with default config (output rails without explicit streaming config)""" - # text with a BLOCK keyword - llm_completions = [ - ' express greeting\nbot express greeting\n "Hi, how are you doing?"', - ' "This is a [BLOCK] joke that should be blocked."', - ] + llmrails = LLMRails(output_rails_streaming_config_default) - chunks = await run_self_check_test( - output_rails_streaming_config_default, llm_completions - ) + with pytest.raises(ValueError) as exc_info: + async for chunk in llmrails.stream_async( + messages=[{"role": "user", "content": "Hi!"}] + ): + pass - expected_error = { - "error": { - "message": "Blocked by self check output rails.", - "type": "guardrails_violation", - "param": "self check output", - "code": "content_blocked", - } - } - - error_chunks = [ - json.loads(chunk) for chunk in chunks if chunk.startswith('{"error":') - ] - assert len(error_chunks) == 0 - assert expected_error not in error_chunks + assert "stream_async() cannot be used when output rails are configured" in str( + exc_info.value + ) await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()}) @@ -235,19 +224,19 @@ async def test_streaming_output_rails_blocked_at_start(output_rails_streaming_co async def test_streaming_output_rails_default_config_not_blocked_at_start( output_rails_streaming_config_default, ): - """Tests blocking with BLOCK at the very beginning of the response does not return abort sse""" + """Tests that stream_async raises an error with default config (output rails without explicit streaming config)""" - llm_completions = [ - ' express greeting\nbot express greeting\n "Hi, how are you doing?"', - ' "[BLOCK] This should be blocked immediately at the start."', - ] + llmrails = LLMRails(output_rails_streaming_config_default) - chunks = await run_self_check_test( - output_rails_streaming_config_default, llm_completions - ) + with pytest.raises(ValueError) as exc_info: + async for chunk in llmrails.stream_async( + messages=[{"role": "user", "content": "Hi!"}] + ): + pass - with pytest.raises(JSONDecodeError): - json.loads(chunks[0]) + assert "stream_async() cannot be used when output rails are configured" in str( + exc_info.value + ) await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()}) From 714a7d621f8eb771e2b84bd09f60640663182cc0 Mon Sep 17 00:00:00 2001 From: Pouyanpi <13303554+Pouyanpi@users.noreply.github.com> Date: Fri, 24 Oct 2025 09:52:20 +0200 Subject: [PATCH 2/3] apply review suggestions --- nemoguardrails/rails/llm/llmrails.py | 23 +++++---- tests/test_parallel_streaming_output_rails.py | 7 ++- tests/test_streaming.py | 51 +++++++++++++++++-- tests/test_streaming_output_rails.py | 42 ++------------- 4 files changed, 71 insertions(+), 52 deletions(-) diff --git a/nemoguardrails/rails/llm/llmrails.py b/nemoguardrails/rails/llm/llmrails.py index 3842c6e29..a03efdee9 100644 --- a/nemoguardrails/rails/llm/llmrails.py +++ b/nemoguardrails/rails/llm/llmrails.py @@ -1248,6 +1248,18 @@ async def generate_async( new_message["tool_calls"] = tool_calls return new_message + def _validate_streaming_with_output_rails(self) -> None: + if len(self.config.rails.output.flows) > 0 and ( + not self.config.rails.output.streaming + or not self.config.rails.output.streaming.enabled + ): + raise ValueError( + "stream_async() cannot be used when output rails are configured but " + "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled to True in your configuration, or use " + "generate_async() instead of stream_async()." + ) + def stream_async( self, prompt: Optional[str] = None, @@ -1259,16 +1271,7 @@ def stream_async( ) -> AsyncIterator[str]: """Simplified interface for getting directly the streamed tokens from the LLM.""" - if len(self.config.rails.output.flows) > 0 and ( - not self.config.rails.output.streaming - or not self.config.rails.output.streaming.enabled - ): - raise ValueError( - "stream_async() cannot be used when output rails are configured but " - "output.streaming.enabled is False. Either set " - "rails.output.streaming.enabled to True in your configuration, or use " - "generate_async() instead of stream_async()." - ) + self._validate_streaming_with_output_rails() # if an external generator is provided, use it directly if generator: if ( diff --git a/tests/test_parallel_streaming_output_rails.py b/tests/test_parallel_streaming_output_rails.py index c028226b9..8f607f88e 100644 --- a/tests/test_parallel_streaming_output_rails.py +++ b/tests/test_parallel_streaming_output_rails.py @@ -617,8 +617,11 @@ async def test_parallel_streaming_output_rails_default_config_behavior( ): pass - assert "stream_async() cannot be used when output rails are configured" in str( - exc_info.value + assert str(exc_info.value) == ( + "stream_async() cannot be used when output rails are configured but " + "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled to True in your configuration, or use " + "generate_async() instead of stream_async()." ) await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()}) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 8031271f0..24920ec39 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -512,10 +512,55 @@ async def test_streaming_with_output_rails_disabled_raises_error(): ): pass - assert "stream_async() cannot be used when output rails are configured" in str( - exc_info.value + assert str(exc_info.value) == ( + "stream_async() cannot be used when output rails are configured but " + "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled to True in your configuration, or use " + "generate_async() instead of stream_async()." + ) + + +@pytest.mark.asyncio +async def test_streaming_with_output_rails_no_streaming_config_raises_error(): + config = RailsConfig.from_content( + config={ + "models": [], + "rails": { + "output": { + "flows": {"self check output"}, + } + }, + "streaming": True, + "prompts": [{"task": "self_check_output", "content": "a test template"}], + }, + colang_content=""" + define user express greeting + "hi" + + define flow + user express greeting + bot tell joke + """, + ) + + chat = TestChat( + config, + llm_completions=[], + streaming=True, + ) + + with pytest.raises(ValueError) as exc_info: + async for chunk in chat.app.stream_async( + messages=[{"role": "user", "content": "Hi!"}], + ): + pass + + assert str(exc_info.value) == ( + "stream_async() cannot be used when output rails are configured but " + "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled to True in your configuration, or use " + "generate_async() instead of stream_async()." ) - assert "output.streaming.enabled is False" in str(exc_info.value) @pytest.mark.asyncio diff --git a/tests/test_streaming_output_rails.py b/tests/test_streaming_output_rails.py index 926f785fc..1a49667c0 100644 --- a/tests/test_streaming_output_rails.py +++ b/tests/test_streaming_output_rails.py @@ -87,20 +87,6 @@ def output_rails_streaming_config_default(): ) -@pytest.mark.asyncio -async def test_stream_async_streaming_disabled(output_rails_streaming_config_default): - """Tests that stream_async raises an error when output rails are configured but streaming is disabled""" - - llmrails = LLMRails(output_rails_streaming_config_default) - - with pytest.raises(ValueError) as exc_info: - llmrails.stream_async(prompt="test") - - assert "stream_async() cannot be used when output rails are configured" in str( - exc_info.value - ) - - @pytest.mark.asyncio async def test_stream_async_streaming_enabled(output_rails_streaming_config): """Tests if stream_async returns does not return StreamingHandler instance when streaming is enabled""" @@ -187,8 +173,11 @@ async def test_streaming_output_rails_blocked_default_config( ): pass - assert "stream_async() cannot be used when output rails are configured" in str( - exc_info.value + assert str(exc_info.value) == ( + "stream_async() cannot be used when output rails are configured but " + "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled to True in your configuration, or use " + "generate_async() instead of stream_async()." ) await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()}) @@ -220,27 +209,6 @@ async def test_streaming_output_rails_blocked_at_start(output_rails_streaming_co await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()}) -@pytest.mark.asyncio -async def test_streaming_output_rails_default_config_not_blocked_at_start( - output_rails_streaming_config_default, -): - """Tests that stream_async raises an error with default config (output rails without explicit streaming config)""" - - llmrails = LLMRails(output_rails_streaming_config_default) - - with pytest.raises(ValueError) as exc_info: - async for chunk in llmrails.stream_async( - messages=[{"role": "user", "content": "Hi!"}] - ): - pass - - assert "stream_async() cannot be used when output rails are configured" in str( - exc_info.value - ) - - await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()}) - - async def simple_token_generator() -> AsyncIterator[str]: """Simple generator that yields tokens.""" tokens = ["Hello", " ", "world", "!"] From 6852d7b180b453164ef4f3526aaebc3dc9a56e26 Mon Sep 17 00:00:00 2001 From: Pouyanpi <13303554+Pouyanpi@users.noreply.github.com> Date: Fri, 24 Oct 2025 18:06:12 +0200 Subject: [PATCH 3/3] apply review suggestions from @tgasser-nv --- nemoguardrails/rails/llm/llmrails.py | 2 +- tests/test_parallel_streaming_output_rails.py | 2 +- tests/test_streaming.py | 4 ++-- tests/test_streaming_output_rails.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nemoguardrails/rails/llm/llmrails.py b/nemoguardrails/rails/llm/llmrails.py index a03efdee9..430b3dbee 100644 --- a/nemoguardrails/rails/llm/llmrails.py +++ b/nemoguardrails/rails/llm/llmrails.py @@ -1255,7 +1255,7 @@ def _validate_streaming_with_output_rails(self) -> None: ): raise ValueError( "stream_async() cannot be used when output rails are configured but " - "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled is False. Either set " "rails.output.streaming.enabled to True in your configuration, or use " "generate_async() instead of stream_async()." ) diff --git a/tests/test_parallel_streaming_output_rails.py b/tests/test_parallel_streaming_output_rails.py index 8f607f88e..4359388fd 100644 --- a/tests/test_parallel_streaming_output_rails.py +++ b/tests/test_parallel_streaming_output_rails.py @@ -619,7 +619,7 @@ async def test_parallel_streaming_output_rails_default_config_behavior( assert str(exc_info.value) == ( "stream_async() cannot be used when output rails are configured but " - "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled is False. Either set " "rails.output.streaming.enabled to True in your configuration, or use " "generate_async() instead of stream_async()." ) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 24920ec39..8fb1ac22a 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -514,7 +514,7 @@ async def test_streaming_with_output_rails_disabled_raises_error(): assert str(exc_info.value) == ( "stream_async() cannot be used when output rails are configured but " - "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled is False. Either set " "rails.output.streaming.enabled to True in your configuration, or use " "generate_async() instead of stream_async()." ) @@ -557,7 +557,7 @@ async def test_streaming_with_output_rails_no_streaming_config_raises_error(): assert str(exc_info.value) == ( "stream_async() cannot be used when output rails are configured but " - "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled is False. Either set " "rails.output.streaming.enabled to True in your configuration, or use " "generate_async() instead of stream_async()." ) diff --git a/tests/test_streaming_output_rails.py b/tests/test_streaming_output_rails.py index 1a49667c0..8583a2fb9 100644 --- a/tests/test_streaming_output_rails.py +++ b/tests/test_streaming_output_rails.py @@ -175,7 +175,7 @@ async def test_streaming_output_rails_blocked_default_config( assert str(exc_info.value) == ( "stream_async() cannot be used when output rails are configured but " - "output.streaming.enabled is False. Either set " + "rails.output.streaming.enabled is False. Either set " "rails.output.streaming.enabled to True in your configuration, or use " "generate_async() instead of stream_async()." )