Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: The workflow cannot function properly after the conversation flow is disconnected #2428

Merged
merged 1 commit into from
Feb 27, 2025

Conversation

shaohuzhang1
Copy link
Contributor

fix: The workflow cannot function properly after the conversation flow is disconnected

Copy link

f2c-ci-robot bot commented Feb 27, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Copy link

f2c-ci-robot bot commented Feb 27, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment


async def gzip_wrapper():
async for chunk in orignal_iterator:
async for chunk in original_iterator:
yield compress_string(
chunk,
max_random_bytes=self.max_random_bytes,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code appears to be for processing API responses that can include streamed content. The key line with concerns is:

async def gzip_wrapper():
    async for chunk in orignal_iterator:

This line has two issues:

  1. Syntax Error: The yield keyword should include parentheses to properly yield each chunk as an asynchronous iterator item.

Corrected version:

async def gzip_wrapper():
    async for chunk in original_iterator:
        yield (chunk,)  # Correct syntax to yield a tuple containing the chunk

Additional Optimization Suggestion:
Ensure that error handling and resource management are incorporated into this loop. For example, you might want to handle exceptions that could occur during iteration, such as network errors or changes in streaming content midway through processing.

Here's a more robust version of the function including basic exception handling:

async def gzip_wrapper():
    try:
        for chunk in stream_original_response(original_iterator):
            yield compress_string(chunk, self.max_random_bytes)
    except Exception as e:
        # Log or raise an exception based on requirements
        print(f"An error occurred while processing stream {e}")
        return

def stream_original_response(iterator):
    """Simulate logic for getting the streaming content from the original iterator."""
    try:
        return iterator()
    except TypeError:
        # Handle cases where the 'iterator' doesn't have a callable '__call__'
        raise ValueError("Invalid streaming content type")

In this revised code, the stream_orIGINAL_response function attempts to fetch the actual streaming content, which may differ depending on how your actual implementation handles streaming data. This approach decouples the streaming behavior from the compression step.

@@ -382,6 +382,8 @@ def await_result(self):
break
yield chunk
finally:
while self.is_run():
pass
details = self.get_runtime_details()
message_tokens = sum([row.get('message_tokens') for row in details.values() if
'message_tokens' in row and row.get('message_tokens') is not None])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code you provided appears to be part of an asynchronous implementation where await_result is responsible for gathering chunks from some process until it's done. The current version has a few potential improvements:

  1. Ensure Resource Cleanup: It's good to ensure that all resources are properly released when self.is_run() returns False. This might include closing connections, stopping background tasks, or releasing locks.

  2. Avoid Infinite Loop When Not Running: In the final lines with a while self.is_run():, there isn't enough context to understand why they exist. If they were intended to perform cleanup operations once is_run() becomes false, consider removing them or replacing them with appropriate logic.

  3. Optimization Considerations:

    • Ensure there aren’t unused imports at the top.
    • Review if any variables or functions used within get_runtime_details() can return values directly without additional processing.
  4. Code Readability: Although concise, adding comments would help readability but may introduce unnecessary complexity. Alternatively, refactor parts of the function if they become too long.

Here’s a simplified version with these considerations:

def await_result(self):
    while True:
        # Gather chunks...
        if not self.is_running():
            break
    details = self.get_runtime_details()
    message_tokens = sum(row['message_tokens'] for row in details if row.get('message_tokens'))

This approach assumes that is_running() should stop looping after a certain point, and get_runtime_details() provides useful performance metrics needed elsewhere. Adjustments could be made based on specific requirements and constraints of your system architecture.

@shaohuzhang1 shaohuzhang1 merged commit afaf3d6 into main Feb 27, 2025
4 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@main@fix_workflow_chat branch February 27, 2025 06:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant