Skip to content

Commit

Permalink
Allow downloading more content from a webpage and index it hoarder-ap…
Browse files Browse the repository at this point in the history
…p#215

Rebased onto master
replaced redis queue with new db queue
fixed some async/await issues
  • Loading branch information
kamtschatka committed Jul 21, 2024
1 parent 62b9350 commit 1a5f1bd
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 65 deletions.
16 changes: 3 additions & 13 deletions apps/workers/crawlerWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import * as path from "node:path";
import type { Browser } from "puppeteer";
import { Readability } from "@mozilla/readability";
import { Mutex } from "async-mutex";
import Database from "better-sqlite3";
import DOMPurify from "dompurify";
import { eq } from "drizzle-orm";
import { execa } from "execa";
Expand All @@ -24,18 +23,9 @@ import AdblockerPlugin from "puppeteer-extra-plugin-adblocker";
import StealthPlugin from "puppeteer-extra-plugin-stealth";
import { withTimeout } from "utils";

import type { ZCrawlLinkRequest } from "@hoarder/shared/queues";
import { db, HoarderDBTransaction } from "@hoarder/db";
import {
assets,
AssetTypes,
bookmarkAssets,
bookmarkLinks,
bookmarks,
} from "@hoarder/db/schema";
import { DequeuedJob, Runner } from "@hoarder/queue";
import { db } from "@hoarder/db";
import { bookmarkAssets, bookmarkLinks, bookmarks } from "@hoarder/db/schema";
import { DequeuedJob, Runner } from "@hoarder/queue";
import {
ASSET_TYPES,
IMAGE_ASSET_TYPES,
Expand Down Expand Up @@ -634,9 +624,9 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
}

// Update the search index
triggerSearchReindex(bookmarkId);
await triggerSearchReindex(bookmarkId);
// Trigger a potential download of a video from the URL
triggerVideoWorker(bookmarkId, url);
await triggerVideoWorker(bookmarkId, url);

// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
Expand Down
2 changes: 1 addition & 1 deletion apps/workers/openaiWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,5 +397,5 @@ async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
await connectTags(bookmarkId, tags, bookmark.userId);

// Update the search index
triggerSearchReindex(bookmarkId);
await triggerSearchReindex(bookmarkId);
}
59 changes: 27 additions & 32 deletions apps/workers/videoWorker.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import fs from "fs";
import path from "path";
import type { Job } from "bullmq";
import { Worker } from "bullmq";
import YTDlpWrap from "yt-dlp-wrap";

import { db } from "@hoarder/db";
import { DequeuedJob, Runner } from "@hoarder/queue";
import { newAssetId, saveAssetFromFile } from "@hoarder/shared/assetdb";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import {
queueConnectionDetails,
VideoWorkerQueue,
ZVideoRequest,
} from "@hoarder/shared/queues";
import { VideoWorkerQueue, ZVideoRequest } from "@hoarder/shared/queues";
import { DBAssetTypes } from "@hoarder/shared/utils/bookmarkUtils";

import { withTimeout } from "./utils";
Expand All @@ -36,34 +31,34 @@ export class VideoWorker {
return;
}

const worker = new Worker<ZVideoRequest, void>(
VideoWorkerQueue.name,
withTimeout(
runCrawler,
/* timeoutSec */ serverConfig.crawler.downloadVideoTimeout,
),
return new Runner<ZVideoRequest>(
VideoWorkerQueue,
{
run: withTimeout(
runCrawler,
/* timeoutSec */ serverConfig.crawler.downloadVideoTimeout,
),
onComplete: async (job) => {
const jobId = job?.id ?? "unknown";
logger.info(
`[VideoCrawler][${jobId}] Video Download Completed successfully`,
);
return Promise.resolve();
},
onError: async (job) => {
const jobId = job?.id ?? "unknown";
logger.error(
`[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`,
);
return Promise.resolve();
},
},
{
pollIntervalMs: 1000,
timeoutSecs: serverConfig.crawler.downloadVideoTimeout,
concurrency: 1,
connection: queueConnectionDetails,
autorun: false,
},
);

worker.on("completed", (job) => {
const jobId = job?.id ?? "unknown";
logger.info(
`[VideoCrawler][${jobId}] Video Download Completed successfully`,
);
});

worker.on("failed", (job, error) => {
const jobId = job?.id ?? "unknown";
logger.error(
`[VideoCrawler][${jobId}] Video Download job failed: ${error}`,
);
});

return worker;
}
}

Expand Down Expand Up @@ -114,7 +109,7 @@ function prepareYtDlpArguments(url: string, assetPath: string) {
return ytDlpArguments;
}

async function runCrawler(job: Job<ZVideoRequest, void>) {
async function runCrawler(job: DequeuedJob<ZVideoRequest>) {
const jobId = job.id ?? "unknown";
const { bookmarkId } = job.data;

Expand Down
27 changes: 13 additions & 14 deletions packages/shared/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>(
},
);

export function triggerSearchReindex(bookmarkId: string) {
SearchIndexingQueue.enqueue({
export async function triggerSearchReindex(bookmarkId: string) {
await SearchIndexingQueue.enqueue({
bookmarkId,
type: "index",
});
}

export function triggerSearchDeletion(bookmarkId: string) {
SearchIndexingQueue.enqueue({
export async function triggerSearchDeletion(bookmarkId: string) {
await SearchIndexingQueue.enqueue({
bookmarkId: bookmarkId,
type: "delete",
});
Expand All @@ -84,19 +84,18 @@ export const zvideoRequestSchema = z.object({
});
export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>;

export const VideoWorkerQueue = new Queue<ZVideoRequest, void>("video_queue", {
connection: queueConnectionDetails,
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 500,
export const VideoWorkerQueue = new SqliteQueue<ZVideoRequest>(
"video_queue",
queueDB,
{
defaultJobArgs: {
numRetries: 5,
},
},
});
);

export function triggerVideoWorker(bookmarkId: string, url: string) {
VideoWorkerQueue.add("video_queue", {
export async function triggerVideoWorker(bookmarkId: string, url: string) {
await VideoWorkerQueue.enqueue({
bookmarkId,
url,
});
Expand Down
10 changes: 5 additions & 5 deletions packages/trpc/routers/bookmarks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ export const bookmarksAppRouter = router({
break;
}
}
triggerSearchReindex(bookmark.id);
await triggerSearchReindex(bookmark.id);
return bookmark;
}),

Expand Down Expand Up @@ -334,7 +334,7 @@ export const bookmarksAppRouter = router({
message: "Bookmark not found",
});
}
triggerSearchReindex(input.bookmarkId);
await triggerSearchReindex(input.bookmarkId);
return res[0];
}),

Expand All @@ -360,7 +360,7 @@ export const bookmarksAppRouter = router({
message: "Bookmark not found",
});
}
triggerSearchReindex(input.bookmarkId);
await triggerSearchReindex(input.bookmarkId);
}),

deleteBookmark: authedProcedure
Expand All @@ -386,7 +386,7 @@ export const bookmarksAppRouter = router({
eq(bookmarks.id, input.bookmarkId),
),
);
triggerSearchDeletion(input.bookmarkId);
await triggerSearchDeletion(input.bookmarkId);
if (deleted.changes > 0 && bookmark) {
await cleanupAssetForBookmark({
asset: bookmark.asset,
Expand Down Expand Up @@ -728,7 +728,7 @@ export const bookmarksAppRouter = router({
})),
)
.onConflictDoNothing();
triggerSearchReindex(input.bookmarkId);
await triggerSearchReindex(input.bookmarkId);
return {
bookmarkId: input.bookmarkId,
attached: allIds,
Expand Down

0 comments on commit 1a5f1bd

Please sign in to comment.