Skip to content

Commit

Permalink
fix(crawl-status): consider concurrency limited jobs as prioritized (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mogery authored Feb 16, 2025
1 parent 7ac2b99 commit fd8b389
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 33 deletions.
14 changes: 1 addition & 13 deletions apps/api/src/controllers/v1/crawl-errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,15 @@ import { Response } from "express";
import {
CrawlErrorsResponse,
CrawlStatusParams,
CrawlStatusResponse,
ErrorResponse,
RequestWithAuth,
} from "./types";
import {
getCrawl,
getCrawlExpiry,
getCrawlJobs,
getDoneJobsOrdered,
getDoneJobsOrderedLength,
getThrottledJobs,
isCrawlFinished,
} from "../../lib/crawl-redis";
import { getScrapeQueue, redisConnection } from "../../services/queue-service";
import {
supabaseGetJobById,
supabaseGetJobsById,
} from "../../lib/supabase-jobs";
import { configDotenv } from "dotenv";
import { Job, JobState } from "bullmq";
import { logger } from "../../lib/logger";
import { Job } from "bullmq";
configDotenv();

export async function getJob(id: string) {
Expand Down
14 changes: 7 additions & 7 deletions apps/api/src/controllers/v1/crawl-status-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import {
getCrawlJobs,
getDoneJobsOrdered,
getDoneJobsOrderedLength,
getThrottledJobs,
isCrawlFinished,
isCrawlFinishedLocked,
} from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { getJob, getJobs } from "./crawl-status";
import * as Sentry from "@sentry/node";
import { Job, JobState } from "bullmq";
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";

type ErrorMessage = {
type: "error";
Expand Down Expand Up @@ -127,16 +127,16 @@ async function crawlStatusWS(
async (x) => [x, await getScrapeQueue().getJobState(x)] as const,
),
);
const throttledJobs = new Set(...(await getThrottledJobs(req.auth.team_id)));

const throttledJobsSet = new Set(throttledJobs);

const throttledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);

const validJobStatuses: [string, JobState | "unknown"][] = [];
const validJobIDs: string[] = [];

for (const [id, status] of jobStatuses) {
if (
!throttledJobsSet.has(id) &&
if (throttledJobsSet.has(id)) {
validJobStatuses.push([id, "prioritized"]);
validJobIDs.push(id);
} else if (
status !== "failed" &&
status !== "unknown"
) {
Expand Down
11 changes: 6 additions & 5 deletions apps/api/src/controllers/v1/crawl-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
getCrawlJobs,
getDoneJobsOrdered,
getDoneJobsOrderedLength,
getThrottledJobs,
isCrawlKickoffFinished,
} from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
Expand All @@ -23,6 +22,7 @@ import { configDotenv } from "dotenv";
import type { Job, JobState } from "bullmq";
import { logger } from "../../lib/logger";
import { supabase_service } from "../../services/supabase";
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
configDotenv();

export type PseudoJob<T> = {
Expand Down Expand Up @@ -137,16 +137,17 @@ export async function crawlStatusController(
async (x) => [x, await getScrapeQueue().getJobState(x)] as const,
),
);
const throttledJobs = new Set(...(await getThrottledJobs(req.auth.team_id)));

const throttledJobsSet = new Set(throttledJobs);
const throttledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);

const validJobStatuses: [string, JobState | "unknown"][] = [];
const validJobIDs: string[] = [];

for (const [id, status] of jobStatuses) {
if (
!throttledJobsSet.has(id) &&
if (throttledJobsSet.has(id)) {
validJobStatuses.push([id, "prioritized"]);
validJobIDs.push(id);
} else if (
status !== "failed" &&
status !== "unknown"
) {
Expand Down
5 changes: 5 additions & 0 deletions apps/api/src/lib/concurrency-limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ export async function pushConcurrencyLimitedJob(
);
}

export async function getConcurrencyLimitedJobs(
team_id: string,
) {
return new Set((await redisConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id));
}

export async function getConcurrencyQueueJobsCount(team_id: string): Promise<number> {
const count = await redisConnection.zcard(constructQueueKey(team_id));
Expand Down
8 changes: 0 additions & 8 deletions apps/api/src/lib/crawl-redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,6 @@ export async function getCrawlJobCount(id: string): Promise<number> {
return await redisConnection.scard("crawl:" + id + ":jobs");
}

export async function getThrottledJobs(teamId: string): Promise<string[]> {
return await redisConnection.zrangebyscore(
"concurrency-limiter:" + teamId + ":throttled",
Date.now(),
Infinity,
);
}

export function normalizeURL(url: string, sc: StoredCrawl): string {
const urlO = new URL(url);
if (!sc.crawlerOptions || sc.crawlerOptions.ignoreQueryParameters) {
Expand Down

0 comments on commit fd8b389

Please sign in to comment.