Skip to content

Commit

Permalink
fix: Minor optimizations to decrease the job latencies (#200)
Browse files Browse the repository at this point in the history
- Change job polling frequency based on the activity level of the
cluster
- Change the results polling frequency to have a low base and a backoff
  • Loading branch information
nadeesha authored Mar 29, 2024
1 parent 504fd0e commit 743cb5f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 34 deletions.
11 changes: 11 additions & 0 deletions control-plane/src/modules/cluster-activity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import Cache from "node-cache";

const cache = new Cache({ stdTTL: 60, maxKeys: 1000 });

export const setClusterActivityToHigh = (clusterId: string) => {
cache.set(clusterId, true);
};

export const isClusterActivityHigh = (clusterId: string) => {
return cache.get(clusterId) === true;
};
3 changes: 3 additions & 0 deletions control-plane/src/modules/jobs/create-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as clusters from "../cluster";
import * as data from "../data";
import * as events from "../observability/events";
import { jobDurations } from "./job-metrics";
import * as clusterActivity from "../cluster-activity";

type CreateJobParams = {
service: string;
Expand Down Expand Up @@ -40,6 +41,8 @@ export const createJob = async (params: {
}) => {
const end = jobDurations.startTimer({ operation: "createJob" });

clusterActivity.setClusterActivityToHigh(params.owner.clusterId);

const cluster = await clusters.operationalCluster(params.owner.clusterId);

const callConfigParams = {
Expand Down
44 changes: 36 additions & 8 deletions control-plane/src/modules/jobs/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
} from "../service-definitions";
import { jobDurations } from "./job-metrics";
import { selfHealJobs } from "./persist-result";
import * as clusterActivity from "../cluster-activity";

export { createJob } from "./create-job";
export { persistJobResult } from "./persist-result";
Expand All @@ -20,6 +21,7 @@ export const nextJobs = async ({
deploymentId,
ip,
definition,
ttl = 1_000,
}: {
service: string;
owner: { clusterId: string };
Expand All @@ -28,11 +30,30 @@ export const nextJobs = async ({
deploymentId?: string;
ip: string;
definition?: ServiceDefinition;
ttl?: number;
}) => {
const start = Date.now();
const end = jobDurations.startTimer({ operation: "nextJobs" });

const results = await data.db.execute(
sql`UPDATE
type Result = {
id: string;
target_fn: string;
target_args: string;
};

let results: { rowCount: number | null; rows: Result[] } = {
rowCount: null,
rows: [],
};

await Promise.all([
storeMachineInfo(machineId, ip, owner, deploymentId),
definition ? storeServiceDefinition(service, definition, owner) : undefined,
]);

do {
results = await data.db.execute<Result>(
sql`UPDATE
jobs SET status = 'running',
remaining_attempts = remaining_attempts - 1,
last_retrieved_at=${new Date().toISOString()},
Expand All @@ -43,12 +64,14 @@ export const nextJobs = async ({
AND service = ${service}
LIMIT ${limit})
RETURNING id, target_fn, target_args`,
);
);

await Promise.all([
storeMachineInfo(machineId, ip, owner, deploymentId),
definition ? storeServiceDefinition(service, definition, owner) : undefined,
]);
const timeout = clusterActivity.isClusterActivityHigh(owner.clusterId)
? 100
: 1000;

await new Promise((resolve) => setTimeout(resolve, timeout));
} while (!results.rowCount && Date.now() - start < ttl);

if (results.rowCount === 0) {
end();
Expand Down Expand Up @@ -149,7 +172,12 @@ export const getJobStatuses = async ({
resultType: InferModel<typeof data.jobs>["result_type"];
}>;

let attempt = 0;

do {
attempt++;
const backoff = Math.min(100 * attempt, 1000);

jobs = await data.db
.select({
id: data.jobs.id,
Expand All @@ -171,7 +199,7 @@ export const getJobStatuses = async ({
);

if (!hasResolved) {
await new Promise((resolve) => setTimeout(resolve, 500));
await new Promise((resolve) => setTimeout(resolve, backoff));
}
} while (!hasResolved && Date.now() - start < longPollTimeout);

Expand Down
39 changes: 13 additions & 26 deletions control-plane/src/modules/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,19 @@ export const router = s.router(contract, {

const limit = request.body.limit ?? 1;

let collection: {
id: string;
targetFn: string;
targetArgs: string;
}[];

const start = Date.now();

do {
collection = await jobs.nextJobs({
owner,
limit,
machineId: request.headers["x-machine-id"],
deploymentId: request.headers["x-deployment-id"],
ip: request.request.ip,
service: request.body.service,
definition: {
name: request.body.service,
functions: request.body.functions,
},
});

if (collection.length === 0) {
await new Promise((resolve) => setTimeout(resolve, 500));
}
} while (collection.length === 0 && Date.now() - start < request.body.ttl);
const collection = await jobs.nextJobs({
owner,
limit,
machineId: request.headers["x-machine-id"],
deploymentId: request.headers["x-deployment-id"],
ip: request.request.ip,
service: request.body.service,
definition: {
name: request.body.service,
functions: request.body.functions,
},
ttl: request.body.ttl,
});

return {
status: 200,
Expand Down

0 comments on commit 743cb5f

Please sign in to comment.