Skip to content

Realtime streams now powered by electric #1541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 9, 2024

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Dec 9, 2024

Replaces the current redis stream implementation of Realtime streams with one powered by Electric, which simplifies things massively. We do this in a way that is backwards compatible with previous clients, and also configurable with the REALTIME_STREAM_VERSION env var, which will probably eventually be removed and all streaming will happen through electric. Also in this PR:

Copy link

changeset-bot bot commented Dec 9, 2024

🦋 Changeset detected

Latest commit: be97bde

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 11 packages
Name Type
@trigger.dev/react-hooks Patch
@trigger.dev/sdk Patch
@trigger.dev/build Patch
@trigger.dev/core Patch
@trigger.dev/rsc Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch
trigger.dev Patch
@internal/redis-worker Patch
@internal/zod-worker Patch
@internal/testcontainers Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Contributor

coderabbitai bot commented Dec 9, 2024

Warning

There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure.

🔧 eslint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

packages/core/src/v3/apiClient/runStream.ts

Oops! Something went wrong! :(

ESLint: 8.45.0

ESLint couldn't find the config "custom" to extend from. Please check that the name of the config is correct.

The config "custom" was referenced from the config file in "/.eslintrc.js".

If you still have problems, please stop by https://eslint.org/chat/help to chat with the team.

Walkthrough

The pull request introduces several significant changes to the application's real-time streaming capabilities. Key modifications include the addition of a new enumeration field REALTIME_STREAM_VERSION in the EnvironmentSchema, updates to the SpanPresenter class for enhanced metadata handling, and the introduction of new API routes for versioned real-time streams. Additionally, new classes for database and Redis streaming are created, along with various updates to existing components to support these enhancements. The changes also include updates to Docker configurations and database migrations to accommodate the new streaming architecture.

Changes

File Path Change Summary
apps/webapp/app/env.server.ts Added REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1") to EnvironmentSchema.
apps/webapp/app/presenters/v3/SpanPresenter.server.ts Updated metadata processing in getRun method to include $$streamsVersion and refined context object construction.
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts Updated imports and modified action and loader functions to use v1RealtimeStreams with additional parameters.
apps/webapp/app/routes/realtime.v2.streams.$runId.$streamId.ts Introduced new routes for v2 real-time streams with action and loader functions, implementing schema validation.
apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts Added DatabaseRealtimeStreams class with methods for streaming responses and ingesting data.
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts Renamed RealtimeStreams to RedisRealtimeStreams and updated methods for improved request handling.
apps/webapp/app/services/realtime/types.ts Added interfaces StreamIngestor and StreamResponder for managing streaming data.
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts Updated service initialization to use RedisRealtimeStreams.
apps/webapp/app/services/realtime/v2StreamsGlobal.server.ts Introduced initializeDatabaseRealtimeStreams for managing database streams.
apps/webapp/app/services/realtimeClient.server.ts Added streamChunks method to RealtimeClient for chunk streaming and updated error handling.
apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts Added TRIGGER_REALTIME_STREAM_VERSION variable to environment variable resolution functions.
apps/webapp/test/realtimeClient.test.ts Updated test imports to use containerWithElectricAndRedisTest.
docker/Dockerfile.postgres Created new Dockerfile for PostgreSQL 14 with partition management.
docker/docker-compose.yml Updated database service to build from a custom Dockerfile and added command options.
internal-packages/database/prisma/migrations/20241206135145_create_realtime_chunks_table/migration.sql Created RealtimeStreamChunk table with specified columns and an index.
internal-packages/database/prisma/migrations/20241208074324_add_created_at_index_to_realtime_stream_chunks/migration.sql Added index on createdAt column and renamed existing index.
internal-packages/database/prisma/schema.prisma Added RealtimeStreamChunk model with defined fields and indexes.
internal-packages/testcontainers/src/index.ts Modified container types for testing with Redis context.
packages/cli-v3/src/entryPoints/deploy-run-worker.ts Updated StandardMetadataManager initialization to include stream version parameter.
packages/cli-v3/src/entryPoints/dev-run-worker.ts Modified StandardMetadataManager instantiation to include stream version.
packages/core/package.json Updated @electric-sql/client dependency version and enhanced exports structure.
packages/core/src/v3/apiClient/index.ts Added getHeaders() method to ApiClient class.
packages/core/src/v3/apiClient/runStream.ts Introduced ElectricStreamSubscription and ElectricStreamSubscriptionFactory classes for new streaming mechanism.
packages/core/src/v3/apiClient/stream.ts Updated zodShapeStream function and added ReadableShapeStream class for handling shape messages.
packages/core/src/v3/runMetadata/manager.ts Enhanced StandardMetadataManager to handle stream versions and manage active streams.
packages/core/src/v3/runMetadata/metadataStream.ts Modified MetadataOptions type to include headers and version properties.
packages/core/src/v3/schemas/api.ts Added SubscribeRealtimeStreamChunkRawShape schema for real-time stream chunk data.
packages/core/test/runStream.test.ts Updated test cases to use new stream creation functions and removed old shape provider implementations.
references/nextjs-realtime/src/app/realtime/page.tsx Added RuntimeComparisonPage component for real-time streaming comparison.
references/nextjs-realtime/src/components/RealtimeComparison.tsx Introduced RealtimeComparison component for displaying streaming data from OpenAI.
references/nextjs-realtime/src/trigger/ai.ts Updated STREAMS type and simplified openaiStreaming task for handling streamed text.
references/nextjs-realtime/src/trigger/example.ts Modified exampleTask function to remove random failure simulation and reduce timeout duration.

Possibly related PRs

  • Realtime: improve scope access to runs with tags and batches #1511: This PR enhances the accessibility of runs by allowing public access tokens scoped to tags or batches to access associated runs, which relates to the introduction of the REALTIME_STREAM_VERSION field in the main PR as it may affect how streams are managed based on versioning.
  • Multiple streams can now be consumed at the same time #1522: This PR introduces functionality for simultaneous consumption of multiple streams, which is directly relevant to the changes in the main PR that involve the REALTIME_STREAM_VERSION, as it may impact how different versions of streams are handled concurrently.
  • Fix cancelled runs breaking realtime subscriptions #1533: This PR addresses issues with real-time functionality when a run is cancelled, which is related to the main PR's focus on managing stream versions and ensuring stability in real-time operations.

🐰 In a world of streams, both new and old,
A versioning tale in code unfolds.
With chunks of data, we hop and play,
Real-time magic, come what may!
So let’s celebrate this vibrant change,
As our streams evolve, they rearrange! 🌟


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Outside diff range and nitpick comments (18)
apps/webapp/app/routes/realtime.v2.streams.$runId.$streamId.ts (1)

19-21: Ensure robust request body validation

While checking for the presence of request.body is good, consider verifying that the body contains valid data and is not empty to prevent potential processing of invalid streams.

apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (1)

33-84: Consider handling backpressure for large data streams

In the ingestData method, processing large streams without backpressure control may lead to memory issues. Consider implementing flow control mechanisms to handle large or high-throughput streams efficiently.

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)

Line range hint 17-31: Unused parameter 'request' in 'streamResponse' method

The request parameter is accepted but not utilized within the streamResponse method. If it's unnecessary, consider removing it to improve code clarity.

apps/webapp/app/services/realtimeClient.server.ts (1)

Line range hint 110-125: Refactor to eliminate duplicated code in URL construction methods

The methods #constructRunsElectricUrl and #constructChunksElectricUrl share similar logic for constructing the electric URL, differing mainly in the table name. To improve maintainability and reduce code duplication, consider refactoring these methods to use a common helper function that accepts the table name as a parameter.

Apply this diff to refactor the code:

+  #constructElectricUrl(
+    url: URL | string,
+    whereClause: string,
+    tableName: string,
+    clientVersion?: string
+  ): URL {
+    const $url = new URL(url.toString());
+    const electricUrl = new URL(`${this.options.electricOrigin}/v1/shape`);
+
+    // Copy over all the url search params to the electric url
+    $url.searchParams.forEach((value, key) => {
+      electricUrl.searchParams.set(key, value);
+    });
+
+    electricUrl.searchParams.set("where", whereClause);
+    electricUrl.searchParams.set("table", tableName);
+
+    if (!clientVersion) {
+      // If the client version is not provided, we're using an older client
+      // This means the client will be sending shape_id instead of handle
+      electricUrl.searchParams.set("handle", electricUrl.searchParams.get("shape_id") ?? "");
+    }
+
+    return electricUrl;
+  }
-
-  #constructRunsElectricUrl(url: URL | string, whereClause: string, clientVersion?: string): URL {
-    // Existing code...
-  }
-
-  #constructChunksElectricUrl(url: URL | string, whereClause: string, clientVersion?: string): URL {
-    // Existing code...
-  }

Update the calls to use the new method:

-    const electricUrl = this.#constructRunsElectricUrl(url, whereClause, clientVersion);
+    const electricUrl = this.#constructElectricUrl(url, whereClause, 'public."TaskRun"', clientVersion);

-    const electricUrl = this.#constructChunksElectricUrl(url, whereClause, clientVersion);
+    const electricUrl = this.#constructElectricUrl(url, whereClause, 'public."RealtimeStreamChunk"', clientVersion);

This refactor centralizes the URL construction logic and reduces duplication.

Also applies to: 144-163

packages/core/src/v3/apiClient/runStream.ts (1)

223-223: Remove console.log statement or replace with proper logging

The console.log statement at line 223 can clutter output in production environments and may expose sensitive information. Consider removing it or replacing it with a standardized logging mechanism.

Apply this diff to remove the debug statement:

-    console.log("Creating ElectricStreamSubscription with URL:", url);

If needed for debugging, use a proper logging library that can be configured based on the environment.

packages/core/test/runStream.test.ts (1)

280-286: Ensure proper binding of 'this' when overriding methods

In the override of createSubscription, ensure that the context (this) is correctly bound to avoid unexpected behavior.

Apply this diff to use an arrow function, preserving the context:

