-
-
Notifications
You must be signed in to change notification settings - Fork 707
feat: replicate task runs to clickhouse to power dashboard improvements #2035
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… of raw run events
…ng boot in the entrypoint.sh
|
WalkthroughThis update introduces a comprehensive logical replication pipeline between PostgreSQL and ClickHouse, including new internal packages for ClickHouse and replication, detailed schema migrations, service orchestration, and robust integration tests. It adds new environment variables, refactors event bus types, enhances run metadata, and expands the webapp with new endpoints, UI routes, and bulk run management features. Supporting scripts, Docker configurations, and test utilities are also included. Changes
Sequence Diagram(s)sequenceDiagram
participant Postgres
participant LogicalReplicationClient
participant Redis
participant RunsReplicationService
participant ClickHouse
Note over Postgres,ClickHouse: Replication Startup
LogicalReplicationClient->>Redis: Acquire leader lock
Redis-->>LogicalReplicationClient: Lock acquired
LogicalReplicationClient->>Postgres: Create publication/slot if needed
LogicalReplicationClient->>Postgres: Start replication stream (pgoutput)
Postgres-->>LogicalReplicationClient: Send WAL changes
Note over LogicalReplicationClient,RunsReplicationService: Replication Event Handling
LogicalReplicationClient->>RunsReplicationService: Emit WAL events (insert/update/delete)
RunsReplicationService->>RunsReplicationService: Buffer events by transaction
RunsReplicationService->>ClickHouse: Flush batch insert (concurrent, batched)
ClickHouse-->>RunsReplicationService: Insert result/ack
Note over RunsReplicationService,LogicalReplicationClient: Acknowledgement
RunsReplicationService->>LogicalReplicationClient: Acknowledge LSN
LogicalReplicationClient->>Postgres: Send ack for LSN
Note over LogicalReplicationClient: Leadership/Shutdown
LogicalReplicationClient->>Redis: Extend/release leader lock
LogicalReplicationClient->>Postgres: Stop/teardown replication slot (on teardown)
Possibly related PRs
Suggested reviewers
Poem
Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 28
🔭 Outside diff range comments (1)
packages/core/src/v3/streams/asyncIterableStream.ts (1)
52-97
:⚠️ Potential issueFix potential issue with optional transformer parameter.
The transformer parameter has been made optional, but on line 94, the code still unconditionally pipes the stream through a TransformStream with the transformer. This could cause runtime errors if transformer is undefined.
Consider modifying the implementation to handle the optional transformer:
- const transformedStream = stream.pipeThrough(new TransformStream(transformer)); + const transformedStream = transformer ? stream.pipeThrough(new TransformStream(transformer)) : stream;
♻️ Duplicate comments (3)
internal-packages/replication/src/client.test.ts (3)
111-118
: Remove console logging statements.Similar to the first test, these console logs should be removed to keep test output clean.
165-165
: Replace fixed delay with polling.Same issue as in the first test - fixed delays should be replaced with polling mechanisms.
179-179
: Remove console.log statement.This debugging console.log should be removed before committing.
🧹 Nitpick comments (50)
packages/core/src/v3/streams/asyncIterableStream.ts (1)
99-105
: Good addition of AsyncGenerator convenience function.This helper function provides a useful wrapper for creating streams from AsyncGenerator objects. However, unlike the updated function above, this one still requires a transformer parameter. Consider whether it should also have an optional transformer for consistency.
apps/webapp/remix.config.js (1)
26-26
: Bundleredlock
for the server build.You’ve added
"redlock"
toserverDependenciesToBundle
, which is required for Redis-based leader election in replication. Ensure theredlock
package is declared with an appropriate version inapps/webapp/package.json
, so the bundler can resolve it.apps/webapp/app/routes/api.v1.runs.$runId.tags.ts (1)
69-80
: Wrap tag creation and update in a transaction.If
createTag
succeeds but the subsequentupdate
fails, you could end up with orphaned tag records. Consider usingprisma.$transaction
to group tag creation and the update atomically:- for (const tag of newTags) { - const tagRecord = await createTag({ tag, projectId }); - if (tagRecord) tagIds.push(tagRecord.id); - } - await prisma.taskRun.update({ ... }); + await prisma.$transaction(async (tx) => { + const created = await Promise.all( + newTags.map((tag) => + createTag({ tag, projectId }, { prisma: tx }) + ) + ); + const allIds = [...existingTags.map((t) => t.id), ...created.map((t) => t.id)]; + await tx.taskRun.update({ + where: { friendlyId: parsedParams.data.runId, runtimeEnvironmentId: authenticationResult.environment.id }, + data: { + tags: { connect: allIds.map((id) => ({ id })) }, + runTags: { push: newTags }, + }, + }); + });internal-packages/database/prisma/migrations/20250428211853_add_environment_type_and_org_id_to_task_run/migration.sql (1)
1-3
: Consider indexing new columns for query performance
AddingenvironmentType
andorganizationId
toTaskRun
will enable richer filtering, but high-volume queries on these fields may suffer without indexes. You might add after the ALTER TABLEs:CREATE INDEX ON "TaskRun"("organizationId"); CREATE INDEX ON "TaskRun"("environmentType");internal-packages/clickhouse/schema/001_create_databases.sql (1)
1-7
: Make database creation idempotent
To avoid errors on repeated migrations, consider usingIF NOT EXISTS
/IF EXISTS
clauses:-- +goose up -CREATE DATABASE trigger_dev; +CREATE DATABASE IF NOT EXISTS trigger_dev; -- +goose down -DROP DATABASE trigger_dev; +DROP DATABASE IF EXISTS trigger_dev;apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts (1)
54-55
: Remove or utilize selected timestamps
updatedAt
andcreatedAt
are included in theselect
, but not used in this service. Consider either dropping them to reduce data payload or extending the logic (e.g., in logging or message payload) to make use of these timestamps.internal-packages/replication/src/errors.ts (1)
1-5
: Simplify the error class constructorThe current constructor doesn't add any functionality beyond what the parent Error class provides.
You can simplify this class by removing the unnecessary constructor:
-export class LogicalReplicationClientError extends Error { - constructor(message: string) { - super(message); - } -} +export class LogicalReplicationClientError extends Error {}This change maintains the same functionality while making the code more concise. If you need to keep the constructor for future extensions, consider adding a comment explaining the intent.
🧰 Tools
🪛 Biome (1.9.4)
[error] 2-4: This constructor is unnecessary.
Unsafe fix: Remove the unnecessary constructor.
(lint/complexity/noUselessConstructor)
scripts/start-prometheus.sh (1)
1-5
: Consider adding pre-flight checks and usage instructions
The script currently assumes that the Prometheus binary and config file exist. To improve robustness and user experience, you could verify their presence before invoking Prometheus, and print a helpful error message if they’re missing.#!/bin/bash set -e + if ! command -v prometheus >/dev/null; then + echo "Error: prometheus CLI not found in PATH. Please install Prometheus." + exit 1 + fi + + if [ ! -f ./.configs/prometheus.yml ]; then + echo "Error: Configuration file ./.configs/prometheus.yml not found." + exit 1 + fi prometheus --config.file=./.configs/prometheus.yml --storage.tsdb.path=/tmp/prom-datainternal-packages/clickhouse/Dockerfile (1)
1-4
: Pin the Go base image to a specific version or use a slimmer variant
Using the unversionedgolang
image can lead to non-reproducible builds and large image sizes. Consider specifying a minor version (e.g.,golang:1.21-alpine
) to improve build stability and reduce size.package.json (1)
25-25
: Consistency: consider adding--remove-orphans
to thedev:docker:build
script
Yourdev:docker
script now includes--remove-orphans
, butdev:docker:build
does not. Including it there too will clean up leftover containers when rebuilding.internal-packages/clickhouse/tsconfig.build.json (1)
1-21
: Consider aligning ES targets across packagesThe configuration is appropriate for the ClickHouse package. However, I notice it targets ES2019 while the replication package targets ES2020. Consider aligning these for consistency across the codebase, unless there are specific compatibility requirements.
- "target": "ES2019", - "lib": ["ES2019", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "target": "ES2020", + "lib": ["ES2020", "DOM", "DOM.Iterable", "DOM.AsyncIterable"],apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1)
1439-1446
: Consider removing unused variable.The
run
variable is assigned but never used or returned. This appears to be an oversight or potential preparation for future code.- const run = await prisma.taskRun.update({ + await prisma.taskRun.update({ where: { id: runId, }, data: { status: "WAITING_FOR_DEPLOY", }, });apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts (1)
6-37
: Well-structured API route with proper authentication.The API route properly authenticates requests, checks for admin privileges, and handles errors. However, two improvements could be made:
- Consider checking if
runsReplicationInstance
exists before callingstop()
.- Add more specific error handling or logging for different failure scenarios.
try { + if (!runsReplicationInstance) { + return json({ error: "Runs replication service is not initialized" }, { status: 400 }); + } await runsReplicationInstance?.stop(); return json({ success: true, }); } catch (error) { + console.error("Error stopping runs replication:", error); return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); }apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts (1)
33-42
: Consider validating runsReplicationInstance existence.Similar to the stop route, it would be better to explicitly check if
runsReplicationInstance
is initialized before attempting to call methods on it.try { const body = await request.json(); const { insertStrategy } = schema.parse(body); + if (!runsReplicationInstance) { + return json({ error: "Runs replication service is not initialized" }, { status: 400 }); + } await runsReplicationInstance?.start(insertStrategy); return json({ success: true, }); } catch (error) { + console.error("Error starting runs replication:", error); return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); }apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts (2)
29-29
: Consider handling the case when runsReplicationInstance is undefined.The optional chaining operator (
?.
) is used, which means the instance might be undefined. Consider adding explicit handling for this case.- await runsReplicationInstance?.teardown(); + if (!runsReplicationInstance) { + return json({ error: "Runs replication service is not initialized" }, { status: 400 }); + } + + await runsReplicationInstance.teardown();
14-22
: Consider simplifying user verification.The authentication and user verification logic has some redundancy. Since
authenticateApiRequestWithPersonalAccessToken
already validates the token, consider simplifying by directly checking admin status.- const user = await prisma.user.findUnique({ - where: { - id: authenticationResult.userId, - }, - }); - - if (!user) { - return json({ error: "Invalid or Missing API key" }, { status: 401 }); - } + const user = await prisma.user.findUniqueOrThrow({ + where: { + id: authenticationResult.userId, + }, + select: { admin: true } + });internal-packages/clickhouse/package.json (2)
4-4
: Consider starting with version 0.0.1For a newly created package, it's common practice to start with version 0.0.1. The current version 0.0.2 suggests there was a previous version, which might be confusing for future maintainers.
- "version": "0.0.2", + "version": "0.0.1",
26-28
: Consider adding a descriptive comment for migration commandsThe migration commands would benefit from a brief comment explaining what they do, especially for new team members who might not be familiar with the ClickHouse migration process.
"scripts": { "clean": "rimraf dist", "typecheck": "tsc --noEmit", "build": "pnpm run clean && tsc -p tsconfig.build.json", "dev": "tsc --watch -p tsconfig.build.json", + // Run ClickHouse migrations using Docker Compose "db:migrate": "docker compose -p triggerdotdev-docker -f ../../docker/docker-compose.yml up clickhouse_migrator --build", "db:migrate:down": "GOOSE_COMMAND=down pnpm run db:migrate",
internal-packages/clickhouse/README.md (2)
19-20
: Consider consistent formatting of code blocksThe formatting of code blocks appears inconsistent. For clarity, ensure consistent use of backticks around code elements.
- `raw_`: Input data tables - `tmp_{yourname}_`: Temporary tables for experiments, add your name, so it's easy to identify ownership. + `raw_` : Input data tables + `tmp_{yourname}_` : Temporary tables for experiments, add your name, so it's easy to identify ownership.🧰 Tools
🪛 LanguageTool
[uncategorized] ~19-~19: Loose punctuation mark.
Context: ...tion][version]### Prefixes -
raw: Input data tables -
tmp_{yourname}_`: ...(UNLIKELY_OPENING_PUNCTUATION)
1-64
: Consider adding a section on migrations and schema managementThe README thoroughly covers naming conventions but lacks information about how migrations are managed, which would be valuable since the package includes migration scripts and commands.
Add a section on migrations:
## Migrations and Schema Management We use [Goose](https://github.com/pressly/goose) for managing ClickHouse migrations. Migrations are stored in the `schema` directory and can be executed using the package scripts: ```bash # Run migrations pnpm run db:migrate # Rollback migrations pnpm run db:migrate:downMigration files follow the Goose format with SQL statements for schema changes.
<details> <summary>🧰 Tools</summary> <details> <summary>🪛 LanguageTool</summary> [uncategorized] ~19-~19: Loose punctuation mark. Context: ...tion]_[version]` ### Prefixes - `raw_`: Input data tables - `tmp_{yourname}_`: ... (UNLIKELY_OPENING_PUNCTUATION) </details> </details> </blockquote></details> <details> <summary>internal-packages/testcontainers/src/utils.ts (2)</summary><blockquote> `61-63`: **Add logging for migration steps** Adding logging for the migration process would improve observability, consistent with how the other container setup functions handle logging. ```diff // Now we run the migrations const migrationsPath = path.resolve(__dirname, "../../clickhouse/schema"); + if (isDebug) { + console.log(`Running ClickHouse migrations from ${migrationsPath}`); + } await runClickhouseMigrations(client, migrationsPath); + if (isDebug) { + console.log("ClickHouse migrations completed successfully"); + }
51-70
: Consider adding verification step similar to Redis containerFor consistency with the Redis container setup which includes a verification step, consider adding a similar verification function for ClickHouse.
I recommend adding a separate verification function similar to
verifyRedisConnection
:async function verifyClickHouseConnection(container: ClickHouseContainer) { const client = createClient({ url: container.getConnectionUrl(), }); const containerMetadata = { containerId: container.getId().slice(0, 12), containerName: container.getName(), containerNetworkNames: container.getNetworkNames(), }; try { await client.ping(); } catch (error) { if (isDebug) { console.log("verifyClickHouseConnection: ping error", error, containerMetadata); } throw new Error("verifyClickHouseConnection: ping error", { cause: error }); } }Then call this from
createClickHouseContainer
.internal-packages/clickhouse/tsconfig.test.json (1)
11-11
: Consider enabling verbatimModuleSyntax for more predictable importsSetting
verbatimModuleSyntax
tofalse
allows TypeScript to transform import statements. For more predictable behavior, consider enabling this option.- "verbatimModuleSyntax": false, + "verbatimModuleSyntax": true,internal-packages/clickhouse/src/client/client.test.ts (1)
6-148
: Comprehensive ClickHouse client test suite.The test suite properly validates the core functionality of inserting and querying data with various configurations. It tests both synchronous and asynchronous operations with appropriate validations.
Consider replacing the hardcoded delay on line 136 with a polling mechanism. Fixed delays can cause flaky tests in CI environments with varying performance characteristics. A polling approach that checks for the data with a timeout would be more reliable.
- // Now we wait for the data to be flushed - await setTimeout(2000); - - // Querying now should return the data - const [queryErrorAsyncDontWait2, resultAsyncDontWait2] = await querySmokeTest({}); + // Use polling to check for data with a reasonable timeout + let resultAsyncDontWait2; + let queryErrorAsyncDontWait2; + const maxAttempts = 10; + const pollInterval = 300; // ms + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + [queryErrorAsyncDontWait2, resultAsyncDontWait2] = await querySmokeTest({}); + + if (resultAsyncDontWait2.some(r => + r.message === "async-dont-wait-hello" && r.number === 42) && + resultAsyncDontWait2.some(r => + r.message === "async-dont-wait-world" && r.number === 100)) { + break; + } + + await setTimeout(pollInterval); + }internal-packages/replication/src/client.test.ts (3)
27-34
: Remove console logging statements.Console logs should be removed from test files as they create noise in the test output. If you need to debug, consider using the test framework's logging utilities or conditional logging.
- client.events.on("data", (data) => { - console.log(data); - logs.push(data); - }); - - client.events.on("error", (error) => { - console.error(error); - }); + client.events.on("data", (data) => { + logs.push(data); + }); + + client.events.on("error", (error) => { + // Consider failing the test or capturing errors in an array + fail(`Unexpected error in replication client: ${error}`); + });
81-81
: Replace fixed delay with polling.Using a fixed delay for asynchronous operations can lead to flaky tests. Consider implementing a polling mechanism that checks for the expected condition with a timeout.
- // Wait for a bit of time - await setTimeout(50); - - // Now we should see the row in the logs - expect(logs.length).toBeGreaterThan(0); + // Poll for logs with a timeout + const timeout = 2000; // 2 seconds + const interval = 50; // 50ms + const startTime = Date.now(); + + while (logs.length === 0) { + if (Date.now() - startTime > timeout) { + throw new Error(`Timed out waiting for replication logs (${timeout}ms)`); + } + await setTimeout(interval); + } + + expect(logs.length).toBeGreaterThan(0);
1-184
: Extract test data creation into a helper function.Both tests duplicate the same code for creating test data (organization, project, runtimeEnvironment, taskRun). This could be extracted into a helper function to reduce duplication.
Create a helper function to set up test data:
async function createTestData(prisma) { const organization = await prisma.organization.create({ data: { title: "test", slug: "test", }, }); const project = await prisma.project.create({ data: { name: "test", slug: "test", organizationId: organization.id, externalRef: "test", }, }); const runtimeEnvironment = await prisma.runtimeEnvironment.create({ data: { slug: "test", type: "DEVELOPMENT", projectId: project.id, organizationId: organization.id, apiKey: "test", pkApiKey: "test", shortcode: "test", }, }); // Create TaskRun const taskRun = await prisma.taskRun.create({ data: { friendlyId: "run_1234", taskIdentifier: "my-task", payload: JSON.stringify({ foo: "bar" }), traceId: "1234", spanId: "1234", queue: "test", runtimeEnvironmentId: runtimeEnvironment.id, projectId: project.id, }, }); return { organization, project, runtimeEnvironment, taskRun }; }Then use this in both tests to simplify the code.
internal-packages/replication/src/stream.test.ts (2)
24-36
: Consider extracting the subscription options into a shared helper.Both tests use similar subscription configuration. Consider creating a helper function to reduce code duplication.
+ function createTestSubscription<T>( + postgresContainer: any, + redisOptions: any, + filterTags: string[] = ["insert"], + abortTimeout: number = 10000 + ) { + return createSubscription<T>({ + name: "test_stream", + publicationName: "test_publication_stream", + slotName: "test_slot_stream", + pgConfig: { + connectionString: postgresContainer.getConnectionUri(), + }, + table: "TaskRun", + redisOptions, + filterTags, + abortSignal: AbortSignal.timeout(abortTimeout), + }); + }
85-91
: Remove console.log from test code.Production code and tests should not contain debugging console.log statements.
- console.log(received);
apps/webapp/app/services/runsReplicationInstance.server.ts (2)
53-67
: Consider adding a shutdown log message.While you log service startup success and failure, there's no logging for service shutdown. This would be helpful for troubleshooting.
process.on("SIGTERM", service.shutdown.bind(service)); process.on("SIGINT", service.shutdown.bind(service)); + + // Add logging for shutdown events + const shutdownHandler = async () => { + logger.info("🗃️ Runs replication service shutting down"); + await service.shutdown(); + }; + + process.on("SIGTERM", shutdownHandler); + process.on("SIGINT", shutdownHandler);
65-66
: Avoid using bind for event listeners.Using
.bind()
for event listeners can cause issues with listener removal. Consider using arrow functions instead.- process.on("SIGTERM", service.shutdown.bind(service)); - process.on("SIGINT", service.shutdown.bind(service)); + process.on("SIGTERM", () => service.shutdown()); + process.on("SIGINT", () => service.shutdown());docker/docker-compose.yml (2)
108-119
: Consider adding container names for all services.While the main services have container_name set, ch-ui is missing this property. For consistency, consider adding it.
ch-ui: + container_name: ch-ui image: ghcr.io/caioricciuti/ch-ui:latest
108-119
: Consider implementing restart policy for clickhouse_migrator.The ch-ui and clickhouse services have restart policies, but clickhouse_migrator doesn't. Consider adding a restart policy based on your needs.
clickhouse_migrator: build: context: ../internal-packages/clickhouse dockerfile: ./Dockerfile + restart: on-failure depends_on: clickhouse: condition: service_healthy
internal-packages/clickhouse/src/index.ts (1)
71-88
: Consider adding URL validation in fromEnv methodThe
fromEnv
method instantiates clients from environment variables without validating that the URLs are properly formatted. Consider adding validation to handle malformed URLs gracefully.static fromEnv(): ClickHouse { if ( typeof process.env.CLICKHOUSE_WRITER_URL === "string" && typeof process.env.CLICKHOUSE_READER_URL === "string" ) { + try { + // Validate URLs + new URL(process.env.CLICKHOUSE_WRITER_URL); + new URL(process.env.CLICKHOUSE_READER_URL); + } catch (error) { + console.error("Invalid ClickHouse URL format:", error); + // Fall back to NoopClient + return new ClickHouse({}); + } return new ClickHouse({ writerUrl: process.env.CLICKHOUSE_WRITER_URL, readerUrl: process.env.CLICKHOUSE_READER_URL, writerName: process.env.CLICKHOUSE_WRITER_NAME, readerName: process.env.CLICKHOUSE_READER_NAME, }); } + try { + if (process.env.CLICKHOUSE_URL) { + new URL(process.env.CLICKHOUSE_URL); + } + } catch (error) { + console.error("Invalid ClickHouse URL format:", error); + // Fall back to NoopClient + return new ClickHouse({}); + } return new ClickHouse({ url: process.env.CLICKHOUSE_URL, name: process.env.CLICKHOUSE_NAME, }); }internal-packages/clickhouse/src/taskRuns.ts (4)
5-47
: Comprehensive schema for task runs with proper defaultsThe
TaskRunV1
schema is well-structured and includes all necessary fields for tracking task runs. Default values are appropriately provided for optional fields.Consider standardizing the use of
.int()
validators for numeric fields - some fields use it while others don't, which could lead to inconsistency in how integer types are handled.
49-65
: Consider using z.infer for consistent type definitionThe
TaskRunV1
type is defined usingz.input<typeof TaskRunV1>
while later theRawTaskRunPayloadV1
type usesz.infer<typeof RawTaskRunPayloadV1>
. Consider standardizing on one approach for consistency.-export type TaskRunV1 = z.input<typeof TaskRunV1>; +export type TaskRunV1 = z.infer<typeof TaskRunV1>;
51-65
: Extract common ClickHouse settings to reduce duplicationThe insert functions have identical ClickHouse settings. Consider extracting these to a common constant to reduce duplication and ensure consistency.
+const DEFAULT_INSERT_SETTINGS: ClickHouseSettings = { + async_insert: 1, + wait_for_async_insert: 0, + async_insert_max_data_size: "1000000", + async_insert_busy_timeout_ms: 1000, + enable_json_type: 1, +}; export function insertTaskRuns(ch: ClickhouseWriter, settings?: ClickHouseSettings) { return ch.insert({ name: "insertTaskRuns", table: "trigger_dev.task_runs_v1", schema: TaskRunV1, settings: { - async_insert: 1, - wait_for_async_insert: 0, - async_insert_max_data_size: "1000000", - async_insert_busy_timeout_ms: 1000, - enable_json_type: 1, + ...DEFAULT_INSERT_SETTINGS, ...settings, }, }); }
75-89
: Apply the same settings extraction hereSimilarly, extract and reuse the common ClickHouse settings here.
export function insertRawTaskRunPayloads(ch: ClickhouseWriter, settings?: ClickHouseSettings) { return ch.insert({ name: "insertRawTaskRunPayloads", table: "trigger_dev.raw_task_runs_payload_v1", schema: RawTaskRunPayloadV1, settings: { - async_insert: 1, - wait_for_async_insert: 0, - async_insert_max_data_size: "1000000", - async_insert_busy_timeout_ms: 1000, - enable_json_type: 1, + ...DEFAULT_INSERT_SETTINGS, ...settings, }, }); }internal-packages/clickhouse/src/client/errors.ts (2)
10-15
: Leverage the nativeErrorOptions.cause
instead of re-implementing itNode ≥ 16.9 & browsers already expose the
cause
option onError
.
Forwarding the supplied cause to the super-constructor keeps the full stack-trace chain and avoids duplicating a field that is already present onError
.- constructor(opts: { message: string; cause?: BaseError; context?: TContext }) { - super(opts.message); + constructor(opts: { message: string; cause?: BaseError; context?: TContext }) { + // Pass the cause up so `.stack` prints the whole chain. + super(opts.message, opts.cause ? { cause: opts.cause } : undefined);(No other changes required.)
17-21
:toString()
may blow up on complex context objects
JSON.stringify
throws on circular references and silently dropsbigint
/symbol
values.
Consider a safer serialisation (e.g.safe-stable-stringify
,util.inspect
) or guard the call.- return `${this.name}: ${this.message} - ${JSON.stringify( - this.context - )} - caused by ${this.cause?.toString()}`; + const ctx = (() => { + try { + return JSON.stringify(this.context); + } catch { + return "[unserialisable context]"; + } + })(); + return `${this.name}: ${this.message} - ${ctx} - caused by ${this.cause?.toString()}`;internal-packages/clickhouse/schema/003_create_task_runs_v1.sql (1)
88-94
:FINAL
in the view can be expensive
SELECT … FINAL
forces a full merge ofReplacingMergeTree
parts on every query, defeating ClickHouse’s normal laziness and causing noticeable latency on large tables.
If you only need deduplication for rare analytic queries, consider:
- Dropping
FINAL
and relying on background merges, or- Keeping this view as-is but adding another
task_runs_live_v1
view withoutFINAL
for UI paths that don’t need perfect deduplication.internal-packages/testcontainers/src/index.ts (1)
181-188
:metadata
is created but never used – remove or forward it
withContainerSetup()
returns{ metadata }
but the variable is discarded, which makes the call towithContainerSetup
look suspicious.
If the metadata is genuinely unnecessary in this fixture, remove the destructuring to keep the intent clear; otherwise pass it down touseContainer
just like other fixtures do (they include it in the logged metadata).internal-packages/replication/src/client.ts (1)
626-636
:setInterval
callback swallows async errorsThe async lambda passed to
setInterval
is not awaited, so any rejection is unhandled and will crash the process in Node ≥ 15.Wrap the body in a try/catch or detach the async logic:
this.ackIntervalTimer = setInterval(() => { (async () => { try { if (this._isStopped) return; ... } catch (err) { this.events.emit("error", err instanceof Error ? err : new Error(String(err))); } })(); }, 1000);internal-packages/clickhouse/src/client/types.ts (1)
7-13
: Consider tightening parameter typing to removeany
and avoid double-meaning of “params”.
params
is used twice:
- In the request object (
req.params
) where it actually represents the schema of the parameters, and- Inside the
options
object (options.params
) where it represents the runtime values.Besides being easy to confuse, the type for
TIn
/TOut
isz.ZodSchema<any>
, which throws away the compile-time information Zod can give you.Suggested direction:
- query<TIn extends z.ZodSchema<any>, TOut extends z.ZodSchema<any>>(req: { + query< + TIn extends z.ZodTypeAny, + TOut extends z.ZodTypeAny + >(req: { - params?: TIn; + paramSchema?: TIn; ... - }): ClickhouseQueryFunction<z.input<TIn>, z.output<TOut>>; + }): ClickhouseQueryFunction<z.input<TIn>, z.output<TOut>>;This:
• Keeps generic inference,
• Removes theany
escape hatch, and
• Disambiguates schema vs values.(Changing the public API is breaking, so only apply if you can propagate the rename quickly.)
internal-packages/replication/src/stream.ts (1)
37-47
: Replaceconsole.log
calls with structured logging or remove themThe debug
console.log()
statements will leak to stdout in production and can severely impact performance for high-throughput streams. Please use the project-wideLogger
(or pass one in via the caller) or remove the calls entirely.- console.log("ReadableStream.start"); + // logger.debug("ReadableStream.start"); … - console.log("ReadableStream.data"); + // logger.debug("ReadableStream.data");apps/webapp/app/services/runsReplicationService.server.ts (1)
654-656
:TODO
– spreading large arrays is O(n²) memory-wiseThe code already uses
push(...items)
which is optimal; the comment suggests consideringconcat
, which would allocate a new array each time.
You can safely delete the TODO to avoid confusion.apps/webapp/test/runsReplicationService.test.ts (2)
611-624
: Prefer the enum over hard-coded string literals forstatus
Within the same file both the
TaskRunStatus
enum and raw strings ("PENDING"
,"COMPLETED_SUCCESSFULLY"
) are used. Mixing the two loosens type-safety and risks typos slipping through.- status: "PENDING", + status: TaskRunStatus.PENDING,Repeat for all occurrences.
Also applies to: 1262-1270
1120-1145
: Large-scale stress test can be made ~5-10× fasterFor the 1 000-run stress test a fixed
await setTimeout(5000)
is used. Empirically the replication finishes much sooner; conversely, a slow machine might still be busy after 5 s. Re-using the polling helper proposed above will both shorten green runs and harden the test.Additionally, consider reducing
flushIntervalMs
to10
ms (while keeping the production default higher) just for this test to speed the end-to-end path.docker/dev-compose.yml (1)
29-37
: PostgreSQL command section could be simplifiedYAML list notation with alternating
- -c
pairs is verbose and easy to mis-order. Consider a single string:command: > -c listen_addresses=* -c wal_level=logical -c shared_preload_libraries=pg_partman_bgwNo functional change, but improves readability.
apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.next.runs._index/route.tsx (1)
330-341
: Form loading state detection can be brittle
const isLoading = navigation.formAction === formAction;
navigation.formAction
contains the fully-qualified URL sent by the browser, which might have a different origin or trailing slash thanformAction
. Safer:const isLoading = navigation.state === "submitting" && navigation.formAction?.endsWith(formAction);Prevents false positives when different actions are active in parallel.
Also applies to: 387-398
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (89)
.configs/prometheus.yml
(1 hunks)apps/webapp/app/env.server.ts
(1 hunks)apps/webapp/app/metrics.server.ts
(1 hunks)apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.next.runs._index/route.tsx
(1 hunks)apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.next.runs/route.tsx
(1 hunks)apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts
(1 hunks)apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts
(1 hunks)apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts
(1 hunks)apps/webapp/app/routes/api.v1.runs.$runId.tags.ts
(1 hunks)apps/webapp/app/runEngine/services/triggerTask.server.ts
(1 hunks)apps/webapp/app/runEngine/types.ts
(0 hunks)apps/webapp/app/services/runsReplicationInstance.server.ts
(1 hunks)apps/webapp/app/services/runsReplicationService.server.ts
(1 hunks)apps/webapp/app/utils/pathBuilder.ts
(1 hunks)apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
(2 hunks)apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
(2 hunks)apps/webapp/app/v3/services/completeAttempt.server.ts
(4 hunks)apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
(2 hunks)apps/webapp/app/v3/services/enqueueDelayedRun.server.ts
(2 hunks)apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts
(2 hunks)apps/webapp/app/v3/services/finalizeTaskRun.server.ts
(1 hunks)apps/webapp/app/v3/services/triggerScheduledTask.server.ts
(1 hunks)apps/webapp/app/v3/services/triggerTask.server.ts
(1 hunks)apps/webapp/app/v3/services/triggerTaskV1.server.ts
(2 hunks)apps/webapp/package.json
(3 hunks)apps/webapp/remix.config.js
(1 hunks)apps/webapp/test/engine/triggerTask.test.ts
(0 hunks)apps/webapp/test/runsReplicationService.test.ts
(1 hunks)apps/webapp/tsconfig.json
(1 hunks)docker/Dockerfile
(3 hunks)docker/dev-compose.yml
(4 hunks)docker/docker-compose.yml
(2 hunks)docker/scripts/entrypoint.sh
(1 hunks)internal-packages/clickhouse/Dockerfile
(1 hunks)internal-packages/clickhouse/README.md
(1 hunks)internal-packages/clickhouse/package.json
(1 hunks)internal-packages/clickhouse/schema/001_create_databases.sql
(1 hunks)internal-packages/clickhouse/schema/002_create_smoke_test.sql
(1 hunks)internal-packages/clickhouse/schema/003_create_task_runs_v1.sql
(1 hunks)internal-packages/clickhouse/src/client/client.test.ts
(1 hunks)internal-packages/clickhouse/src/client/client.ts
(1 hunks)internal-packages/clickhouse/src/client/errors.ts
(1 hunks)internal-packages/clickhouse/src/client/noop.ts
(1 hunks)internal-packages/clickhouse/src/client/types.ts
(1 hunks)internal-packages/clickhouse/src/index.ts
(1 hunks)internal-packages/clickhouse/src/taskRuns.test.ts
(1 hunks)internal-packages/clickhouse/src/taskRuns.ts
(1 hunks)internal-packages/clickhouse/tsconfig.build.json
(1 hunks)internal-packages/clickhouse/tsconfig.json
(1 hunks)internal-packages/clickhouse/tsconfig.src.json
(1 hunks)internal-packages/clickhouse/tsconfig.test.json
(1 hunks)internal-packages/clickhouse/vitest.config.ts
(1 hunks)internal-packages/database/prisma/migrations/20250428211853_add_environment_type_and_org_id_to_task_run/migration.sql
(1 hunks)internal-packages/database/prisma/schema.prisma
(1 hunks)internal-packages/replication/README.md
(1 hunks)internal-packages/replication/package.json
(1 hunks)internal-packages/replication/src/client.test.ts
(1 hunks)internal-packages/replication/src/client.ts
(1 hunks)internal-packages/replication/src/errors.ts
(1 hunks)internal-packages/replication/src/index.ts
(1 hunks)internal-packages/replication/src/pgoutput.ts
(1 hunks)internal-packages/replication/src/stream.test.ts
(1 hunks)internal-packages/replication/src/stream.ts
(1 hunks)internal-packages/replication/tsconfig.build.json
(1 hunks)internal-packages/replication/tsconfig.json
(1 hunks)internal-packages/replication/tsconfig.src.json
(1 hunks)internal-packages/replication/tsconfig.test.json
(1 hunks)internal-packages/replication/vitest.config.ts
(1 hunks)internal-packages/run-engine/src/engine/eventBus.ts
(8 hunks)internal-packages/run-engine/src/engine/index.ts
(5 hunks)internal-packages/run-engine/src/engine/systems/checkpointSystem.ts
(3 hunks)internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts
(2 hunks)internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
(4 hunks)internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
(13 hunks)internal-packages/run-engine/src/engine/systems/ttlSystem.ts
(4 hunks)internal-packages/run-engine/src/engine/types.ts
(1 hunks)internal-packages/run-engine/src/index.ts
(1 hunks)internal-packages/testcontainers/package.json
(1 hunks)internal-packages/testcontainers/src/clickhouse.ts
(1 hunks)internal-packages/testcontainers/src/index.ts
(4 hunks)internal-packages/testcontainers/src/utils.ts
(2 hunks)internal-packages/tracing/src/index.ts
(1 hunks)package.json
(2 hunks)packages/core/src/v3/streams/asyncIterableStream.ts
(2 hunks)packages/core/src/v3/tryCatch.ts
(1 hunks)references/hello-world/package.json
(1 hunks)scripts/build-dockerfile.sh
(1 hunks)scripts/start-prometheus.sh
(1 hunks)
💤 Files with no reviewable changes (2)
- apps/webapp/app/runEngine/types.ts
- apps/webapp/test/engine/triggerTask.test.ts
🧰 Additional context used
🧬 Code Graph Analysis (9)
apps/webapp/app/v3/services/triggerTaskV1.server.ts (1)
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1)
environment
(2169-2192)
apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts (4)
apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts (1)
action
(11-45)apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts (1)
action
(6-37)apps/webapp/app/db.server.ts (1)
prisma
(100-100)apps/webapp/app/services/runsReplicationInstance.server.ts (1)
runsReplicationInstance
(9-12)
internal-packages/replication/src/client.test.ts (3)
internal-packages/testcontainers/src/index.ts (1)
postgresAndRedisTest
(210-216)apps/webapp/app/db.server.ts (1)
prisma
(100-100)internal-packages/replication/src/client.ts (1)
LogicalReplicationClient
(84-638)
internal-packages/replication/src/stream.test.ts (3)
internal-packages/testcontainers/src/index.ts (1)
postgresAndRedisTest
(210-216)apps/webapp/app/db.server.ts (1)
prisma
(100-100)internal-packages/replication/src/stream.ts (2)
Transaction
(18-24)createSubscription
(144-171)
internal-packages/clickhouse/src/client/types.ts (2)
packages/core/src/v3/tryCatch.ts (1)
Result
(5-5)internal-packages/clickhouse/src/client/errors.ts (2)
QueryError
(33-42)InsertError
(24-32)
apps/webapp/app/v3/services/completeAttempt.server.ts (1)
packages/core/src/v3/schemas/common.ts (2)
TaskRunError
(198-203)TaskRunError
(205-205)
internal-packages/testcontainers/src/index.ts (2)
internal-packages/testcontainers/src/clickhouse.ts (1)
StartedClickHouseContainer
(70-145)internal-packages/testcontainers/src/utils.ts (3)
withContainerSetup
(179-204)createClickHouseContainer
(51-70)useContainer
(206-230)
internal-packages/replication/src/stream.ts (4)
internal-packages/replication/src/client.ts (3)
LogicalReplicationClientOptions
(11-73)LogicalReplicationClient
(84-638)lsn
(517-521)internal-packages/replication/src/pgoutput.ts (4)
MessageInsert
(42-46)MessageUpdate
(90-96)MessageDelete
(36-41)PgoutputMessage
(11-21)apps/webapp/app/services/runsReplicationService.server.ts (1)
lsn
(190-261)packages/core/src/v3/streams/asyncIterableStream.ts (1)
createAsyncIterableStreamFromAsyncIterable
(52-97)
internal-packages/clickhouse/src/taskRuns.ts (1)
internal-packages/clickhouse/src/client/types.ts (1)
ClickhouseWriter
(54-61)
🪛 Biome (1.9.4)
internal-packages/replication/src/errors.ts
[error] 2-4: This constructor is unnecessary.
Unsafe fix: Remove the unnecessary constructor.
(lint/complexity/noUselessConstructor)
internal-packages/replication/src/stream.test.ts
[error] 66-83: Promise executor functions should not be async
.
(lint/suspicious/noAsyncPromiseExecutor)
[error] 160-179: Promise executor functions should not be async
.
(lint/suspicious/noAsyncPromiseExecutor)
internal-packages/replication/src/pgoutput.ts
[error] 336-336: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 337-337: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 341-341: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 342-342: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 343-343: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
🪛 Gitleaks (8.21.2)
internal-packages/clickhouse/src/taskRuns.test.ts
58-58: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
🪛 Checkov (3.2.334)
internal-packages/clickhouse/Dockerfile
[MEDIUM] 10-11: Basic Auth Credentials
(CKV_SECRET_4)
docker/dev-compose.yml
[MEDIUM] 42-43: Basic Auth Credentials
(CKV_SECRET_4)
🪛 LanguageTool
internal-packages/clickhouse/README.md
[uncategorized] ~19-~19: Loose punctuation mark.
Context: ...tion][version] ### Prefixes -
raw: Input data tables -
tmp_{yourname}_`: ...
(UNLIKELY_OPENING_PUNCTUATION)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: units / 🧪 Unit Tests
🔇 Additional comments (91)
references/hello-world/package.json (1)
15-16
: Good addition of deployment script.Adding the deploy script enables users to easily deploy the hello-world reference project, which aligns with the broader infrastructure enhancements in this PR.
internal-packages/replication/tsconfig.test.json (1)
1-21
: Well-configured TypeScript test setup.The TypeScript configuration properly sets up the test environment for the replication package with appropriate compiler options, strict type checking, and Vitest integration. This follows project patterns and TypeScript best practices.
internal-packages/testcontainers/package.json (1)
8-8
: Ensure version compatibility with ClickHouse server.The new
@clickhouse/client
dependency (^1.11.1
) enables ClickHouse support in test containers. Please verify that this client version is compatible with the ClickHouse server version defined in your Docker Compose setup. Additionally, confirm whether@clickhouse/client
should reside underdependencies
(runtime) ordevDependencies
(tests only) to avoid bloating production installs..configs/prometheus.yml (1)
1-8
: Verify metrics endpoint and scrape settings.This config scrapes
localhost:3030
every 15s under the job nametrigger-dev
. Confirm that your replication service indeed exposes Prometheus metrics on port 3030 (and on the default/metrics
path). If your endpoint path differs, add ametrics_path
entry. You may also want to specify ascrape_timeout
or adjustevaluation_interval
to suit your needs.apps/webapp/app/routes/api.v1.runs.$runId.tags.ts (1)
82-95
: Dropped unused update result for clarity.The call to
prisma.taskRun.update
is now unassigned, removing an unnecessary variable. This cleanup clarifies intent and avoids lint warnings for unused values.internal-packages/run-engine/src/engine/types.ts (1)
121-122
: Add optional scheduling metadata to TriggerParams.You've extended
TriggerParams
withscheduleId
andscheduleInstanceId
, enabling propagation of scheduling context. Please verify that all consumers ofTriggerParams
(e.g., trigger services, event serializers) are updated to handle these fields, and consider adding JSDoc comments to explain their usage.apps/webapp/app/v3/services/triggerTask.server.ts (1)
32-33
: Ensure V1 support for new scheduling IDs
ThescheduleId
andscheduleInstanceId
fields are now part ofTriggerTaskServiceOptions
. Verify that the V1 code path (inTriggerTaskServiceV1
) and any downstream serialization persist these values, otherwise they will be silently dropped.internal-packages/run-engine/src/index.ts (1)
3-3
: Export the newEventBusEvents
type
IncludingEventBusEvents
alongsideEventBusEventArgs
aligns with the enriched event payloads. Confirm all consumers update their imports to use the new type where appropriate.apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts (1)
2-3
: Approve addition ofenv
andlogger
imports
You correctly addedenv
(used at line 34) andlogger
(used at line 30). These imports are now required for configuration and error logging.internal-packages/replication/tsconfig.json (1)
1-8
: Appropriate TypeScript configuration for the replication packageThe configuration is well-structured for a Node.js TypeScript package, using modern Node16 module resolution and properly referencing separate configurations for source and test files. The custom condition
@triggerdotdev/source
enables conditional exports which is useful for package resolution flexibility.internal-packages/clickhouse/tsconfig.json (1)
1-8
: Consistent TypeScript configuration across internal packagesThis configuration mirrors the one in the replication package, demonstrating a consistent approach to TypeScript configuration across internal packages. This consistency will make maintenance easier as the project evolves.
apps/webapp/app/runEngine/services/triggerTask.server.ts (1)
307-309
: Correctly adds scheduling metadata to task triggersThe added
scheduleId
andscheduleInstanceId
properties appropriately extend the trigger metadata to include scheduling information, which enables tracking the relationship between scheduled tasks and their runs.apps/webapp/tsconfig.json (1)
6-6
: Upgraded TypeScript target to ES2020This change updates the TypeScript compilation target from ES2019 to ES2020, enabling newer JavaScript features like the nullish coalescing operator (??) and optional chaining (?.).
Also applies to: 13-13
apps/webapp/package.json (2)
149-149
: New dependency for concurrent operation limitingAdding
p-limit
is a good choice for controlling concurrency in the replication pipeline, which typically needs to limit simultaneous database operations.
197-198
: Added internal replication and ClickHouse packagesThese workspace dependencies enable the replication pipeline functionality described in the PR title.
internal-packages/tracing/src/index.ts (1)
48-54
: Added utility function for error recording in spansThis function extracts common error recording logic from the
startSpan
function, allowing standalone error recording in spans without the full wrapper pattern. This improves code reuse and consistency across the replication system.internal-packages/database/prisma/schema.prisma (2)
1734-1735
: Added environment type to TaskRun modelThis optional field enhances the metadata available for task runs, allowing for environment-specific filtering and analytics in the new ClickHouse dashboards.
1739-1740
: Added organization ID to TaskRun modelAdding the organization ID as an optional field improves the ability to filter and query task runs by organization in the ClickHouse-powered dashboards.
apps/webapp/app/v3/services/finalizeTaskRun.server.ts (1)
94-98
: Sanitize error only once before the update
Refactoring to computetaskRunError
viasanitizeError(error)
exactly once is a clean optimization. Please verify that passingundefined
forerror
in the Prisma update will omit the field rather than overwrite or clear it unintentionally.apps/webapp/app/v3/services/triggerTaskV1.server.ts (2)
378-380
: Enrich taskRun with environmentType and organizationId
Great addition ofenvironmentType
andorganizationId
to the creation payload. Please confirm that your Prisma schema and migrations have been updated to include these new columns on thetaskRun
model.
439-441
: Include scheduleId and scheduleInstanceId for scheduled tasks
AddingscheduleId
andscheduleInstanceId
ensures scheduled runs are traceable. Verify that these optional fields exist in the database schema and are propagated through the replication pipeline.internal-packages/clickhouse/vitest.config.ts (1)
1-19
: LGTM: Test configuration appropriate for ClickHouse integration testingThe configuration is well-structured with appropriate settings for database integration tests. The longer timeout (60s) and sequential execution (disabled parallelism, single thread) are good choices when working with external database systems like ClickHouse.
internal-packages/replication/src/index.ts (1)
1-4
: LGTM: Good module organizationClean barrel file implementation that properly consolidates exports from the replication package modules, making imports cleaner for consumers.
internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts (1)
103-122
: LGTM: Good event emission enhancementThe added event emission properly captures the run status change to "PENDING" after the transaction completes, with comprehensive context data for downstream consumers. This change enhances system observability.
packages/core/src/v3/tryCatch.ts (1)
2-5
: LGTM: Good type exportsExporting these utility types is a good practice that enables better type safety across the codebase by allowing other modules to reuse them.
apps/webapp/app/metrics.server.ts (1)
7-9
: Type declarations enhance clarity and maintainability.Adding the explicit
MetricsRegister
type and return type annotation toinitializeMetricsRegister
improves type safety and makes the code more self-documenting. This change aligns well with the needs of the new replication service that will rely on these metrics.apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.next.runs/route.tsx (1)
1-10
: Clean layout implementation using Remix patterns.This parent route component follows Remix's best practices by using
Outlet
to render child routes and wrapping them in a consistent layout container. The implementation is concise and properly structured.apps/webapp/app/v3/services/createTaskRunAttempt.server.ts (1)
162-162
: Appropriate data consistency enhancement.Adding the
attemptNumber
update to the task run record ensures that the attempt count is properly persisted in the database. This maintains data consistency between the task run and its attempts, which is important for accurate event payloads and downstream processing.apps/webapp/app/utils/pathBuilder.ts (1)
236-245
: Well-implemented path builder function matching existing patterns.The new
v3RunsNextPath
function follows the established pattern of other path builder functions in this file. It correctly handles the optional filters parameter, properly converts it to search params, and constructs the path consistently with related functions.apps/webapp/app/v3/services/triggerScheduledTask.server.ts (1)
150-154
: Improved scheduling metadata propagationThis change embeds scheduling metadata directly in the initial trigger call, eliminating the need for subsequent updates to the taskRun record. The direct inclusion of
scheduleId
andscheduleInstanceId
in the trigger options ensures consistent propagation of scheduling context through the entire run pipeline.internal-packages/replication/vitest.config.ts (1)
3-19
: LGTM: Well-configured test environment for replication packageThis Vitest configuration is appropriately set up for testing replication functionality with:
- Proper test isolation
- Sequential test execution (via
fileParallelism: false
and single thread)- Extended timeout (60s) suitable for integration tests with databases
- V8 coverage provider enabled
These settings are ideal for integration tests involving database interactions, ensuring consistent and reliable test results.
docker/Dockerfile (3)
3-5
: LGTM: Properly configured goose builder stageThe goose builder stage uses an appropriate Golang Alpine image to install the goose migration tool that will handle ClickHouse migrations.
49-53
: LGTM: Proper integration of goose and ClickHouse schemas in builder stageThe setup correctly:
- Copies the goose binary from the goose_builder stage
- Sets executable permissions on the binary
- Copies ClickHouse schema files with proper ownership
81-84
: LGTM: Proper integration of goose and ClickHouse schemas in runner stageCorrectly copies both the goose binary and ClickHouse schema files from the builder stage to the final runner stage with appropriate permissions and ownership.
docker/scripts/entrypoint.sh (2)
9-11
: LGTM: Improved migration loggingAdded clear log statements to indicate the beginning and completion of Prisma migrations, improving observability during container startup.
13-23
: LGTM: Well-structured ClickHouse migration handlingThis change properly:
- Checks for the presence of the
CLICKHOUSE_URL
environment variable- Sets up the required goose environment variables (driver, connection string, and migration directory)
- Executes migrations using the goose binary
- Includes appropriate logging before and after the migration process
- Handles the case when ClickHouse is not configured
This approach enables conditional ClickHouse schema initialization that integrates seamlessly with existing database setup.
internal-packages/run-engine/src/engine/systems/ttlSystem.ts (2)
87-88
: Enhanced data selection for event emissionGood addition of fields (
updatedAt
,projectId
,id
,expiredAt
,status
) to the select statement, which will provide richer information for the replication pipeline to ClickHouse.Also applies to: 96-98, 104-105
120-126
: Improved event payload structureNice enhancement to the event payload. The explicit organization, project, and environment objects with their IDs will make it easier to query and analyze runs by these organizational dimensions in ClickHouse dashboards.
internal-packages/replication/tsconfig.build.json (1)
1-21
: Appropriate TypeScript configuration for the new replication packageThe TypeScript configuration is well-structured for a server-side package with proper module resolution settings, strict type checking, and declaration file generation.
apps/webapp/app/v3/services/enqueueDelayedRun.server.ts (2)
83-91
: Transaction removal for taskRun updatesThe update operation has been extracted from a transaction. This change seems appropriate for the replication pipeline as it allows the taskRun update to be immediately visible for replication without waiting for the TTL logic to complete.
93-99
:❓ Verification inconclusive
TTL handling moved outside transaction
Moving the TTL handling outside the transaction is consistent with the change above. Just be aware that there's no longer an atomic operation ensuring both the status update and TTL scheduling happen together. Consider adding error handling to recover from failures that might occur between these operations.
🏁 Script executed:
#!/bin/bash # Look for any error handling patterns in similar code rg -A 3 -B 3 "ExpireEnqueuedRunService.enqueue" --glob "*.ts" | grep -i "try|catch|finally"Length of output: 91
To ensure we didn’t miss any existing error handling around the TTL enqueue call, let’s broaden and correct our search for
try
/catch
/finally
blocks that includeExpireEnqueuedRunService.enqueue
:
🏁 Script executed:
#!/bin/bash # Search within a 5-line window around the enqueue call for try/catch/finally rg --context 5 "ExpireEnqueuedRunService.enqueue" --glob '*.ts' \ | rg -E "(try\s*\{|catch\s*\(|finally\s*\{)"Length of output: 505
Manual verification needed: ensure TTL enqueue scheduling is error-handled
I wasn’t able to locate any
try
/catch
/finally
around theExpireEnqueuedRunService.enqueue
call—so a failure here could leave the run status updated without scheduling its expiration. Please confirm whether any higher-level error recovery covers this, or consider adding local error handling to maintain consistency.
- File: apps/webapp/app/v3/services/enqueueDelayedRun.server.ts (lines 93–99)
internal-packages/replication/tsconfig.src.json (1)
1-20
: TypeScript configuration looks good for the replication package.The configuration is well-structured with appropriate compiler options for a TypeScript package focused on replication. The settings align with modern TypeScript practices and provide a good balance of type safety and compatibility.
internal-packages/clickhouse/tsconfig.src.json (1)
1-20
: Configuration is consistent with the replication package.This TypeScript configuration for the ClickHouse package maintains consistency with the replication package configuration, which is good for maintainability. The only notable difference is the target ES version (ES2019 vs ES2020 in replication).
apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (4)
9-10
: Appropriate import changes for functionality.The import modifications correctly support the refactored timestamp handling and extraction of the maximum duration calculation.
15-16
: Good reorganization of imports.Separating the queue-related imports from the Redis client imports improves code organization.
443-445
: Good timestamp consistency implementation.Extracting timestamp creation into variables ensures consistency when the same timestamp is used in multiple places, avoiding subtle timing differences between operations.
451-459
: Consistent use of timestamp variables.Using the extracted timestamp variables here ensures that all references to the same logical timestamp have identical values.
internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (3)
146-164
: Well-structured event emission for status change.Adding the "runStatusChanged" event emission when transitioning to "WAITING_TO_RESUME" enhances system observability. The event payload is comprehensive, including all necessary context about the run, organization, project, and environment.
283-288
: Appropriate addition of fields to the query.The additional selected fields are necessary to support the event emission functionality, ensuring all required data is available without additional database queries.
299-317
: Consistent event emission implementation.This "runStatusChanged" event emission for the "EXECUTING" status follows the same pattern as the previous one, maintaining consistency in the codebase. The event includes all necessary context for downstream consumers.
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1)
710-720
: Good variable extraction for improved readability.The extraction of these values into local constants before the Prisma update call improves code readability and maintainability. This refactoring makes the code easier to understand and debug.
internal-packages/clickhouse/schema/002_create_smoke_test.sql (1)
1-11
: Well-structured ClickHouse migration for smoke testing.This schema creates a simple test table with appropriate column types and defaults. The MergeTree engine with ordering by timestamp and ID is suitable for timestamp-based data. The migration includes both up and down directions for easy application and rollback.
apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts (1)
7-9
: Good schema validation for the insert strategy.The Zod schema ensures that if an insert strategy is provided, it must be either "streaming" or "batching". Making it optional allows for default behavior in the replication service.
internal-packages/run-engine/src/engine/index.ts (5)
31-31
: LGTM! Import simplification.The import cleanup helps maintain a cleaner codebase by removing unused imports.
368-370
: LGTM! Extended parameters for scheduling support.Adding
scheduleId
andscheduleInstanceId
parameters allows capturing important scheduling context with each task run.
408-410
: LGTM! Added environment context to task runs.Adding
environmentType
andorganizationId
directly to the task run provides important context that will be useful for filtering and analytics in ClickHouse dashboards.
458-460
: LGTM! Persisting scheduling metadata.Storing the scheduling identifiers in the database ensures the scheduling context is preserved throughout the run's lifecycle.
557-561
: LGTM! Added run creation event emission.This event emission is a crucial building block for the replication system, enabling real-time tracking of run creation events.
apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts (1)
6-37
: LGTM! Well-structured admin API endpoint.The endpoint correctly handles authentication, authorization, and error cases with appropriate status codes.
internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts (3)
71-90
: LGTM! Comprehensive event emission for delayed run rescheduling.The event contains detailed metadata about the run, organization, project, and environment, which will be valuable for replication and monitoring.
124-124
: LGTM! Consistent timestamp usage.Storing the
queuedAt
timestamp in a variable before using it in both the database update and event emission ensures consistency between stored and reported values.Also applies to: 126-133
134-152
: LGTM! Detailed event emission for run enqueuing after delay.The event payload includes comprehensive metadata that will enable accurate tracking and replication of the run status change.
internal-packages/replication/package.json (3)
1-30
: LGTM! Well-structured package for replication.The package includes appropriate dependencies for replication functionality (Redis, PostgreSQL client, distributed locking) and follows project conventions for testing and building.
12-14
: Consider pinning exact versions for all dependencies.While most dependencies use workspace references, external dependencies like
pg
andredlock
have exact version specifications. This is good practice for ensuring reproducible builds.
23-29
: LGTM! Comprehensive script setup.The package includes all necessary scripts for development, testing, and building, ensuring a smooth development workflow.
internal-packages/clickhouse/package.json (1)
9-9
:❓ Verification inconclusive
Verify ClickHouse client version for security and compatibility
The caret (^) allows updates to any compatible version. It's good to verify this is the latest stable version without known vulnerabilities.
🏁 Script executed:
#!/bin/bash # Check latest version of @clickhouse/client npm view @clickhouse/client version # Check for known vulnerabilities npm audit --package @clickhouse/clientLength of output: 434
Confirm @clickhouse/client Version and Audit for Vulnerabilities
File: internal-packages/clickhouse/package.json
Lines: 9-9"@clickhouse/client": "^1.11.1",
- We’ve verified that
^1.11.1
resolves to the latest stable@clickhouse/client@1.11.1
.- To check for known vulnerabilities, generate a lockfile and rerun the audit:
npm i --package-lock-only npm audit --package @clickhouse/client- After confirming there are no security issues, you may keep the caret range. If you require stricter control, consider pinning to the exact version.
internal-packages/clickhouse/README.md (2)
3-3
: LGTM: Proper attributionGood practice to attribute the inspiration source for the naming conventions.
42-63
: Well-structured examples sectionThe examples section clearly illustrates the naming conventions with concrete examples, making it easy for developers to understand and follow the guidelines.
internal-packages/clickhouse/tsconfig.test.json (1)
1-21
: LGTM: Well-structured test configurationThe test configuration is well-structured with appropriate settings for the Vitest testing framework. The inclusion of strict type checking and isolated modules is particularly good for maintaining code quality.
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (3)
332-337
: Good extraction of timing variables.Extracting these variables before they're used in the update query improves clarity and ensures consistent timestamps across the operation.
363-392
: Well-structured event emission for run locking.The detailed "runLocked" event contains comprehensive metadata about the locked run, including all relevant IDs, timestamps, and configuration details. This will provide the necessary data for the ClickHouse replication pipeline indicated in the PR objectives.
614-631
: Good addition of run status change event.The "runStatusChanged" event emission follows the same pattern as other events in the system and includes the essential metadata for tracking status changes in ClickHouse.
apps/webapp/app/v3/services/completeAttempt.server.ts (1)
562-573
: Well-defined error parameter addition.Adding the
error: TaskRunError
parameter provides valuable context during the retry process and maintains type safety through the TaskRunError discriminated union. This aligns with the broader changes for enriched error handling observed in the PR.internal-packages/clickhouse/src/taskRuns.test.ts (3)
1-125
: Well-structured test suite with comprehensive coverage.This test file thoroughly validates the ClickHouse client for task runs insertion and querying. The use of the
clickhouseTest
helper ensures proper test isolation with containers. Both insert and query functionality are covered, with attention to synchronous behavior for reliable testing.🧰 Tools
🪛 Gitleaks (8.21.2)
58-58: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
127-252
: Good test for deduplication logic using ClickHouse's versioning.The second test effectively verifies the version-based deduplication using the
FINAL
modifier in ClickHouse, ensuring only the latest version (with status "COMPLETED_SUCCESSFULLY") is returned when multiple versions of the same run exist.
58-58
:✅ Verification successful
Verify this is not an actual API key.
Static analysis flagged this line as containing a potential API key. While this appears to be test data, it's good practice to use clearly fake values for test identifiers.
🏁 Script executed:
#!/bin/bash # Check if this value appears in other test files or if it follows a specific pattern rg "idempotency_key_1234" --type tsLength of output: 134
No API key leak detected
The valueidempotency_key_1234
is mock data used exclusively in internal-packages/clickhouse/src/taskRuns.test.ts. No real key is exposed and no changes are required.🧰 Tools
🪛 Gitleaks (8.21.2)
58-58: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
apps/webapp/app/services/runsReplicationInstance.server.ts (1)
14-70
: Strong implementation with good error handling and graceful shutdown.The singleton pattern for the replication service is well implemented with proper error handling, environment variable checks, and graceful shutdown hooks. The comprehensive configuration allows for flexible deployment options.
docker/docker-compose.yml (2)
63-96
: Well-configured ClickHouse service with proper health check.The ClickHouse container is well configured with appropriate environment variables, port mappings, volume mounts, and a thorough health check to ensure the service is truly ready before dependent services start.
97-106
: Good use of conditional migrator command.The migrator service is properly configured to depend on ClickHouse readiness and uses a parameterized command that can be overridden with environment variables, allowing for different migration strategies (up, down, etc.).
internal-packages/clickhouse/src/client/noop.ts (1)
8-62
: Well-implemented NoopClient with proper schema validationThis implementation of the NoopClient follows the null object pattern well, providing safe fallback behavior when an actual ClickHouse connection isn't available. The validation logic is properly implemented for both query and insert operations, ensuring type safety even when not connecting to the database.
The mock insert result with detailed fields will help maintain consistent behavior in tests and development environments.
internal-packages/run-engine/src/engine/eventBus.ts (2)
11-110
: Well-structured event types for comprehensive run lifecycle trackingThe new event types (
runCreated
,runEnqueuedAfterDelay
,runDelayRescheduled
,runLocked
, andrunStatusChanged
) provide a complete picture of run lifecycle events. The consistent structure with standardized metadata for organization, project, and environment will facilitate filtering and analysis in ClickHouse dashboards.
116-269
: Improved consistency in existing event typesThe enhancements to existing event types with additional fields for timestamps, status information, metrics, and standardized metadata ensure consistency across all events. This will make it easier to join and analyze related events in the ClickHouse analytics system.
internal-packages/clickhouse/src/index.ts (3)
10-28
: Well-designed configuration type with mutual exclusivityThe
ClickHouseConfig
union type effectively enforces mutually exclusive configuration options (single URL vs separate reader/writer URLs) using TypeScript's discriminated union pattern. This prevents configuration mistakes and clearly communicates the expected configuration format.
30-69
: Good client initialization with URL security considerationThe
ClickHouse
class constructor properly handles different configuration scenarios and falls back toNoopClient
when needed. The redaction of password from URLs in logs is a good security practice.
90-95
: Clean API design for task run operationsThe
taskRuns
getter provides a clean, focused API for task run operations. Binding the insert methods to the writer client in this way makes the API intuitive and easy to use.internal-packages/replication/src/client.ts (1)
91-92
: Typed EventEmitter usage may not compile on older@types/node
new EventEmitter<LogicalReplicationClientEvents>()
relies on generic support that only exists in@types/node
≥ 20.5.
If consumers are on an older version the compilation will fail. Two safer options:
- Use an explicit typed emitter library:
import { TypedEmitter } from "tiny-typed-emitter"; ... public readonly events = new TypedEmitter<LogicalReplicationClientEvents>();
- Stick to the built-in emitter and cast on emit:
import { EventEmitter } from "node:events"; public readonly events = new EventEmitter() as EventEmitter & { emit<E extends keyof LogicalReplicationClientEvents>( ev: E, ...args: LogicalReplicationClientEvents[E] ): boolean; };Verify the node types version in
package.json
/ lock-file to ensure this generic form is supported.apps/webapp/app/env.server.ts (1)
729-756
: Redis variable block mirrors existing patterns — looks goodThe new
RUN_REPLICATION_REDIS_*
variables follow the established pattern of re-using the base Redis values via.transform
, which keeps configuration DRY and predictable.
No issues spotted.internal-packages/replication/src/pgoutput.ts (1)
220-237
: Potential unbounded relation/type caches
_relationCache
and_typeCache
grow indefinitely. For long-running replication processes this could become a memory leak when relations are dropped or types re-created.Consider:
• Evicting entries on
truncate
/drop
messages, or
• Capping the cache size with an LRU.Not urgent for short-lived services, but worth tracking.
internal-packages/replication/src/stream.ts (1)
173-176
:filterTags
option is ignored when emitting events
filterTags
is forwarded toLogicalReplicationClient
for publication setup, but the runtime filter inisRelevantTag()
is hard-coded.
If the caller suppliesfilterTags?: ["insert"]
, the stream will still forwardupdate
anddelete
messages.Expose the option here as well or document that only the publication controls filtering.
internal-packages/testcontainers/src/clickhouse.ts (1)
22-26
:Wait.forHttp
predicate is fragile across ClickHouse versionsClickHouse may respond with
"Ok.\n"
or"Ok."
(no newline) depending on version/container.
Loosen the predicate or switch toWait.forHealthCheck()
to avoid intermittent startup failures.- Wait.forHttp("/", CLICKHOUSE_HTTP_PORT).forResponsePredicate( - (response) => response === "Ok.\n" - ) + Wait.forHttp("/", CLICKHOUSE_HTTP_PORT).forResponsePredicate( + (response) => response.trim() === "Ok." + )apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.next.runs._index/route.tsx (1)
344-349
: Potential double decoding of tag values
tags: url.searchParams.getAll("tags").map((t) => decodeURIComponent(t)),
When the query string is parsed,URLSearchParams
already performs decoding. CallingdecodeURIComponent
again mis-handles legitimately encoded values (e.g.%25
→%
). Safe to drop the manual decode:- tags: url.searchParams.getAll("tags").map((t) => decodeURIComponent(t)), + tags: url.searchParams.getAll("tags"),
@@ -0,0 +1 @@ | |||
# Replication |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance documentation for the replication package.
The README is minimal and lacks essential information about this complex package. Consider expanding it to include an overview of functionality, usage examples, API documentation, configuration options, and requirements.
Example of a more comprehensive README structure:
# Replication
## Overview
This package implements logical replication from PostgreSQL to ClickHouse with Redis-based leader election.
## Installation
...
## Usage
```typescript
// Example of initializing and using the LogicalReplicationClient
API Reference
...
Configuration
...
Troubleshooting
...
<!-- This is an auto-generated comment by CodeRabbit -->
ENV GOOSE_DRIVER=clickhouse | ||
ENV GOOSE_DBSTRING="tcp://default:password@clickhouse:9000" | ||
ENV GOOSE_MIGRATION_DIR=./schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not hard-code database credentials in the Dockerfile
Storing tcp://default:password@clickhouse:9000
directly in ENV
exposes credentials and violates security best practices. Use build-time ARG
or Docker secrets to inject the GOOSE_DBSTRING
at runtime instead of hard-coding it.
🧰 Tools
🪛 Checkov (3.2.334)
[MEDIUM] 10-11: Basic Auth Credentials
(CKV_SECRET_4)
#!/bin/bash | ||
|
||
set -e | ||
|
||
docker build -t local-triggerdotdev:latest -f docker/Dockerfile . | ||
image=local-triggerdotdev:latest | ||
src=/triggerdotdev | ||
dst=$(mktemp -d) | ||
|
||
mkdir -p $dst | ||
|
||
echo -e "Extracting image into $dst..." | ||
|
||
container=$(docker create "$image") | ||
docker cp "$container:$src" "$dst" | ||
docker rm "$container" | ||
/Applications/Visual\ Studio\ Code.app/Contents/Resources/app/bin/code "$dst/triggerdotdev" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Script works but has portability and cleanup issues.
The script successfully builds and extracts Docker image contents, but has several issues:
- It uses a hardcoded macOS path to VS Code, making it non-portable
- No cleanup for the temporary directory is implemented
- Limited error handling beyond the basic
set -e
Consider these improvements:
#!/bin/bash
set -e
docker build -t local-triggerdotdev:latest -f docker/Dockerfile .
image=local-triggerdotdev:latest
src=/triggerdotdev
-dst=$(mktemp -d)
+dst=$(mktemp -d)
+
+# Clean up temp directory on exit
+function cleanup {
+ echo "Cleaning up temporary directory: $dst"
+ rm -rf "$dst"
+}
+trap cleanup EXIT
mkdir -p $dst
echo -e "Extracting image into $dst..."
container=$(docker create "$image")
docker cp "$container:$src" "$dst"
docker rm "$container"
-/Applications/Visual\ Studio\ Code.app/Contents/Resources/app/bin/code "$dst/triggerdotdev"
+
+# Use 'code' command if available, otherwise try platform-specific paths
+if command -v code &> /dev/null; then
+ code "$dst/triggerdotdev"
+elif [ -f "/Applications/Visual Studio Code.app/Contents/Resources/app/bin/code" ]; then
+ "/Applications/Visual Studio Code.app/Contents/Resources/app/bin/code" "$dst/triggerdotdev"
+else
+ echo "VS Code not found in path or at default macOS location."
+ echo "Extracted files are available at: $dst/triggerdotdev"
+ # Don't exit immediately to allow the cleanup trap to run
+ exit 1
+fi
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
#!/bin/bash | |
set -e | |
docker build -t local-triggerdotdev:latest -f docker/Dockerfile . | |
image=local-triggerdotdev:latest | |
src=/triggerdotdev | |
dst=$(mktemp -d) | |
mkdir -p $dst | |
echo -e "Extracting image into $dst..." | |
container=$(docker create "$image") | |
docker cp "$container:$src" "$dst" | |
docker rm "$container" | |
/Applications/Visual\ Studio\ Code.app/Contents/Resources/app/bin/code "$dst/triggerdotdev" | |
#!/bin/bash | |
set -e | |
docker build -t local-triggerdotdev:latest -f docker/Dockerfile . | |
image=local-triggerdotdev:latest | |
src=/triggerdotdev | |
dst=$(mktemp -d) | |
# Clean up temp directory on exit | |
function cleanup { | |
echo "Cleaning up temporary directory: $dst" | |
rm -rf "$dst" | |
} | |
trap cleanup EXIT | |
mkdir -p $dst | |
echo -e "Extracting image into $dst..." | |
container=$(docker create "$image") | |
docker cp "$container:$src" "$dst" | |
docker rm "$container" | |
# Use 'code' command if available, otherwise try platform-specific paths | |
if command -v code &> /dev/null; then | |
code "$dst/triggerdotdev" | |
elif [ -f "/Applications/Visual Studio Code.app/Contents/Resources/app/bin/code" ]; then | |
"/Applications/Visual Studio Code.app/Contents/Resources/app/bin/code" "$dst/triggerdotdev" | |
else | |
echo "VS Code not found in PATH or at default macOS location." | |
echo "Extracted files are available at: $dst/triggerdotdev" | |
# Allow cleanup trap to run | |
exit 1 | |
fi |
export async function createClickHouseContainer(network: StartedNetwork) { | ||
const container = await new ClickHouseContainer().withNetwork(network).start(); | ||
|
||
const client = createClient({ | ||
url: container.getConnectionUrl(), | ||
}); | ||
|
||
await client.ping(); | ||
|
||
// Now we run the migrations | ||
const migrationsPath = path.resolve(__dirname, "../../clickhouse/schema"); | ||
|
||
await runClickhouseMigrations(client, migrationsPath); | ||
|
||
return { | ||
url: container.getConnectionUrl(), | ||
container, | ||
network, | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for ClickHouse connection and migration failures
Unlike the Redis container setup which has explicit error handling, the ClickHouse container setup lacks robust error handling for connection failures or migration errors.
export async function createClickHouseContainer(network: StartedNetwork) {
const container = await new ClickHouseContainer().withNetwork(network).start();
const client = createClient({
url: container.getConnectionUrl(),
});
- await client.ping();
+ try {
+ await client.ping();
+ } catch (error) {
+ await container.stop({ timeout: 10 });
+ throw new Error("Failed to connect to ClickHouse container", { cause: error });
+ }
// Now we run the migrations
const migrationsPath = path.resolve(__dirname, "../../clickhouse/schema");
- await runClickhouseMigrations(client, migrationsPath);
+ try {
+ await runClickhouseMigrations(client, migrationsPath);
+ } catch (error) {
+ await container.stop({ timeout: 10 });
+ throw new Error("Failed to run ClickHouse migrations", { cause: error });
+ }
return {
url: container.getConnectionUrl(),
container,
network,
};
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
export async function createClickHouseContainer(network: StartedNetwork) { | |
const container = await new ClickHouseContainer().withNetwork(network).start(); | |
const client = createClient({ | |
url: container.getConnectionUrl(), | |
}); | |
await client.ping(); | |
// Now we run the migrations | |
const migrationsPath = path.resolve(__dirname, "../../clickhouse/schema"); | |
await runClickhouseMigrations(client, migrationsPath); | |
return { | |
url: container.getConnectionUrl(), | |
container, | |
network, | |
}; | |
} | |
export async function createClickHouseContainer(network: StartedNetwork) { | |
const container = await new ClickHouseContainer().withNetwork(network).start(); | |
const client = createClient({ | |
url: container.getConnectionUrl(), | |
}); | |
try { | |
await client.ping(); | |
} catch (error) { | |
await container.stop({ timeout: 10 }); | |
throw new Error("Failed to connect to ClickHouse container", { cause: error }); | |
} | |
// Now we run the migrations | |
const migrationsPath = path.resolve(__dirname, "../../clickhouse/schema"); | |
try { | |
await runClickhouseMigrations(client, migrationsPath); | |
} catch (error) { | |
await container.stop({ timeout: 10 }); | |
throw new Error("Failed to run ClickHouse migrations", { cause: error }); | |
} | |
return { | |
url: container.getConnectionUrl(), | |
container, | |
network, | |
}; | |
} |
const runsReplicationServiceA = new RunsReplicationService({ | ||
clickhouse, | ||
pgConnectionUrl: postgresContainer.getConnectionUri(), | ||
serviceName: "runs-replication-shutdown-handover", | ||
slotName: "task_runs_to_clickhouse_v1", | ||
publicationName: "task_runs_to_clickhouse_v1_publication", | ||
redisOptions, | ||
maxFlushConcurrency: 1, | ||
flushIntervalMs: 100, | ||
flushBatchSize: 1, | ||
insertStrategy: "batching", | ||
leaderLockTimeoutMs: 5000, | ||
leaderLockExtendIntervalMs: 1000, | ||
ackIntervalSeconds: 5, | ||
}); | ||
|
||
await runsReplicationServiceA.start("batching"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure services are always stopped even when an assertion fails
await runsReplicationService.stop()
is executed at the end of each happy-path, but if an assertion throws first the ClickHouse writer and the logical replication slot keep running, leaking resources and occasionally blocking subsequent tests.
Wrap the critical section in try … finally
:
- await runsReplicationService.start();
- … assertions …
- await runsReplicationService.stop();
+ await runsReplicationService.start();
+ try {
+ … assertions …
+ } finally {
+ await runsReplicationService.stop();
+ }
containerTest
does not automatically guarantee this shutdown.
Also applies to: 876-893
await setTimeout(1000); | ||
|
||
// Check that the row was replicated to clickhouse | ||
const queryRuns = clickhouse.reader.query({ | ||
name: "runs-replication", | ||
query: "SELECT * FROM trigger_dev.task_runs_v1", | ||
schema: z.any(), | ||
}); | ||
|
||
const [queryError, result] = await queryRuns({}); | ||
|
||
expect(queryError).toBeNull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace fixed setTimeout
sleeps with polling to avoid flaky tests
Using hard-coded sleeps (await setTimeout(1000)
) assumes the replication will always finish within the timeout. On a slower CI node, this may introduce intermittent failures, while on a fast laptop it needlessly elongates the test run.
Consider polling ClickHouse until the expected rows appear (or a global test-timeout is hit). A helper such as:
async function waitForRun(
query: (args?: any) => Promise<[unknown, any[] | null]>,
expected: number,
timeoutMs = 10_000
) {
const start = Date.now();
for (;;) {
const [, rows] = await query({});
if ((rows?.length ?? 0) === expected) return rows;
if (Date.now() - start > timeoutMs) {
throw new Error(`Timed out after ${timeoutMs} ms`);
}
await setTimeout(250);
}
}
would let the assertions become deterministic and faster on average.
Also applies to: 190-203, 297-310
clickhouse: | ||
image: bitnami/clickhouse:latest | ||
container_name: clickhouse-dev | ||
environment: | ||
CLICKHOUSE_ADMIN_USER: default | ||
CLICKHOUSE_ADMIN_PASSWORD: password | ||
ports: | ||
- "8123:8123" | ||
- "9000:9000" | ||
volumes: | ||
- clickhouse:/bitnami/clickhouse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pin image tags and avoid committing plaintext credentials
-
image: bitnami/clickhouse:latest
latest
drifts over time and may introduce breaking changes.- Pin to a known-good minor (
bitnami/clickhouse:23.10.4
) and bump deliberately.
-
CLICKHOUSE_ADMIN_PASSWORD: password
&CLICKHOUSE_URL: http://default:password@clickhouse:8123
- Even for dev, checked-in default passwords are picked up by secret-scanners and create noise (see CKV_SECRET_4).
- Move to an
.env
file ignored by Git or use Docker secrets.
Example:
clickhouse:
image: bitnami/clickhouse:23.10.4
env_file: .env.dev # contains CLICKHOUSE_ADMIN_PASSWORD
Also applies to: 54-56, 111-112
|
||
const url = new URL(request.url); | ||
|
||
let rootOnlyValue = false; | ||
if (url.searchParams.has("rootOnly")) { | ||
rootOnlyValue = url.searchParams.get("rootOnly") === "true"; | ||
} else { | ||
rootOnlyValue = await getRootOnlyFilterPreference(request); | ||
} | ||
|
||
const project = await findProjectBySlug(organizationSlug, projectParam, userId); | ||
if (!project) { | ||
throw new Error("Project not found"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Prefer early‐return for missing project/environment
Throwing generic Error
s here returns a 500 response, leaking stack traces in prod. Remix allows returning throw new Response("Not Found", { status: 404 })
for a proper HTTP 404 and a user-friendly error boundary.
if (!project) {
- throw new Error("Project not found");
+ throw new Response("Project not found", { status: 404 });
}
Same for environment.
Also applies to: 90-98
<DialogContent key="replay"> | ||
<DialogHeader>Cancel {selectedItems.size} runs?</DialogHeader> | ||
<DialogDescription className="pt-2"> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DialogContent
keys are duplicated/copy-pasted
CancelRuns
sets <DialogContent key="replay">
, same as ReplayRuns
. The key determines React’s element identity; re-using the same value prevents React from unmounting stateful contents when switching between dialogs and can surface subtle bugs.
- <DialogContent key="replay">
+ <DialogContent key="cancel">
Apply to the Cancel dialog only.
Also applies to: 405-407
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
internal-packages/testcontainers/src/clickhouse.ts (1)
158-185
: Execute migrations in deterministic order
readdir
returns filenames in arbitrary order which can break schema migrations if they're not executed in sequence. This was previously flagged but not addressed.async function getAllClickhouseMigrationQueries(migrationsPath: string) { const queries: string[] = []; // Get all the *.sql files in the migrations path - const migrations = await readdir(migrationsPath); + const migrations = (await readdir(migrationsPath)) + .filter((f) => f.endsWith(".sql")) + .sort(); for (const migration of migrations) { const migrationPath = resolve(migrationsPath, migration);Additionally, consider adding validation for migration filenames to ensure they follow a consistent versioning format.
🧹 Nitpick comments (2)
internal-packages/testcontainers/src/clickhouse.ts (2)
174-174
: Avoid non-null assertion operatorReplace the non-null assertion with a defensive check to make the code more robust.
- const upQueries = parts[2]!.trim(); + const upQueries = parts[2] ? parts[2].trim() : "";
174-181
: Consider logging migration execution for better observabilityAdding logging for each migration being executed would improve debugging and observability.
This could be implemented in the
runClickhouseMigrations
function to log each migration file and query being executed, especially useful in test environments to track migration progress.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal-packages/testcontainers/src/clickhouse.ts
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: units / 🧪 Unit Tests
🔇 Additional comments (4)
internal-packages/testcontainers/src/clickhouse.ts (4)
1-13
: Imports and constants are appropriately definedThe imports are correctly set up for the required functionality, and constants for ClickHouse ports are properly defined at the top of the file for better readability and maintainability.
14-68
: Well-structured container configuration with good defaultsThe
ClickHouseContainer
class is well implemented with:
- Appropriate default credentials and database name
- Properly exposed ports
- A specific wait strategy for ClickHouse
- Reasonable startup timeout
- High ulimits to prevent "Too many open files" errors
- Fluent API for configuration options
The implementation follows testcontainers patterns correctly.
29-37
: Good proactive error prevention with ulimitsThe high ulimits setting is a good practice to prevent "Too many open files" errors during testing, especially with database containers that may open many connections.
70-145
: Well-designed container interface with comprehensive helper methodsThe
StartedClickHouseContainer
class provides a complete API with:
- Proper getters for connection details
- Well-documented HTTP URL methods
- Client options compatible with
@clickhouse/client
- A correctly formatted connection URL that handles database paths appropriately
The URL handling in
getConnectionUrl()
correctly handles database paths with or without leading slashes.
export async function runClickhouseMigrations(client: ClickHouseClient, migrationsPath: string) { | ||
// Get all the *.sql files in the migrations path | ||
const queries = await getAllClickhouseMigrationQueries(migrationsPath); | ||
|
||
for (const query of queries) { | ||
await client.command({ | ||
query, | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling to migration execution
The migration execution function should include error handling to provide better diagnostics when migrations fail.
export async function runClickhouseMigrations(client: ClickHouseClient, migrationsPath: string) {
// Get all the *.sql files in the migrations path
const queries = await getAllClickhouseMigrationQueries(migrationsPath);
for (const query of queries) {
- await client.command({
- query,
- });
+ try {
+ await client.command({
+ query,
+ });
+ } catch (error) {
+ // Include the query in the error message for easier debugging
+ const truncatedQuery = query.length > 100 ? `${query.substring(0, 100)}...` : query;
+ throw new Error(`Failed to execute ClickHouse migration query: ${truncatedQuery}\nError: ${error}`);
+ }
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
export async function runClickhouseMigrations(client: ClickHouseClient, migrationsPath: string) { | |
// Get all the *.sql files in the migrations path | |
const queries = await getAllClickhouseMigrationQueries(migrationsPath); | |
for (const query of queries) { | |
await client.command({ | |
query, | |
}); | |
} | |
} | |
export async function runClickhouseMigrations(client: ClickHouseClient, migrationsPath: string) { | |
// Get all the *.sql files in the migrations path | |
const queries = await getAllClickhouseMigrationQueries(migrationsPath); | |
for (const query of queries) { | |
try { | |
await client.command({ | |
query, | |
}); | |
} catch (error) { | |
// Include the query in the error message for easier debugging | |
const truncatedQuery = query.length > 100 ? `${query.substring(0, 100)}...` : query; | |
throw new Error(`Failed to execute ClickHouse migration query: ${truncatedQuery}\nError: ${error}`); | |
} | |
} | |
} |
…to the replication service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
internal-packages/replication/src/client.ts (3)
131-137
: 🛠️ Refactor suggestionGuard against
undefined
in Redis key prefix
options.redisOptions.keyPrefix
is interpolated directly.
When the caller omits the option the resulting prefix literally starts with the string"undefined"
, polluting the key-space.- keyPrefix: `${options.redisOptions.keyPrefix}logical-replication-client:`, + keyPrefix: `${options.redisOptions.keyPrefix ?? ""}logical-replication-client:`,
152-207
:⚠️ Potential issueRedis connection is never closed → resource leak
stop()
cleans up PG and timers but the dedicated Redis client created in the constructor is left open.
Long-running services that repeatedly start/stop the client will accumulate idle Redis connections.@@ if (this.client) { ... this.client = null; } + + // Close the Redis connection + if (this.redis.status !== "end") { + await this.redis.quit(); + }
420-426
:⚠️ Potential issueInterpolating identifiers opens room for SQL-injection / invalid identifiers
Table, slot and publication names are embedded straight into the SQL strings.
Any value containing"
or back-slashes will break the statement or execute unintended SQL.
Use parameterised queries orpg-format
’s%I
placeholder to safely quote identifiers.Example:
-import format from "pg-format"; -... -this.client.query( - `CREATE PUBLICATION "${this.options.publicationName}" FOR TABLE "${this.options.table}" ...` -) +import format from "pg-format"; +... +await this.client.query( + format( + 'CREATE PUBLICATION %I FOR TABLE %I %s', + this.options.publicationName, + this.options.table, + publishClause + ) +);Also applies to: 453-456, 470-474, 498-503, 511-514
🧹 Nitpick comments (2)
internal-packages/replication/src/client.ts (1)
644-659
:ackIntervalTimer
fires every second regardless of configured interval
setInterval
is hard-coded to1000 ms
, but the option is documented as “interval in seconds”.
For largeackIntervalSeconds
this wakes the event loop 60× more often than necessary.- }, 1000); + }, this.ackIntervalSeconds * 1000);apps/webapp/app/services/runsReplicationService.server.ts (1)
700-707
: Micro-optimisation: avoid spread in hot path
push(...items)
causes the array to re-compute its length on every element.
Array.prototype.push
can take multiple arguments; passing the array viaapply
(or usingfor…of
) avoids the extra spread-operator overhead.- this.currentBatch.push(...items); + this.currentBatch.push.apply(this.currentBatch, items);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (5)
apps/webapp/app/services/runsReplicationService.server.ts
(1 hunks)apps/webapp/package.json
(4 hunks)internal-packages/clickhouse/package.json
(1 hunks)internal-packages/replication/package.json
(1 hunks)internal-packages/replication/src/client.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- apps/webapp/package.json
- internal-packages/clickhouse/package.json
- internal-packages/replication/package.json
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal-packages/replication/src/client.ts (4)
internal-packages/redis/src/index.ts (2)
RedisOptions
(4-4)Redis
(4-4)internal-packages/tracing/src/index.ts (1)
Tracer
(7-7)internal-packages/replication/src/pgoutput.ts (3)
PgoutputMessage
(11-21)PgoutputParser
(164-391)getPgoutputStartReplicationSQL
(393-404)internal-packages/replication/src/errors.ts (1)
LogicalReplicationClientError
(1-5)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
if (this._insertStrategy === "batching") { | ||
this._concurrentFlushScheduler | ||
.addToBatch( | ||
transaction.events.map((event) => ({ | ||
_version, | ||
run: event.data, | ||
event: event.tag, | ||
})) | ||
) | ||
.catch((error) => { | ||
this.logger.error("Error adding to batch", { | ||
error, | ||
}); | ||
}); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledging LSN before ClickHouse durability breaks at-least-once guarantees
In “batching” mode the code pushes events to the scheduler without awaiting the flush and immediately acknowledges the LSN.
If the process crashes before the batch is persisted, events are lost and PostgreSQL won’t resend them.
- if (this._insertStrategy === "batching") {
- this._concurrentFlushScheduler
- .addToBatch(/* … */)
- .catch(/* log */);
- } else {
+ if (this._insertStrategy === "batching") {
+ await this._concurrentFlushScheduler.addToBatch(/* … */);
+ } else {
…
}
- if (!alreadyAcknowledged) {
- await this._replicationClient.acknowledge(transaction.commitEndLsn);
- }
+ if (!alreadyAcknowledged) {
+ await this._replicationClient.acknowledge(transaction.commitEndLsn);
+ }
Also applies to: 380-387
const failedBatches = results.filter((result) => result.status === "rejected").length; | ||
this.failedBatchCount += failedBatches; | ||
|
||
this.logger.info("Batch flush complete", { | ||
totalBatches: batches.length, | ||
successfulBatches: batches.length - failedBatches, | ||
failedBatches, | ||
totalFailedBatches: this.failedBatchCount, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Failed batches are counted but never retried
flushNextBatch()
increments failedBatchCount
yet silently drops the data, risking permanent loss.
Consider re-queuing failed batches or halting replication until the operator intervenes, e.g.:
- const failedBatches = results.filter((r) => r.status === "rejected").length;
- this.failedBatchCount += failedBatches;
+ const failed = results
+ .map((r, i) => ({ r, i }))
+ .filter(({ r }) => r.status === "rejected")
+ .map(({ i }) => batches[i]);
+ this.failedBatchCount += failed.length;
+
+ // Re-queue failed batches for another attempt
+ for (const b of failed) {
+ this.currentBatch.unshift(...b);
+ }
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (3)
apps/webapp/app/services/runsReplicationService.server.ts (1)
735-744
: 🛠️ Refactor suggestionFailed batches are dropped instead of retried
#flushNextBatch()
incrementsfailedBatchCount
but the records are
not re-queued. A transient ClickHouse error would therefore cause
permanent data loss and replication slot divergence.A simple fix is to push the failed batch back onto
currentBatch
(or a dedicated retry queue) so it will be retried on the next flush:- if (error) { - this.logger.error("Error flushing batch", { error }); - this.failedBatchCount++; - } + if (error) { + this.logger.error("Error flushing batch", { error }); + this.failedBatchCount++; + // re-queue for another attempt + this.currentBatch.unshift(...batch); + }apps/webapp/test/runsReplicationService.test.ts (2)
41-130
: Always stop the replication service infinally{}
to avoid resource leaksIf one of the
expect
assertions throws,runsReplicationService.stop()
is never invoked, leaving
the logical replication slot active and the ClickHouse writer running.
This can block subsequent tests and occasionally dead-lock CI runners
(as noted in a previous review).await runsReplicationService.start(); - … assertions … -await runsReplicationService.stop(); +try { + … assertions … +} finally { + await runsReplicationService.stop(); +}
88-99
: Replace fixed sleeps with polling to make the test deterministicUsing
await setTimeout(1000)
assumes replication will always complete
within 1 s on every CI node. Introduce a helper that polls ClickHouse
until the expected rows appear (or a global timeout is hit) to eliminate
flakiness and speed up fast runs.This concern was raised earlier but is still present throughout the file.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
apps/webapp/app/env.server.ts
(1 hunks)apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts
(1 hunks)apps/webapp/app/services/runsReplicationInstance.server.ts
(1 hunks)apps/webapp/app/services/runsReplicationService.server.ts
(1 hunks)apps/webapp/test/runsReplicationService.test.ts
(1 hunks)apps/webapp/test/utils/tracing.ts
(1 hunks)internal-packages/replication/src/client.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- apps/webapp/app/services/runsReplicationInstance.server.ts
- internal-packages/replication/src/client.ts
- apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts
- apps/webapp/app/env.server.ts
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: units / 🧪 Unit Tests
export function createInMemoryTracing() { | ||
// Initialize the tracer provider and exporter | ||
const provider = new NodeTracerProvider(); | ||
const exporter = new InMemorySpanExporter(); | ||
provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); | ||
provider.register(); | ||
|
||
// Retrieve the tracer | ||
const tracer = trace.getTracer("test-tracer"); | ||
|
||
return { | ||
exporter, | ||
tracer, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure the tracer provider is properly torn-down between tests
NodeTracerProvider.register()
installs a global provider.
If multiple tests call createInMemoryTracing()
the old provider remains active and continues to buffer spans, which can:
• leak memory,
• make assertions unreliable (spans from previous suites bleed into the next),
• break other OpenTelemetry instrumentation that expects only one provider.
Consider returning the provider
so callers can invoke await provider.shutdown()
(or at least provider.activeSpanProcessor?.forceFlush()
) in afterEach
.
A minimal change:
export function createInMemoryTracing() {
…
- const tracer = trace.getTracer("test-tracer");
-
- return {
- exporter,
- tracer,
- };
+ const tracer = trace.getTracer("test-tracer");
+
+ return {
+ exporter,
+ tracer,
+ provider, // expose for shutdown
+ };
}
and in the test:
const { provider } = createInMemoryTracing();
afterEach(() => provider.shutdown());
async #prepareJson( | ||
data: string | undefined | null, | ||
dataType: string | ||
): Promise<{ data: unknown }> { | ||
if (!data) { | ||
return { data: undefined }; | ||
} | ||
|
||
if (dataType !== "application/json" && dataType !== "application/super+json") { | ||
return { data: undefined }; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
#prepareJson
should accept undefined / null MIME type
dataType
is declared as string
but callers pass
run.outputType
/ run.payloadType
, which can be null | undefined
.
This violates TypeScript strict-null-checks and may crash at runtime when
.startsWith
/ comparisons are performed.
- dataType: string
+ dataType?: string | null
Also adjust the two equality checks accordingly.
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (6)
apps/webapp/app/services/runsReplicationService.server.ts (4)
149-162
:⚠️ Potential issuePotential data loss in shutdown sequence
The shutdown method doesn't flush buffered data when there's no current transaction. This can lead to data loss if there are pending items in the scheduler's batch that haven't been flushed yet.
Fix the shutdown sequence to always flush pending data:
public async shutdown() { this._isShuttingDown = true; this.logger.info("Initiating shutdown of runs replication service"); if (!this._currentTransaction) { this.logger.info("No transaction to commit, shutting down immediately"); - await this._replicationClient.stop(); - this._isShutDownComplete = true; - return; + this.logger.info("No transaction to commit, flushing any buffered rows"); } this._concurrentFlushScheduler.shutdown(); + await this._replicationClient.stop(); + this._isShutDownComplete = true; }
731-769
: 🛠️ Refactor suggestionFailed batches are counted but never retried
When a batch fails, it's counted but the data is permanently lost. Consider implementing a retry mechanism to ensure data durability.
Implement a retry mechanism for failed batches:
async #flushNextBatch(): Promise<void> { if (this.currentBatch.length === 0) return; const batch = this.currentBatch; this.currentBatch = []; const callback = this.config.callback; const promise = this.concurrencyLimiter(async () => { await startSpan(this._tracer, "flushNextBatch", async (span) => { const batchId = nanoid(); span.setAttribute("batch_id", batchId); span.setAttribute("batch_size", batch.length); span.setAttribute("concurrency_active_count", this.concurrencyLimiter.activeCount); span.setAttribute("concurrency_pending_count", this.concurrencyLimiter.pendingCount); span.setAttribute("concurrency_concurrency", this.concurrencyLimiter.concurrency); await callback(batchId, batch); }); }); const [error] = await tryCatch(promise); if (error) { this.logger.error("Error flushing batch", { error, }); this.failedBatchCount++; + // Re-queue failed batch items + this.currentBatch = [...batch, ...this.currentBatch]; + this.logger.info("Re-queued failed batch items", { count: batch.length }); } this.logger.debug("Batch flush complete", { totalBatches: 1, successfulBatches: 1, failedBatches: 0, totalFailedBatches: this.failedBatchCount, }); }
604-607
:⚠️ Potential issueType safety issue in
#prepareJson
methodThe
dataType
parameter is declared asstring
but callers passrun.outputType
orrun.payloadType
which might benull
orundefined
.Update the parameter type to match actual usage:
async #prepareJson( data: string | undefined | null, - dataType: string + dataType: string | undefined | null ): Promise<{ data: unknown }> {
337-343
:⚠️ Potential issueMissing await could cause data loss during transaction handling
The service adds events to the batch but doesn't wait for them to be processed before moving on, which could lead to data loss if a crash occurs before flushing.
Add an await to ensure data is added to the batch before proceeding:
- this._concurrentFlushScheduler.addToBatch( + await this._concurrentFlushScheduler.addToBatch( transaction.events.map((event) => ({ _version, run: event.data, event: event.tag, })) );internal-packages/replication/src/pgoutput.ts (2)
333-354
: 🛠️ Refactor suggestionWrap each
case
in braces to prevent variable scope leakageVariables declared in switch cases without block scoping can lead to subtle bugs. Static analysis correctly flagged this issue.
Add block scoping to each case:
switch (kind) { - case 0x62: // 'b' binary - const bsize = reader.readInt32(); - const bval = reader.read(bsize); - tuple[name] = bval; - break; + case 0x62: { // 'b' binary + const bsize = reader.readInt32(); + const bval = reader.read(bsize); + tuple[name] = bval; + break; + } - case 0x74: // 't' text - const valsize = reader.readInt32(); - const valbuf = reader.read(valsize); - const valtext = reader.decodeText(valbuf); - tuple[name] = parser(valtext); - break; + case 0x74: { // 't' text + const valsize = reader.readInt32(); + const valbuf = reader.read(valsize); + const valtext = reader.decodeText(valbuf); + tuple[name] = parser(valtext); + break; + } case 0x6e: // 'n' null tuple[name] = null; break; case 0x75: // 'u' unchanged toast datum tuple[name] = unchangedToastFallback?.[name]; break; default: throw Error(`unknown attribute kind ${String.fromCharCode(kind)}`); }🧰 Tools
🪛 Biome (1.9.4)
[error] 335-335: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 336-336: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 340-340: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 341-341: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 342-342: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
156-160
:⚠️ Potential issueTimestamp unit conversion inconsistency
The comment implies converting to milliseconds for JS Date compatibility, but the function returns microseconds. This leads to timestamp values that are 1000× larger than expected.
Fix the timestamp conversion:
readTime(): bigint { // (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY == 946684800000000 const microsSinceUnixEpoch = this.readUint64() + 946684800000000n; - return microsSinceUnixEpoch; + // Convert to milliseconds for JS Date compatibility + return microsSinceUnixEpoch / 1000n; }
🧹 Nitpick comments (2)
apps/webapp/app/services/runsReplicationService.server.ts (1)
388-394
: Improved error handling for acknowledgment failuresThe error handling for acknowledgment failures is good, but could be improved with exponential backoff for retries.
Consider implementing a backoff strategy for acknowledgment failures:
const [ackError] = await tryCatch( this._replicationClient.acknowledge(this._latestCommitEndLsn) ); if (ackError) { this.logger.error("Error acknowledging transaction", { ackError }); + // Reset acknowledgment state to retry in the next interval + this._lastAcknowledgedLsn = null; }internal-packages/replication/src/pgoutput.ts (1)
1-4
: Add TypeScript triple-slash directive for BigInt supportSince this file explicitly requires ES2020+ for BigInt literals, add a TypeScript reference directive to enforce this requirement at the type level.
// NOTE: This file requires ES2020 or higher for BigInt literals (used in BinaryReader.readTime) +// @ts-check +/// <reference lib="es2020.bigint" /> import { Client } from "pg"; import { types } from "pg";
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
apps/webapp/app/services/runsReplicationService.server.ts
(1 hunks)internal-packages/replication/src/pgoutput.ts
(1 hunks)
🧰 Additional context used
🪛 Biome (1.9.4)
internal-packages/replication/src/pgoutput.ts
[error] 335-335: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 336-336: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 340-340: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 341-341: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
[error] 342-342: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
🔇 Additional comments (1)
internal-packages/replication/src/pgoutput.ts (1)
315-322
:❓ Verification inconclusive
Potential issue with handling of null values in key columns
The key tuple processing converts
null
values toundefined
, which might lead to unexpected behavior depending on how this data is later processed or compared.Verify this behavior is intentional and consider keeping
null
values as-is to maintain semantic consistency:private readKeyTuple(reader: BinaryReader, relation: MessageRelation): Record<string, any> { const tuple = this.readTuple(reader, relation); const key = Object.create(null); for (const k of relation.keyColumns) { - key[k] = tuple[k] === null ? undefined : tuple[k]; + key[k] = tuple[k]; } return key; }
🏁 Script executed:
#!/bin/bash # Check if null key values appear and how they're used in the codebase echo "Searching for code that uses key tuples from pgoutput messages..." rg -A 10 -B 10 "key\s*=\s*this\.readKeyTuple" --type ts echo "Searching for code that compares key values to undefined..." rg "key\[\w+\]\s*===\s*undefined" --type tsLength of output: 4053
Confirm intentional
null
→undefined
mapping inreadKeyTuple
I ran searches for any code that compares key values to
undefined
and found none—so converting SQLnull
to JSundefined
here will silently drop those properties when serialized (e.g. viaJSON.stringify
) and may not be intentional. If the goal is simply to forward raw tuple values, preservingnull
is safer:private readKeyTuple(reader: BinaryReader, relation: MessageRelation): Record<string, any> { const tuple = this.readTuple(reader, relation); const key = Object.create(null); for (const k of relation.keyColumns) { - key[k] = tuple[k] === null ? undefined : tuple[k]; + key[k] = tuple[k]; } return key; }• File: internal-packages/replication/src/pgoutput.ts
• Method:readKeyTuple
(around line 315)Please verify whether any downstream logic or tests rely on these keys being
undefined
; if not, let’s preservenull
for semantic consistency.
shutdown(): void { | ||
this.logger.info("Shutting down ConcurrentFlushScheduler"); | ||
|
||
this._isShutDown = true; | ||
|
||
this.#clearTimer(); | ||
this.#flushNextBatchIfNeeded(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure scheduler flushes data on shutdown
The shutdown
method should await the final flush to ensure all data is processed before shutdown completes.
Update the shutdown method to await the flush:
- shutdown(): void {
+ async shutdown(): Promise<void> {
this.logger.info("Shutting down ConcurrentFlushScheduler");
this._isShutDown = true;
this.#clearTimer();
- this.#flushNextBatchIfNeeded();
+ await this.#flushNextBatchIfNeeded();
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
shutdown(): void { | |
this.logger.info("Shutting down ConcurrentFlushScheduler"); | |
this._isShutDown = true; | |
this.#clearTimer(); | |
this.#flushNextBatchIfNeeded(); | |
} | |
async shutdown(): Promise<void> { | |
this.logger.info("Shutting down ConcurrentFlushScheduler"); | |
this._isShutDown = true; | |
this.#clearTimer(); | |
await this.#flushNextBatchIfNeeded(); | |
} |
addToBatch(items: T[]): void { | ||
this.currentBatch = this.currentBatch.concat(items); | ||
this.#flushNextBatchIfNeeded(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method should return Promise for consistency
The addToBatch
method should return a Promise to allow callers to await it, ensuring data durability.
Modify the method to return a Promise:
- addToBatch(items: T[]): void {
+ addToBatch(items: T[]): Promise<void> {
this.currentBatch = this.currentBatch.concat(items);
- this.#flushNextBatchIfNeeded();
+ return this.#flushNextBatchIfNeeded();
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
addToBatch(items: T[]): void { | |
this.currentBatch = this.currentBatch.concat(items); | |
this.#flushNextBatchIfNeeded(); | |
} | |
addToBatch(items: T[]): Promise<void> { | |
this.currentBatch = this.currentBatch.concat(items); | |
return this.#flushNextBatchIfNeeded(); | |
} |
#flushNextBatchIfNeeded(): void { | ||
if (this.currentBatch.length >= this.BATCH_SIZE || this._isShutDown) { | ||
this.logger.debug("Batch size threshold reached, initiating flush", { | ||
batchSize: this.BATCH_SIZE, | ||
currentSize: this.currentBatch.length, | ||
isShutDown: this._isShutDown, | ||
}); | ||
|
||
this.#flushNextBatch().catch((error) => { | ||
this.logger.error("Error flushing next batch", { | ||
error, | ||
}); | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Flush method should return Promise for awaiting
The #flushNextBatchIfNeeded
method should return a Promise to allow proper awaiting in the chain.
Modify the method to return a Promise:
- #flushNextBatchIfNeeded(): void {
+ #flushNextBatchIfNeeded(): Promise<void> {
if (this.currentBatch.length >= this.BATCH_SIZE || this._isShutDown) {
this.logger.debug("Batch size threshold reached, initiating flush", {
batchSize: this.BATCH_SIZE,
currentSize: this.currentBatch.length,
isShutDown: this._isShutDown,
});
- this.#flushNextBatch().catch((error) => {
- this.logger.error("Error flushing next batch", {
- error,
- });
- });
+ return this.#flushNextBatch().catch((error) => {
+ this.logger.error("Error flushing next batch", {
+ error,
+ });
+ });
}
+ return Promise.resolve();
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
#flushNextBatchIfNeeded(): void { | |
if (this.currentBatch.length >= this.BATCH_SIZE || this._isShutDown) { | |
this.logger.debug("Batch size threshold reached, initiating flush", { | |
batchSize: this.BATCH_SIZE, | |
currentSize: this.currentBatch.length, | |
isShutDown: this._isShutDown, | |
}); | |
this.#flushNextBatch().catch((error) => { | |
this.logger.error("Error flushing next batch", { | |
error, | |
}); | |
}); | |
} | |
} | |
#flushNextBatchIfNeeded(): Promise<void> { | |
if (this.currentBatch.length >= this.BATCH_SIZE || this._isShutDown) { | |
this.logger.debug("Batch size threshold reached, initiating flush", { | |
batchSize: this.BATCH_SIZE, | |
currentSize: this.currentBatch.length, | |
isShutDown: this._isShutDown, | |
}); | |
return this.#flushNextBatch().catch((error) => { | |
this.logger.error("Error flushing next batch", { | |
error, | |
}); | |
}); | |
} | |
return Promise.resolve(); | |
} |
Using postgresql logical replication, sync the TaskRun changes to clickhouse so we can improve the efficiency of our dashboard, starting with the runs list. This PR doesn't implement the new dashboard yet, that will come in a later release. For now this PR implements the replication only.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests
Chores