diff --git a/package.json b/package.json index 1a180c0fc..8e15f02ee 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "build-gulp": "gulp build", "export": "pnpm build-ts && node ./built/migration/migrateAyuskeyNext.js", "worker": "node ./built/migration/worker.js", + "bnote-worker": "node ./built/migration/bNoteWorker.js", "note-worker": "node ./built/migration/noteWorker.js", "user-worker": "node ./built/migration/userAfterHookWorker.js", "bull-board": "node ./built/migration/bullDashboard.js", diff --git a/src/migration/bNoteWorker.ts b/src/migration/bNoteWorker.ts new file mode 100644 index 000000000..26c8cc3be --- /dev/null +++ b/src/migration/bNoteWorker.ts @@ -0,0 +1,31 @@ +import { logger } from "./common"; +import { noteQueue } from "./jobqueue"; + + +const cluster = require('cluster'); + +async function main() { + const numWorkers = 64; + + if (cluster.isPrimary) { + for (let i = 0; i < numWorkers; i++) { + cluster.fork(); + } + cluster.on("exit", (worker: any, code :any, signal:any) => { + console.log(`worker ${worker.process.pid} died`); + }); + } else { + + + noteQueue.process(__dirname + "/processor/note.processor.js"); + noteQueue.on("completed", (job) => { + logger.succ(`Note: ${job.data.note.id} の処理が完了しました`); + }); + } +} + + +main().catch((e) => { + console.warn(e); + process.exit(1); +}); diff --git a/src/migration/jobqueue.ts b/src/migration/jobqueue.ts index ac54de51f..ec7135d0a 100644 --- a/src/migration/jobqueue.ts +++ b/src/migration/jobqueue.ts @@ -10,7 +10,7 @@ const queueRedisConf: Queue.QueueOptions = { prefix: "ayuskey_next", limiter: { max: 1000, - duration: 1000, + duration: 500, }, }; export const hashtagQueue = new Queue("hashtag", queueRedisConf); diff --git a/src/migration/noteWorker.ts b/src/migration/noteWorker.ts index 8727f0add..cb54cc70b 100644 --- a/src/migration/noteWorker.ts +++ b/src/migration/noteWorker.ts @@ -1,15 +1,11 @@ -import { initDb } from "@/db/postgre"; import { logger } from "./common"; import { noteQueue } from "./jobqueue"; import noteProcessor from "./processor/note.processor"; -import { createConnection } from "typeorm"; -import config from "@/config"; -import { AyuskeyNextEntities } from "@/v13/models"; const cluster = require('cluster'); async function main() { - const numWorkers = 20; + const numWorkers = 32; if (cluster.isPrimary) { for (let i = 0; i < numWorkers; i++) { @@ -19,17 +15,7 @@ async function main() { console.log(`worker ${worker.process.pid} died`); }); } else { - await initDb(); - await createConnection({ - name: "nextDb", - type: "postgres", - host: config.db.nextDb.host, - port: config.db.nextDb.port, - username: config.db.nextDb.user, - password: config.db.nextDb.pass, - database: config.db.nextDb.db, - entities: AyuskeyNextEntities, - }); + noteQueue.process(noteProcessor); noteQueue.on("completed", (job) => { diff --git a/src/migration/processor/note.processor.ts b/src/migration/processor/note.processor.ts index b138e09a9..e603d2415 100644 --- a/src/migration/processor/note.processor.ts +++ b/src/migration/processor/note.processor.ts @@ -1,9 +1,26 @@ import { Job } from "bull"; import { migrateNote } from "../note"; import { Note } from "@/models/entities/note"; +import { initDb } from "@/db/postgre"; +import { createConnection } from "typeorm"; +import config from "@/config"; +import { AyuskeyNextEntities } from "@/v13/models"; export default async (job: Job<{noteId: string, note: Note}>, done: any) => { + await initDb(); + const con = await createConnection({ + name: "nextDb", + type: "postgres", + host: config.db.nextDb.host, + port: config.db.nextDb.port, + username: config.db.nextDb.user, + password: config.db.nextDb.pass, + database: config.db.nextDb.db, + entities: AyuskeyNextEntities, + }); job.progress(job.data.noteId); await migrateNote(job.data.noteId, job.data.note); done(); -} \ No newline at end of file + await con.close(); + +}