From e23522ae51c3b92b2d3f6080763f439c269010a6 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 23 Jul 2025 11:43:43 +0100 Subject: [PATCH 1/8] Set the default replication concurrency to 2 for self-hosters 100 was a bit crazy --- apps/webapp/app/env.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 6fd895c456..8800686236 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -891,7 +891,7 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_ENABLED: z.string().default("0"), RUN_REPLICATION_SLOT_NAME: z.string().default("task_runs_to_clickhouse_v1"), RUN_REPLICATION_PUBLICATION_NAME: z.string().default("task_runs_to_clickhouse_v1_publication"), - RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(100), + RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(2), RUN_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), RUN_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100), RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000), From 5800cb4d5e47d708d3735124f6324b796163fa49 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 23 Jul 2025 12:37:44 +0100 Subject: [PATCH 2/8] Made the run repository an interface, deferring just to CH for now --- .../app/presenters/RunFilters.server.ts | 2 +- .../v3/BulkActionPresenter.server.ts | 2 +- .../v3/CreateBulkActionPresenter.server.ts | 2 +- .../v3/NextRunListPresenter.server.ts | 2 +- .../clickhouseRunsRepository.server.ts} | 84 ++--------- .../runsRepository/runsRepository.server.ts | 142 ++++++++++++++++++ .../v3/services/bulk/BulkActionV2.server.ts | 2 +- apps/webapp/test/runsRepository.test.ts | 2 +- 8 files changed, 160 insertions(+), 78 deletions(-) rename apps/webapp/app/services/{runsRepository.server.ts => runsRepository/clickhouseRunsRepository.server.ts} (81%) create mode 100644 apps/webapp/app/services/runsRepository/runsRepository.server.ts diff --git a/apps/webapp/app/presenters/RunFilters.server.ts b/apps/webapp/app/presenters/RunFilters.server.ts index db19e65656..8d70b4d3bd 100644 --- a/apps/webapp/app/presenters/RunFilters.server.ts +++ b/apps/webapp/app/presenters/RunFilters.server.ts @@ -3,7 +3,7 @@ import { TaskRunListSearchFilters, } from "~/components/runs/v3/RunFilters"; import { getRootOnlyFilterPreference } from "~/services/preferences/uiPreferences.server"; -import { type ParsedRunFilters } from "~/services/runsRepository.server"; +import { type ParsedRunFilters } from "~/services/runsRepository/runsRepository.server"; type FiltersFromRequest = ParsedRunFilters & Required>; diff --git a/apps/webapp/app/presenters/v3/BulkActionPresenter.server.ts b/apps/webapp/app/presenters/v3/BulkActionPresenter.server.ts index 0434045ea4..f98d0819cb 100644 --- a/apps/webapp/app/presenters/v3/BulkActionPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/BulkActionPresenter.server.ts @@ -1,7 +1,7 @@ import { getUsername } from "~/utils/username"; import { BasePresenter } from "./basePresenter.server"; import { type BulkActionMode } from "~/components/BulkActionFilterSummary"; -import { parseRunListInputOptions } from "~/services/runsRepository.server"; +import { parseRunListInputOptions } from "~/services/runsRepository/runsRepository.server"; import { TaskRunListSearchFilters } from "~/components/runs/v3/RunFilters"; type BulkActionOptions = { diff --git a/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts b/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts index e80c7df42b..acf511f0f5 100644 --- a/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts @@ -1,7 +1,7 @@ import { type PrismaClient } from "@trigger.dev/database"; import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction"; import { clickhouseClient } from "~/services/clickhouseInstance.server"; -import { RunsRepository } from "~/services/runsRepository.server"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { getRunFiltersFromRequest } from "../RunFilters.server"; import { BasePresenter } from "./basePresenter.server"; diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index 960bdfc10a..2375ea161a 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -9,7 +9,7 @@ import { type Direction } from "~/components/ListPagination"; import { timeFilters } from "~/components/runs/v3/SharedFilters"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { getAllTaskIdentifiers } from "~/models/task.server"; -import { RunsRepository } from "~/services/runsRepository.server"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus"; diff --git a/apps/webapp/app/services/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts similarity index 81% rename from apps/webapp/app/services/runsRepository.server.ts rename to apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index 3196c436b3..9bcd0b1753 100644 --- a/apps/webapp/app/services/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -1,72 +1,16 @@ -import { type ClickHouse, type ClickhouseQueryBuilder } from "@internal/clickhouse"; -import { type Tracer } from "@internal/tracing"; -import { type Logger, type LogLevel } from "@trigger.dev/core/logger"; -import { MachinePresetName } from "@trigger.dev/core/v3"; -import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; -import { TaskRunStatus } from "@trigger.dev/database"; -import parseDuration from "parse-duration"; -import { z } from "zod"; import { timeFilters } from "~/components/runs/v3/SharedFilters"; -import { type PrismaClient } from "~/db.server"; - -export type RunsRepositoryOptions = { - clickhouse: ClickHouse; - prisma: PrismaClient; - logger?: Logger; - logLevel?: LogLevel; - tracer?: Tracer; -}; - -const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]); - -const RunListInputOptionsSchema = z.object({ - organizationId: z.string(), - projectId: z.string(), - environmentId: z.string(), - //filters - tasks: z.array(z.string()).optional(), - versions: z.array(z.string()).optional(), - statuses: z.array(RunStatus).optional(), - tags: z.array(z.string()).optional(), - scheduleId: z.string().optional(), - period: z.string().optional(), - from: z.number().optional(), - to: z.number().optional(), - isTest: z.boolean().optional(), - rootOnly: z.boolean().optional(), - batchId: z.string().optional(), - runId: z.array(z.string()).optional(), - bulkId: z.string().optional(), - queues: z.array(z.string()).optional(), - machines: MachinePresetName.array().optional(), -}); - -export type RunListInputOptions = z.infer; -export type RunListInputFilters = Omit< - RunListInputOptions, - "organizationId" | "projectId" | "environmentId" ->; - -export type ParsedRunFilters = RunListInputFilters & { - cursor?: string; - direction?: "forward" | "backward"; -}; - -type FilterRunsOptions = Omit & { - period: number | undefined; -}; - -type Pagination = { - page: { - size: number; - cursor?: string; - direction?: "forward" | "backward"; - }; -}; - -export type ListRunsOptions = RunListInputOptions & Pagination; - -export class RunsRepository { +import { + type FilterRunsOptions, + type RunListInputOptions, + type IRunsRepository, + type ListRunsOptions, + type RunsRepositoryOptions, +} from "./runsRepository.server"; +import parseDuration from "parse-duration"; +import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; +import { type ClickhouseQueryBuilder } from "@internal/clickhouse"; + +export class ClickHouseRunsRepository implements IRunsRepository { constructor(private readonly options: RunsRepositoryOptions) {} async listRunIds(options: ListRunsOptions) { @@ -373,7 +317,3 @@ function applyRunFiltersToQueryBuilder( }); } } - -export function parseRunListInputOptions(data: any): RunListInputOptions { - return RunListInputOptionsSchema.parse(data); -} diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts new file mode 100644 index 0000000000..512e6bf671 --- /dev/null +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -0,0 +1,142 @@ +import { type ClickHouse, type ClickhouseQueryBuilder } from "@internal/clickhouse"; +import { type Tracer } from "@internal/tracing"; +import { type Logger, type LogLevel } from "@trigger.dev/core/logger"; +import { MachinePresetName } from "@trigger.dev/core/v3"; +import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; +import { Prisma, TaskRunStatus } from "@trigger.dev/database"; +import parseDuration from "parse-duration"; +import { z } from "zod"; +import { timeFilters } from "~/components/runs/v3/SharedFilters"; +import { type PrismaClient } from "~/db.server"; +import { ClickHouseRunsRepository } from "./clickhouseRunsRepository.server"; + +export type RunsRepositoryOptions = { + clickhouse: ClickHouse; + prisma: PrismaClient; + logger?: Logger; + logLevel?: LogLevel; + tracer?: Tracer; +}; + +const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]); + +const RunListInputOptionsSchema = z.object({ + organizationId: z.string(), + projectId: z.string(), + environmentId: z.string(), + //filters + tasks: z.array(z.string()).optional(), + versions: z.array(z.string()).optional(), + statuses: z.array(RunStatus).optional(), + tags: z.array(z.string()).optional(), + scheduleId: z.string().optional(), + period: z.string().optional(), + from: z.number().optional(), + to: z.number().optional(), + isTest: z.boolean().optional(), + rootOnly: z.boolean().optional(), + batchId: z.string().optional(), + runId: z.array(z.string()).optional(), + bulkId: z.string().optional(), + queues: z.array(z.string()).optional(), + machines: MachinePresetName.array().optional(), +}); + +export type RunListInputOptions = z.infer; +export type RunListInputFilters = Omit< + RunListInputOptions, + "organizationId" | "projectId" | "environmentId" +>; + +export type ParsedRunFilters = RunListInputFilters & { + cursor?: string; + direction?: "forward" | "backward"; +}; + +export type FilterRunsOptions = Omit & { + period: number | undefined; +}; + +type Pagination = { + page: { + size: number; + cursor?: string; + direction?: "forward" | "backward"; + }; +}; + +type ListedRun = Prisma.TaskRunGetPayload<{ + select: { + id: true; + friendlyId: true; + taskIdentifier: true; + taskVersion: true; + runtimeEnvironmentId: true; + status: true; + createdAt: true; + startedAt: true; + lockedAt: true; + delayUntil: true; + updatedAt: true; + completedAt: true; + isTest: true; + spanId: true; + idempotencyKey: true; + ttl: true; + expiredAt: true; + costInCents: true; + baseCostInCents: true; + usageDurationMs: true; + runTags: true; + depth: true; + rootTaskRunId: true; + batchId: true; + metadata: true; + metadataType: true; + machinePreset: true; + queue: true; + }; +}>; + +export type ListRunsOptions = RunListInputOptions & Pagination; + +export interface IRunsRepository { + listRunIds(options: ListRunsOptions): Promise; + listRuns(options: ListRunsOptions): Promise<{ + runs: ListedRun[]; + pagination: { + nextCursor: string | null; + previousCursor: string | null; + }; + }>; + countRuns(options: RunListInputOptions): Promise; +} + +export class RunsRepository implements IRunsRepository { + private readonly clickHouseRunsRepository: ClickHouseRunsRepository; + constructor(private readonly options: RunsRepositoryOptions) { + this.clickHouseRunsRepository = new ClickHouseRunsRepository(options); + } + + listRunIds(options: ListRunsOptions): Promise { + return this.clickHouseRunsRepository.listRunIds(options); + } + + listRuns(options: ListRunsOptions): Promise<{ + runs: ListedRun[]; + pagination: { + nextCursor: string | null; + previousCursor: string | null; + }; + }> { + return this.clickHouseRunsRepository.listRuns(options); + } + + countRuns(options: RunListInputOptions): Promise { + return this.clickHouseRunsRepository.countRuns(options); + } +} + +export function parseRunListInputOptions(data: any): RunListInputOptions { + return RunListInputOptionsSchema.parse(data); +} diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index a68d6d330f..98b6079c10 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -12,7 +12,7 @@ import { parseRunListInputOptions, type RunListInputFilters, RunsRepository, -} from "~/services/runsRepository.server"; +} from "~/services/runsRepository/runsRepository.server"; import { BaseService } from "../baseService.server"; import { commonWorker } from "~/v3/commonWorker.server"; import { env } from "~/env.server"; diff --git a/apps/webapp/test/runsRepository.test.ts b/apps/webapp/test/runsRepository.test.ts index 56e6bf92d6..37f83d7f0c 100644 --- a/apps/webapp/test/runsRepository.test.ts +++ b/apps/webapp/test/runsRepository.test.ts @@ -1,6 +1,6 @@ import { containerTest } from "@internal/testcontainers"; import { setTimeout } from "node:timers/promises"; -import { RunsRepository } from "~/services/runsRepository.server"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { setupClickhouseReplication } from "./utils/replicationUtils"; vi.setConfig({ testTimeout: 60_000 }); From 8076a44ed2472de9ebadad774a71e797cc6b7866 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 23 Jul 2025 14:26:35 +0100 Subject: [PATCH 3/8] Added run repository feature flag, allowing passing a default when getting a flag --- apps/webapp/app/v3/featureFlags.server.ts | 27 ++++++++++++++++------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/apps/webapp/app/v3/featureFlags.server.ts b/apps/webapp/app/v3/featureFlags.server.ts index 6f7c3edce5..3a54465c0e 100644 --- a/apps/webapp/app/v3/featureFlags.server.ts +++ b/apps/webapp/app/v3/featureFlags.server.ts @@ -1,23 +1,32 @@ import { z } from "zod"; -import { prisma, PrismaClientOrTransaction } from "~/db.server"; +import { prisma, type PrismaClientOrTransaction } from "~/db.server"; export const FEATURE_FLAG = { defaultWorkerInstanceGroupId: "defaultWorkerInstanceGroupId", + runsListRepository: "runsListRepository", } as const; const FeatureFlagCatalog = { [FEATURE_FLAG.defaultWorkerInstanceGroupId]: z.string(), + [FEATURE_FLAG.runsListRepository]: z.enum(["clickhouse", "postgres"]), }; type FeatureFlagKey = keyof typeof FeatureFlagCatalog; -export type FlagsOptions = { - key: FeatureFlagKey; +export type FlagsOptions = { + key: T; + defaultValue?: z.infer<(typeof FeatureFlagCatalog)[T]>; }; export function makeFlags(_prisma: PrismaClientOrTransaction = prisma) { - return async function flags( - opts: FlagsOptions + function flags( + opts: FlagsOptions & { defaultValue: z.infer<(typeof FeatureFlagCatalog)[T]> } + ): Promise>; + function flags( + opts: FlagsOptions + ): Promise | undefined>; + async function flags( + opts: FlagsOptions ): Promise | undefined> { const value = await _prisma.featureFlag.findUnique({ where: { @@ -28,16 +37,18 @@ export function makeFlags(_prisma: PrismaClientOrTransaction = prisma) { const parsed = FeatureFlagCatalog[opts.key].safeParse(value?.value); if (!parsed.success) { - return; + return opts.defaultValue; } return parsed.data; - }; + } + + return flags; } export function makeSetFlags(_prisma: PrismaClientOrTransaction = prisma) { return async function setFlags( - opts: FlagsOptions & { value: z.infer<(typeof FeatureFlagCatalog)[T]> } + opts: FlagsOptions & { value: z.infer<(typeof FeatureFlagCatalog)[T]> } ): Promise { await _prisma.featureFlag.upsert({ where: { From fa1fa0260ed18b8d17084dd62d10a14931a2513b Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 23 Jul 2025 14:27:16 +0100 Subject: [PATCH 4/8] Switch run repository using a feature flag --- .../clickhouseRunsRepository.server.ts | 72 +---- .../postgresRunsRepository.server.ts | 289 ++++++++++++++++++ .../runsRepository/runsRepository.server.ts | 106 ++++++- 3 files changed, 391 insertions(+), 76 deletions(-) create mode 100644 apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts diff --git a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index 9bcd0b1753..ccd01c3360 100644 --- a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -5,6 +5,7 @@ import { type IRunsRepository, type ListRunsOptions, type RunsRepositoryOptions, + convertRunListInputOptionsToFilterRunsOptions, } from "./runsRepository.server"; import parseDuration from "parse-duration"; import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; @@ -17,7 +18,7 @@ export class ClickHouseRunsRepository implements IRunsRepository { const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder(); applyRunFiltersToQueryBuilder( queryBuilder, - await this.#convertRunListInputOptionsToFilterRunsOptions(options) + await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma) ); if (options.page.cursor) { @@ -144,7 +145,7 @@ export class ClickHouseRunsRepository implements IRunsRepository { const queryBuilder = this.options.clickhouse.taskRuns.countQueryBuilder(); applyRunFiltersToQueryBuilder( queryBuilder, - await this.#convertRunListInputOptionsToFilterRunsOptions(options) + await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma) ); const [queryError, result] = await queryBuilder.execute(); @@ -159,73 +160,6 @@ export class ClickHouseRunsRepository implements IRunsRepository { return result[0].count; } - - async #convertRunListInputOptionsToFilterRunsOptions( - options: RunListInputOptions - ): Promise { - const convertedOptions: FilterRunsOptions = { - ...options, - period: undefined, - }; - - // Convert time period to ms - const time = timeFilters({ - period: options.period, - from: options.from, - to: options.to, - }); - convertedOptions.period = time.period ? parseDuration(time.period) ?? undefined : undefined; - - // batch friendlyId to id - if (options.batchId && options.batchId.startsWith("batch_")) { - const batch = await this.options.prisma.batchTaskRun.findFirst({ - select: { - id: true, - }, - where: { - friendlyId: options.batchId, - runtimeEnvironmentId: options.environmentId, - }, - }); - - if (batch) { - convertedOptions.batchId = batch.id; - } - } - - // scheduleId can be a friendlyId - if (options.scheduleId && options.scheduleId.startsWith("sched_")) { - const schedule = await this.options.prisma.taskSchedule.findFirst({ - select: { - id: true, - }, - where: { - friendlyId: options.scheduleId, - projectId: options.projectId, - }, - }); - - if (schedule) { - convertedOptions.scheduleId = schedule?.id; - } - } - - if (options.bulkId && options.bulkId.startsWith("bulk_")) { - convertedOptions.bulkId = BulkActionId.toId(options.bulkId); - } - - if (options.runId) { - //convert to friendlyId - convertedOptions.runId = options.runId.map((r) => RunId.toFriendlyId(r)); - } - - // Show all runs if we are filtering by batchId or runId - if (options.batchId || options.runId?.length || options.scheduleId || options.tasks?.length) { - convertedOptions.rootOnly = false; - } - - return convertedOptions; - } } function applyRunFiltersToQueryBuilder( diff --git a/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts new file mode 100644 index 0000000000..1b4afddd05 --- /dev/null +++ b/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts @@ -0,0 +1,289 @@ +import { Prisma, type TaskRunStatus } from "@trigger.dev/database"; +import parseDuration from "parse-duration"; +import { MachinePresetName } from "@trigger.dev/core/v3"; +import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; +import { timeFilters } from "~/components/runs/v3/SharedFilters"; +import { sqlDatabaseSchema } from "~/db.server"; +import { + type FilterRunsOptions, + type RunListInputOptions, + type IRunsRepository, + type ListRunsOptions, + type RunsRepositoryOptions, + type ListedRun, + convertRunListInputOptionsToFilterRunsOptions, +} from "./runsRepository.server"; + +export class PostgresRunsRepository implements IRunsRepository { + constructor(private readonly options: RunsRepositoryOptions) {} + + async listRunIds(options: ListRunsOptions) { + const filterOptions = await convertRunListInputOptionsToFilterRunsOptions( + options, + this.options.prisma + ); + + const query = this.#buildRunIdsQuery(filterOptions, options.page); + const runs = await this.options.prisma.$queryRaw<{ id: string }[]>(query); + + return runs.map((run) => run.id); + } + + async listRuns(options: ListRunsOptions) { + const filterOptions = await convertRunListInputOptionsToFilterRunsOptions( + options, + this.options.prisma + ); + + const query = this.#buildRunsQuery(filterOptions, options.page); + const runs = await this.options.prisma.$queryRaw(query); + + // If there are more runs than the page size, we need to fetch the next page + const hasMore = runs.length > options.page.size; + + let nextCursor: string | null = null; + let previousCursor: string | null = null; + + // Get cursors for next and previous pages + const direction = options.page.direction ?? "forward"; + switch (direction) { + case "forward": { + previousCursor = options.page.cursor ? runs.at(0)?.id ?? null : null; + if (hasMore) { + // The next cursor should be the last run ID from this page + nextCursor = runs[options.page.size - 1]?.id ?? null; + } + break; + } + case "backward": { + const reversedRuns = [...runs].reverse(); + if (hasMore) { + previousCursor = reversedRuns.at(1)?.id ?? null; + nextCursor = reversedRuns.at(options.page.size)?.id ?? null; + } else { + nextCursor = reversedRuns.at(options.page.size - 1)?.id ?? null; + } + break; + } + } + + const runsToReturn = + options.page.direction === "backward" && hasMore + ? runs.slice(1, options.page.size + 1) + : runs.slice(0, options.page.size); + + // ClickHouse is slightly delayed, so we're going to do in-memory status filtering too + let filteredRuns = runsToReturn; + if (options.statuses && options.statuses.length > 0) { + filteredRuns = runsToReturn.filter((run) => options.statuses!.includes(run.status)); + } + + return { + runs: filteredRuns, + pagination: { + nextCursor, + previousCursor, + }, + }; + } + + async countRuns(options: RunListInputOptions) { + const filterOptions = await convertRunListInputOptionsToFilterRunsOptions( + options, + this.options.prisma + ); + + const query = this.#buildCountQuery(filterOptions); + const result = await this.options.prisma.$queryRaw<{ count: bigint }[]>(query); + + if (result.length === 0) { + throw new Error("No count rows returned"); + } + + return Number(result[0].count); + } + + #buildRunIdsQuery( + filterOptions: FilterRunsOptions, + page: { size: number; cursor?: string; direction?: "forward" | "backward" } + ) { + const whereConditions = this.#buildWhereConditions(filterOptions, page.cursor, page.direction); + + return Prisma.sql` + SELECT tr.id + FROM ${sqlDatabaseSchema}."TaskRun" tr + WHERE ${whereConditions} + ORDER BY ${page.direction === "backward" ? Prisma.sql`tr.id ASC` : Prisma.sql`tr.id DESC`} + LIMIT ${page.size + 1} + `; + } + + #buildRunsQuery( + filterOptions: FilterRunsOptions, + page: { size: number; cursor?: string; direction?: "forward" | "backward" } + ) { + const whereConditions = this.#buildWhereConditions(filterOptions, page.cursor, page.direction); + + return Prisma.sql` + SELECT + tr.id, + tr."friendlyId", + tr."taskIdentifier", + tr."taskVersion", + tr."runtimeEnvironmentId", + tr.status, + tr."createdAt", + tr."startedAt", + tr."lockedAt", + tr."delayUntil", + tr."updatedAt", + tr."completedAt", + tr."isTest", + tr."spanId", + tr."idempotencyKey", + tr."ttl", + tr."expiredAt", + tr."costInCents", + tr."baseCostInCents", + tr."usageDurationMs", + tr."runTags", + tr."depth", + tr."rootTaskRunId", + tr."batchId", + tr."metadata", + tr."metadataType", + tr."machinePreset", + tr."queue" + FROM ${sqlDatabaseSchema}."TaskRun" tr + WHERE ${whereConditions} + ORDER BY ${page.direction === "backward" ? Prisma.sql`tr.id ASC` : Prisma.sql`tr.id DESC`} + LIMIT ${page.size + 1} + `; + } + + #buildCountQuery(filterOptions: FilterRunsOptions) { + const whereConditions = this.#buildWhereConditions(filterOptions); + + return Prisma.sql` + SELECT COUNT(*) as count + FROM ${sqlDatabaseSchema}."TaskRun" tr + WHERE ${whereConditions} + `; + } + + #buildWhereConditions( + filterOptions: FilterRunsOptions, + cursor?: string, + direction?: "forward" | "backward" + ) { + const conditions: Prisma.Sql[] = []; + + // Environment filter + conditions.push(Prisma.sql`tr."runtimeEnvironmentId" = ${filterOptions.environmentId}`); + + // Cursor pagination + if (cursor) { + if (direction === "forward" || !direction) { + conditions.push(Prisma.sql`tr.id < ${cursor}`); + } else { + conditions.push(Prisma.sql`tr.id > ${cursor}`); + } + } + + // Task filters + if (filterOptions.tasks && filterOptions.tasks.length > 0) { + conditions.push(Prisma.sql`tr."taskIdentifier" IN (${Prisma.join(filterOptions.tasks)})`); + } + + // Version filters + if (filterOptions.versions && filterOptions.versions.length > 0) { + conditions.push(Prisma.sql`tr."taskVersion" IN (${Prisma.join(filterOptions.versions)})`); + } + + // Status filters + if (filterOptions.statuses && filterOptions.statuses.length > 0) { + conditions.push( + Prisma.sql`tr.status = ANY(ARRAY[${Prisma.join( + filterOptions.statuses + )}]::"TaskRunStatus"[])` + ); + } + + // Tag filters + if (filterOptions.tags && filterOptions.tags.length > 0) { + conditions.push( + Prisma.sql`tr."runTags" && ARRAY[${Prisma.join(filterOptions.tags)}]::text[]` + ); + } + + // Schedule filter + if (filterOptions.scheduleId) { + conditions.push(Prisma.sql`tr."scheduleId" = ${filterOptions.scheduleId}`); + } + + // Time period filter + if (filterOptions.period) { + conditions.push( + Prisma.sql`tr."createdAt" >= NOW() - INTERVAL '1 millisecond' * ${filterOptions.period}` + ); + } + + // From date filter + if (filterOptions.from) { + conditions.push( + Prisma.sql`tr."createdAt" >= ${new Date(filterOptions.from).toISOString()}::timestamp` + ); + } + + // To date filter + if (filterOptions.to) { + const toDate = new Date(filterOptions.to); + const now = new Date(); + const clampedDate = toDate > now ? now : toDate; + conditions.push(Prisma.sql`tr."createdAt" <= ${clampedDate.toISOString()}::timestamp`); + } + + // Test filter + if (typeof filterOptions.isTest === "boolean") { + conditions.push(Prisma.sql`tr."isTest" = ${filterOptions.isTest}`); + } + + // Root only filter + if (filterOptions.rootOnly) { + conditions.push(Prisma.sql`tr."rootTaskRunId" IS NULL`); + } + + // Batch filter + if (filterOptions.batchId) { + conditions.push(Prisma.sql`tr."batchId" = ${filterOptions.batchId}`); + } + + // Bulk action filter + if (filterOptions.bulkId) { + conditions.push( + Prisma.sql`tr."bulkActionGroupIds" && ARRAY[${filterOptions.bulkId}]::text[]` + ); + } + + // Run ID filter + if (filterOptions.runId && filterOptions.runId.length > 0) { + const friendlyIds = filterOptions.runId.map((runId) => RunId.toFriendlyId(runId)); + conditions.push(Prisma.sql`tr."friendlyId" IN (${Prisma.join(friendlyIds)})`); + } + + // Queue filter + if (filterOptions.queues && filterOptions.queues.length > 0) { + conditions.push(Prisma.sql`tr."queue" IN (${Prisma.join(filterOptions.queues)})`); + } + + // Machine preset filter + if (filterOptions.machines && filterOptions.machines.length > 0) { + conditions.push(Prisma.sql`tr."machinePreset" IN (${Prisma.join(filterOptions.machines)})`); + } + + // Combine all conditions with AND + return conditions.reduce((acc, condition) => + acc === null ? condition : Prisma.sql`${acc} AND ${condition}` + ); + } +} diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts index 512e6bf671..c4372e26de 100644 --- a/apps/webapp/app/services/runsRepository/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -9,6 +9,8 @@ import { z } from "zod"; import { timeFilters } from "~/components/runs/v3/SharedFilters"; import { type PrismaClient } from "~/db.server"; import { ClickHouseRunsRepository } from "./clickhouseRunsRepository.server"; +import { PostgresRunsRepository } from "./postgresRunsRepository.server"; +import { FEATURE_FLAG, makeFlags } from "~/v3/featureFlags.server"; export type RunsRepositoryOptions = { clickhouse: ClickHouse; @@ -65,7 +67,7 @@ type Pagination = { }; }; -type ListedRun = Prisma.TaskRunGetPayload<{ +export type ListedRun = Prisma.TaskRunGetPayload<{ select: { id: true; friendlyId: true; @@ -114,29 +116,119 @@ export interface IRunsRepository { export class RunsRepository implements IRunsRepository { private readonly clickHouseRunsRepository: ClickHouseRunsRepository; + private readonly postgresRunsRepository: PostgresRunsRepository; + constructor(private readonly options: RunsRepositoryOptions) { this.clickHouseRunsRepository = new ClickHouseRunsRepository(options); + this.postgresRunsRepository = new PostgresRunsRepository(options); } - listRunIds(options: ListRunsOptions): Promise { - return this.clickHouseRunsRepository.listRunIds(options); + async #getRepository() { + const getFlag = makeFlags(this.options.prisma); + const runsListRepository = await getFlag({ + key: FEATURE_FLAG.runsListRepository, + defaultValue: "clickhouse", + }); + + switch (runsListRepository) { + case "postgres": + return this.postgresRunsRepository; + case "clickhouse": + default: + return this.clickHouseRunsRepository; + } } - listRuns(options: ListRunsOptions): Promise<{ + async listRunIds(options: ListRunsOptions): Promise { + const repository = await this.#getRepository(); + return repository.listRunIds(options); + } + + async listRuns(options: ListRunsOptions): Promise<{ runs: ListedRun[]; pagination: { nextCursor: string | null; previousCursor: string | null; }; }> { - return this.clickHouseRunsRepository.listRuns(options); + const repository = await this.#getRepository(); + return repository.listRuns(options); } - countRuns(options: RunListInputOptions): Promise { - return this.clickHouseRunsRepository.countRuns(options); + async countRuns(options: RunListInputOptions): Promise { + const repository = await this.#getRepository(); + return repository.countRuns(options); } } export function parseRunListInputOptions(data: any): RunListInputOptions { return RunListInputOptionsSchema.parse(data); } + +export async function convertRunListInputOptionsToFilterRunsOptions( + options: RunListInputOptions, + prisma: RunsRepositoryOptions["prisma"] +): Promise { + const convertedOptions: FilterRunsOptions = { + ...options, + period: undefined, + }; + + // Convert time period to ms + const time = timeFilters({ + period: options.period, + from: options.from, + to: options.to, + }); + convertedOptions.period = time.period ? parseDuration(time.period) ?? undefined : undefined; + + // Batch friendlyId to id + if (options.batchId && options.batchId.startsWith("batch_")) { + const batch = await prisma.batchTaskRun.findFirst({ + select: { + id: true, + }, + where: { + friendlyId: options.batchId, + runtimeEnvironmentId: options.environmentId, + }, + }); + + if (batch) { + convertedOptions.batchId = batch.id; + } + } + + // ScheduleId can be a friendlyId + if (options.scheduleId && options.scheduleId.startsWith("sched_")) { + const schedule = await prisma.taskSchedule.findFirst({ + select: { + id: true, + }, + where: { + friendlyId: options.scheduleId, + projectId: options.projectId, + }, + }); + + if (schedule) { + convertedOptions.scheduleId = schedule?.id; + } + } + + if (options.bulkId && options.bulkId.startsWith("bulk_")) { + convertedOptions.bulkId = BulkActionId.toId(options.bulkId); + } + + if (options.runId) { + // Convert to friendlyId + convertedOptions.runId = options.runId.map((r) => RunId.toFriendlyId(r)); + } + + // Show all runs if we are filtering by batchId or runId + if (options.batchId || options.runId?.length || options.scheduleId || options.tasks?.length) { + convertedOptions.rootOnly = false; + } + + return convertedOptions; +} From e1c936731a8e22892dd59a5e3701757edddab1c2 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 23 Jul 2025 14:55:15 +0100 Subject: [PATCH 5/8] Added spans --- .../clickhouseRunsRepository.server.ts | 12 ++-- .../postgresRunsRepository.server.ts | 15 ++-- .../runsRepository/runsRepository.server.ts | 72 ++++++++++++++----- 3 files changed, 71 insertions(+), 28 deletions(-) diff --git a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index ccd01c3360..56a42c751d 100644 --- a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -1,19 +1,21 @@ -import { timeFilters } from "~/components/runs/v3/SharedFilters"; +import { type ClickhouseQueryBuilder } from "@internal/clickhouse"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; import { type FilterRunsOptions, - type RunListInputOptions, type IRunsRepository, type ListRunsOptions, + type RunListInputOptions, type RunsRepositoryOptions, convertRunListInputOptionsToFilterRunsOptions, } from "./runsRepository.server"; -import parseDuration from "parse-duration"; -import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; -import { type ClickhouseQueryBuilder } from "@internal/clickhouse"; export class ClickHouseRunsRepository implements IRunsRepository { constructor(private readonly options: RunsRepositoryOptions) {} + get name() { + return "clickhouse"; + } + async listRunIds(options: ListRunsOptions) { const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder(); applyRunFiltersToQueryBuilder( diff --git a/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts index 1b4afddd05..ec9b5be69b 100644 --- a/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts @@ -1,22 +1,23 @@ -import { Prisma, type TaskRunStatus } from "@trigger.dev/database"; -import parseDuration from "parse-duration"; -import { MachinePresetName } from "@trigger.dev/core/v3"; -import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; -import { timeFilters } from "~/components/runs/v3/SharedFilters"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; +import { Prisma } from "@trigger.dev/database"; import { sqlDatabaseSchema } from "~/db.server"; import { type FilterRunsOptions, - type RunListInputOptions, type IRunsRepository, type ListRunsOptions, - type RunsRepositoryOptions, type ListedRun, + type RunListInputOptions, + type RunsRepositoryOptions, convertRunListInputOptionsToFilterRunsOptions, } from "./runsRepository.server"; export class PostgresRunsRepository implements IRunsRepository { constructor(private readonly options: RunsRepositoryOptions) {} + get name() { + return "postgres"; + } + async listRunIds(options: ListRunsOptions) { const filterOptions = await convertRunListInputOptionsToFilterRunsOptions( options, diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts index c4372e26de..38ee7e4812 100644 --- a/apps/webapp/app/services/runsRepository/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -11,6 +11,7 @@ import { type PrismaClient } from "~/db.server"; import { ClickHouseRunsRepository } from "./clickhouseRunsRepository.server"; import { PostgresRunsRepository } from "./postgresRunsRepository.server"; import { FEATURE_FLAG, makeFlags } from "~/v3/featureFlags.server"; +import { startActiveSpan } from "~/v3/tracer.server"; export type RunsRepositoryOptions = { clickhouse: ClickHouse; @@ -103,6 +104,7 @@ export type ListedRun = Prisma.TaskRunGetPayload<{ export type ListRunsOptions = RunListInputOptions & Pagination; export interface IRunsRepository { + name: string; listRunIds(options: ListRunsOptions): Promise; listRuns(options: ListRunsOptions): Promise<{ runs: ListedRun[]; @@ -123,25 +125,43 @@ export class RunsRepository implements IRunsRepository { this.postgresRunsRepository = new PostgresRunsRepository(options); } - async #getRepository() { - const getFlag = makeFlags(this.options.prisma); - const runsListRepository = await getFlag({ - key: FEATURE_FLAG.runsListRepository, - defaultValue: "clickhouse", - }); + get name() { + return "runsRepository"; + } - switch (runsListRepository) { - case "postgres": - return this.postgresRunsRepository; - case "clickhouse": - default: - return this.clickHouseRunsRepository; - } + async #getRepository(): Promise { + return startActiveSpan("runsRepository.getRepository", async (span) => { + const getFlag = makeFlags(this.options.prisma); + const runsListRepository = await getFlag({ + key: FEATURE_FLAG.runsListRepository, + defaultValue: "clickhouse", + }); + + span.setAttribute("repository.name", runsListRepository); + + switch (runsListRepository) { + case "postgres": + return this.postgresRunsRepository; + case "clickhouse": + default: + return this.clickHouseRunsRepository; + } + }); } async listRunIds(options: ListRunsOptions): Promise { const repository = await this.#getRepository(); - return repository.listRunIds(options); + return startActiveSpan( + "runsRepository.listRunIds", + async () => { + return repository.listRunIds(options); + }, + { + attributes: { + "repository.name": repository.name, + }, + } + ); } async listRuns(options: ListRunsOptions): Promise<{ @@ -152,12 +172,32 @@ export class RunsRepository implements IRunsRepository { }; }> { const repository = await this.#getRepository(); - return repository.listRuns(options); + return startActiveSpan( + "runsRepository.listRuns", + async () => { + return repository.listRuns(options); + }, + { + attributes: { + "repository.name": repository.name, + }, + } + ); } async countRuns(options: RunListInputOptions): Promise { const repository = await this.#getRepository(); - return repository.countRuns(options); + return startActiveSpan( + "runsRepository.countRuns", + async () => { + return repository.countRuns(options); + }, + { + attributes: { + "repository.name": repository.name, + }, + } + ); } } From e62b119fc03adf998d8179d707462eb870ca56cc Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 23 Jul 2025 15:24:26 +0100 Subject: [PATCH 6/8] Pass the default repository in, so we can try Postgres in the tests --- .../runsRepository/runsRepository.server.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts index 38ee7e4812..0603642d63 100644 --- a/apps/webapp/app/services/runsRepository/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -12,6 +12,7 @@ import { ClickHouseRunsRepository } from "./clickhouseRunsRepository.server"; import { PostgresRunsRepository } from "./postgresRunsRepository.server"; import { FEATURE_FLAG, makeFlags } from "~/v3/featureFlags.server"; import { startActiveSpan } from "~/v3/tracer.server"; +import { logger } from "../logger.server"; export type RunsRepositoryOptions = { clickhouse: ClickHouse; @@ -119,10 +120,16 @@ export interface IRunsRepository { export class RunsRepository implements IRunsRepository { private readonly clickHouseRunsRepository: ClickHouseRunsRepository; private readonly postgresRunsRepository: PostgresRunsRepository; + private readonly defaultRepository: "clickhouse" | "postgres"; - constructor(private readonly options: RunsRepositoryOptions) { + constructor( + private readonly options: RunsRepositoryOptions & { + defaultRepository?: "clickhouse" | "postgres"; + } + ) { this.clickHouseRunsRepository = new ClickHouseRunsRepository(options); this.postgresRunsRepository = new PostgresRunsRepository(options); + this.defaultRepository = options.defaultRepository ?? "clickhouse"; } get name() { @@ -134,11 +141,13 @@ export class RunsRepository implements IRunsRepository { const getFlag = makeFlags(this.options.prisma); const runsListRepository = await getFlag({ key: FEATURE_FLAG.runsListRepository, - defaultValue: "clickhouse", + defaultValue: this.defaultRepository, }); span.setAttribute("repository.name", runsListRepository); + logger.log("runsListRepository", { runsListRepository }); + switch (runsListRepository) { case "postgres": return this.postgresRunsRepository; From 2b2003b56b3cc32970dc8b345874e74fbabb9f53 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 23 Jul 2025 16:05:37 +0100 Subject: [PATCH 7/8] Fallback to Postgres if ClickHouse errors --- .../runsRepository/runsRepository.server.ts | 97 +++++++++++++++++-- 1 file changed, 90 insertions(+), 7 deletions(-) diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts index 0603642d63..7b9bf2a368 100644 --- a/apps/webapp/app/services/runsRepository/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -1,18 +1,18 @@ -import { type ClickHouse, type ClickhouseQueryBuilder } from "@internal/clickhouse"; +import { type ClickHouse } from "@internal/clickhouse"; import { type Tracer } from "@internal/tracing"; import { type Logger, type LogLevel } from "@trigger.dev/core/logger"; import { MachinePresetName } from "@trigger.dev/core/v3"; import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; -import { Prisma, TaskRunStatus } from "@trigger.dev/database"; +import { type Prisma, TaskRunStatus } from "@trigger.dev/database"; import parseDuration from "parse-duration"; import { z } from "zod"; import { timeFilters } from "~/components/runs/v3/SharedFilters"; import { type PrismaClient } from "~/db.server"; -import { ClickHouseRunsRepository } from "./clickhouseRunsRepository.server"; -import { PostgresRunsRepository } from "./postgresRunsRepository.server"; import { FEATURE_FLAG, makeFlags } from "~/v3/featureFlags.server"; import { startActiveSpan } from "~/v3/tracer.server"; import { logger } from "../logger.server"; +import { ClickHouseRunsRepository } from "./clickhouseRunsRepository.server"; +import { PostgresRunsRepository } from "./postgresRunsRepository.server"; export type RunsRepositoryOptions = { clickhouse: ClickHouse; @@ -121,6 +121,7 @@ export class RunsRepository implements IRunsRepository { private readonly clickHouseRunsRepository: ClickHouseRunsRepository; private readonly postgresRunsRepository: PostgresRunsRepository; private readonly defaultRepository: "clickhouse" | "postgres"; + private readonly logger: Logger; constructor( private readonly options: RunsRepositoryOptions & { @@ -130,6 +131,7 @@ export class RunsRepository implements IRunsRepository { this.clickHouseRunsRepository = new ClickHouseRunsRepository(options); this.postgresRunsRepository = new PostgresRunsRepository(options); this.defaultRepository = options.defaultRepository ?? "clickhouse"; + this.logger = options.logger ?? logger; } get name() { @@ -163,11 +165,38 @@ export class RunsRepository implements IRunsRepository { return startActiveSpan( "runsRepository.listRunIds", async () => { - return repository.listRunIds(options); + try { + return await repository.listRunIds(options); + } catch (error) { + // If ClickHouse fails, retry with Postgres + if (repository.name === "clickhouse") { + this.logger?.warn("ClickHouse failed, retrying with Postgres", { error }); + return startActiveSpan( + "runsRepository.listRunIds.fallback", + async () => { + return await this.postgresRunsRepository.listRunIds(options); + }, + { + attributes: { + "repository.name": "postgres", + "fallback.reason": "clickhouse_error", + "fallback.error": error instanceof Error ? error.message : String(error), + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + throw error; + } }, { attributes: { "repository.name": repository.name, + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, }, } ); @@ -184,11 +213,38 @@ export class RunsRepository implements IRunsRepository { return startActiveSpan( "runsRepository.listRuns", async () => { - return repository.listRuns(options); + try { + return await repository.listRuns(options); + } catch (error) { + // If ClickHouse fails, retry with Postgres + if (repository.name === "clickhouse") { + this.logger?.warn("ClickHouse failed, retrying with Postgres", { error }); + return startActiveSpan( + "runsRepository.listRuns.fallback", + async () => { + return await this.postgresRunsRepository.listRuns(options); + }, + { + attributes: { + "repository.name": "postgres", + "fallback.reason": "clickhouse_error", + "fallback.error": error instanceof Error ? error.message : String(error), + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + throw error; + } }, { attributes: { "repository.name": repository.name, + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, }, } ); @@ -199,11 +255,38 @@ export class RunsRepository implements IRunsRepository { return startActiveSpan( "runsRepository.countRuns", async () => { - return repository.countRuns(options); + try { + return await repository.countRuns(options); + } catch (error) { + // If ClickHouse fails, retry with Postgres + if (repository.name === "clickhouse") { + this.logger?.warn("ClickHouse failed, retrying with Postgres", { error }); + return startActiveSpan( + "runsRepository.countRuns.fallback", + async () => { + return await this.postgresRunsRepository.countRuns(options); + }, + { + attributes: { + "repository.name": "postgres", + "fallback.reason": "clickhouse_error", + "fallback.error": error instanceof Error ? error.message : String(error), + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + throw error; + } }, { attributes: { "repository.name": repository.name, + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, }, } ); From f476f78654830aa494f01ec4e97f6455fdf3e36a Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 23 Jul 2025 16:29:38 +0100 Subject: [PATCH 8/8] Update feature flags API endpoint --- .../app/routes/admin.api.v1.feature-flags.ts | 71 +++++++++++++++++++ apps/webapp/app/v3/featureFlags.server.ts | 43 +++++++++++ 2 files changed, 114 insertions(+) create mode 100644 apps/webapp/app/routes/admin.api.v1.feature-flags.ts diff --git a/apps/webapp/app/routes/admin.api.v1.feature-flags.ts b/apps/webapp/app/routes/admin.api.v1.feature-flags.ts new file mode 100644 index 0000000000..cd7958c5b8 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.feature-flags.ts @@ -0,0 +1,71 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server"; +import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; +import { + makeSetFlags, + setFlags, + FeatureFlagCatalogSchema, + validateAllFeatureFlags, + validatePartialFeatureFlags, + makeSetMultipleFlags, +} from "~/v3/featureFlags.server"; +import { z } from "zod"; + +export async function action({ request }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + try { + // Parse the request body + const body = await request.json(); + + // Validate the input using the partial schema + const validationResult = validatePartialFeatureFlags(body as Record); + if (!validationResult.success) { + return json( + { + error: "Invalid feature flags data", + details: validationResult.error.issues, + }, + { status: 400 } + ); + } + + const featureFlags = validationResult.data; + const setMultipleFlags = makeSetMultipleFlags(prisma); + const updatedFlags = await setMultipleFlags(featureFlags); + + return json({ + success: true, + updatedFlags, + message: `Updated ${updatedFlags.length} feature flag(s)`, + }); + } catch (error) { + return json( + { + error: error instanceof Error ? error.message : String(error), + }, + { status: 400 } + ); + } +} diff --git a/apps/webapp/app/v3/featureFlags.server.ts b/apps/webapp/app/v3/featureFlags.server.ts index 3a54465c0e..f1bc913c42 100644 --- a/apps/webapp/app/v3/featureFlags.server.ts +++ b/apps/webapp/app/v3/featureFlags.server.ts @@ -67,3 +67,46 @@ export function makeSetFlags(_prisma: PrismaClientOrTransaction = prisma) { export const flags = makeFlags(); export const setFlags = makeSetFlags(); + +// Create a Zod schema from the existing catalog +export const FeatureFlagCatalogSchema = z.object(FeatureFlagCatalog); + +// Utility function to validate a feature flag value +export function validateFeatureFlagValue( + key: T, + value: unknown +): z.SafeParseReturnType> { + return FeatureFlagCatalog[key].safeParse(value); +} + +// Utility function to validate all feature flags at once +export function validateAllFeatureFlags(values: Record) { + return FeatureFlagCatalogSchema.safeParse(values); +} + +// Utility function to validate partial feature flags (all keys optional) +export function validatePartialFeatureFlags(values: Record) { + return FeatureFlagCatalogSchema.partial().safeParse(values); +} + +// Utility function to set multiple feature flags at once +export function makeSetMultipleFlags(_prisma: PrismaClientOrTransaction = prisma) { + return async function setMultipleFlags( + flags: Partial> + ): Promise<{ key: string; value: any }[]> { + const setFlag = makeSetFlags(_prisma); + const updatedFlags: { key: string; value: any }[] = []; + + for (const [key, value] of Object.entries(flags)) { + if (value !== undefined) { + await setFlag({ + key: key as any, + value: value as any, + }); + updatedFlags.push({ key, value }); + } + } + + return updatedFlags; + }; +}