-    streamFactory.createSubscription = (
+    streamFactory.createSubscription = (
       metadata: Record<string, unknown>,
       runId: string,
       streamKey: string
-    ) => {
+    ): StreamSubscription => {
        streamCreationCount++;
        return originalCreate(metadata, runId, streamKey);
      };

This ensures this remains bound to streamFactory.

internal-packages/database/prisma/migrations/20241208074324_add_created_at_index_to_realtime_stream_chunks/migration.sql (1)

2-2: Consider using CREATE INDEX CONCURRENTLY to avoid locking the table during index creation

When adding a new index to a large table in a production environment, using CREATE INDEX CONCURRENTLY can prevent locking writes during the index creation process.

Apply this diff to create the index without locking the table:

- CREATE INDEX "RealtimeStreamChunk_createdAt_idx" ON "RealtimeStreamChunk"("createdAt");
+ CREATE INDEX CONCURRENTLY "RealtimeStreamChunk_createdAt_idx" ON "RealtimeStreamChunk"("createdAt");

Note: Ensure your database environment supports concurrent index creation and adjust transaction settings accordingly.

references/nextjs-realtime/src/app/realtime/page.tsx (1)

7-11: Add loading state while token is being generated

Consider showing a loading indicator while the token is being generated to improve user experience.

+import { Suspense } from "react";
+
 export default async function RuntimeComparisonPage() {
   const accessToken = await auth.createTriggerPublicToken("openai-streaming");
 
   return (
     <main className="flex min-h-screen items-center justify-center p-4 bg-gray-900">
-      <RealtimeComparison accessToken={accessToken} />
+      <Suspense fallback={<div className="text-white">Initializing streaming...</div>}>
+        <RealtimeComparison accessToken={accessToken} />
+      </Suspense>
     </main>
   );
 }
apps/webapp/app/services/realtime/types.ts (2)

3-10: Add JSDoc documentation for StreamIngestor interface.

While the interface is well-structured, adding JSDoc documentation would help clarify:

  • Expected behavior for different stream types
  • Error handling expectations
  • Success/failure conditions

Add documentation like this:

 // Interface for stream ingestion
+/**
+ * Handles ingestion of streaming data.
+ * @throws {Error} When stream ingestion fails
+ * @returns {Promise<Response>} A response indicating ingestion success/failure
+ */
 export interface StreamIngestor {
   ingestData(
     stream: ReadableStream<Uint8Array>,
     runId: string,
     streamId: string
   ): Promise<Response>;
 }

12-21: Add JSDoc documentation for StreamResponder interface.

The interface would benefit from documentation explaining:

  • The purpose of each parameter
  • How AbortSignal is expected to be handled
  • Response format specifications

Add documentation like this:

 // Interface for stream response
+/**
+ * Handles streaming responses for authenticated environments.
+ * @param request The incoming request
+ * @param signal AbortSignal for cancellation handling
+ * @returns {Promise<Response>} A streaming response
+ */
 export interface StreamResponder {
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)

Line range hint 5-18: Consider adding Redis connection resilience.

The Redis configuration would benefit from additional resilience features:

  • Retry strategy for connection failures
  • Connection pool settings
  • Error event handling

Consider enhancing the configuration:

 function initializeRedisRealtimeStreams() {
   return new RedisRealtimeStreams({
     redis: {
       port: env.REDIS_PORT,
       host: env.REDIS_HOST,
       username: env.REDIS_USERNAME,
       password: env.REDIS_PASSWORD,
       enableAutoPipelining: true,
+      retryStrategy: (times) => Math.min(times * 50, 2000),
+      maxRetriesPerRequest: 3,
+      enableOfflineQueue: true,
       ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
       keyPrefix: "tr:realtime:streams:",
     },
   });
 }
references/nextjs-realtime/src/trigger/example.ts (2)

18-18: Consider using more granular progress updates.

The 2-second delay between progress updates might not provide enough feedback for monitoring the task's progress.

Consider adding intermediate progress updates:

-    await setTimeout(2000);
+    // Update progress every 500ms
+    for (let i = 1; i <= 4; i++) {
+      await setTimeout(500);
+      metadata.set("status", { type: "started", progress: 0.1 + (i * 0.1) });
+    }

Line range hint 26-29: Optimize large payload generation.

The current approach of generating a large payload could cause memory issues:

  • Creating a 1MB string in memory
  • Using Array.fill and join operations

Consider using a streaming approach or chunking:

-    // Generate a return payload that is more than 128KB
-    const bigPayload = new Array(100000).fill("a".repeat(10)).join("");
-
-    return { message: bigPayload };
+    // Stream the large payload in chunks
+    const chunkSize = 1024 * 16; // 16KB chunks
+    const chunks = Math.ceil(100000 * 10 / chunkSize);
+    return {
+      message: new ReadableStream({
+        start(controller) {
+          let sent = 0;
+          const interval = setInterval(() => {
+            if (sent >= chunks) {
+              clearInterval(interval);
+              controller.close();
+              return;
+            }
+            controller.enqueue("a".repeat(chunkSize));
+            sent++;
+          }, 100);
+        }
+      })
+    };
packages/core/src/v3/runMetadata/metadataStream.ts (1)

72-72: Consider header defaults

The headers fallback to an empty object, which is safe but might benefit from default headers for consistency.

-        headers: this.options.headers ?? {},
+        headers: {
+          'Content-Type': 'application/json',
+          ...this.options.headers ?? {}
+        },
references/nextjs-realtime/src/components/RealtimeComparison.tsx (2)

30-30: Fix typo in the prompt text

"San Fransico" is misspelled. It should be "San Francisco".

-                "Based on the temperature, will I need to wear extra clothes today in San Fransico? Please be detailed.",
+                "Based on the temperature, will I need to wear extra clothes today in San Francisco? Please be detailed.",

24-35: Consider environment-specific rendering of debug button

The debug button should only be visible in non-production environments.

+        {process.env.NODE_ENV !== 'production' && (
           <Button
             className="bg-gray-100 text-gray-900 hover:bg-gray-200 font-semibold text-xs"
             onClick={() => {
               trigger.submit({
                 model: "gpt-4o-mini",
                 prompt:
                   "Based on the temperature, will I need to wear extra clothes today in San Francisco? Please be detailed.",
               });
             }}
           >
             Debug LLM Streaming
           </Button>
+        )}
packages/cli-v3/src/entryPoints/deploy-run-worker.ts (1)

108-109: LGTM! Consider improving type safety.

The stream version configuration is correctly implemented with a fallback value. However, we could improve type safety by using a constant or type from a shared location.

Consider extracting the stream version type to a shared location:

+import { StreamVersion } from "@trigger.dev/core/v3";
 const runMetadataManager = new StandardMetadataManager(
   apiClientManager.clientOrThrow(),
   getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
-  (getEnvVar("TRIGGER_REALTIME_STREAM_VERSION") ?? "v1") as "v1" | "v2"
+  (getEnvVar("TRIGGER_REALTIME_STREAM_VERSION") ?? "v1") as StreamVersion
 );
packages/core/src/v3/schemas/api.ts (1)

731-742: Consider enhancing schema validation and documentation.

The schema structure is good, but could benefit from:

  1. Adding .min(0) validation for the sequence number to ensure it's non-negative
  2. Adding JSDoc comments to document the purpose of each field
 export const SubscribeRealtimeStreamChunkRawShape = z.object({
+  /** Unique identifier for the stream chunk */
   id: z.string(),
+  /** Reference to the associated run */
   runId: z.string(),
+  /** Sequence number for ordering chunks */
-  sequence: z.number(),
+  sequence: z.number().min(0),
+  /** Key for the stream chunk */
   key: z.string(),
+  /** Value/payload of the stream chunk */
   value: z.string(),
+  /** Timestamp when the chunk was created */
   createdAt: z.coerce.date(),
 });
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b411313 and 2d5ec15.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (33)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1 hunks)
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (3 hunks)
  • apps/webapp/app/routes/realtime.v2.streams.$runId.$streamId.ts (1 hunks)
  • apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (1 hunks)
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2 hunks)
  • apps/webapp/app/services/realtime/types.ts (1 hunks)
  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (2 hunks)
  • apps/webapp/app/services/realtime/v2StreamsGlobal.server.ts (1 hunks)
  • apps/webapp/app/services/realtimeClient.server.ts (5 hunks)
  • apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts (2 hunks)
  • apps/webapp/test/realtimeClient.test.ts (3 hunks)
  • apps/webapp/test/realtimeStreams.test.ts (0 hunks)
  • docker/Dockerfile.postgres (1 hunks)
  • docker/docker-compose.yml (2 hunks)
  • internal-packages/database/prisma/migrations/20241206135145_create_realtime_chunks_table/migration.sql (1 hunks)
  • internal-packages/database/prisma/migrations/20241208074324_add_created_at_index_to_realtime_stream_chunks/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (1 hunks)
  • internal-packages/testcontainers/src/index.ts (3 hunks)
  • packages/cli-v3/src/entryPoints/deploy-run-worker.ts (1 hunks)
  • packages/cli-v3/src/entryPoints/dev-run-worker.ts (1 hunks)
  • packages/core/package.json (1 hunks)
  • packages/core/src/v3/apiClient/index.ts (1 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (7 hunks)
  • packages/core/src/v3/apiClient/stream.ts (3 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (3 hunks)
  • packages/core/src/v3/runMetadata/metadataStream.ts (2 hunks)
  • packages/core/src/v3/schemas/api.ts (1 hunks)
  • packages/core/test/runStream.test.ts (10 hunks)
  • references/nextjs-realtime/src/app/realtime/page.tsx (1 hunks)
  • references/nextjs-realtime/src/components/RealtimeComparison.tsx (1 hunks)
  • references/nextjs-realtime/src/trigger/ai.ts (2 hunks)
  • references/nextjs-realtime/src/trigger/example.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • apps/webapp/test/realtimeStreams.test.ts
✅ Files skipped from review due to trivial changes (1)
  • apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts
🔇 Additional comments (33)
docker/Dockerfile.postgres (2)

1-1: LGTM! Using a stable PostgreSQL version.

The choice of postgres:14 as the base image is appropriate for production use.


3-5: Verify pg_partman extension installation.

The installation of postgresql-14-partman looks correct. Let's verify that the extension can be created and is available for use.

✅ Verification successful

The pg_partman extension installation is correctly configured.

The installation of postgresql-14-partman package in the Dockerfile is sufficient for enabling partitioning capabilities in PostgreSQL. While there are no explicit SQL files using the pg_partman extension in the codebase, this is not a concern as:

  1. The package installation ensures the extension is available in the PostgreSQL instance
  2. The extension can be created when needed using CREATE EXTENSION pg_partman;
  3. Partitioning can be implemented later when required without modifying the Dockerfile
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify if there are any existing SQL files that attempt to create or use the pg_partman extension
# This helps ensure the extension will be properly utilized

# Search for pg_partman extension creation
rg -l "CREATE EXTENSION.*pg_partman" 

# Search for potential partition management related SQL
rg -l "pg_partman" --type sql

Length of output: 68


Script:

#!/bin/bash
# Let's check for any SQL files or database migration files in the repository
fd -e sql -e psql

# Also check for any database initialization or setup scripts
fd -g "*db*" -g "*database*" -g "*schema*" -g "*migration*"

# Check for any mentions of partitioning in SQL or related files
rg -l "partition" --type sql
rg -l "partition" -g "*db*" -g "*database*" -g "*schema*" -g "*migration*"

Length of output: 62441

docker/docker-compose.yml (3)

16-18: LGTM! Proper custom build configuration.

The build configuration correctly references the new Dockerfile for the custom PostgreSQL image with pg_partman.


35-36: Verify pg_partman_bgw worker configuration.

The addition of pg_partman_bgw to shared_preload_libraries is correct. However, we should verify if additional configuration is needed for the background worker.

✅ Verification successful

The pg_partman_bgw configuration appears to be minimal and sufficient

The current configuration in docker-compose.yml only enables the background worker through shared_preload_libraries, which is the primary requirement for pg_partman's background worker functionality. Since we couldn't find any additional configuration files or SQL scripts related to pg_partman in the codebase, this suggests a minimal setup is being used. The background worker will operate with default settings, which is acceptable for most use cases.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any additional pg_partman configuration files or settings

# Search for pg_partman related configuration
rg -l "pg_partman.*config" 

# Search for any SQL files that might configure the background worker
rg -l "pg_partman_bgw" --type sql

Length of output: 62


Script:

#!/bin/bash
# Let's search more broadly for pg_partman related configurations and settings

# Search for any pg_partman mentions in SQL files
rg -l "pg_partman" --type sql

# Search for any postgresql.conf or similar config files
fd -g "*postgresql*.conf*"

# Search for any mentions of pg_partman in yaml/yml files
rg -l "pg_partman" --type yaml --type yml

# Look for database initialization or setup scripts
fd -g "init*.sql"
fd -g "*setup*.sql"

Length of output: 170


Line range hint 67-77: Consider upgrading Electric version and verify compatibility.

The Electric service is using version 0.9.4. Given this is a significant change to realtime streams, we should verify if this is the latest stable version and ensure compatibility with PostgreSQL 14 and pg_partman.

Consider documenting the following in the project's README or deployment guide:

  1. The reason for choosing Electric for realtime streams
  2. Any specific configuration requirements for Electric with pg_partman
  3. Deployment considerations for the background worker
apps/webapp/app/routes/realtime.v2.streams.$runId.$streamId.ts (2)

14-43: Action route implementation is correct

The action route is correctly defined and handles data ingestion appropriately.


47-87: Loader route implementation is correct

The loader route is properly set up to stream responses using v2RealtimeStreams.

apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (1)

16-31: streamResponse method implementation is correct

The streamResponse method correctly streams data using the realtimeClient.

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)

Line range hint 78-85: Ensure proper cleanup of Redis connections

The cleanup function is called in the finally block, but in case of an abrupt termination, ensure that the Redis connection is always properly closed to prevent resource leaks.

packages/core/src/v3/apiClient/stream.ts (2)

Line range hint 20-49: Validate the removal of 'async' from 'zodShapeStream'

The async keyword has been removed from the zodShapeStream function. Confirm that all asynchronous operations are correctly handled and that this change does not introduce any unintended side effects.


73-175: Implementation of ReadableShapeStream is well-structured

The ReadableShapeStream class effectively manages the streaming and parsing of shape messages, improving code maintainability and separation of concerns.

apps/webapp/app/services/realtimeClient.server.ts (1)

Line range hint 184-195: Verify whether concurrency limits should be applied to initial and non-live requests

In the #performElectricRequest method, the concurrency limit is only enforced for live requests with a shapeId. For initial value requests (line 184) and non-live requests (line 190), the concurrency limit is not applied. Consider whether concurrency control should also be enforced in these cases to prevent potential resource exhaustion from a high number of concurrent initial or non-live requests.

Would you like to verify if the current load could lead to resource exhaustion due to unmanaged concurrency in these scenarios?

internal-packages/database/prisma/migrations/20241208074324_add_created_at_index_to_realtime_stream_chunks/migration.sql (1)

5-5: Verify the impact of renaming the index on dependent code or scripts

Renaming the index from "RealtimeStreamChunk_runId" to "RealtimeStreamChunk_runId_idx" might affect any database queries, stored procedures, or scripts that reference the index by name. Verify that there are no dependencies on the original index name.

Would you like assistance in searching the codebase for references to the old index name?

internal-packages/database/prisma/migrations/20241206135145_create_realtime_chunks_table/migration.sql (1)

12-13: Verify index usage patterns

The index on runId alone might not be as effective as the composite index suggested above. Consider monitoring query patterns to ensure this single-column index is necessary alongside the composite index.

✅ Verification successful

Single-column index on runId is appropriate

Based on the schema analysis, the single-column index on runId is sufficient. The schema shows that RealtimeStreamChunk has a subsequent migration that adds a separate index on createdAt, which complements the runId index. This setup allows for efficient queries on either column independently, which is more versatile than a composite index in this case, especially since there's no evidence of frequent combined queries on both columns.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for queries using runId to verify index usage patterns
rg -A 5 "RealtimeStreamChunk.*where.*runId" 

Length of output: 45


Script:

#!/bin/bash
# Let's search for any Prisma queries related to RealtimeStreamChunk
rg -A 5 "RealtimeStreamChunk.*find"

# Also search for any direct SQL queries or schema definitions
rg -A 5 "RealtimeStreamChunk" --type sql

Length of output: 2248


Script:

#!/bin/bash
# Let's check the schema file to understand the complete model definition and any relations
fd schema.prisma --exec cat {}

# Also search for any ordering or filtering patterns in the codebase
rg -A 5 "orderBy.*createdAt|where.*runId.*createdAt"

Length of output: 66280

apps/webapp/app/services/realtime/v2StreamsGlobal.server.ts (1)

13-13: Verify singleton initialization across the codebase

Ensure that the singleton instance is properly managed across the application lifecycle and there are no memory leaks.

✅ Verification successful

Based on the search results, I can generate the final response as I have sufficient information about singleton usage and DatabaseRealtimeStreams implementation:

Singleton initialization is properly implemented and consistent

The singleton pattern is consistently used across the codebase for managing global instances, and the DatabaseRealtimeStreams implementation follows the same pattern as other services. The singleton is:

  • Properly scoped with a unique key "dbRealtimeStreams"
  • Follows the same initialization pattern as other server-side singletons
  • Used in a controlled server-side context
  • Has clean implementation with proper error handling and logging
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other singleton usages to verify consistency
rg "singleton\\(" --type ts

# Search for DatabaseRealtimeStreams usage
rg "DatabaseRealtimeStreams" --type ts

Length of output: 4730

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)

19-19: Verify singleton initialization across worker processes.

The singleton pattern needs careful consideration in a Node.js environment with multiple worker processes.

Let's verify the singleton usage:

✅ Verification successful

Let me analyze the singleton implementation by checking its source.


Singleton implementation is safe for multi-process environments

The current singleton implementation using globalThis is safe across worker processes because:

  1. Each Node.js process maintains its own isolated globalThis scope
  2. The implementation uses lazy initialization with null coalescing
  3. Redis-dependent services (like realtime streams) handle their own connection pooling

The pattern is consistently used across 25+ services in the codebase, all following the same initialization pattern, making it a reliable approach for managing global instances within each worker process.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other singleton usages and potential race conditions
rg -A 3 "singleton\\(" --type ts

Length of output: 8641


Script:

#!/bin/bash
# Find the singleton implementation
rg -l "function singleton" --type ts

# If found, show the implementation
ast-grep --pattern 'function singleton($_) {
  $$$
}'

Length of output: 129


Script:

#!/bin/bash
# Let's see the actual implementation of singleton
cat apps/webapp/app/utils/singleton.ts

Length of output: 279

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (3)

4-4: LGTM: Clean versioning transition

The import change from realtimeStreams to v1RealtimeStreams aligns with the versioning strategy for realtime streams.


19-19: LGTM: Action maintains backward compatibility

The action function's transition to v1RealtimeStreams preserves the existing parameter structure, ensuring backward compatibility.


53-60: Verify environment parameter usage

The addition of authentication.environment to streamResponse suggests environment-specific stream handling. This change could affect stream behavior based on environment context.

✅ Verification successful

Environment parameter is correctly used in stream handling

The environment parameter is properly integrated into both Redis and Database stream implementations. In the Redis implementation, it's available for potential environment-specific handling, while in the Database implementation, it's actively used in the realtimeClient.streamChunks call. This is a valid and intentional design pattern for environment-aware stream handling.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other environment-dependent stream handling
ast-grep --pattern 'streamResponse($$$, $_, environment, $$$)'

Length of output: 64


Script:

#!/bin/bash
# Let's try a broader search for streamResponse usage
rg "streamResponse" -A 3

# Also search for the implementation of streamResponse
ast-grep --pattern 'streamResponse = function($$$) { $$$ }' 
ast-grep --pattern 'streamResponse($$$) { $$$ }'

Length of output: 2672


Script:

#!/bin/bash
# Let's examine the implementation files more closely
cat apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
cat apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts
cat apps/webapp/app/services/realtime/types.ts

Length of output: 8075

packages/core/src/v3/runMetadata/metadataStream.ts (2)

67-69: LGTM: Proper version handling

The URL construction correctly handles versioning with a safe fallback to "v1".


6-8: Validate header usage security

While adding custom headers provides flexibility, ensure that sensitive headers are properly validated or filtered.

✅ Verification successful

Let me check the implementation of the MetadataStream class to see how these headers are used.


Let me try a different pattern to find the MetadataStream implementation.


Let me check how the headers are used in the actual request.


Let me check if there are any security validations or filters applied to these headers at a higher level in the API client.


Based on the code analysis, I can now provide the final response:

Headers are safely handled with appropriate controls

The implementation shows that headers are properly managed with several security controls in place:

  • Headers are processed through the API client which adds required authentication and version headers
  • Custom headers are filtered to remove undefined values
  • Headers are normalized and validated at multiple levels
  • The MetadataStream only passes headers through a controlled fetch request to an authenticated endpoint
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for header validation or security measures
rg -A 5 "headers.*Record<string, string>" 

Length of output: 8959


Script:

#!/bin/bash
# Check MetadataStream implementation for header usage
ast-grep --pattern 'class MetadataStream {
  $$$
}'

Length of output: 53


Script:

#!/bin/bash
# Search for the MetadataStream class and its usage of headers
rg -A 10 "class MetadataStream" packages/core/src/v3/runMetadata/metadataStream.ts

Length of output: 604


Script:

#!/bin/bash
# Search for fetch or request usage in metadataStream.ts
rg -A 10 "fetch\(" packages/core/src/v3/runMetadata/metadataStream.ts

Length of output: 426


Script:

#!/bin/bash
# Search for header validation in the API client
rg -A 10 "headers.*\{" packages/core/src/v3/apiClient/

Length of output: 8591

internal-packages/testcontainers/src/index.ts (3)

23-24: LGTM: Clear context type separation

The type definitions clearly separate concerns between Electric-only and Electric+Redis contexts, making test requirements explicit.


101-105: LGTM: Minimal Electric test context

The containerWithElectricTest correctly includes only the necessary dependencies for Electric-based tests.


Line range hint 107-114: Verify test coverage transition

The new containerWithElectricAndRedisTest suggests tests need both Electric and Redis. Ensure existing tests are updated accordingly.

✅ Verification successful

Let me gather more context about the test files to understand the transition better.


No test coverage transition needed - new test fixture is only used by new tests

The verification shows that containerWithElectricAndRedisTest is only used in new tests within realtimeClient.test.ts that specifically require Redis functionality. The original containerWithElectricTest fixture still exists and is not being replaced, indicating this is an addition rather than a transition that would require updating existing tests.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing test usage
rg "containerWithElectricTest|containerWithElectricAndRedisTest" --type ts

Length of output: 703


Script:

#!/bin/bash
# Check the content of the test file to understand the usage
rg -B5 -A5 "containerWithElectricAndRedisTest" apps/webapp/test/realtimeClient.test.ts

# Look for other test files that might be using containerWithElectricTest
rg -l "containerWithElectricTest" --type ts

Length of output: 1290

references/nextjs-realtime/src/trigger/ai.ts (1)

12-15: LGTM: STREAMS type update

The addition of openaiText to the STREAMS type is well-structured and aligns with the new streaming requirements.

packages/core/src/v3/runMetadata/manager.ts (1)

23-24: LGTM: Well-structured stream versioning implementation

The addition of streamsVersion parameter and its integration with metadata management is well-implemented. The code includes proper error handling and stream limits.

Also applies to: 235-235, 244-246

apps/webapp/test/realtimeClient.test.ts (2)

1-1: LGTM: Updated container setup to include Redis

The change to use containerWithElectricAndRedisTest aligns with the migration to electric-powered realtime streams that now require Redis.


6-6: Verify Redis integration in test environment

The test cases now use a container with both Electric and Redis. Let's verify the Redis configuration is properly initialized.

Also applies to: 142-142, 221-221

✅ Verification successful

Let me gather more information about the Redis configuration by checking the test container setup.


Let me check one more thing to verify the test container setup itself.


Let me try one last search with a different pattern to locate the test container setup.


Redis integration is properly configured in test environment

The Redis configuration is correctly initialized in the test environment. The verification shows:

  • Redis container is properly set up using @testcontainers/redis
  • Redis client is correctly instantiated with container connection details
  • RealtimeClient tests consistently use Redis with proper options
  • The test container setup includes proper cleanup and resource management
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check Redis configuration in test container setup
# Test: Search for Redis configuration in test container setup
rg -A 5 "containerWithElectricAndRedisTest" "apps/webapp/test"

# Test: Look for Redis options usage
ast-grep --pattern 'redis: redis.options'

Length of output: 1947


Script:

#!/bin/bash
# Check the implementation of containerWithElectricAndRedisTest
fd containerWithElectricAndRedisTest --exec cat {}

# Look for Redis initialization in test files
rg -l "redis" "apps/webapp/test"

# Check Redis client initialization
ast-grep --pattern 'new RealtimeClient({
  $$$,
  redis: $_,
  $$$
})'

Length of output: 4553


Script:

#!/bin/bash
# Check the testcontainers implementation
fd -e ts testcontainers --exec cat {}

# Look for Redis container setup
rg -A 10 "startRedis" 

Length of output: 63


Script:

#!/bin/bash
# Search for Redis container setup in internal packages
fd -g "**/internal/**/testcontainers*" --exec cat {}

# Alternative search for Redis container configuration
rg -A 5 "Redis.*Container" 

Length of output: 4904

apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1)

218-220: LGTM: Added stream version filtering in metadata

The addition of "$$streamsVersion" to filtered keys properly handles the new versioning system while maintaining clean metadata output.

packages/cli-v3/src/entryPoints/dev-run-worker.ts (1)

90-91: LGTM: Added stream version configuration with backward compatibility

The StandardMetadataManager now properly handles stream versioning while maintaining backward compatibility with v1 as the default.

✅ Verification successful

Let me gather more information about the stream version handling to ensure it's consistently implemented.


Let me try one more search with corrected commands:


Based on the comprehensive search results, I can now generate the final response:

LGTM: Stream version configuration is properly implemented

The stream version configuration is correctly implemented with proper type safety and consistent usage across the codebase:

  • Default version "v1" is maintained for backward compatibility
  • Version is properly typed as "v1" | "v2" throughout the codebase
  • Environment variable handling is consistent in both dev and deploy workers
  • Version configuration is properly integrated with StandardMetadataManager and stream subscription factories
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify stream version configuration
# Test: Check for consistent version handling across files
rg -A 2 "TRIGGER_REALTIME_STREAM_VERSION"

# Test: Look for version type definitions
ast-grep --pattern 'type $_ = "v1" | "v2"'

Length of output: 1330


Script:

#!/bin/bash
# Check for any stream version related types or interfaces
ast-grep --pattern 'type $name = { $$$version$$$: "v1" | "v2"$$$}'

# Look for version-related constants or enums
rg -A 2 '"v1"|"v2"' --type typescript

# Check StandardMetadataManager implementation
ast-grep --pattern 'class StandardMetadataManager {
  $$$
}'

Length of output: 206


Script:

#!/bin/bash
# Look for version-related constants or enums
rg -A 2 '"v1"|"v2"'

# Check StandardMetadataManager usage and implementation
rg -A 5 "StandardMetadataManager"

# Look for any stream version related configurations
rg -A 2 "stream.*version|version.*stream" -i

Length of output: 54444

apps/webapp/app/env.server.ts (1)

246-247: LGTM! Verify the stream version configuration across environments.

The addition of REALTIME_STREAM_VERSION with enum validation and a default value is well-implemented. This ensures type safety and backward compatibility.

Let's verify the stream version configuration:

✅ Verification successful

LGTM! The stream version configuration is properly synchronized across environments.

The verification confirms that:

  • The REALTIME_STREAM_VERSION is properly defined in env.server.ts with enum validation
  • It's correctly propagated to workers through TRIGGER_REALTIME_STREAM_VERSION in both development and production environments via environmentVariablesRepository.server.ts
  • The workers (deploy-run-worker.ts and dev-run-worker.ts) handle the version configuration consistently with the same default value
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for the usage of REALTIME_STREAM_VERSION across the codebase
# to ensure consistent version handling.

# Test: Search for stream version configuration in route handlers
rg -A 5 'realtime\.v[12]\.streams'

# Test: Search for version-specific implementations
rg -A 5 'REALTIME_STREAM_VERSION|TRIGGER_REALTIME_STREAM_VERSION'

Length of output: 3151

packages/core/package.json (1)

185-185: Verify the changelog for @electric-sql/client upgrade.

The upgrade from 0.7.1 to 0.9.0 aligns with the PR objective. However, we should verify the changes introduced in this upgrade.

Let's check the changelog and any breaking changes:

packages/core/src/v3/apiClient/index.ts (1)

142-144: Review security implications of exposing headers.

The new getHeaders() method exposes internal headers. While this might be necessary for the real-time streaming functionality, consider:

  1. Adding JSDoc to document the intended usage
  2. Adding a warning about security implications
  3. Potentially limiting the exposed headers to only those necessary

Copy link

pkg-pr-new bot commented Dec 9, 2024

@trigger.dev/build

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/build@1541

@trigger.dev/core

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/core@1541

@trigger.dev/react-hooks

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/react-hooks@1541

trigger.dev

npm i https://pkg.pr.new/triggerdotdev/trigger.dev@1541

@trigger.dev/rsc

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/rsc@1541

@trigger.dev/sdk

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/sdk@1541

commit: be97bde

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Outside diff range and nitpick comments (10)
.changeset/lemon-cherries-greet.md (1)

6-6: Enhance the changeset description with more details.

While the current description captures the main changes, it would be helpful to provide more context about:

  • What "electric" is and its benefits
  • The specific conditions that triggered the re-rendering issue
  • Any migration steps or breaking changes that users should be aware of

Consider expanding the description like this:

-Realtime streams now powered by electric. Also, this change fixes a realtime bug that was causing too many re-renders, even on records that didn't change
+Realtime streams are now powered by electric, a [brief description of what electric is and its benefits]. This change also fixes a performance issue where components would unnecessarily re-render when the underlying record data remained unchanged.
+
+## Changes
+- Migrated realtime streams to use electric
+- Fixed excessive re-renders in realtime components
+
+## Migration
+[Add any necessary migration steps or breaking changes]
references/nextjs-realtime/src/components/RealtimeComparison.tsx (2)

10-10: Add validation for NEXT_PUBLIC_TRIGGER_API_URL

The environment variable process.env.NEXT_PUBLIC_TRIGGER_API_URL might be undefined. Consider adding a default value or throwing an error to handle the case when it's not set.

Apply this diff to handle the potential undefined value:

baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL,
+        ?? (() => { throw new Error("NEXT_PUBLIC_TRIGGER_API_URL is not defined"); })(),

19-21: Remove unnecessary onComplete callback if not used

The onComplete callback is defined but doesn't perform any meaningful actions besides logging to the console. If it's not needed, consider removing it to simplify the code.

Apply this diff to remove the onComplete callback:

onComplete: (...args) => {
-          console.log("Run completed!", args);
-        },
packages/react-hooks/src/hooks/useRealtime.ts (3)

15-23: Improve documentation for onComplete callback

The documentation for the onComplete callback could be more precise. Specify that the callback is invoked when the run completes successfully, an error occurs, or the subscription is stopped, and clarify the parameters.

Apply this diff to enhance the documentation:

/**
-   * Callback this is called when the run completes, an error occurs, or the subscription is stopped.
+   * Callback invoked when the run completes, an error occurs, or the subscription is stopped.
    *
-   * @param {RealtimeRun<TTask>} run - The run object
-   * @param {Error} [err] - The error that occurred
+   * @param {RealtimeRun<TTask>} run - The current run state
+   * @param {Error | undefined} err - The error that occurred, if any
    */

66-71: Avoid using SWR for simple state management

Using SWR for managing the isComplete state might be overkill since it's a simple boolean flag that doesn't benefit from revalidation or caching. Consider using useState instead for simplicity.

Apply this diff to replace SWR with useState:

-const { data: isComplete = false, mutate: setIsComplete } = useSWR<boolean>(
-  [idKey, "complete"],
-  null
-);
+const [isComplete, setIsComplete] = useState(false);

205-209: Consistent state management for isComplete

Similar to the previous comment, avoid using SWR for managing the isComplete state in useRealtimeRunWithStreams. Use useState for consistency and simplicity.

Apply this diff to replace SWR with useState:

-const { data: isComplete = false, mutate: setIsComplete } = useSWR<boolean>(
-  [idKey, "complete"],
-  null
-);
+const [isComplete, setIsComplete] = useState(false);
packages/core/src/v3/apiClient/stream.ts (1)

103-205: Optimize state management in ReadableShapeStream class

The ReadableShapeStream class maintains a #currentState map and processes messages accordingly. Review the logic to ensure that state updates are efficient and thread-safe, especially in concurrent environments.

Consider potential race conditions or memory leaks if the stream processes a large number of messages. Implement proper synchronization or use more efficient data structures if necessary.

docs/frontend/react-hooks/realtime.mdx (1)

65-88: Enhance documentation for the onComplete callback.

The documentation for the new onComplete callback is clear, but could be improved by:

  1. Adding type information for the callback parameters
  2. Including an example of error handling in the callback

Consider updating the example to:

 const { run, error } = useRealtimeRun(runId, {
   accessToken: publicAccessToken,
-  onComplete: (run, error) => {
-    console.log("Run completed", run);
+  onComplete: (run: Run, error: Error | null) => {
+    if (error) {
+      console.error("Run failed:", error);
+      return;
+    }
+    console.log("Run completed successfully:", run);
   },
 });
packages/core/src/v3/apiClient/runStream.ts (2)

90-124: Consider adding error handling for invalid API URLs.

The code uses environment variables for API URLs but doesn't validate them beyond the fallback to "https://api.trigger.dev".

Consider adding URL validation:

+function isValidUrl(url: string): boolean {
+  try {
+    new URL(url);
+    return true;
+  } catch {
+    return false;
+  }
+}

 const version1 = new SSEStreamSubscriptionFactory(
-  getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
+  (() => {
+    const url = getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev";
+    if (!isValidUrl(url)) {
+      throw new Error(`Invalid API URL: ${url}`);
+    }
+    return url;
+  })(),
   {
     headers: options?.headers,
     signal: abortController.signal,
   }
 );

250-279: Add logging for version selection in VersionedStreamSubscriptionFactory.

The version selection logic would benefit from logging to help with debugging.

Consider adding debug logs:

 const version =
   typeof metadata.$$streamsVersion === "string" ? metadata.$$streamsVersion : "v1";

+console.debug(`Selected stream version ${version} for run ${runId}`);
+
 if (version === "v1") {
   return this.version1.createSubscription(metadata, runId, streamKey, baseUrl);
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 2d5ec15 and 8525be2.

📒 Files selected for processing (7)
  • .changeset/lemon-cherries-greet.md (1 hunks)
  • docs/frontend/react-hooks/realtime.mdx (1 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (7 hunks)
  • packages/core/src/v3/apiClient/stream.ts (3 hunks)
  • packages/react-hooks/src/hooks/useRealtime.ts (7 hunks)
  • references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx (1 hunks)
  • references/nextjs-realtime/src/components/RealtimeComparison.tsx (1 hunks)
🔇 Additional comments (7)
references/nextjs-realtime/src/components/RealtimeComparison.tsx (2)

32-34: Replace the placeholder model name with a valid OpenAI model

The model name "gpt-4o-mini" appears to be a placeholder or incorrect. Replace it with a valid OpenAI model name like "gpt-3.5-turbo" or "gpt-4" if available.

Apply this diff to use a valid model name:

model: "gpt-4o-mini",
+                  // Replace with a valid model name
+                  model: "gpt-3.5-turbo",

62-72: Add loading and error states to enhance user experience

The component lacks handling for loading and error states during the streaming process. Implementing these states can improve the user experience by providing feedback during data retrieval and handling potential errors.

Consider adding state management for isLoading and error, and update the UI accordingly.

packages/react-hooks/src/hooks/useRealtime.ts (1)

264-270: ⚠️ Potential issue

Add dependency to the effect hook

Ensure all relevant dependencies are included in the dependency array for the effect hook that handles the onComplete callback in useRealtimeRunWithStreams.

Apply this diff to include the missing dependency:

}, [isComplete, run, error, options?.onComplete]);
+  }, [isComplete, run, error, options?.onComplete, options?.enabled]);

Likely invalid or redundant comment.

references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx (1)

30-32: Ensure proper cleanup of timeout in useEffect

The useEffect hook sets a timeout to update the accessToken after 2 seconds. If the component unmounts before the timeout executes, the timeout might not be cleared properly, leading to state updates on an unmounted component. Verify that clearTimeout is correctly called.

Run the following script to check for proper cleanup:

packages/core/src/v3/apiClient/stream.ts (1)

73-101: ⚠️ Potential issue

Ensure proper handling of abort signals in createAsyncIterableReadable

The createAsyncIterableReadable function adds an abort event listener but does not handle the case where the signal is already aborted before the listener is added. This could lead to the stream not being closed when the signal is already aborted. Consider checking signal.aborted and handling it immediately.

Apply this diff to handle already aborted signals:

signal.addEventListener("abort", () => {
+  if (signal.aborted) {
+    reader.cancel();
+    controller.close();
+    return;
+  }
  queueMicrotask(() => {
    reader.cancel();
    controller.close();
  });
});

Likely invalid or redundant comment.

packages/core/src/v3/apiClient/runStream.ts (2)

Line range hint 3-23: LGTM! Clean import organization.

The imports are well-organized and properly grouped by functionality.


302-324: 🛠️ Refactor suggestion

Consider adding memory leak prevention for the packet cache.

The packetCache grows unbounded as new packets are processed.

Consider implementing a cache size limit:

 private packetCache = new Map<string, any>();
+private readonly MAX_CACHE_SIZE = 1000;
+
+private trimCache() {
+  if (this.packetCache.size > this.MAX_CACHE_SIZE) {
+    const entriesToRemove = this.packetCache.size - this.MAX_CACHE_SIZE;
+    const keys = Array.from(this.packetCache.keys()).slice(0, entriesToRemove);
+    keys.forEach(key => this.packetCache.delete(key));
+  }
+}

 private async transformRunShape(row: SubscribeRunRawShape): Promise<RunShape<TRunTypes>> {
   // ... existing code ...
   const result = await conditionallyImportAndParsePacket(packet, this.options.client);
   this.packetCache.set(`${row.friendlyId}/${key}`, result);
+  this.trimCache();
   // ... rest of the code ...
 }

Likely invalid or redundant comment.

Comment on lines +2 to +3
"@trigger.dev/react-hooks": patch
"@trigger.dev/sdk": patch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the evidence gathered, I can now provide a final response:

A minor version bump is warranted for these changes

The changes introduce significant new functionality and behavioral improvements:

  1. A new streaming backend powered by "electric"
  2. Fixes to the realtime functionality affecting render behavior
  3. Multiple files affected across the codebase (react-hooks, core, webapp) related to realtime/streaming functionality

These changes represent new features and significant improvements rather than just bug fixes, which aligns with semantic versioning principles for minor version bumps.

🔗 Analysis chain

Consider using minor version bumps instead of patches.

Given that this change introduces a new streaming backend ("electric") and fixes behavioral issues, it might be more appropriate to use minor version bumps instead of patches. Minor versions typically indicate new features or significant changes while maintaining backward compatibility.

Let's check the recent version history of these packages:


Let me gather more information about the changes to better assess the version bump type.


Let me search for changes related to the streaming backend and realtime functionality to better assess the impact.


Let me try a different search approach without specifying file types.


Let me try one final search with simpler commands.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check recent version history and commit messages for these packages

# Look for version-related changes in package.json files
rg -A 1 '"version":' packages/react-hooks/package.json packages/sdk/package.json

# Look for recent changesets
fd -e md . .changeset/ -x cat {}

Length of output: 1215


Script:

#!/bin/bash
# Look for changes related to the streaming backend and realtime features
rg -B2 -A2 "electric" --type ts --type tsx
rg -B2 -A2 "realtime" --type ts --type tsx

# Look for package.json files to check current versions
fd package.json -x cat {}

Length of output: 62603


Script:

#!/bin/bash
# Look for changes in source files related to streaming and realtime features
rg -B2 -A2 "electric|realtime|stream" --type typescript --type javascript

# Look for recent commits related to these changes
git log -n 5 --grep="electric\|realtime\|stream"

Length of output: 110


Script:

#!/bin/bash
# Search for electric and realtime related changes
rg -B2 -A2 "electric|realtime|stream" --type ts --type tsx --type js --type jsx

# Look for any recent changesets
fd -e md . .changeset/ -x grep -l "electric\|realtime\|stream" {} \; -exec cat {} \;

Length of output: 384


Script:

#!/bin/bash
# Look for the content of the changeset file
cat .changeset/lemon-cherries-greet.md

# Simple grep for electric/realtime in ts files
find . -type f -name "*.ts" -o -name "*.tsx" | xargs grep -l "electric\|realtime"

Length of output: 1951

Comment on lines +209 to +248
// Real implementation for production
export class ElectricStreamSubscription implements StreamSubscription {
constructor(
private url: string,
private options: { headers?: Record<string, string>; signal?: AbortSignal }
) {}

async subscribe(): Promise<ReadableStream<unknown>> {
return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options).pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(safeParseJSON(chunk.value));
},
})
);
}
}

export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFactory {
constructor(
private baseUrl: string,
private options: { headers?: Record<string, string>; signal?: AbortSignal }
) {}

createSubscription(
metadata: Record<string, unknown>,
runId: string,
streamKey: string,
baseUrl?: string
): StreamSubscription {
if (!runId || !streamKey) {
throw new Error("runId and streamKey are required");
}

return new ElectricStreamSubscription(
`${baseUrl ?? this.baseUrl}/realtime/v2/streams/${runId}/${streamKey}`,
this.options
);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding retry logic for stream connections.

The ElectricStreamSubscription implementation doesn't handle temporary network failures or connection drops.

Consider implementing a retry mechanism:

 async subscribe(): Promise<ReadableStream<unknown>> {
+  const maxRetries = 3;
+  const backoff = (attempt: number) => Math.min(1000 * Math.pow(2, attempt), 10000);
+  
+  for (let attempt = 0; attempt < maxRetries; attempt++) {
+    try {
       return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options).pipeThrough(
         new TransformStream({
           transform(chunk, controller) {
             controller.enqueue(safeParseJSON(chunk.value));
           },
         })
       );
+    } catch (error) {
+      if (attempt === maxRetries - 1) throw error;
+      await new Promise(resolve => setTimeout(resolve, backoff(attempt)));
+    }
+  }
+  throw new Error("Failed to establish stream connection after maximum retries");
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Real implementation for production
export class ElectricStreamSubscription implements StreamSubscription {
constructor(
private url: string,
private options: { headers?: Record<string, string>; signal?: AbortSignal }
) {}
async subscribe(): Promise<ReadableStream<unknown>> {
return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options).pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(safeParseJSON(chunk.value));
},
})
);
}
}
export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFactory {
constructor(
private baseUrl: string,
private options: { headers?: Record<string, string>; signal?: AbortSignal }
) {}
createSubscription(
metadata: Record<string, unknown>,
runId: string,
streamKey: string,
baseUrl?: string
): StreamSubscription {
if (!runId || !streamKey) {
throw new Error("runId and streamKey are required");
}
return new ElectricStreamSubscription(
`${baseUrl ?? this.baseUrl}/realtime/v2/streams/${runId}/${streamKey}`,
this.options
);
}
}
// Real implementation for production
export class ElectricStreamSubscription implements StreamSubscription {
constructor(
private url: string,
private options: { headers?: Record<string, string>; signal?: AbortSignal }
) {}
async subscribe(): Promise<ReadableStream<unknown>> {
const maxRetries = 3;
const backoff = (attempt: number) => Math.min(1000 * Math.pow(2, attempt), 10000);
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options).pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(safeParseJSON(chunk.value));
},
})
);
} catch (error) {
if (attempt === maxRetries - 1) throw error;
await new Promise(resolve => setTimeout(resolve, backoff(attempt)));
}
}
throw new Error("Failed to establish stream connection after maximum retries");
}
}
export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFactory {
constructor(
private baseUrl: string,
private options: { headers?: Record<string, string>; signal?: AbortSignal }
) {}
createSubscription(
metadata: Record<string, unknown>,
runId: string,
streamKey: string,
baseUrl?: string
): StreamSubscription {
if (!runId || !streamKey) {
throw new Error("runId and streamKey are required");
}
return new ElectricStreamSubscription(
`${baseUrl ?? this.baseUrl}/realtime/v2/streams/${runId}/${streamKey}`,
this.options
);
}
}

…e duplicate issues by converting the electric Shape materialized view into a ReadableStream of changes
@ericallam ericallam force-pushed the realtime-stream-updates-and-fix branch from 8525be2 to 88c1b38 Compare December 9, 2024 21:14
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Outside diff range and nitpick comments (9)
docs/frontend/react-hooks/realtime.mdx (2)

77-82: Enhance the example with practical use cases

The current example with console.log could be more illustrative of real-world usage.

Consider enhancing the example to show more practical scenarios:

   const { run, error } = useRealtimeRun(runId, {
     accessToken: publicAccessToken,
     onComplete: (run, error) => {
-      console.log("Run completed", run);
+      if (error) {
+        toast.error(`Run failed: ${error.message}`);
+        return;
+      }
+      
+      if (run.status === "COMPLETED") {
+        toast.success("Process completed successfully!");
+        router.push(`/results/${run.id}`);
+      }
     },
   });

65-88: Document the relationship with REALTIME_STREAM_VERSION

The PR objectives mention that this feature is related to the new Electric-powered streams implementation, controlled by the REALTIME_STREAM_VERSION environment variable.

Consider adding a note about the environment variable configuration:

> Note: The behavior of realtime hooks, including the `onComplete` callback, can be affected by the `REALTIME_STREAM_VERSION` environment variable. This variable determines whether the hooks use the legacy Redis-based implementation (v1) or the new Electric-powered implementation (v2).
packages/core/src/v3/runMetadata/manager.ts (1)

235-235: Consider cleanup of version metadata

The $$streamsVersion metadata is set but never cleaned up when all streams are done. Consider adding cleanup in the waitForAllStreams method when all streams complete.

