diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 6fd895c456..8800686236 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -891,7 +891,7 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_ENABLED: z.string().default("0"), RUN_REPLICATION_SLOT_NAME: z.string().default("task_runs_to_clickhouse_v1"), RUN_REPLICATION_PUBLICATION_NAME: z.string().default("task_runs_to_clickhouse_v1_publication"), - RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(100), + RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(2), RUN_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), RUN_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100), RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000), diff --git a/apps/webapp/app/presenters/RunFilters.server.ts b/apps/webapp/app/presenters/RunFilters.server.ts index db19e65656..8d70b4d3bd 100644 --- a/apps/webapp/app/presenters/RunFilters.server.ts +++ b/apps/webapp/app/presenters/RunFilters.server.ts @@ -3,7 +3,7 @@ import { TaskRunListSearchFilters, } from "~/components/runs/v3/RunFilters"; import { getRootOnlyFilterPreference } from "~/services/preferences/uiPreferences.server"; -import { type ParsedRunFilters } from "~/services/runsRepository.server"; +import { type ParsedRunFilters } from "~/services/runsRepository/runsRepository.server"; type FiltersFromRequest = ParsedRunFilters & Required>; diff --git a/apps/webapp/app/presenters/v3/BulkActionPresenter.server.ts b/apps/webapp/app/presenters/v3/BulkActionPresenter.server.ts index 0434045ea4..f98d0819cb 100644 --- a/apps/webapp/app/presenters/v3/BulkActionPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/BulkActionPresenter.server.ts @@ -1,7 +1,7 @@ import { getUsername } from "~/utils/username"; import { BasePresenter } from "./basePresenter.server"; import { type BulkActionMode } from "~/components/BulkActionFilterSummary"; -import { parseRunListInputOptions } from "~/services/runsRepository.server"; +import { parseRunListInputOptions } from "~/services/runsRepository/runsRepository.server"; import { TaskRunListSearchFilters } from "~/components/runs/v3/RunFilters"; type BulkActionOptions = { diff --git a/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts b/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts index e80c7df42b..acf511f0f5 100644 --- a/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts @@ -1,7 +1,7 @@ import { type PrismaClient } from "@trigger.dev/database"; import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction"; import { clickhouseClient } from "~/services/clickhouseInstance.server"; -import { RunsRepository } from "~/services/runsRepository.server"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { getRunFiltersFromRequest } from "../RunFilters.server"; import { BasePresenter } from "./basePresenter.server"; diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index 960bdfc10a..2375ea161a 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -9,7 +9,7 @@ import { type Direction } from "~/components/ListPagination"; import { timeFilters } from "~/components/runs/v3/SharedFilters"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { getAllTaskIdentifiers } from "~/models/task.server"; -import { RunsRepository } from "~/services/runsRepository.server"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus"; diff --git a/apps/webapp/app/routes/admin.api.v1.feature-flags.ts b/apps/webapp/app/routes/admin.api.v1.feature-flags.ts new file mode 100644 index 0000000000..cd7958c5b8 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.feature-flags.ts @@ -0,0 +1,71 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server"; +import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; +import { + makeSetFlags, + setFlags, + FeatureFlagCatalogSchema, + validateAllFeatureFlags, + validatePartialFeatureFlags, + makeSetMultipleFlags, +} from "~/v3/featureFlags.server"; +import { z } from "zod"; + +export async function action({ request }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + try { + // Parse the request body + const body = await request.json(); + + // Validate the input using the partial schema + const validationResult = validatePartialFeatureFlags(body as Record); + if (!validationResult.success) { + return json( + { + error: "Invalid feature flags data", + details: validationResult.error.issues, + }, + { status: 400 } + ); + } + + const featureFlags = validationResult.data; + const setMultipleFlags = makeSetMultipleFlags(prisma); + const updatedFlags = await setMultipleFlags(featureFlags); + + return json({ + success: true, + updatedFlags, + message: `Updated ${updatedFlags.length} feature flag(s)`, + }); + } catch (error) { + return json( + { + error: error instanceof Error ? error.message : String(error), + }, + { status: 400 } + ); + } +} diff --git a/apps/webapp/app/services/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts similarity index 61% rename from apps/webapp/app/services/runsRepository.server.ts rename to apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index 3196c436b3..56a42c751d 100644 --- a/apps/webapp/app/services/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -1,79 +1,26 @@ -import { type ClickHouse, type ClickhouseQueryBuilder } from "@internal/clickhouse"; -import { type Tracer } from "@internal/tracing"; -import { type Logger, type LogLevel } from "@trigger.dev/core/logger"; -import { MachinePresetName } from "@trigger.dev/core/v3"; -import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; -import { TaskRunStatus } from "@trigger.dev/database"; -import parseDuration from "parse-duration"; -import { z } from "zod"; -import { timeFilters } from "~/components/runs/v3/SharedFilters"; -import { type PrismaClient } from "~/db.server"; - -export type RunsRepositoryOptions = { - clickhouse: ClickHouse; - prisma: PrismaClient; - logger?: Logger; - logLevel?: LogLevel; - tracer?: Tracer; -}; - -const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]); - -const RunListInputOptionsSchema = z.object({ - organizationId: z.string(), - projectId: z.string(), - environmentId: z.string(), - //filters - tasks: z.array(z.string()).optional(), - versions: z.array(z.string()).optional(), - statuses: z.array(RunStatus).optional(), - tags: z.array(z.string()).optional(), - scheduleId: z.string().optional(), - period: z.string().optional(), - from: z.number().optional(), - to: z.number().optional(), - isTest: z.boolean().optional(), - rootOnly: z.boolean().optional(), - batchId: z.string().optional(), - runId: z.array(z.string()).optional(), - bulkId: z.string().optional(), - queues: z.array(z.string()).optional(), - machines: MachinePresetName.array().optional(), -}); - -export type RunListInputOptions = z.infer; -export type RunListInputFilters = Omit< - RunListInputOptions, - "organizationId" | "projectId" | "environmentId" ->; - -export type ParsedRunFilters = RunListInputFilters & { - cursor?: string; - direction?: "forward" | "backward"; -}; - -type FilterRunsOptions = Omit & { - period: number | undefined; -}; - -type Pagination = { - page: { - size: number; - cursor?: string; - direction?: "forward" | "backward"; - }; -}; - -export type ListRunsOptions = RunListInputOptions & Pagination; - -export class RunsRepository { +import { type ClickhouseQueryBuilder } from "@internal/clickhouse"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; +import { + type FilterRunsOptions, + type IRunsRepository, + type ListRunsOptions, + type RunListInputOptions, + type RunsRepositoryOptions, + convertRunListInputOptionsToFilterRunsOptions, +} from "./runsRepository.server"; + +export class ClickHouseRunsRepository implements IRunsRepository { constructor(private readonly options: RunsRepositoryOptions) {} + get name() { + return "clickhouse"; + } + async listRunIds(options: ListRunsOptions) { const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder(); applyRunFiltersToQueryBuilder( queryBuilder, - await this.#convertRunListInputOptionsToFilterRunsOptions(options) + await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma) ); if (options.page.cursor) { @@ -200,7 +147,7 @@ export class RunsRepository { const queryBuilder = this.options.clickhouse.taskRuns.countQueryBuilder(); applyRunFiltersToQueryBuilder( queryBuilder, - await this.#convertRunListInputOptionsToFilterRunsOptions(options) + await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma) ); const [queryError, result] = await queryBuilder.execute(); @@ -215,73 +162,6 @@ export class RunsRepository { return result[0].count; } - - async #convertRunListInputOptionsToFilterRunsOptions( - options: RunListInputOptions - ): Promise { - const convertedOptions: FilterRunsOptions = { - ...options, - period: undefined, - }; - - // Convert time period to ms - const time = timeFilters({ - period: options.period, - from: options.from, - to: options.to, - }); - convertedOptions.period = time.period ? parseDuration(time.period) ?? undefined : undefined; - - // batch friendlyId to id - if (options.batchId && options.batchId.startsWith("batch_")) { - const batch = await this.options.prisma.batchTaskRun.findFirst({ - select: { - id: true, - }, - where: { - friendlyId: options.batchId, - runtimeEnvironmentId: options.environmentId, - }, - }); - - if (batch) { - convertedOptions.batchId = batch.id; - } - } - - // scheduleId can be a friendlyId - if (options.scheduleId && options.scheduleId.startsWith("sched_")) { - const schedule = await this.options.prisma.taskSchedule.findFirst({ - select: { - id: true, - }, - where: { - friendlyId: options.scheduleId, - projectId: options.projectId, - }, - }); - - if (schedule) { - convertedOptions.scheduleId = schedule?.id; - } - } - - if (options.bulkId && options.bulkId.startsWith("bulk_")) { - convertedOptions.bulkId = BulkActionId.toId(options.bulkId); - } - - if (options.runId) { - //convert to friendlyId - convertedOptions.runId = options.runId.map((r) => RunId.toFriendlyId(r)); - } - - // Show all runs if we are filtering by batchId or runId - if (options.batchId || options.runId?.length || options.scheduleId || options.tasks?.length) { - convertedOptions.rootOnly = false; - } - - return convertedOptions; - } } function applyRunFiltersToQueryBuilder( @@ -373,7 +253,3 @@ function applyRunFiltersToQueryBuilder( }); } } - -export function parseRunListInputOptions(data: any): RunListInputOptions { - return RunListInputOptionsSchema.parse(data); -} diff --git a/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts new file mode 100644 index 0000000000..ec9b5be69b --- /dev/null +++ b/apps/webapp/app/services/runsRepository/postgresRunsRepository.server.ts @@ -0,0 +1,290 @@ +import { RunId } from "@trigger.dev/core/v3/isomorphic"; +import { Prisma } from "@trigger.dev/database"; +import { sqlDatabaseSchema } from "~/db.server"; +import { + type FilterRunsOptions, + type IRunsRepository, + type ListRunsOptions, + type ListedRun, + type RunListInputOptions, + type RunsRepositoryOptions, + convertRunListInputOptionsToFilterRunsOptions, +} from "./runsRepository.server"; + +export class PostgresRunsRepository implements IRunsRepository { + constructor(private readonly options: RunsRepositoryOptions) {} + + get name() { + return "postgres"; + } + + async listRunIds(options: ListRunsOptions) { + const filterOptions = await convertRunListInputOptionsToFilterRunsOptions( + options, + this.options.prisma + ); + + const query = this.#buildRunIdsQuery(filterOptions, options.page); + const runs = await this.options.prisma.$queryRaw<{ id: string }[]>(query); + + return runs.map((run) => run.id); + } + + async listRuns(options: ListRunsOptions) { + const filterOptions = await convertRunListInputOptionsToFilterRunsOptions( + options, + this.options.prisma + ); + + const query = this.#buildRunsQuery(filterOptions, options.page); + const runs = await this.options.prisma.$queryRaw(query); + + // If there are more runs than the page size, we need to fetch the next page + const hasMore = runs.length > options.page.size; + + let nextCursor: string | null = null; + let previousCursor: string | null = null; + + // Get cursors for next and previous pages + const direction = options.page.direction ?? "forward"; + switch (direction) { + case "forward": { + previousCursor = options.page.cursor ? runs.at(0)?.id ?? null : null; + if (hasMore) { + // The next cursor should be the last run ID from this page + nextCursor = runs[options.page.size - 1]?.id ?? null; + } + break; + } + case "backward": { + const reversedRuns = [...runs].reverse(); + if (hasMore) { + previousCursor = reversedRuns.at(1)?.id ?? null; + nextCursor = reversedRuns.at(options.page.size)?.id ?? null; + } else { + nextCursor = reversedRuns.at(options.page.size - 1)?.id ?? null; + } + break; + } + } + + const runsToReturn = + options.page.direction === "backward" && hasMore + ? runs.slice(1, options.page.size + 1) + : runs.slice(0, options.page.size); + + // ClickHouse is slightly delayed, so we're going to do in-memory status filtering too + let filteredRuns = runsToReturn; + if (options.statuses && options.statuses.length > 0) { + filteredRuns = runsToReturn.filter((run) => options.statuses!.includes(run.status)); + } + + return { + runs: filteredRuns, + pagination: { + nextCursor, + previousCursor, + }, + }; + } + + async countRuns(options: RunListInputOptions) { + const filterOptions = await convertRunListInputOptionsToFilterRunsOptions( + options, + this.options.prisma + ); + + const query = this.#buildCountQuery(filterOptions); + const result = await this.options.prisma.$queryRaw<{ count: bigint }[]>(query); + + if (result.length === 0) { + throw new Error("No count rows returned"); + } + + return Number(result[0].count); + } + + #buildRunIdsQuery( + filterOptions: FilterRunsOptions, + page: { size: number; cursor?: string; direction?: "forward" | "backward" } + ) { + const whereConditions = this.#buildWhereConditions(filterOptions, page.cursor, page.direction); + + return Prisma.sql` + SELECT tr.id + FROM ${sqlDatabaseSchema}."TaskRun" tr + WHERE ${whereConditions} + ORDER BY ${page.direction === "backward" ? Prisma.sql`tr.id ASC` : Prisma.sql`tr.id DESC`} + LIMIT ${page.size + 1} + `; + } + + #buildRunsQuery( + filterOptions: FilterRunsOptions, + page: { size: number; cursor?: string; direction?: "forward" | "backward" } + ) { + const whereConditions = this.#buildWhereConditions(filterOptions, page.cursor, page.direction); + + return Prisma.sql` + SELECT + tr.id, + tr."friendlyId", + tr."taskIdentifier", + tr."taskVersion", + tr."runtimeEnvironmentId", + tr.status, + tr."createdAt", + tr."startedAt", + tr."lockedAt", + tr."delayUntil", + tr."updatedAt", + tr."completedAt", + tr."isTest", + tr."spanId", + tr."idempotencyKey", + tr."ttl", + tr."expiredAt", + tr."costInCents", + tr."baseCostInCents", + tr."usageDurationMs", + tr."runTags", + tr."depth", + tr."rootTaskRunId", + tr."batchId", + tr."metadata", + tr."metadataType", + tr."machinePreset", + tr."queue" + FROM ${sqlDatabaseSchema}."TaskRun" tr + WHERE ${whereConditions} + ORDER BY ${page.direction === "backward" ? Prisma.sql`tr.id ASC` : Prisma.sql`tr.id DESC`} + LIMIT ${page.size + 1} + `; + } + + #buildCountQuery(filterOptions: FilterRunsOptions) { + const whereConditions = this.#buildWhereConditions(filterOptions); + + return Prisma.sql` + SELECT COUNT(*) as count + FROM ${sqlDatabaseSchema}."TaskRun" tr + WHERE ${whereConditions} + `; + } + + #buildWhereConditions( + filterOptions: FilterRunsOptions, + cursor?: string, + direction?: "forward" | "backward" + ) { + const conditions: Prisma.Sql[] = []; + + // Environment filter + conditions.push(Prisma.sql`tr."runtimeEnvironmentId" = ${filterOptions.environmentId}`); + + // Cursor pagination + if (cursor) { + if (direction === "forward" || !direction) { + conditions.push(Prisma.sql`tr.id < ${cursor}`); + } else { + conditions.push(Prisma.sql`tr.id > ${cursor}`); + } + } + + // Task filters + if (filterOptions.tasks && filterOptions.tasks.length > 0) { + conditions.push(Prisma.sql`tr."taskIdentifier" IN (${Prisma.join(filterOptions.tasks)})`); + } + + // Version filters + if (filterOptions.versions && filterOptions.versions.length > 0) { + conditions.push(Prisma.sql`tr."taskVersion" IN (${Prisma.join(filterOptions.versions)})`); + } + + // Status filters + if (filterOptions.statuses && filterOptions.statuses.length > 0) { + conditions.push( + Prisma.sql`tr.status = ANY(ARRAY[${Prisma.join( + filterOptions.statuses + )}]::"TaskRunStatus"[])` + ); + } + + // Tag filters + if (filterOptions.tags && filterOptions.tags.length > 0) { + conditions.push( + Prisma.sql`tr."runTags" && ARRAY[${Prisma.join(filterOptions.tags)}]::text[]` + ); + } + + // Schedule filter + if (filterOptions.scheduleId) { + conditions.push(Prisma.sql`tr."scheduleId" = ${filterOptions.scheduleId}`); + } + + // Time period filter + if (filterOptions.period) { + conditions.push( + Prisma.sql`tr."createdAt" >= NOW() - INTERVAL '1 millisecond' * ${filterOptions.period}` + ); + } + + // From date filter + if (filterOptions.from) { + conditions.push( + Prisma.sql`tr."createdAt" >= ${new Date(filterOptions.from).toISOString()}::timestamp` + ); + } + + // To date filter + if (filterOptions.to) { + const toDate = new Date(filterOptions.to); + const now = new Date(); + const clampedDate = toDate > now ? now : toDate; + conditions.push(Prisma.sql`tr."createdAt" <= ${clampedDate.toISOString()}::timestamp`); + } + + // Test filter + if (typeof filterOptions.isTest === "boolean") { + conditions.push(Prisma.sql`tr."isTest" = ${filterOptions.isTest}`); + } + + // Root only filter + if (filterOptions.rootOnly) { + conditions.push(Prisma.sql`tr."rootTaskRunId" IS NULL`); + } + + // Batch filter + if (filterOptions.batchId) { + conditions.push(Prisma.sql`tr."batchId" = ${filterOptions.batchId}`); + } + + // Bulk action filter + if (filterOptions.bulkId) { + conditions.push( + Prisma.sql`tr."bulkActionGroupIds" && ARRAY[${filterOptions.bulkId}]::text[]` + ); + } + + // Run ID filter + if (filterOptions.runId && filterOptions.runId.length > 0) { + const friendlyIds = filterOptions.runId.map((runId) => RunId.toFriendlyId(runId)); + conditions.push(Prisma.sql`tr."friendlyId" IN (${Prisma.join(friendlyIds)})`); + } + + // Queue filter + if (filterOptions.queues && filterOptions.queues.length > 0) { + conditions.push(Prisma.sql`tr."queue" IN (${Prisma.join(filterOptions.queues)})`); + } + + // Machine preset filter + if (filterOptions.machines && filterOptions.machines.length > 0) { + conditions.push(Prisma.sql`tr."machinePreset" IN (${Prisma.join(filterOptions.machines)})`); + } + + // Combine all conditions with AND + return conditions.reduce((acc, condition) => + acc === null ? condition : Prisma.sql`${acc} AND ${condition}` + ); + } +} diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts new file mode 100644 index 0000000000..7b9bf2a368 --- /dev/null +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -0,0 +1,366 @@ +import { type ClickHouse } from "@internal/clickhouse"; +import { type Tracer } from "@internal/tracing"; +import { type Logger, type LogLevel } from "@trigger.dev/core/logger"; +import { MachinePresetName } from "@trigger.dev/core/v3"; +import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic"; +import { type Prisma, TaskRunStatus } from "@trigger.dev/database"; +import parseDuration from "parse-duration"; +import { z } from "zod"; +import { timeFilters } from "~/components/runs/v3/SharedFilters"; +import { type PrismaClient } from "~/db.server"; +import { FEATURE_FLAG, makeFlags } from "~/v3/featureFlags.server"; +import { startActiveSpan } from "~/v3/tracer.server"; +import { logger } from "../logger.server"; +import { ClickHouseRunsRepository } from "./clickhouseRunsRepository.server"; +import { PostgresRunsRepository } from "./postgresRunsRepository.server"; + +export type RunsRepositoryOptions = { + clickhouse: ClickHouse; + prisma: PrismaClient; + logger?: Logger; + logLevel?: LogLevel; + tracer?: Tracer; +}; + +const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]); + +const RunListInputOptionsSchema = z.object({ + organizationId: z.string(), + projectId: z.string(), + environmentId: z.string(), + //filters + tasks: z.array(z.string()).optional(), + versions: z.array(z.string()).optional(), + statuses: z.array(RunStatus).optional(), + tags: z.array(z.string()).optional(), + scheduleId: z.string().optional(), + period: z.string().optional(), + from: z.number().optional(), + to: z.number().optional(), + isTest: z.boolean().optional(), + rootOnly: z.boolean().optional(), + batchId: z.string().optional(), + runId: z.array(z.string()).optional(), + bulkId: z.string().optional(), + queues: z.array(z.string()).optional(), + machines: MachinePresetName.array().optional(), +}); + +export type RunListInputOptions = z.infer; +export type RunListInputFilters = Omit< + RunListInputOptions, + "organizationId" | "projectId" | "environmentId" +>; + +export type ParsedRunFilters = RunListInputFilters & { + cursor?: string; + direction?: "forward" | "backward"; +}; + +export type FilterRunsOptions = Omit & { + period: number | undefined; +}; + +type Pagination = { + page: { + size: number; + cursor?: string; + direction?: "forward" | "backward"; + }; +}; + +export type ListedRun = Prisma.TaskRunGetPayload<{ + select: { + id: true; + friendlyId: true; + taskIdentifier: true; + taskVersion: true; + runtimeEnvironmentId: true; + status: true; + createdAt: true; + startedAt: true; + lockedAt: true; + delayUntil: true; + updatedAt: true; + completedAt: true; + isTest: true; + spanId: true; + idempotencyKey: true; + ttl: true; + expiredAt: true; + costInCents: true; + baseCostInCents: true; + usageDurationMs: true; + runTags: true; + depth: true; + rootTaskRunId: true; + batchId: true; + metadata: true; + metadataType: true; + machinePreset: true; + queue: true; + }; +}>; + +export type ListRunsOptions = RunListInputOptions & Pagination; + +export interface IRunsRepository { + name: string; + listRunIds(options: ListRunsOptions): Promise; + listRuns(options: ListRunsOptions): Promise<{ + runs: ListedRun[]; + pagination: { + nextCursor: string | null; + previousCursor: string | null; + }; + }>; + countRuns(options: RunListInputOptions): Promise; +} + +export class RunsRepository implements IRunsRepository { + private readonly clickHouseRunsRepository: ClickHouseRunsRepository; + private readonly postgresRunsRepository: PostgresRunsRepository; + private readonly defaultRepository: "clickhouse" | "postgres"; + private readonly logger: Logger; + + constructor( + private readonly options: RunsRepositoryOptions & { + defaultRepository?: "clickhouse" | "postgres"; + } + ) { + this.clickHouseRunsRepository = new ClickHouseRunsRepository(options); + this.postgresRunsRepository = new PostgresRunsRepository(options); + this.defaultRepository = options.defaultRepository ?? "clickhouse"; + this.logger = options.logger ?? logger; + } + + get name() { + return "runsRepository"; + } + + async #getRepository(): Promise { + return startActiveSpan("runsRepository.getRepository", async (span) => { + const getFlag = makeFlags(this.options.prisma); + const runsListRepository = await getFlag({ + key: FEATURE_FLAG.runsListRepository, + defaultValue: this.defaultRepository, + }); + + span.setAttribute("repository.name", runsListRepository); + + logger.log("runsListRepository", { runsListRepository }); + + switch (runsListRepository) { + case "postgres": + return this.postgresRunsRepository; + case "clickhouse": + default: + return this.clickHouseRunsRepository; + } + }); + } + + async listRunIds(options: ListRunsOptions): Promise { + const repository = await this.#getRepository(); + return startActiveSpan( + "runsRepository.listRunIds", + async () => { + try { + return await repository.listRunIds(options); + } catch (error) { + // If ClickHouse fails, retry with Postgres + if (repository.name === "clickhouse") { + this.logger?.warn("ClickHouse failed, retrying with Postgres", { error }); + return startActiveSpan( + "runsRepository.listRunIds.fallback", + async () => { + return await this.postgresRunsRepository.listRunIds(options); + }, + { + attributes: { + "repository.name": "postgres", + "fallback.reason": "clickhouse_error", + "fallback.error": error instanceof Error ? error.message : String(error), + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + throw error; + } + }, + { + attributes: { + "repository.name": repository.name, + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + + async listRuns(options: ListRunsOptions): Promise<{ + runs: ListedRun[]; + pagination: { + nextCursor: string | null; + previousCursor: string | null; + }; + }> { + const repository = await this.#getRepository(); + return startActiveSpan( + "runsRepository.listRuns", + async () => { + try { + return await repository.listRuns(options); + } catch (error) { + // If ClickHouse fails, retry with Postgres + if (repository.name === "clickhouse") { + this.logger?.warn("ClickHouse failed, retrying with Postgres", { error }); + return startActiveSpan( + "runsRepository.listRuns.fallback", + async () => { + return await this.postgresRunsRepository.listRuns(options); + }, + { + attributes: { + "repository.name": "postgres", + "fallback.reason": "clickhouse_error", + "fallback.error": error instanceof Error ? error.message : String(error), + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + throw error; + } + }, + { + attributes: { + "repository.name": repository.name, + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + + async countRuns(options: RunListInputOptions): Promise { + const repository = await this.#getRepository(); + return startActiveSpan( + "runsRepository.countRuns", + async () => { + try { + return await repository.countRuns(options); + } catch (error) { + // If ClickHouse fails, retry with Postgres + if (repository.name === "clickhouse") { + this.logger?.warn("ClickHouse failed, retrying with Postgres", { error }); + return startActiveSpan( + "runsRepository.countRuns.fallback", + async () => { + return await this.postgresRunsRepository.countRuns(options); + }, + { + attributes: { + "repository.name": "postgres", + "fallback.reason": "clickhouse_error", + "fallback.error": error instanceof Error ? error.message : String(error), + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + throw error; + } + }, + { + attributes: { + "repository.name": repository.name, + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } +} + +export function parseRunListInputOptions(data: any): RunListInputOptions { + return RunListInputOptionsSchema.parse(data); +} + +export async function convertRunListInputOptionsToFilterRunsOptions( + options: RunListInputOptions, + prisma: RunsRepositoryOptions["prisma"] +): Promise { + const convertedOptions: FilterRunsOptions = { + ...options, + period: undefined, + }; + + // Convert time period to ms + const time = timeFilters({ + period: options.period, + from: options.from, + to: options.to, + }); + convertedOptions.period = time.period ? parseDuration(time.period) ?? undefined : undefined; + + // Batch friendlyId to id + if (options.batchId && options.batchId.startsWith("batch_")) { + const batch = await prisma.batchTaskRun.findFirst({ + select: { + id: true, + }, + where: { + friendlyId: options.batchId, + runtimeEnvironmentId: options.environmentId, + }, + }); + + if (batch) { + convertedOptions.batchId = batch.id; + } + } + + // ScheduleId can be a friendlyId + if (options.scheduleId && options.scheduleId.startsWith("sched_")) { + const schedule = await prisma.taskSchedule.findFirst({ + select: { + id: true, + }, + where: { + friendlyId: options.scheduleId, + projectId: options.projectId, + }, + }); + + if (schedule) { + convertedOptions.scheduleId = schedule?.id; + } + } + + if (options.bulkId && options.bulkId.startsWith("bulk_")) { + convertedOptions.bulkId = BulkActionId.toId(options.bulkId); + } + + if (options.runId) { + // Convert to friendlyId + convertedOptions.runId = options.runId.map((r) => RunId.toFriendlyId(r)); + } + + // Show all runs if we are filtering by batchId or runId + if (options.batchId || options.runId?.length || options.scheduleId || options.tasks?.length) { + convertedOptions.rootOnly = false; + } + + return convertedOptions; +} diff --git a/apps/webapp/app/v3/featureFlags.server.ts b/apps/webapp/app/v3/featureFlags.server.ts index 6f7c3edce5..f1bc913c42 100644 --- a/apps/webapp/app/v3/featureFlags.server.ts +++ b/apps/webapp/app/v3/featureFlags.server.ts @@ -1,23 +1,32 @@ import { z } from "zod"; -import { prisma, PrismaClientOrTransaction } from "~/db.server"; +import { prisma, type PrismaClientOrTransaction } from "~/db.server"; export const FEATURE_FLAG = { defaultWorkerInstanceGroupId: "defaultWorkerInstanceGroupId", + runsListRepository: "runsListRepository", } as const; const FeatureFlagCatalog = { [FEATURE_FLAG.defaultWorkerInstanceGroupId]: z.string(), + [FEATURE_FLAG.runsListRepository]: z.enum(["clickhouse", "postgres"]), }; type FeatureFlagKey = keyof typeof FeatureFlagCatalog; -export type FlagsOptions = { - key: FeatureFlagKey; +export type FlagsOptions = { + key: T; + defaultValue?: z.infer<(typeof FeatureFlagCatalog)[T]>; }; export function makeFlags(_prisma: PrismaClientOrTransaction = prisma) { - return async function flags( - opts: FlagsOptions + function flags( + opts: FlagsOptions & { defaultValue: z.infer<(typeof FeatureFlagCatalog)[T]> } + ): Promise>; + function flags( + opts: FlagsOptions + ): Promise | undefined>; + async function flags( + opts: FlagsOptions ): Promise | undefined> { const value = await _prisma.featureFlag.findUnique({ where: { @@ -28,16 +37,18 @@ export function makeFlags(_prisma: PrismaClientOrTransaction = prisma) { const parsed = FeatureFlagCatalog[opts.key].safeParse(value?.value); if (!parsed.success) { - return; + return opts.defaultValue; } return parsed.data; - }; + } + + return flags; } export function makeSetFlags(_prisma: PrismaClientOrTransaction = prisma) { return async function setFlags( - opts: FlagsOptions & { value: z.infer<(typeof FeatureFlagCatalog)[T]> } + opts: FlagsOptions & { value: z.infer<(typeof FeatureFlagCatalog)[T]> } ): Promise { await _prisma.featureFlag.upsert({ where: { @@ -56,3 +67,46 @@ export function makeSetFlags(_prisma: PrismaClientOrTransaction = prisma) { export const flags = makeFlags(); export const setFlags = makeSetFlags(); + +// Create a Zod schema from the existing catalog +export const FeatureFlagCatalogSchema = z.object(FeatureFlagCatalog); + +// Utility function to validate a feature flag value +export function validateFeatureFlagValue( + key: T, + value: unknown +): z.SafeParseReturnType> { + return FeatureFlagCatalog[key].safeParse(value); +} + +// Utility function to validate all feature flags at once +export function validateAllFeatureFlags(values: Record) { + return FeatureFlagCatalogSchema.safeParse(values); +} + +// Utility function to validate partial feature flags (all keys optional) +export function validatePartialFeatureFlags(values: Record) { + return FeatureFlagCatalogSchema.partial().safeParse(values); +} + +// Utility function to set multiple feature flags at once +export function makeSetMultipleFlags(_prisma: PrismaClientOrTransaction = prisma) { + return async function setMultipleFlags( + flags: Partial> + ): Promise<{ key: string; value: any }[]> { + const setFlag = makeSetFlags(_prisma); + const updatedFlags: { key: string; value: any }[] = []; + + for (const [key, value] of Object.entries(flags)) { + if (value !== undefined) { + await setFlag({ + key: key as any, + value: value as any, + }); + updatedFlags.push({ key, value }); + } + } + + return updatedFlags; + }; +} diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index a68d6d330f..98b6079c10 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -12,7 +12,7 @@ import { parseRunListInputOptions, type RunListInputFilters, RunsRepository, -} from "~/services/runsRepository.server"; +} from "~/services/runsRepository/runsRepository.server"; import { BaseService } from "../baseService.server"; import { commonWorker } from "~/v3/commonWorker.server"; import { env } from "~/env.server"; diff --git a/apps/webapp/test/runsRepository.test.ts b/apps/webapp/test/runsRepository.test.ts index 56e6bf92d6..37f83d7f0c 100644 --- a/apps/webapp/test/runsRepository.test.ts +++ b/apps/webapp/test/runsRepository.test.ts @@ -1,6 +1,6 @@ import { containerTest } from "@internal/testcontainers"; import { setTimeout } from "node:timers/promises"; -import { RunsRepository } from "~/services/runsRepository.server"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { setupClickhouseReplication } from "./utils/replicationUtils"; vi.setConfig({ testTimeout: 60_000 });