diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index a74d5d64f0..e2325e4694 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -771,6 +771,7 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"), RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"), RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().default(9_000), + RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10), }); export type Environment = z.infer; diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts new file mode 100644 index 0000000000..483c2d219a --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts @@ -0,0 +1,122 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { z } from "zod"; +import { ClickHouse } from "@internal/clickhouse"; +import { env } from "~/env.server"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; +import { + getRunsReplicationGlobal, + setRunsReplicationGlobal, +} from "~/services/runsReplicationGlobal.server"; + +const CreateRunReplicationServiceParams = z.object({ + name: z.string(), + keepAliveEnabled: z.boolean(), + keepAliveIdleSocketTtl: z.number(), + maxOpenConnections: z.number(), + maxFlushConcurrency: z.number(), + flushIntervalMs: z.number(), + flushBatchSize: z.number(), + leaderLockTimeoutMs: z.number(), + leaderLockExtendIntervalMs: z.number(), + leaderLockAcquireAdditionalTimeMs: z.number(), + leaderLockRetryIntervalMs: z.number(), + ackIntervalSeconds: z.number(), + waitForAsyncInsert: z.boolean(), +}); + +type CreateRunReplicationServiceParams = z.infer; + +export async function action({ request }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + try { + const globalService = getRunsReplicationGlobal(); + + if (globalService) { + return json( + { error: "Global runs replication service already exists. Stop it first." }, + { status: 400 } + ); + } + + const params = CreateRunReplicationServiceParams.parse(await request.json()); + + const service = createRunReplicationService(params); + + setRunsReplicationGlobal(service); + + await service.start(); + + return json({ + success: true, + }); + } catch (error) { + return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); + } +} + +function createRunReplicationService(params: CreateRunReplicationServiceParams) { + const clickhouse = new ClickHouse({ + url: env.RUN_REPLICATION_CLICKHOUSE_URL, + name: params.name, + keepAlive: { + enabled: params.keepAliveEnabled, + idleSocketTtl: params.keepAliveIdleSocketTtl, + }, + logLevel: "debug", + compression: { + request: true, + }, + maxOpenConnections: params.maxOpenConnections, + }); + + const service = new RunsReplicationService({ + clickhouse: clickhouse, + pgConnectionUrl: env.DATABASE_URL, + serviceName: params.name, + slotName: env.RUN_REPLICATION_SLOT_NAME, + publicationName: env.RUN_REPLICATION_PUBLICATION_NAME, + redisOptions: { + keyPrefix: "runs-replication:", + port: env.RUN_REPLICATION_REDIS_PORT ?? undefined, + host: env.RUN_REPLICATION_REDIS_HOST ?? undefined, + username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined, + password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined, + enableAutoPipelining: true, + ...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }, + maxFlushConcurrency: params.maxFlushConcurrency, + flushIntervalMs: params.flushIntervalMs, + flushBatchSize: params.flushBatchSize, + leaderLockTimeoutMs: params.leaderLockTimeoutMs, + leaderLockExtendIntervalMs: params.leaderLockExtendIntervalMs, + leaderLockAcquireAdditionalTimeMs: params.leaderLockAcquireAdditionalTimeMs, + leaderLockRetryIntervalMs: params.leaderLockRetryIntervalMs, + ackIntervalSeconds: params.ackIntervalSeconds, + logLevel: "debug", + waitForAsyncInsert: params.waitForAsyncInsert, + }); + + return service; +} diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.start-monitor.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.start-monitor.ts new file mode 100644 index 0000000000..510c363526 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.start-monitor.ts @@ -0,0 +1,59 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { startTcpBufferMonitor } from "~/services/monitorTcpBuffers.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { getTcpMonitorGlobal, setTcpMonitorGlobal } from "~/services/runsReplicationGlobal.server"; + +const schema = z.object({ + intervalMs: z.number().min(1000).max(60_000).default(5_000), +}); + +export async function action({ request }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + try { + const body = await request.json(); + const { intervalMs } = schema.parse(body); + + const globalMonitor = getTcpMonitorGlobal(); + + if (globalMonitor) { + return json( + { + error: "Tcp buffer monitor already running, you must stop it before starting a new one", + }, + { + status: 400, + } + ); + } + + setTcpMonitorGlobal(startTcpBufferMonitor(intervalMs)); + + return json({ + success: true, + }); + } catch (error) { + return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); + } +} diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts index 182b264000..a700c4d4f1 100644 --- a/apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts @@ -1,6 +1,7 @@ import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; import { prisma } from "~/db.server"; import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server"; import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; export async function action({ request }: ActionFunctionArgs) { @@ -26,7 +27,13 @@ export async function action({ request }: ActionFunctionArgs) { } try { - await runsReplicationInstance?.start(); + const globalService = getRunsReplicationGlobal(); + + if (globalService) { + await globalService.start(); + } else { + await runsReplicationInstance?.start(); + } return json({ success: true, diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.stop-monitor.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.stop-monitor.ts new file mode 100644 index 0000000000..fb2a3daecd --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.stop-monitor.ts @@ -0,0 +1,47 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { + getTcpMonitorGlobal, + unregisterTcpMonitorGlobal, +} from "~/services/runsReplicationGlobal.server"; + +export async function action({ request }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + try { + const globalMonitor = getTcpMonitorGlobal(); + + if (!globalMonitor) { + return json({ error: "Tcp buffer monitor not running" }, { status: 400 }); + } + + clearInterval(globalMonitor); + unregisterTcpMonitorGlobal(); + + return json({ + success: true, + }); + } catch (error) { + return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); + } +} diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts index 6163ff5f70..1dc53833d8 100644 --- a/apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts @@ -1,6 +1,7 @@ import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; import { prisma } from "~/db.server"; import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server"; import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; export async function action({ request }: ActionFunctionArgs) { @@ -26,7 +27,13 @@ export async function action({ request }: ActionFunctionArgs) { } try { - await runsReplicationInstance?.stop(); + const globalService = getRunsReplicationGlobal(); + + if (globalService) { + await globalService.stop(); + } else { + await runsReplicationInstance?.stop(); + } return json({ success: true, diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts index f32b76383d..f4a1223dfc 100644 --- a/apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts @@ -1,6 +1,10 @@ import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; import { prisma } from "~/db.server"; import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { + getRunsReplicationGlobal, + unregisterRunsReplicationGlobal, +} from "~/services/runsReplicationGlobal.server"; import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; export async function action({ request }: ActionFunctionArgs) { @@ -26,7 +30,14 @@ export async function action({ request }: ActionFunctionArgs) { } try { - await runsReplicationInstance?.teardown(); + const globalService = getRunsReplicationGlobal(); + + if (globalService) { + await globalService.teardown(); + unregisterRunsReplicationGlobal(); + } else { + await runsReplicationInstance?.teardown(); + } return json({ success: true, diff --git a/apps/webapp/app/services/monitorTcpBuffers.server.ts b/apps/webapp/app/services/monitorTcpBuffers.server.ts new file mode 100644 index 0000000000..418ff9e4c8 --- /dev/null +++ b/apps/webapp/app/services/monitorTcpBuffers.server.ts @@ -0,0 +1,57 @@ +// monitorTcpBuffers.ts +import fs from "fs/promises"; +import os from "os"; +import { logger } from "./logger.server"; + +/** + * Parse /proc/net/sockstat and /proc/sys/net/* every `intervalMs` + * and log the numbers. You can pivot these logs into CloudWatch + * metrics with a filter pattern if you like. + */ +export function startTcpBufferMonitor(intervalMs = 5_000) { + async function sampleOnce() { + try { + const [sockstat, wmemMax, tcpMem] = await Promise.all([ + fs.readFile("/proc/net/sockstat", "utf8"), + fs.readFile("/proc/sys/net/core/wmem_max", "utf8"), + fs.readFile("/proc/sys/net/ipv4/tcp_mem", "utf8"), + ]); + + logger.debug("tcp-buffer-monitor", { + sockstat, + wmemMax, + tcpMem, + }); + + // /proc/net/sockstat has lines like: + // TCP: inuse 5 orphan 0 tw 0 alloc 6 mem 409 + const tcpLine = sockstat.split("\n").find((l) => l.startsWith("TCP:")) ?? ""; + const fields = tcpLine.trim().split(/\s+/); + const inUse = Number(fields[2]); // open sockets + const alloc = Number(fields[8]); // total sockets with buffers + const memPages = Number(fields[10]); // pages (4 kB each) + const memBytes = memPages * 4096; + + const wmemMaxBytes = Number(wmemMax.trim()); + const [low, pressure, high] = tcpMem + .trim() + .split(/\s+/) + .map((n) => Number(n) * 4096); // pages โ†’ bytes + + logger.debug("tcp-buffer-monitor", { + t: Date.now(), + host: os.hostname(), + sockets_in_use: inUse, + sockets_alloc: alloc, + tcp_mem_bytes: memBytes, + tcp_mem_high: high, + wmem_max: wmemMaxBytes, + }); + } catch (err) { + // Log and keep going; most errors are โ€œfile disappeared for a momentโ€ + console.error("tcp-buffer-monitor error", err); + } + } + + return setInterval(sampleOnce, intervalMs); +} diff --git a/apps/webapp/app/services/runsReplicationGlobal.server.ts b/apps/webapp/app/services/runsReplicationGlobal.server.ts new file mode 100644 index 0000000000..c65a819da2 --- /dev/null +++ b/apps/webapp/app/services/runsReplicationGlobal.server.ts @@ -0,0 +1,36 @@ +import { RunsReplicationService } from "./runsReplicationService.server"; + +const GLOBAL_RUNS_REPLICATION_KEY = Symbol.for("dev.trigger.ts.runs-replication"); +const GLOBAL_TCP_MONITOR_KEY = Symbol.for("dev.trigger.ts.tcp-monitor"); + +type RunsReplicationGlobal = { + [GLOBAL_RUNS_REPLICATION_KEY]?: RunsReplicationService; + [GLOBAL_TCP_MONITOR_KEY]?: NodeJS.Timeout; +}; + +const _globalThis = typeof globalThis === "object" ? globalThis : global; +const _global = _globalThis as RunsReplicationGlobal; + +export function getRunsReplicationGlobal(): RunsReplicationService | undefined { + return _global[GLOBAL_RUNS_REPLICATION_KEY]; +} + +export function setRunsReplicationGlobal(service: RunsReplicationService) { + _global[GLOBAL_RUNS_REPLICATION_KEY] = service; +} + +export function unregisterRunsReplicationGlobal() { + delete _global[GLOBAL_RUNS_REPLICATION_KEY]; +} + +export function getTcpMonitorGlobal(): NodeJS.Timeout | undefined { + return _global[GLOBAL_TCP_MONITOR_KEY]; +} + +export function setTcpMonitorGlobal(timeout: NodeJS.Timeout) { + _global[GLOBAL_TCP_MONITOR_KEY] = timeout; +} + +export function unregisterTcpMonitorGlobal() { + delete _global[GLOBAL_TCP_MONITOR_KEY]; +} diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index c51defe214..57c5754e2a 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -16,10 +16,12 @@ function initializeRunsReplicationInstance() { invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set"); if (!env.RUN_REPLICATION_CLICKHOUSE_URL) { - logger.info("๐Ÿ—ƒ๏ธ Runs replication service not enabled"); + console.log("๐Ÿ—ƒ๏ธ Runs replication service not enabled"); return; } + console.log("๐Ÿ—ƒ๏ธ Runs replication service enabled"); + const clickhouse = new ClickHouse({ url: env.RUN_REPLICATION_CLICKHOUSE_URL, name: "runs-replication", @@ -28,6 +30,10 @@ function initializeRunsReplicationInstance() { idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, }, logLevel: env.RUN_REPLICATION_LOG_LEVEL, + compression: { + request: true, + }, + maxOpenConnections: env.RUN_REPLICATION_MAX_OPEN_CONNECTIONS, }); const service = new RunsReplicationService({ @@ -62,10 +68,10 @@ function initializeRunsReplicationInstance() { service .start() .then(() => { - logger.info("๐Ÿ—ƒ๏ธ Runs replication service started"); + console.log("๐Ÿ—ƒ๏ธ Runs replication service started"); }) .catch((error) => { - logger.error("๐Ÿ—ƒ๏ธ Runs replication service failed to start", { + console.error("๐Ÿ—ƒ๏ธ Runs replication service failed to start", { error, }); }); diff --git a/apps/webapp/test/runsReplicationService.test.ts b/apps/webapp/test/runsReplicationService.test.ts index 4fa173563a..1494b8bd70 100644 --- a/apps/webapp/test/runsReplicationService.test.ts +++ b/apps/webapp/test/runsReplicationService.test.ts @@ -18,6 +18,9 @@ describe("RunsReplicationService", () => { const clickhouse = new ClickHouse({ url: clickhouseContainer.getConnectionUrl(), name: "runs-replication", + compression: { + request: true, + }, }); const { tracer, exporter } = createInMemoryTracing(); @@ -1605,8 +1608,8 @@ describe("RunsReplicationService", () => { }); }, 500); - // Wait for 4 minutes - await setTimeout(4 * 60 * 1000); + // Wait for 1 minute + await setTimeout(1 * 60 * 1000); // Stop the interval clearInterval(interval); @@ -1624,9 +1627,7 @@ describe("RunsReplicationService", () => { const [queryError, result] = await queryRuns({}); expect(queryError).toBeNull(); - // Check that there are between 200 and 480 runs in ClickHouse - expect(result?.length).toBeGreaterThanOrEqual(200); - expect(result?.length).toBeLessThanOrEqual(480); + expect(result?.length).toBeGreaterThanOrEqual(50); await runsReplicationService.stop(); }, diff --git a/internal-packages/clickhouse/src/client/client.ts b/internal-packages/clickhouse/src/client/client.ts index 96f04d547e..d63aabf262 100644 --- a/internal-packages/clickhouse/src/client/client.ts +++ b/internal-packages/clickhouse/src/client/client.ts @@ -1,6 +1,7 @@ import { type ClickHouseClient, ClickHouseError, + ClickHouseLogLevel, type ClickHouseSettings, createClient, } from "@clickhouse/client"; @@ -30,7 +31,12 @@ export type ClickhouseConfig = { httpAgent?: HttpAgent | HttpsAgent; clickhouseSettings?: ClickHouseSettings; logger?: Logger; + maxOpenConnections?: number; logLevel?: LogLevel; + compression?: { + request?: boolean; + response?: boolean; + }; }; export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { @@ -47,11 +53,16 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { url: config.url, keep_alive: config.keepAlive, http_agent: config.httpAgent, + compression: config.compression, + max_open_connections: config.maxOpenConnections, clickhouse_settings: { ...config.clickhouseSettings, output_format_json_quote_64bit_integers: 0, output_format_json_quote_64bit_floats: 0, }, + log: { + level: convertLogLevelToClickhouseLogLevel(config.logLevel), + }, }); this.tracer = config.tracer ?? trace.getTracer("@internal/clickhouse"); @@ -288,3 +299,22 @@ function recordClickhouseError(span: Span, error: Error) { recordSpanError(span, error); } } + +function convertLogLevelToClickhouseLogLevel(logLevel?: LogLevel) { + if (!logLevel) { + return ClickHouseLogLevel.INFO; + } + + switch (logLevel) { + case "debug": + return ClickHouseLogLevel.DEBUG; + case "info": + return ClickHouseLogLevel.INFO; + case "warn": + return ClickHouseLogLevel.WARN; + case "error": + return ClickHouseLogLevel.ERROR; + default: + return ClickHouseLogLevel.INFO; + } +} diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 51066ffed8..f4ea368ffd 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -18,6 +18,11 @@ export type ClickhouseCommonConfig = { clickhouseSettings?: ClickHouseSettings; logger?: Logger; logLevel?: LogLevel; + compression?: { + request?: boolean; + response?: boolean; + }; + maxOpenConnections?: number; }; export type ClickHouseConfig = @@ -59,6 +64,8 @@ export class ClickHouse { logLevel: config.logLevel, keepAlive: config.keepAlive, httpAgent: config.httpAgent, + maxOpenConnections: config.maxOpenConnections, + compression: config.compression, }); this.reader = client; this.writer = client; @@ -73,6 +80,8 @@ export class ClickHouse { logLevel: config.logLevel, keepAlive: config.keepAlive, httpAgent: config.httpAgent, + maxOpenConnections: config.maxOpenConnections, + compression: config.compression, }); this.writer = new ClickhouseClient({ name: config.writerName ?? "clickhouse-writer", @@ -82,6 +91,8 @@ export class ClickHouse { logLevel: config.logLevel, keepAlive: config.keepAlive, httpAgent: config.httpAgent, + maxOpenConnections: config.maxOpenConnections, + compression: config.compression, }); this._splitClients = true; diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index d49f753358..0dfbe1c026 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -673,7 +673,6 @@ export class LogicalReplicationClient { publicationName: this.options.publicationName, lockTimeoutMs: this.leaderLockTimeoutMs, lockExtendIntervalMs: this.leaderLockExtendIntervalMs, - lock: this.leaderLock, }); } catch (err) { this.logger.error("Failed to extend leader lock", { @@ -683,7 +682,6 @@ export class LogicalReplicationClient { error: err, lockTimeoutMs: this.leaderLockTimeoutMs, lockExtendIntervalMs: this.leaderLockExtendIntervalMs, - lock: this.leaderLock, }); // Optionally emit an error or handle loss of leadership this.events.emit("error", err instanceof Error ? err : new Error(String(err)));