Here's a suggested implementation:

  public async waitForAllStreams(timeout: number = 60_000): Promise<void> {
    if (this.activeStreams.size === 0) {
      return;
    }

    const promises = Array.from(this.activeStreams.values()).map((stream) => stream.wait());

    try {
      await Promise.race([
        Promise.allSettled(promises),
        new Promise<void>((resolve, _) => setTimeout(() => resolve(), timeout)),
      ]);
+     // Clean up streams metadata when all streams are done
+     if (this.activeStreams.size === 0) {
+       delete this.store?.$$streamsVersion;
+       await this.flush();
+     }
    } catch (error) {
      console.error("Error waiting for streams to finish:", error);
packages/core/src/v3/apiClient/stream.ts (2)

Line range hint 26-29: Avoid hardcoding the Electric version number

The Electric version is hardcoded in the headers. This could lead to maintenance issues and version mismatches.

Consider moving this to a configuration constant or environment variable:

    headers: {
      ...options?.headers,
-     "x-trigger-electric-version": "0.8.1",
+     "x-trigger-electric-version": ELECTRIC_VERSION,
    },

103-106: Consider memory management for #currentState

The class maintains all rows in memory without any cleanup mechanism.

Consider implementing:

  1. A maximum size limit for the state map
  2. A cleanup strategy for old/unused entries
  3. An LRU cache implementation for better memory management
packages/core/src/v3/apiClient/runStream.ts (3)

90-125: Consider using constants for version strings and adding documentation.

The version handling implementation could be improved by:

  1. Using constants for version strings to avoid magic strings
  2. Adding documentation about the version selection logic
+// Version constants for stream implementations
+const STREAM_VERSION = {
+  V1: "v1",
+  V2: "v2",
+} as const;
+
+/**
+ * Creates a versioned stream subscription based on the environment configuration.
+ * Version 1: Uses Server-Sent Events (SSE)
+ * Version 2: Uses Electric streaming
+ */
 const version1 = new SSEStreamSubscriptionFactory(
   getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
   {
     headers: options?.headers,
     signal: abortController.signal,
   }
 );

138-143: Add JSDoc documentation for interface parameters.

The StreamSubscriptionFactory interface would benefit from parameter documentation to improve maintainability.

 export interface StreamSubscriptionFactory {
+  /**
+   * Creates a new stream subscription.
+   * @param metadata - Metadata for stream configuration and versioning
+   * @param runId - Unique identifier for the run
+   * @param streamKey - Key identifying the specific stream
+   * @param baseUrl - Optional base URL for the API endpoint
+   * @returns A stream subscription instance
+   */
   createSubscription(
     metadata: Record<string, unknown>,
     runId: string,
     streamKey: string,
     baseUrl?: string
   ): StreamSubscription;
 }

250-279: Enhance type safety for stream versions.

Consider using a union type for stream versions to catch invalid versions at compile time.

+type StreamVersion = "v1" | "v2";
+
 export class VersionedStreamSubscriptionFactory implements StreamSubscriptionFactory {
   constructor(
     private version1: StreamSubscriptionFactory,
     private version2: StreamSubscriptionFactory
   ) {}

   createSubscription(
     metadata: Record<string, unknown>,
     runId: string,
     streamKey: string,
     baseUrl?: string
   ): StreamSubscription {
     if (!runId || !streamKey) {
       throw new Error("runId and streamKey are required");
     }

-    const version =
-      typeof metadata.$$streamsVersion === "string" ? metadata.$$streamsVersion : "v1";
+    const version = (typeof metadata.$$streamsVersion === "string" ? 
+      metadata.$$streamsVersion : "v1") as StreamVersion;

     if (version === "v1") {
       return this.version1.createSubscription(metadata, runId, streamKey, baseUrl);
     }

     if (version === "v2") {
       return this.version2.createSubscription(metadata, runId, streamKey, baseUrl);
     }

     throw new Error(`Unknown stream version: ${version}`);
   }
 }
internal-packages/database/prisma/schema.prisma (1)

2674-2675: Add string length constraints for key and value fields.

Consider adding maximum length constraints to prevent storing excessively large strings:

-  key   String
-  value String
+  key   String @db.VarChar(255)
+  value String @db.Text
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 8525be2 and 88c1b38.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (37)
  • .changeset/lemon-cherries-greet.md (1 hunks)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1 hunks)
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (3 hunks)
  • apps/webapp/app/routes/realtime.v2.streams.$runId.$streamId.ts (1 hunks)
  • apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (1 hunks)
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2 hunks)
  • apps/webapp/app/services/realtime/types.ts (1 hunks)
  • apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (2 hunks)
  • apps/webapp/app/services/realtime/v2StreamsGlobal.server.ts (1 hunks)
  • apps/webapp/app/services/realtimeClient.server.ts (5 hunks)
  • apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts (2 hunks)
  • apps/webapp/test/realtimeClient.test.ts (3 hunks)
  • apps/webapp/test/realtimeStreams.test.ts (0 hunks)
  • docker/Dockerfile.postgres (1 hunks)
  • docker/docker-compose.yml (2 hunks)
  • docs/frontend/react-hooks/realtime.mdx (1 hunks)
  • internal-packages/database/prisma/migrations/20241206135145_create_realtime_chunks_table/migration.sql (1 hunks)
  • internal-packages/database/prisma/migrations/20241208074324_add_created_at_index_to_realtime_stream_chunks/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (1 hunks)
  • internal-packages/testcontainers/src/index.ts (3 hunks)
  • packages/cli-v3/src/entryPoints/deploy-run-worker.ts (1 hunks)
  • packages/cli-v3/src/entryPoints/dev-run-worker.ts (1 hunks)
  • packages/core/package.json (1 hunks)
  • packages/core/src/v3/apiClient/index.ts (1 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (7 hunks)
  • packages/core/src/v3/apiClient/stream.ts (3 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (3 hunks)
  • packages/core/src/v3/runMetadata/metadataStream.ts (2 hunks)
  • packages/core/src/v3/schemas/api.ts (1 hunks)
  • packages/core/test/runStream.test.ts (10 hunks)
  • packages/react-hooks/src/hooks/useRealtime.ts (7 hunks)
  • references/nextjs-realtime/src/app/realtime/page.tsx (1 hunks)
  • references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx (1 hunks)
  • references/nextjs-realtime/src/components/RealtimeComparison.tsx (1 hunks)
  • references/nextjs-realtime/src/trigger/ai.ts (2 hunks)
  • references/nextjs-realtime/src/trigger/example.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • apps/webapp/test/realtimeStreams.test.ts
🚧 Files skipped from review as they are similar to previous changes (22)
  • .changeset/lemon-cherries-greet.md
  • apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts
  • references/nextjs-realtime/src/trigger/example.ts
  • references/nextjs-realtime/src/app/realtime/page.tsx
  • apps/webapp/app/services/realtime/v2StreamsGlobal.server.ts
  • apps/webapp/app/env.server.ts
  • internal-packages/database/prisma/migrations/20241208074324_add_created_at_index_to_realtime_stream_chunks/migration.sql
  • packages/core/src/v3/schemas/api.ts
  • docker/Dockerfile.postgres
  • internal-packages/database/prisma/migrations/20241206135145_create_realtime_chunks_table/migration.sql
  • references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx
  • apps/webapp/test/realtimeClient.test.ts
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts
  • docker/docker-compose.yml
  • packages/core/src/v3/runMetadata/metadataStream.ts
  • internal-packages/testcontainers/src/index.ts
  • packages/cli-v3/src/entryPoints/deploy-run-worker.ts
  • packages/core/src/v3/apiClient/index.ts
  • references/nextjs-realtime/src/trigger/ai.ts
  • packages/core/package.json
  • apps/webapp/app/services/realtime/types.ts
  • packages/cli-v3/src/entryPoints/dev-run-worker.ts
👮 Files not reviewed due to content moderation or server errors (3)
  • references/nextjs-realtime/src/components/RealtimeComparison.tsx
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • apps/webapp/app/services/realtimeClient.server.ts
🔇 Additional comments (9)
packages/core/src/v3/runMetadata/manager.ts (2)

23-24: LGTM: Well-structured version parameter addition

The addition of the streamsVersion parameter with a default value of "v1" ensures backward compatibility while enabling the transition to the new Electric-powered streams.


235-235: LGTM: Stream versioning implementation

The changes properly implement stream versioning by:

  1. Recording the version in metadata
  2. Passing necessary headers for authentication
  3. Configuring the stream instance with the correct version

Let's verify the version handling across the codebase:

Also applies to: 244-244, 246-246

✅ Verification successful

Let me gather more information about the MetadataStream implementation and version handling.


Based on the gathered information, I can now provide the final response:

Stream versioning implementation is correctly handled

The implementation is well-designed and doesn't require changes because:

  • The version handling is consistent across the codebase, with proper fallback to "v1"
  • The $$streamsVersion metadata is appropriately filtered out in the SpanPresenter
  • The metadata is scoped to the stream instance and doesn't require explicit cleanup
  • The versioning is properly propagated through the MetadataStream constructor
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for consistent version handling across the codebase
echo "Checking version handling in stream consumers..."
rg -l "useRealtimeRun|useRealtimeRunWithStreams" 

echo "Checking MetadataStream version handling..."
ast-grep --pattern 'class MetadataStream {
  $$$
  constructor($params) {
    $$$
    version: $_
    $$$
  }
  $$$
}'

Length of output: 868


Script:

#!/bin/bash
# Check MetadataStream implementation and version handling
echo "Searching for MetadataStream class definition..."
rg "class MetadataStream" -A 10

echo "Checking version usage in stream-related files..."
rg "streamsVersion" -A 2

echo "Checking cleanup patterns in manager..."
rg "cleanup|dispose|clear" packages/core/src/v3/runMetadata/manager.ts

Length of output: 2657

packages/core/src/v3/apiClient/stream.ts (1)

39-47: Improve error handling in schema validation

The current error handling immediately terminates the stream on validation failure.

The previous review comment about handling parsing errors gracefully is still applicable here. Consider implementing a more resilient error handling strategy that allows the stream to continue processing valid messages.

apps/webapp/app/routes/realtime.v2.streams.$runId.$streamId.ts (1)

19-21: Ensure proper handling of empty request bodies

While checking for an empty request.body is good, it's important to consider that request.body could be a ReadableStream even if it doesn't contain data. Consider validating the content of the body to ensure it contains the expected data.

packages/core/src/v3/apiClient/runStream.ts (2)

209-225: Consider adding retry logic for stream connections.

The ElectricStreamSubscription implementation doesn't handle temporary network failures or connection drops.


348-409: Improve error handling in stream pipeline.

The error handling in the stream pipeline only logs errors without proper error propagation.

packages/core/test/runStream.test.ts (1)

44-81: Well-structured test utilities with clear separation of concerns.

The implementation of createTestShapeStream and createDelayedTestShapeStream provides good test coverage for both immediate and delayed streaming scenarios.

packages/react-hooks/src/hooks/useRealtime.ts (1)

264-270: Add dependency to the effect hook.

The effect hook that handles the onComplete callback is missing options?.enabled in its dependency array.

internal-packages/database/prisma/schema.prisma (1)

2671-2685: Consider enhancing database model constraints and indexes.

The model structure is good, but consider these improvements for better data integrity and query performance:

  1. Add a unique constraint on (runId, sequence) to ensure ordered chunks
  2. Add a compound index on (runId, sequence) for efficient ordered retrieval
 model RealtimeStreamChunk {
   id String @id @default(cuid())

   key   String
   value String

   sequence Int

   runId String

   createdAt DateTime @default(now())

   @@index([runId])
   @@index([createdAt])
+  @@unique([runId, sequence])
+  @@index([runId, sequence])
 }

Comment on lines +65 to +66
You can supply an `onComplete` callback to the `useRealtimeRun` hook to be called when the run is completed or errored. This is useful if you want to perform some action when the run is completed, like navigating to a different page or showing a notification.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance the documentation of the onComplete callback

The explanation should clarify:

  1. The specific conditions that trigger the callback (e.g., run states that are considered "complete")
  2. The types of the run and error parameters

Consider expanding the explanation like this:

-You can supply an `onComplete` callback to the `useRealtimeRun` hook to be called when the run is completed or errored. This is useful if you want to perform some action when the run is completed, like navigating to a different page or showing a notification.
+You can supply an `onComplete` callback to the `useRealtimeRun` hook that will be called when the run reaches a terminal state (COMPLETED, FAILED, or CANCELLED). The callback receives two parameters: the final run object and any error that occurred. This is useful if you want to perform some action when the run is completed, like navigating to a different page or showing a notification.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
You can supply an `onComplete` callback to the `useRealtimeRun` hook to be called when the run is completed or errored. This is useful if you want to perform some action when the run is completed, like navigating to a different page or showing a notification.
You can supply an `onComplete` callback to the `useRealtimeRun` hook that will be called when the run reaches a terminal state (COMPLETED, FAILED, or CANCELLED). The callback receives two parameters: the final run object and any error that occurred. This is useful if you want to perform some action when the run is completed, like navigating to a different page or showing a notification.

Comment on lines +83 to +88
signal.addEventListener("abort", () => {
queueMicrotask(() => {
reader.cancel();
controller.close();
});
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Clean up abort signal listener

The abort signal event listener is not removed, which could lead to memory leaks.

Add cleanup logic:

-      signal.addEventListener("abort", () => {
+      const abortHandler = () => {
         queueMicrotask(() => {
           reader.cancel();
           controller.close();
         });
-      });
+      };
+      signal.addEventListener("abort", abortHandler);
+      
+      // Clean up the listener when the stream is done
+      reader.closed.finally(() => {
+        signal.removeEventListener("abort", abortHandler);
+      });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
signal.addEventListener("abort", () => {
queueMicrotask(() => {
reader.cancel();
controller.close();
});
});
const abortHandler = () => {
queueMicrotask(() => {
reader.cancel();
controller.close();
});
};
signal.addEventListener("abort", abortHandler);
// Clean up the listener when the stream is done
reader.closed.finally(() => {
signal.removeEventListener("abort", abortHandler);
});

Comment on lines +90 to +98
while (true) {
const { done, value } = await reader.read();
if (done) {
controller.close();
break;
}

controller.enqueue(value);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling to the read loop

The while loop could potentially continue indefinitely if an error occurs during reading.

Add try-catch block to handle potential errors:

-      while (true) {
-        const { done, value } = await reader.read();
-        if (done) {
-          controller.close();
-          break;
-        }
-
-        controller.enqueue(value);
-      }
+      try {
+        while (true) {
+          const { done, value } = await reader.read();
+          if (done) {
+            controller.close();
+            break;
+          }
+          controller.enqueue(value);
+        }
+      } catch (error) {
+        controller.error(error);
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while (true) {
const { done, value } = await reader.read();
if (done) {
controller.close();
break;
}
controller.enqueue(value);
}
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
controller.close();
break;
}
controller.enqueue(value);
}
} catch (error) {
controller.error(error);
}

Comment on lines +126 to +149
if (isChangeMessage(message)) {
switch (message.headers.operation) {
case "insert": {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
break;
}
case "update": {
const existingRow = this.#currentState.get(message.key);
if (existingRow) {
const updatedRow = {
...existingRow,
...message.value,
};
this.#currentState.set(message.key, updatedRow);
controller.enqueue(updatedRow);
} else {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
}
break;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add support for delete operations

The switch statement only handles 'insert' and 'update' operations, but not 'delete'.

Add delete operation handling:

   switch (message.headers.operation) {
     case "insert": {
       this.#currentState.set(message.key, message.value);
       controller.enqueue(message.value);
       break;
     }
     case "update": {
       const existingRow = this.#currentState.get(message.key);
       if (existingRow) {
         const updatedRow = {
           ...existingRow,
           ...message.value,
         };
         this.#currentState.set(message.key, updatedRow);
         controller.enqueue(updatedRow);
       } else {
         this.#currentState.set(message.key, message.value);
         controller.enqueue(message.value);
       }
       break;
     }
+    case "delete": {
+      const existingRow = this.#currentState.get(message.key);
+      if (existingRow) {
+        this.#currentState.delete(message.key);
+        controller.enqueue({ ...existingRow, _deleted: true });
+      }
+      break;
+    }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (isChangeMessage(message)) {
switch (message.headers.operation) {
case "insert": {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
break;
}
case "update": {
const existingRow = this.#currentState.get(message.key);
if (existingRow) {
const updatedRow = {
...existingRow,
...message.value,
};
this.#currentState.set(message.key, updatedRow);
controller.enqueue(updatedRow);
} else {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
}
break;
}
}
}
if (isChangeMessage(message)) {
switch (message.headers.operation) {
case "insert": {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
break;
}
case "update": {
const existingRow = this.#currentState.get(message.key);
if (existingRow) {
const updatedRow = {
...existingRow,
...message.value,
};
this.#currentState.set(message.key, updatedRow);
controller.enqueue(updatedRow);
} else {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
}
break;
}
case "delete": {
const existingRow = this.#currentState.get(message.key);
if (existingRow) {
this.#currentState.delete(message.key);
controller.enqueue({ ...existingRow, _deleted: true });
}
break;
}
}
}

Comment on lines +124 to +160
transform: (messages, controller) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
switch (message.headers.operation) {
case "insert": {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
break;
}
case "update": {
const existingRow = this.#currentState.get(message.key);
if (existingRow) {
const updatedRow = {
...existingRow,
...message.value,
};
this.#currentState.set(message.key, updatedRow);
controller.enqueue(updatedRow);
} else {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
}
break;
}
}
}

if (isControlMessage(message)) {
switch (message.headers.control) {
case "must-refetch":
this.#currentState.clear();
this.#error = false;
break;
}
}
});
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling in message processing

The message processing logic lacks error handling for malformed messages or processing failures.

Wrap the message processing in try-catch:

   transform: (messages, controller) => {
+    try {
       messages.forEach((message) => {
         if (isChangeMessage(message)) {
           // ... existing code ...
         }
         if (isControlMessage(message)) {
           // ... existing code ...
         }
       });
+    } catch (error) {
+      console.error('Error processing message:', error);
+      controller.error(error);
+    }
   },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
transform: (messages, controller) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
switch (message.headers.operation) {
case "insert": {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
break;
}
case "update": {
const existingRow = this.#currentState.get(message.key);
if (existingRow) {
const updatedRow = {
...existingRow,
...message.value,
};
this.#currentState.set(message.key, updatedRow);
controller.enqueue(updatedRow);
} else {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
}
break;
}
}
}
if (isControlMessage(message)) {
switch (message.headers.control) {
case "must-refetch":
this.#currentState.clear();
this.#error = false;
break;
}
}
});
},
transform: (messages, controller) => {
try {
messages.forEach((message) => {
if (isChangeMessage(message)) {
switch (message.headers.operation) {
case "insert": {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
break;
}
case "update": {
const existingRow = this.#currentState.get(message.key);
if (existingRow) {
const updatedRow = {
...existingRow,
...message.value,
};
this.#currentState.set(message.key, updatedRow);
controller.enqueue(updatedRow);
} else {
this.#currentState.set(message.key, message.value);
controller.enqueue(message.value);
}
break;
}
}
}
if (isControlMessage(message)) {
switch (message.headers.control) {
case "must-refetch":
this.#currentState.clear();
this.#error = false;
break;
}
}
});
} catch (error) {
console.error('Error processing message:', error);
controller.error(error);
}
},

Comment on lines +53 to +66
return $replica.taskRun.findFirst({
where: {
friendlyId: params.runId,
runtimeEnvironmentId: auth.environment.id,
},
include: {
batch: {
select: {
friendlyId: true,
},
},
},
});
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Confirm that runTags and taskIdentifier are available in the authorization resource

The authorization resource accesses run.runTags and run.taskIdentifier, but these properties are not explicitly selected in the Prisma query. Ensure that these fields are included to prevent potential undefined values.

Apply this diff to include the necessary fields:

return $replica.taskRun.findFirst({
  where: {
    friendlyId: params.runId,
    runtimeEnvironmentId: auth.environment.id,
  },
  include: {
    batch: {
      select: {
        friendlyId: true,
      },
    },
  },
+ select: {
+   runTags: true,
+   taskIdentifier: true,
+ },
});

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +42 to +76

while (true) {
const { done, value } = await reader.read();

if (done) {
break;
}

logger.debug("[DatabaseRealtimeStreams][ingestData] Reading data", {
streamId,
runId,
value,
});

const chunks = value
.split("\n")
.filter((chunk) => chunk) // Remove empty lines
.map((line) => {
return {
sequence: sequence++,
value: line,
};
});

await this.options.prisma.realtimeStreamChunk.createMany({
data: chunks.map((chunk) => {
return {
runId,
key: streamId,
sequence: chunk.sequence,
value: chunk.value,
};
}),
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential data corruption due to incomplete stream chunks

When reading from the ReadableStream, the value may contain incomplete lines if a line is split across chunks. This can lead to data corruption when splitting the value by \n. Implement buffering of incomplete lines between reads to ensure data integrity.

Apply this diff to handle incomplete lines:

const reader = textStream.getReader();
+ let remainder = '';
let sequence = 0;

while (true) {
  const { done, value } = await reader.read();

  if (done) {
    break;
  }

- const chunks = value
+ remainder += value;
+ const lines = remainder.split('\n');
+ remainder = lines.pop() ?? '';
+ const chunks = lines
    .filter((chunk) => chunk) // Remove empty lines
    .map((line) => {
      return {
        sequence: sequence++,
        value: line,
      };
    });

  await this.options.prisma.realtimeStreamChunk.createMany({
    data: chunks.map((chunk) => {
      return {
        runId,
        key: streamId,
        sequence: chunk.sequence,
        value: chunk.value,
      };
    }),
  });
}

+ // Handle any remaining data after the loop
+ if (remainder) {
+   await this.options.prisma.realtimeStreamChunk.create({
+     data: {
+       runId,
+       key: streamId,
+       sequence: sequence++,
+       value: remainder,
+     },
+   });
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
logger.debug("[DatabaseRealtimeStreams][ingestData] Reading data", {
streamId,
runId,
value,
});
const chunks = value
.split("\n")
.filter((chunk) => chunk) // Remove empty lines
.map((line) => {
return {
sequence: sequence++,
value: line,
};
});
await this.options.prisma.realtimeStreamChunk.createMany({
data: chunks.map((chunk) => {
return {
runId,
key: streamId,
sequence: chunk.sequence,
value: chunk.value,
};
}),
});
}
const reader = textStream.getReader();
let remainder = '';
let sequence = 0;
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
logger.debug("[DatabaseRealtimeStreams][ingestData] Reading data", {
streamId,
runId,
value,
});
remainder += value;
const lines = remainder.split('\n');
remainder = lines.pop() ?? '';
const chunks = lines
.filter((chunk) => chunk) // Remove empty lines
.map((line) => {
return {
sequence: sequence++,
value: line,
};
});
await this.options.prisma.realtimeStreamChunk.createMany({
data: chunks.map((chunk) => {
return {
runId,
key: streamId,
sequence: chunk.sequence,
value: chunk.value,
};
}),
});
}
// Handle any remaining data after the loop
if (remainder) {
await this.options.prisma.realtimeStreamChunk.create({
data: {
runId,
key: streamId,
sequence: sequence++,
value: remainder,
},
});
}

@@ -16,4 +16,4 @@ function initializeRealtimeStreams() {
});
}

export const realtimeStreams = singleton("realtimeStreams", initializeRealtimeStreams);
export const v1RealtimeStreams = singleton("realtimeStreams", initializeRedisRealtimeStreams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update the singleton key to prevent naming collisions

The singleton is currently registered with the key "realtimeStreams", which could lead to collisions with other versions. Update the key to "v1RealtimeStreams" for clarity and to avoid potential conflicts.

Apply this diff:

-export const v1RealtimeStreams = singleton("realtimeStreams", initializeRedisRealtimeStreams);
+export const v1RealtimeStreams = singleton("v1RealtimeStreams", initializeRedisRealtimeStreams);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const v1RealtimeStreams = singleton("realtimeStreams", initializeRedisRealtimeStreams);
export const v1RealtimeStreams = singleton("v1RealtimeStreams", initializeRedisRealtimeStreams);

Comment on lines +54 to +60
return v1RealtimeStreams.streamResponse(
request,
run.friendlyId,
params.streamId,
authentication.environment,
request.signal
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use run.id instead of run.friendlyId for consistency

In the streamResponse call, use run.id to maintain consistency with other parts of the codebase and to ensure accurate identification of the run.

Apply this diff:

return v1RealtimeStreams.streamResponse(
  request,
- run.friendlyId,
+ run.id,
  params.streamId,
  authentication.environment,
  request.signal
);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return v1RealtimeStreams.streamResponse(
request,
run.friendlyId,
params.streamId,
authentication.environment,
request.signal
);
return v1RealtimeStreams.streamResponse(
request,
run.id,
params.streamId,
authentication.environment,
request.signal
);

Comment on lines +2677 to +2679
sequence Int

runId String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add sequence validation and foreign key constraint.

Consider these improvements for better data integrity:

  1. Add validation to ensure sequence is non-negative
  2. Add foreign key constraint to ensure runId exists
-  sequence Int
+  sequence Int @db.UnsignedInt
 
-  runId String
+  runId String
+  run   JobRun @relation(fields: [runId], references: [id], onDelete: Cascade)

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (4)
packages/core/test/runStream.test.ts (1)

Line range hint 83-478: Consider adding tests for error handling and cancellation

The test suite is comprehensive for happy paths but missing some important edge cases:

  1. Stream error handling
  2. AbortController signal handling
  3. Cleanup verification

Consider adding these test cases:

it("should handle stream errors gracefully", async () => {
  const errorStream = new ReadableStream({
    start(controller) {
      controller.error(new Error("Stream failed"));
    },
  });

  const subscription = new RunSubscription({
    runShapeStream: errorStream,
    streamFactory: new TestStreamSubscriptionFactory(),
    abortController: new AbortController(),
  });

  await expect(convertAsyncIterableToArray(subscription)).rejects.toThrow("Stream failed");
});

it("should respect abort controller signals", async () => {
  const shapes = [/* test shapes */];
  const abortController = new AbortController();
  
  const subscription = new RunSubscription({
    runShapeStream: createDelayedTestShapeStream(shapes),
    streamFactory: new TestStreamSubscriptionFactory(),
    abortController,
  });

  // Abort after first result
  setTimeout(() => abortController.abort(), 50);
  
  await expect(convertAsyncIterableToArray(subscription)).rejects.toThrow("AbortError");
});
packages/core/src/v3/apiClient/runStream.ts (3)

85-112: Enhance abort signal handling with cleanup and error reporting.

The abort signal implementation is good but could be improved for better cleanup and error handling.

Consider these improvements:

 const abortController = new AbortController();

 options?.signal?.addEventListener(
   "abort",
   () => {
     if (!abortController.signal.aborted) {
+      const reason = options.signal?.reason;
       abortController.abort();
+      console.debug(`Stream aborted: ${reason instanceof Error ? reason.message : reason}`);
     }
   },
-  { once: true }
+  { signal: abortController.signal }  // Auto-cleanup when internal controller aborts
 );

245-274: Enhance version handling robustness.

The version selection logic could be more maintainable and informative.

Consider these improvements:

+const STREAM_VERSIONS = {
+  V1: "v1",
+  V2: "v2",
+} as const;
+
 export class VersionedStreamSubscriptionFactory implements StreamSubscriptionFactory {
   constructor(
     private version1: StreamSubscriptionFactory,
     private version2: StreamSubscriptionFactory
   ) {}

   createSubscription(
     metadata: Record<string, unknown>,
     runId: string,
     streamKey: string,
     baseUrl?: string
   ): StreamSubscription {
     if (!runId || !streamKey) {
       throw new Error("runId and streamKey are required");
     }

     const version =
-      typeof metadata.$$streamsVersion === "string" ? metadata.$$streamsVersion : "v1";
+      typeof metadata.$$streamsVersion === "string" ? metadata.$$streamsVersion.toLowerCase() : STREAM_VERSIONS.V1;
+    console.debug(`Using stream version: ${version}`);

-    if (version === "v1") {
+    if (version === STREAM_VERSIONS.V1) {
       return this.version1.createSubscription(metadata, runId, streamKey, baseUrl);
     }

-    if (version === "v2") {
+    if (version === STREAM_VERSIONS.V2) {
       return this.version2.createSubscription(metadata, runId, streamKey, baseUrl);
     }

+    console.warn(`Falling back to ${STREAM_VERSIONS.V1} for unknown version: ${version}`);
+    return this.version1.createSubscription(metadata, runId, streamKey, baseUrl);
-    throw new Error(`Unknown stream version: ${version}`);
   }
 }

204-274: Add JSDoc documentation for the new streaming classes.

The new streaming implementation would benefit from comprehensive documentation explaining the versioning strategy and usage patterns.

Consider adding documentation:

+/**
+ * Implements the Electric-powered v2 stream subscription.
+ * @implements {StreamSubscription}
+ */
 export class ElectricStreamSubscription implements StreamSubscription {
   // ...
 }

+/**
+ * Factory for creating Electric-powered v2 stream subscriptions.
+ * @implements {StreamSubscriptionFactory}
+ */
 export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFactory {
   // ...
 }

+/**
+ * Factory that manages stream versioning based on metadata.
+ * Supports both v1 (SSE) and v2 (Electric) implementations.
+ * @implements {StreamSubscriptionFactory}
+ */
 export class VersionedStreamSubscriptionFactory implements StreamSubscriptionFactory {
   // ...
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 88c1b38 and be97bde.

📒 Files selected for processing (2)
  • packages/core/src/v3/apiClient/runStream.ts (7 hunks)
  • packages/core/test/runStream.test.ts (10 hunks)
🔇 Additional comments (4)
packages/core/test/runStream.test.ts (2)

34-38: LGTM: Clean signature update for metadata support

The addition of the metadata parameter to createSubscription maintains a clean interface while supporting the new Electric streams functionality.


500-505: LGTM: Improved error message formatting

The reformatted error message maintains clarity while improving code readability.

packages/core/src/v3/apiClient/runStream.ts (2)

204-243: Consider implementing retry logic for stream stability.

The Electric stream implementation could benefit from retry logic to handle temporary network failures.

This was previously suggested in a past review. The recommendation for implementing retry logic with exponential backoff still applies here.


343-404: 🛠️ Refactor suggestion

Improve stream pipeline error handling and cleanup.

The stream pipeline implementation needs more robust error handling and resource cleanup.

The previous review comment about improving error handling in the stream pipeline still applies. Additionally, consider these improvements:

 return createAsyncIterableReadable(
   this.stream,
   {
     transform: async (run, controller) => {
+      try {
         controller.enqueue({
           type: "run",
           run,
         });

         if (
           run.metadata &&
           "$$streams" in run.metadata &&
           Array.isArray(run.metadata.$$streams)
         ) {
           for (const streamKey of run.metadata.$$streams) {
             if (typeof streamKey !== "string") {
+              console.warn(`Invalid stream key type: ${typeof streamKey}`);
               continue;
             }

             if (!activeStreams.has(streamKey)) {
               activeStreams.add(streamKey);

               const subscription = this.options.streamFactory.createSubscription(
                 run.metadata,
                 run.id,
                 streamKey,
                 this.options.client?.baseUrl
               );

               const stream = await subscription.subscribe();

               stream
                 .pipeThrough(
                   new TransformStream({
                     transform(chunk, controller) {
+                      try {
                         controller.enqueue({
                           type: streamKey,
                           chunk: chunk as TStreams[typeof streamKey],
                           run,
                         } as StreamPartResult<RunShape<TRunTypes>, TStreams>);
+                      } catch (error) {
+                        controller.error(error);
+                      }
                     },
                   })
                 )
                 .pipeTo(
                   new WritableStream({
                     write(chunk) {
                       controller.enqueue(chunk);
                     },
+                    close() {
+                      activeStreams.delete(streamKey);
+                    },
                   })
                 )
                 .catch((error) => {
-                  console.error(`Error in stream ${streamKey}:`, error);
+                  if (!this.options.abortController.signal.aborted) {
+                    console.error(`Fatal error in stream ${streamKey}:`, error);
+                    this.options.abortController.abort(error);
+                  }
+                  activeStreams.delete(streamKey);
                 });
             }
           }
         }
+      } catch (error) {
+        controller.error(error);
+      }
     },
   },
   this.options.abortController.signal
 );

Comment on lines +59 to +81
function createDelayedTestShapeStream(
shapes: SubscribeRunRawShape[]
): ReadableStream<SubscribeRunRawShape> {
return new ReadableStream({
start: async (controller) => {
// Emit first shape immediately
if (shapes.length > 0) {
controller.enqueue(shapes[0]);
}
await callback(this.shapes[this.currentShapeIndex++]!);
}, 100);

return () => {
this.unsubscribed = true;
clearInterval(interval);
};
}
let currentShapeIndex = 1;

// Emit remaining shapes with delay
const interval = setInterval(() => {
if (currentShapeIndex >= shapes.length) {
clearInterval(interval);
controller.close();
return;
}
controller.enqueue(shapes[currentShapeIndex++]!);
}, 100);
},
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential memory leak in delayed stream implementation

The delayed stream implementation has several issues that need to be addressed:

  1. The interval is not cleaned up if the stream is aborted or encounters an error
  2. The delay is hard-coded to 100ms

Consider this improved implementation:

 function createDelayedTestShapeStream(
   shapes: SubscribeRunRawShape[]
 ): ReadableStream<SubscribeRunRawShape> {
   return new ReadableStream({
     start: async (controller) => {
       // Emit first shape immediately
       if (shapes.length > 0) {
         controller.enqueue(shapes[0]);
       }

       let currentShapeIndex = 1;
+      let intervalId: NodeJS.Timeout;

       // Emit remaining shapes with delay
-      const interval = setInterval(() => {
+      intervalId = setInterval(() => {
         if (currentShapeIndex >= shapes.length) {
-          clearInterval(interval);
+          clearInterval(intervalId);
           controller.close();
           return;
         }
         controller.enqueue(shapes[currentShapeIndex++]!);
       }, 100);
+
+      // Clean up interval on cancel
+      return () => {
+        clearInterval(intervalId);
+      };
     },
+    cancel() {
+      // Additional safety cleanup
+      controller.close();
+    }
   });
 }

Also consider:

  1. Making the delay configurable via a parameter
  2. Adding error handling for malformed shapes

Committable suggestion skipped: line range outside the PR's diff.

@ericallam ericallam merged commit 9970b9b into main Dec 9, 2024
10 checks passed
@ericallam ericallam deleted the realtime-stream-updates-and-fix branch December 9, 2024 22:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant