Skip to content

Multiple streams can now be consumed at the same time #1522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 2, 2024

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Dec 2, 2024

Previously subscribing to a single realtime stream would block other run updates and streams from being consumed, this update fixes it so multiple streams can be consumed at once and they don't block the run updates from being consumed.

Copy link

changeset-bot bot commented Dec 2, 2024

🦋 Changeset detected

Latest commit: 52b7627

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 11 packages
Name Type
@trigger.dev/core Patch
@trigger.dev/build Patch
trigger.dev Patch
@trigger.dev/sdk Patch
@internal/redis-worker Patch
@internal/zod-worker Patch
@trigger.dev/react-hooks Patch
@trigger.dev/rsc Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch
@internal/testcontainers Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Contributor

coderabbitai bot commented Dec 2, 2024

Walkthrough

The pull request introduces significant enhancements to the @trigger.dev/core package, enabling the simultaneous consumption of multiple data streams. Key modifications include updates to the StreamSubscription interface, which now returns a Promise<ReadableStream<unknown>> instead of accepting a callback. This change simplifies the streaming process. Additionally, the publish-prerelease.sh script has been improved for better error handling and user interaction, ensuring a more robust execution flow during the publishing process.

Changes

File Path Change Summary
.changeset/modern-nails-refuse.md Introduced support for simultaneous consumption of multiple streams in the @trigger.dev/core package.
packages/core/src/v3/apiClient/runStream.ts Updated StreamSubscription and SSEStreamSubscription to return ReadableStream<unknown> instead of a callback.
packages/core/test/runStream.test.ts Modified TestStreamSubscription to return ReadableStream<unknown>; added DelayedTestShapeProvider and TestShapeProvider.
scripts/publish-prerelease.sh Enhanced error handling and control flow; improved user interaction for command execution in the publishing process.

Possibly related PRs

🐇 In the meadow, streams now flow,
Multiple paths where data can go.
With promises bright, and errors tamed,
Our code sings sweet, no longer shamed.
Hopping along, we celebrate this day,
For streams unite in a joyous ballet! 🌼

Warning

There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure.

🔧 eslint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

packages/core/src/v3/apiClient/runStream.ts

