Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat/extract) Move extract to a queue system #1044

Merged
merged 12 commits into from
Jan 7, 2025
40 changes: 40 additions & 0 deletions apps/api/src/controllers/v1/extract-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Response } from "express";
import { supabaseGetJobsById } from "../../lib/supabase-jobs";
import { RequestWithAuth } from "./types";
import { getExtract, getExtractExpiry } from "../../lib/extract/extract-redis";

export async function extractStatusController(
req: RequestWithAuth<{ jobId: string }, any, any>,
res: Response,
) {
const extract = await getExtract(req.params.jobId);

if (!extract) {
return res.status(404).json({
success: false,
error: "Extract job not found",
});
}

let data: any[] = [];

if (extract.status === "completed") {
const jobData = await supabaseGetJobsById([req.params.jobId]);
if (!jobData || jobData.length === 0) {
return res.status(404).json({
success: false,
error: "Job not found",
});
}

data = jobData[0].docs;
}

return res.status(200).json({
success: true,
data: data,
status: extract.status,
error: extract?.error ?? undefined,
expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(),
});
}
84 changes: 73 additions & 11 deletions apps/api/src/controllers/v1/extract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,31 @@ import {
extractRequestSchema,
ExtractResponse,
} from "./types";
import { getExtractQueue } from "../../services/queue-service";
import * as Sentry from "@sentry/node";
import { saveExtract } from "../../lib/extract/extract-redis";
import { getTeamIdSyncB } from "../../lib/extract/team-id-sync";
import { performExtraction } from "../../lib/extract/extraction-service";

export async function oldExtract(req: RequestWithAuth<{}, ExtractResponse, ExtractRequest>, res: Response<ExtractResponse>, extractId: string){
// Means that are in the non-queue system
// TODO: Remove this once all teams have transitioned to the new system
try {
const result = await performExtraction(extractId, {
request: req.body,
teamId: req.auth.team_id,
plan: req.auth.plan ?? "free",
subId: req.acuc?.sub_id ?? undefined,
});

return res.status(200).json(result);
} catch (error) {
return res.status(500).json({
success: false,
error: "Internal server error",
});
}
}
/**
* Extracts data from the provided URLs based on the request parameters.
* Currently in beta.
Expand All @@ -21,20 +44,59 @@ export async function extractController(
const selfHosted = process.env.USE_DB_AUTHENTICATION !== "true";
req.body = extractRequestSchema.parse(req.body);

if (!req.auth.plan) {
return res.status(400).json({
success: false,
error: "No plan specified",
urlTrace: [],
});
}

const result = await performExtraction({
const extractId = crypto.randomUUID();
const jobData = {
request: req.body,
teamId: req.auth.team_id,
plan: req.auth.plan,
subId: req.acuc?.sub_id || undefined,
subId: req.acuc?.sub_id,
extractId,
};

if(await getTeamIdSyncB(req.auth.team_id) && req.body.origin !== "api-sdk") {
return await oldExtract(req, res, extractId);
}

await saveExtract(extractId, {
id: extractId,
team_id: req.auth.team_id,
plan: req.auth.plan,
createdAt: Date.now(),
status: "processing",
});

return res.status(result.success ? 200 : 400).json(result);
if (Sentry.isInitialized()) {
const size = JSON.stringify(jobData).length;
await Sentry.startSpan(
{
name: "Add extract job",
op: "queue.publish",
attributes: {
"messaging.message.id": extractId,
"messaging.destination.name": getExtractQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await getExtractQueue().add(extractId, {
...jobData,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
});
},
);
} else {
await getExtractQueue().add(extractId, jobData, {
jobId: extractId,
});
}

return res.status(200).json({
success: true,
id: extractId,
urlTrace: [],
});
}
22 changes: 20 additions & 2 deletions apps/api/src/controllers/v1/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,31 @@ export const scrapeOptions = z

export type ScrapeOptions = z.infer<typeof scrapeOptions>;

import Ajv from "ajv";

const ajv = new Ajv();

export const extractV1Options = z
.object({
urls: url
.array()
.max(10, "Maximum of 10 URLs allowed per request while in beta."),
prompt: z.string().optional(),
systemPrompt: z.string().optional(),
schema: z.any().optional(),
schema: z
.any()
.optional()
.refine((val) => {
if (!val) return true; // Allow undefined schema
try {
const validate = ajv.compile(val);
return typeof validate === "function";
} catch (e) {
return false;
}
}, {
message: "Invalid JSON schema.",
}),
limit: z.number().int().positive().finite().safe().optional(),
ignoreSitemap: z.boolean().default(false),
includeSubdomains: z.boolean().default(true),
Expand Down Expand Up @@ -478,10 +495,11 @@ export interface URLTrace {

export interface ExtractResponse {
success: boolean;
error?: string;
data?: any;
scrape_id?: string;
id?: string;
warning?: string;
error?: string;
urlTrace?: URLTrace[];
}

Expand Down
4 changes: 2 additions & 2 deletions apps/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node";
import express, { NextFunction, Request, Response } from "express";
import bodyParser from "body-parser";
import cors from "cors";
import { getScrapeQueue } from "./services/queue-service";
import { getExtractQueue, getScrapeQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0";
import os from "os";
import { logger } from "./lib/logger";
Expand Down Expand Up @@ -45,7 +45,7 @@ const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);

const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getScrapeQueue())],
queues: [new BullAdapter(getScrapeQueue()), new BullAdapter(getExtractQueue())],
serverAdapter: serverAdapter,
});

Expand Down
38 changes: 38 additions & 0 deletions apps/api/src/lib/extract/extract-redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { redisConnection } from "../../services/queue-service";
import { logger as _logger } from "../logger";

export type StoredExtract = {
id: string;
team_id: string;
plan?: string;
createdAt: number;
status: "processing" | "completed" | "failed" | "cancelled";
error?: any;
};

export async function saveExtract(id: string, extract: StoredExtract) {
_logger.debug("Saving extract " + id + " to Redis...");
await redisConnection.set("extract:" + id, JSON.stringify(extract));
await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX");
}

export async function getExtract(id: string): Promise<StoredExtract | null> {
const x = await redisConnection.get("extract:" + id);
return x ? JSON.parse(x) : null;
}

export async function updateExtract(id: string, extract: Partial<StoredExtract>) {
const current = await getExtract(id);
if (!current) return;
await redisConnection.set("extract:" + id, JSON.stringify({ ...current, ...extract }));
await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX");
}


export async function getExtractExpiry(id: string): Promise<Date> {
const d = new Date();
const ttl = await redisConnection.pttl("extract:" + id);
d.setMilliseconds(d.getMilliseconds() + ttl);
d.setMilliseconds(0);
return d;
}
22 changes: 15 additions & 7 deletions apps/api/src/lib/extract/extraction-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { billTeam } from "../../services/billing/credit_billing";
import { logJob } from "../../services/logging/log_job";
import { _addScrapeJobToBullMQ } from "../../services/queue-jobs";
import { saveCrawl, StoredCrawl } from "../crawl-redis";
import { updateExtract } from "./extract-redis";

interface ExtractServiceOptions {
request: ExtractRequest;
Expand All @@ -20,7 +21,7 @@ interface ExtractServiceOptions {
interface ExtractResult {
success: boolean;
data?: any;
scrapeId: string;
extractId: string;
warning?: string;
urlTrace?: URLTrace[];
error?: string;
Expand All @@ -38,9 +39,8 @@ function getRootDomain(url: string): string {
}
}

export async function performExtraction(options: ExtractServiceOptions): Promise<ExtractResult> {
export async function performExtraction(extractId: string, options: ExtractServiceOptions): Promise<ExtractResult> {
const { request, teamId, plan, subId } = options;
const scrapeId = crypto.randomUUID();
const urlTraces: URLTrace[] = [];
let docs: Document[] = [];

Expand All @@ -65,7 +65,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
return {
success: false,
error: "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.",
scrapeId,
extractId,
urlTrace: urlTraces,
};
}
Expand All @@ -89,7 +89,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
return {
success: false,
error: error.message,
scrapeId,
extractId,
urlTrace: urlTraces,
};
}
Expand Down Expand Up @@ -191,7 +191,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise

// Log job
logJob({
job_id: scrapeId,
job_id: extractId,
success: true,
message: "Extract completed",
num_docs: 1,
Expand All @@ -203,12 +203,20 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
scrapeOptions: request,
origin: request.origin ?? "api",
num_tokens: completions.numTokens ?? 0,
}).then(() => {
updateExtract(extractId, {
status: "completed",
}).catch((error) => {
logger.error(`Failed to update extract ${extractId} status to completed: ${error}`);
});
});



return {
success: true,
data: completions.extract ?? {},
scrapeId,
extractId,
warning: completions.warning,
urlTrace: request.urlTrace ? urlTraces : undefined,
};
Expand Down
19 changes: 19 additions & 0 deletions apps/api/src/lib/extract/team-id-sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { supabase_service } from "../../services/supabase";
import { logger } from "../logger";

export async function getTeamIdSyncB(teamId: string) {
try {
const { data, error } = await supabase_service
.from("eb-sync")
.select("team_id")
.eq("team_id", teamId)
.limit(1);
if (error) {
throw new Error("Error getting team id (sync b)");
}
return data[0] ?? null;
} catch (error) {
logger.error("Error getting team id (sync b)", error);
return null;
}
}
14 changes: 7 additions & 7 deletions apps/api/src/routes/v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@ import { scrapeStatusController } from "../controllers/v1/scrape-status";
import { concurrencyCheckController } from "../controllers/v1/concurrency-check";
import { batchScrapeController } from "../controllers/v1/batch-scrape";
import { extractController } from "../controllers/v1/extract";
// import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview";
// import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status";
// import { searchController } from "../../src/controllers/v1/search";
// import { crawlCancelController } from "../../src/controllers/v1/crawl-cancel";
// import { keyAuthController } from "../../src/controllers/v1/keyAuth";
// import { livenessController } from "../controllers/v1/liveness";
// import { readinessController } from "../controllers/v1/readiness";
import { extractStatusController } from "../controllers/v1/extract-status";
import { creditUsageController } from "../controllers/v1/credit-usage";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
import { searchController } from "../controllers/v1/search";
Expand Down Expand Up @@ -215,6 +209,12 @@ v1Router.post(
wrap(extractController),
);

v1Router.get(
"/extract/:jobId",
authMiddleware(RateLimiterMode.CrawlStatus),
wrap(extractStatusController),
);

// v1Router.post("/crawlWebsitePreview", crawlPreviewController);

v1Router.delete(
Expand Down
Loading
Loading