diff --git a/control-plane/src/modules/cluster-activity.ts b/control-plane/src/modules/cluster-activity.ts new file mode 100644 index 00000000..b6f8067e --- /dev/null +++ b/control-plane/src/modules/cluster-activity.ts @@ -0,0 +1,11 @@ +import Cache from "node-cache"; + +const cache = new Cache({ stdTTL: 60, maxKeys: 1000 }); + +export const setClusterActivityToHigh = (clusterId: string) => { + cache.set(clusterId, true); +}; + +export const isClusterActivityHigh = (clusterId: string) => { + return cache.get(clusterId) === true; +}; diff --git a/control-plane/src/modules/jobs/create-job.ts b/control-plane/src/modules/jobs/create-job.ts index 9fff283a..92d5b151 100644 --- a/control-plane/src/modules/jobs/create-job.ts +++ b/control-plane/src/modules/jobs/create-job.ts @@ -4,6 +4,7 @@ import * as clusters from "../cluster"; import * as data from "../data"; import * as events from "../observability/events"; import { jobDurations } from "./job-metrics"; +import * as clusterActivity from "../cluster-activity"; type CreateJobParams = { service: string; @@ -40,6 +41,8 @@ export const createJob = async (params: { }) => { const end = jobDurations.startTimer({ operation: "createJob" }); + clusterActivity.setClusterActivityToHigh(params.owner.clusterId); + const cluster = await clusters.operationalCluster(params.owner.clusterId); const callConfigParams = { diff --git a/control-plane/src/modules/jobs/jobs.ts b/control-plane/src/modules/jobs/jobs.ts index c8a26b21..63f91624 100644 --- a/control-plane/src/modules/jobs/jobs.ts +++ b/control-plane/src/modules/jobs/jobs.ts @@ -8,6 +8,7 @@ import { } from "../service-definitions"; import { jobDurations } from "./job-metrics"; import { selfHealJobs } from "./persist-result"; +import * as clusterActivity from "../cluster-activity"; export { createJob } from "./create-job"; export { persistJobResult } from "./persist-result"; @@ -20,6 +21,7 @@ export const nextJobs = async ({ deploymentId, ip, definition, + ttl = 1_000, }: { service: string; owner: { clusterId: string }; @@ -28,11 +30,30 @@ export const nextJobs = async ({ deploymentId?: string; ip: string; definition?: ServiceDefinition; + ttl?: number; }) => { + const start = Date.now(); const end = jobDurations.startTimer({ operation: "nextJobs" }); - const results = await data.db.execute( - sql`UPDATE + type Result = { + id: string; + target_fn: string; + target_args: string; + }; + + let results: { rowCount: number | null; rows: Result[] } = { + rowCount: null, + rows: [], + }; + + await Promise.all([ + storeMachineInfo(machineId, ip, owner, deploymentId), + definition ? storeServiceDefinition(service, definition, owner) : undefined, + ]); + + do { + results = await data.db.execute( + sql`UPDATE jobs SET status = 'running', remaining_attempts = remaining_attempts - 1, last_retrieved_at=${new Date().toISOString()}, @@ -43,12 +64,14 @@ export const nextJobs = async ({ AND service = ${service} LIMIT ${limit}) RETURNING id, target_fn, target_args`, - ); + ); - await Promise.all([ - storeMachineInfo(machineId, ip, owner, deploymentId), - definition ? storeServiceDefinition(service, definition, owner) : undefined, - ]); + const timeout = clusterActivity.isClusterActivityHigh(owner.clusterId) + ? 100 + : 1000; + + await new Promise((resolve) => setTimeout(resolve, timeout)); + } while (!results.rowCount && Date.now() - start < ttl); if (results.rowCount === 0) { end(); @@ -149,7 +172,12 @@ export const getJobStatuses = async ({ resultType: InferModel["result_type"]; }>; + let attempt = 0; + do { + attempt++; + const backoff = Math.min(100 * attempt, 1000); + jobs = await data.db .select({ id: data.jobs.id, @@ -171,7 +199,7 @@ export const getJobStatuses = async ({ ); if (!hasResolved) { - await new Promise((resolve) => setTimeout(resolve, 500)); + await new Promise((resolve) => setTimeout(resolve, backoff)); } } while (!hasResolved && Date.now() - start < longPollTimeout); diff --git a/control-plane/src/modules/router.ts b/control-plane/src/modules/router.ts index 9c93493b..823d3174 100644 --- a/control-plane/src/modules/router.ts +++ b/control-plane/src/modules/router.ts @@ -51,32 +51,19 @@ export const router = s.router(contract, { const limit = request.body.limit ?? 1; - let collection: { - id: string; - targetFn: string; - targetArgs: string; - }[]; - - const start = Date.now(); - - do { - collection = await jobs.nextJobs({ - owner, - limit, - machineId: request.headers["x-machine-id"], - deploymentId: request.headers["x-deployment-id"], - ip: request.request.ip, - service: request.body.service, - definition: { - name: request.body.service, - functions: request.body.functions, - }, - }); - - if (collection.length === 0) { - await new Promise((resolve) => setTimeout(resolve, 500)); - } - } while (collection.length === 0 && Date.now() - start < request.body.ttl); + const collection = await jobs.nextJobs({ + owner, + limit, + machineId: request.headers["x-machine-id"], + deploymentId: request.headers["x-deployment-id"], + ip: request.request.ip, + service: request.body.service, + definition: { + name: request.body.service, + functions: request.body.functions, + }, + ttl: request.body.ttl, + }); return { status: 200,