Oops! Something went wrong! :(

ESLint: 8.45.0

ESLint couldn't find the config "custom" to extend from. Please check that the name of the config is correct.

The config "custom" was referenced from the config file in "/.eslintrc.js".

If you still have problems, please stop by https://eslint.org/chat/help to chat with the team.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (4)
packages/core/src/v3/apiClient/runStream.ts (1)

253-277: Handle Errors in Stream Processing Gracefully

In the withStreams method, errors caught in the stream pipeline are currently only logged to the console. Consider enhancing this by propagating the errors to upstream consumers or adding retry logic if appropriate to ensure robust error handling.

For example, you can modify the error handling as follows:

                    .catch((error) => {
-                      console.error(`Error in stream ${streamKey}:`, error);
+                      controller.error(new Error(`Error in stream ${streamKey}: ${error.message}`));
                    });

This change will signal the error to the consumer of the stream rather than just logging it.

packages/core/test/runStream.test.ts (1)

16-24: Simplify ReadableStream Creation in Test Subscription

Consider simplifying the subscribe method by using the ReadableStream constructor's default behavior or by utilizing async generators for better readability.

Here's an example using an async generator:

     async subscribe(): Promise<ReadableStream<unknown>> {
-        return new ReadableStream({
-          start: async (controller) => {
-            for (const chunk of this.chunks) {
-              controller.enqueue(chunk);
-            }
-            controller.close();
-          },
-        });
+        const generator = async function* () {
+          for (const chunk of this.chunks) {
+            yield chunk;
+          }
+        }.bind(this);
+        return new ReadableStream({
+          async pull(controller) {
+            const { value, done } = await generator().next();
+            if (done) {
+              controller.close();
+            } else {
+              controller.enqueue(value);
+            }
+          },
+        });
     }
.changeset/modern-nails-refuse.md (1)

5-5: Enhance Change Description for Clarity

Consider providing a more detailed description of the changes in the changeset to help users understand the impact and benefits of the update.

For example:

Multiple streams can now be consumed simultaneously. This enhancement allows for concurrent processing of data streams, improving performance and efficiency when handling multiple asynchronous tasks.
scripts/publish-prerelease.sh (1)

41-49: Improve Error Handling for Changeset Version Command

The current script may not handle all failure cases of the changeset version command. If the command fails for reasons other than "No unreleased changesets found," the script might not exit appropriately.

Consider updating the error handling to accurately capture and handle different failure scenarios:

 echo "Running: pnpm exec changeset version --snapshot $version"
-if output=$(pnpm exec changeset version --snapshot $version 2>&1); then
+if output=$(pnpm exec changeset version --snapshot $version 2>&1); then
+    if echo "$output" | grep -q "No unreleased changesets found"; then
+        echo "No unreleased changesets found. Exiting."
+        exit 0
+    fi
+elif echo "$output" | grep -q "Some specific known error"; then
+    echo "Known error occurred: $output"
+    exit 1
 else
     echo "Error running changeset version command"
+    echo "$output"
     exit 1
 fi

This adjustment ensures that unexpected errors cause the script to exit and provides error output for debugging.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between f7bf7bc and c5d67b5.

📒 Files selected for processing (4)
  • .changeset/modern-nails-refuse.md (1 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (3 hunks)
  • packages/core/test/runStream.test.ts (1 hunks)
  • scripts/publish-prerelease.sh (1 hunks)
🔇 Additional comments (2)
packages/core/src/v3/apiClient/runStream.ts (1)

100-100: Update Implementations and Usages of subscribe Method

The subscribe method signature in the StreamSubscription interface has changed to return a Promise<ReadableStream<unknown>>. Ensure that all implementations and usages of this method throughout the codebase are updated to match the new signature to prevent any runtime errors.

Run the following script to find all implementations and usages of the subscribe method:

✅ Verification successful

Let's also search for any class that might implement this interface with a different pattern:


Based on the search results, I can now provide a final response:

The subscribe Method Implementation is Already Consistent

The search results show that there are only two implementations of the StreamSubscription interface:

  1. SSEStreamSubscription in packages/core/src/v3/apiClient/runStream.ts which already implements the correct signature:
async subscribe(): Promise<ReadableStream<unknown>>
  1. A test implementation in packages/core/test/runStream.test.ts which also matches the correct signature:
async subscribe(): Promise<ReadableStream<unknown>>

All implementations already return Promise<ReadableStream<unknown>> as required. Other subscribe method occurrences in the codebase are unrelated to this interface (they belong to Redis subscriptions and other messaging systems).

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all implementations and usages of the `subscribe` method.

# Find implementations of the `subscribe` method
ast-grep --pattern $'class $_ implements StreamSubscription {
  $$$
  subscribe() {
    $$$
  }
  $$$
}'

# Find usages of the `subscribe` method
rg 'subscribe\(' -t js -A 2

Length of output: 135


Script:

#!/bin/bash
# Let's try a different approach to find implementations and usages

# Find files containing StreamSubscription interface or implementations
rg -l "StreamSubscription" --type ts

# Find all subscribe method declarations and calls
rg "subscribe\(" --type ts -A 2

# Find interface definition
rg "interface StreamSubscription" --type ts -A 5

Length of output: 5532


Script:

#!/bin/bash
# Search for class implementations with 'subscribe' method
ast-grep --pattern 'class $_ {
  $$$
  subscribe() {
    $$$
  }
}'

Length of output: 69

packages/core/test/runStream.test.ts (1)

16-24: Ensure subscribe Method in Tests Aligns with Updated Interface

The subscribe method in TestStreamSubscription now returns a Promise<ReadableStream<unknown>>, aligning it with the updated StreamSubscription interface. Verify that all test cases using this method are updated to handle the ReadableStream appropriately.

Comment on lines 114 to 136
async subscribe(): Promise<ReadableStream<unknown>> {
return fetch(this.url, {
headers: {
Accept: "text/event-stream",
...this.options.headers,
},
signal: this.options.signal,
});

if (!response.body) {
throw new Error("No response body");
}

const reader = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.getReader();

while (true) {
const { done, value } = await reader.read();

if (done) break;

await onChunk(safeParseJSON(value.data));
}
}).then((response) => {
if (!response.body) {
throw new Error("No response body");
}

return () => reader.cancel();
return response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(safeParseJSON(chunk.data));
},
})
);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add Error Handling for Fetch Response

In the SSEStreamSubscription class, the subscribe method performs a fetch request but doesn't check if the response is successful (e.g., HTTP status 200). If the response is an error (like 4xx or 5xx), this could lead to unexpected behavior.

Apply this diff to handle HTTP errors appropriately:

         }).then((response) => {
+          if (!response.ok) {
+            throw new Error(`Network response was not ok: ${response.status} ${response.statusText}`);
+          }

           if (!response.body) {
             throw new Error("No response body");
           }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async subscribe(): Promise<ReadableStream<unknown>> {
return fetch(this.url, {
headers: {
Accept: "text/event-stream",
...this.options.headers,
},
signal: this.options.signal,
});
if (!response.body) {
throw new Error("No response body");
}
const reader = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
await onChunk(safeParseJSON(value.data));
}
}).then((response) => {
if (!response.body) {
throw new Error("No response body");
}
return () => reader.cancel();
return response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(safeParseJSON(chunk.data));
},
})
);
});
async subscribe(): Promise<ReadableStream<unknown>> {
return fetch(this.url, {
headers: {
Accept: "text/event-stream",
...this.options.headers,
},
signal: this.options.signal,
}).then((response) => {
if (!response.ok) {
throw new Error(`Network response was not ok: ${response.status} ${response.statusText}`);
}
if (!response.body) {
throw new Error("No response body");
}
return response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(safeParseJSON(chunk.data));
},
})
);
});

Copy link

pkg-pr-new bot commented Dec 2, 2024

@trigger.dev/build

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/build@1522

@trigger.dev/core

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/core@1522

@trigger.dev/react-hooks

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/react-hooks@1522

trigger.dev

npm i https://pkg.pr.new/triggerdotdev/trigger.dev@1522

@trigger.dev/rsc

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/rsc@1522

@trigger.dev/sdk

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/sdk@1522

commit: 52b7627

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
packages/core/src/v3/apiClient/runStream.ts (2)

101-101: Excellent architectural improvement!

The simplified StreamSubscription interface returning Promise<ReadableStream<unknown>> is a significant improvement that:

  • Enables better composition with other stream processing tools
  • Provides native backpressure handling
  • Allows for easier error propagation
  • Simplifies resource cleanup

115-146: Consider adding timeout and retry mechanisms

While the stream implementation is solid, consider enhancing reliability with:

  1. Timeout handling for the fetch request
  2. Retry mechanism for temporary failures
 async subscribe(): Promise<ReadableStream<unknown>> {
+  const controller = new AbortController();
+  const timeout = setTimeout(() => controller.abort(), 30000); // 30s timeout
+
   return fetch(this.url, {
     headers: {
       Accept: "text/event-stream",
       ...this.options.headers,
     },
-    signal: this.options.signal,
+    signal: this.options.signal 
+      ? composeAbortSignals(this.options.signal, controller.signal)
+      : controller.signal,
   }).then((response) => {
+    clearTimeout(timeout);
     if (!response.ok) {
+      if (response.status >= 500 || response.status === 429) {
+        // Retry after 1s for server errors or rate limiting
+        return new Promise((resolve) => setTimeout(resolve, 1000))
+          .then(() => this.subscribe());
+      }
       throw ApiError.generate(
         response.status,
         {},
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between c5d67b5 and 52b7627.

📒 Files selected for processing (1)
  • packages/core/src/v3/apiClient/runStream.ts (4 hunks)
🔇 Additional comments (1)
packages/core/src/v3/apiClient/runStream.ts (1)

123-130: Robust error handling implementation!

The error handling has been properly implemented using ApiError with detailed status and headers information.

Comment on lines +263 to +287
const stream = await subscription.subscribe();

// Create the pipeline and start it
stream
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue({
type: streamKey,
chunk: chunk as TStreams[typeof streamKey],
run,
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
},
})
)
.pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
})
)
.catch((error) => {
console.error(`Error in stream ${streamKey}:`, error);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in stream pipeline

The current error handling only logs to console, which could lead to silent failures. Consider:

  1. Propagating errors to the caller
  2. Adding error recovery mechanisms
  3. Providing error events that consumers can listen to
              stream
                .pipeThrough(
                  new TransformStream({
                    transform(chunk, controller) {
                      controller.enqueue({
                        type: streamKey,
                        chunk: chunk as TStreams[typeof streamKey],
                        run,
                      } as StreamPartResult<RunShape<TRunTypes>, TStreams>);
                    },
                  })
                )
                .pipeTo(
                  new WritableStream({
                    write(chunk) {
                      controller.enqueue(chunk);
                    },
+                   abort(reason) {
+                     controller.error(new Error(`Stream ${streamKey} aborted: ${reason}`));
+                   }
                  })
                )
                .catch((error) => {
-                 console.error(`Error in stream ${streamKey}:`, error);
+                 // Propagate error to consumer
+                 controller.error(new Error(`Error in stream ${streamKey}: ${error.message}`));
+                 // Attempt recovery by resubscribing after a delay
+                 setTimeout(() => {
+                   if (!activeStreams.has(streamKey)) return;
+                   subscription.subscribe()
+                     .then(/* ... handle resubscription ... */)
+                     .catch(/* ... handle resubscription failure ... */);
+                 }, 1000);
                });

Committable suggestion skipped: line range outside the PR's diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant