From 9a19f8225e30a14f791bf4a9e90a24a8f4fdbaab Mon Sep 17 00:00:00 2001 From: kgtkr Date: Tue, 6 Feb 2024 02:00:24 +0900 Subject: [PATCH] =?UTF-8?q?worker=E3=82=92=E6=95=B4=E7=90=86=20cron/worker?= =?UTF-8?q?=E3=82=92=E5=88=86=E9=9B=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/adapters/NotificationQueueImpl.ts | 12 +-- packages/server/src/cron.ts | 46 +++++++++++ packages/server/src/jobs/CheckDeadTopicJob.ts | 7 ++ packages/server/src/jobs/JobType.ts | 27 ++++++ .../server/src/jobs/SendPushNotification.ts | 15 ---- .../src/jobs/SendPushNotificationJob.ts | 17 ++++ packages/server/src/jobs/UserCountResetJob.ts | 16 ++++ packages/server/src/jobs/UserPointResetJob.ts | 7 ++ packages/server/src/server/server.ts | 6 +- packages/server/src/worker.ts | 82 +++++-------------- 10 files changed, 149 insertions(+), 86 deletions(-) create mode 100644 packages/server/src/cron.ts create mode 100644 packages/server/src/jobs/CheckDeadTopicJob.ts create mode 100644 packages/server/src/jobs/JobType.ts delete mode 100644 packages/server/src/jobs/SendPushNotification.ts create mode 100644 packages/server/src/jobs/SendPushNotificationJob.ts create mode 100644 packages/server/src/jobs/UserCountResetJob.ts create mode 100644 packages/server/src/jobs/UserPointResetJob.ts diff --git a/packages/server/src/adapters/NotificationQueueImpl.ts b/packages/server/src/adapters/NotificationQueueImpl.ts index f5569b0..948300d 100644 --- a/packages/server/src/adapters/NotificationQueueImpl.ts +++ b/packages/server/src/adapters/NotificationQueueImpl.ts @@ -2,7 +2,8 @@ import { NotificationQueue } from "../ports/NotificationQueue"; import webpush from "web-push"; import { PushSubscriptionsRepo } from "../ports/PushSubscriptionsRepo"; import { Client as FaktoryClient } from "faktory-worker"; -import * as SendPushNotificationJob from "../jobs/SendPushNotification"; +import { SendPushNotificationJob } from "../jobs/SendPushNotificationJob"; +import { createJob } from "../jobs/JobType"; export class NotificationQueueImpl implements NotificationQueue { constructor( @@ -30,14 +31,13 @@ export class NotificationQueueImpl implements NotificationQueue { await this.faktory.pushBulk( payloads.flatMap(({ userId, payload }) => { const pushSubscriptions = pushSubscriptionsByUserId.get(userId) ?? []; - return pushSubscriptions.map((pushSubscription) => { - const arg: SendPushNotificationJob.Arg = { + return pushSubscriptions.map((pushSubscription) => + createJob(this.faktory, SendPushNotificationJob, { pushSubscription, payload, userId, - }; - return this.faktory.job(SendPushNotificationJob.JobType, arg); - }); + }) + ); }) ); } diff --git a/packages/server/src/cron.ts b/packages/server/src/cron.ts new file mode 100644 index 0000000..1821919 --- /dev/null +++ b/packages/server/src/cron.ts @@ -0,0 +1,46 @@ +import { CronJob } from "cron"; +import { ResWaitCountKey } from "./entities"; +import { CheckDeadTopicJob } from "./jobs/CheckDeadTopicJob"; +import { faktoryClient } from "./faktoryClient"; +import { createJob } from "./jobs/JobType"; +import { UserPointResetJob } from "./jobs/UserPointResetJob"; +import { UserCountResetJob } from "./jobs/UserCountResetJob"; + +export function startCron() { + const startUserCountResetCron = (cronTime: string, key: ResWaitCountKey) => { + new CronJob({ + cronTime, + onTick: () => { + void faktoryClient.push( + createJob(faktoryClient, UserCountResetJob, { key }) + ); + }, + start: false, + timeZone: "Asia/Tokyo", + }).start(); + }; + startUserCountResetCron("00 00,10,20,30,40,50 * * * *", "m10"); + startUserCountResetCron("00 00,30 * * * *", "m30"); + startUserCountResetCron("00 00 * * * *", "h1"); + startUserCountResetCron("00 00 00,06,12,18 * * *", "h6"); + startUserCountResetCron("00 00 00,12 * * *", "h12"); + startUserCountResetCron("00 00 00 * * *", "d1"); + + new CronJob({ + cronTime: "00 00 00 * * *", + onTick: () => { + void faktoryClient.push(createJob(faktoryClient, UserPointResetJob, {})); + }, + start: false, + timeZone: "Asia/Tokyo", + }).start(); + + new CronJob({ + cronTime: "00 00 * * * *", + onTick: () => { + void faktoryClient.push(createJob(faktoryClient, CheckDeadTopicJob, {})); + }, + start: false, + timeZone: "Asia/Tokyo", + }).start(); +} diff --git a/packages/server/src/jobs/CheckDeadTopicJob.ts b/packages/server/src/jobs/CheckDeadTopicJob.ts new file mode 100644 index 0000000..7fe7975 --- /dev/null +++ b/packages/server/src/jobs/CheckDeadTopicJob.ts @@ -0,0 +1,7 @@ +import { z } from "zod"; +import { JobType } from "./JobType"; + +export const CheckDeadTopicJob = JobType({ + type: "CheckDeadTopic", + arg: z.object({}), +}); diff --git a/packages/server/src/jobs/JobType.ts b/packages/server/src/jobs/JobType.ts new file mode 100644 index 0000000..341dc0c --- /dev/null +++ b/packages/server/src/jobs/JobType.ts @@ -0,0 +1,27 @@ +import { z } from "zod"; +import { Job, Client, Worker } from "faktory-worker"; + +export interface JobType { + type: string; + arg: z.ZodType; +} + +// 型推論のためのヘルパー関数 +export function JobType(value: JobType): JobType { + return value; +} + +export function createJob(client: Client, type: JobType, arg: A): Job { + return client.job(type.type, arg); +} + +export function registerWorker( + worker: Worker, + type: JobType, + handler: (arg: A) => Promise +): void { + worker.register(type.type, async (raw) => { + const arg = type.arg.parse(raw); + await handler(arg); + }); +} diff --git a/packages/server/src/jobs/SendPushNotification.ts b/packages/server/src/jobs/SendPushNotification.ts deleted file mode 100644 index fdd8c72..0000000 --- a/packages/server/src/jobs/SendPushNotification.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { z } from "zod"; - -export const JobType = "SendPushNotification"; -export const Arg = z.object({ - pushSubscription: z.object({ - endpoint: z.string(), - keys: z.object({ - p256dh: z.string(), - auth: z.string(), - }), - }), - payload: z.string(), - userId: z.string(), -}); -export type Arg = z.infer; diff --git a/packages/server/src/jobs/SendPushNotificationJob.ts b/packages/server/src/jobs/SendPushNotificationJob.ts new file mode 100644 index 0000000..3ceb098 --- /dev/null +++ b/packages/server/src/jobs/SendPushNotificationJob.ts @@ -0,0 +1,17 @@ +import { z } from "zod"; +import { JobType } from "./JobType"; + +export const SendPushNotificationJob = JobType({ + type: "SendPushNotification", + arg: z.object({ + pushSubscription: z.object({ + endpoint: z.string(), + keys: z.object({ + p256dh: z.string(), + auth: z.string(), + }), + }), + payload: z.string(), + userId: z.string(), + }), +}); diff --git a/packages/server/src/jobs/UserCountResetJob.ts b/packages/server/src/jobs/UserCountResetJob.ts new file mode 100644 index 0000000..8ce307c --- /dev/null +++ b/packages/server/src/jobs/UserCountResetJob.ts @@ -0,0 +1,16 @@ +import { JobType } from "./JobType"; +import { z } from "zod"; + +export const UserCountResetJob = JobType({ + type: "UserCountReset", + arg: z.object({ + key: z.union([ + z.literal("m10"), + z.literal("m30"), + z.literal("h1"), + z.literal("h6"), + z.literal("h12"), + z.literal("d1"), + ]), + }), +}); diff --git a/packages/server/src/jobs/UserPointResetJob.ts b/packages/server/src/jobs/UserPointResetJob.ts new file mode 100644 index 0000000..41d2c43 --- /dev/null +++ b/packages/server/src/jobs/UserPointResetJob.ts @@ -0,0 +1,7 @@ +import { z } from "zod"; +import { JobType } from "./JobType"; + +export const UserPointResetJob = JobType({ + type: "UserPointReset", + arg: z.object({}), +}); diff --git a/packages/server/src/server/server.ts b/packages/server/src/server/server.ts index af97b09..399b57a 100644 --- a/packages/server/src/server/server.ts +++ b/packages/server/src/server/server.ts @@ -6,7 +6,7 @@ import { Config } from "../config"; import { resolvers } from "../schema/resolvers.generated"; import { resolveTypes } from "../schema/resolveTypes"; import { typeDefs } from "../schema/typeDefs.generated"; -import { runWorker } from "../worker"; +import { startWorker } from "../worker"; import { AppContext, createContext } from "./context"; import Router from "@koa/router"; import { ApolloServer } from "@apollo/server"; @@ -21,6 +21,7 @@ import { koaMiddleware } from "@as-integrations/koa"; import { makeExecutableSchema } from "@graphql-tools/schema"; import bodyParser from "koa-bodyparser"; import { GraphQLError } from "graphql"; +import { startCron } from "../cron"; export async function serverRun() { const app = new Koa(); @@ -86,7 +87,8 @@ export async function serverRun() { }); await server.start(); - await runWorker(); + startCron(); + await startWorker(); router.get("/ping", (ctx, _next) => (ctx.body = "OK")); router.get("/livez", (ctx, _next) => (ctx.body = "OK")); diff --git a/packages/server/src/worker.ts b/packages/server/src/worker.ts index c017384..b78d07c 100644 --- a/packages/server/src/worker.ts +++ b/packages/server/src/worker.ts @@ -1,20 +1,19 @@ -import { CronJob } from "cron"; -import { Logger, TopicRepo, UserRepo } from "./adapters"; -import { ResWaitCountKey } from "./entities"; -import { prisma } from "./prisma-client"; import faktory from "faktory-worker"; import { Config } from "./config"; -import * as SendPushNotificationJob from "./jobs/SendPushNotification"; +import { SendPushNotificationJob } from "./jobs/SendPushNotificationJob"; +import { CheckDeadTopicJob } from "./jobs/CheckDeadTopicJob"; import { createPorts, PortsConfig } from "./createPorts"; +import { registerWorker } from "./jobs/JobType"; +import { UserPointResetJob } from "./jobs/UserPointResetJob"; +import { UserCountResetJob } from "./jobs/UserCountResetJob"; -export async function runWorker(): Promise { +export async function startWorker(): Promise { const worker = await faktory.work({ url: Config.faktory.url, }); - worker.register(SendPushNotificationJob.JobType, async (raw) => { + registerWorker(worker, SendPushNotificationJob, async (arg) => { const ports = createPorts(PortsConfig); - const arg = SendPushNotificationJob.Arg.parse(raw); await ports.notificationSender.sendNotification( arg.userId, arg.pushSubscription, @@ -22,61 +21,18 @@ export async function runWorker(): Promise { ); }); - runTopicWorker(); - runUserWorker(); -} - -function runTopicWorker() { - // 毎時間トピ落ちチェック - new CronJob({ - cronTime: "00 00 * * * *", - onTick: () => { - void (async () => { - const logger = new Logger(); - const topicRepo = new TopicRepo(prisma); - - logger.info("TopicCron"); - await topicRepo.cronTopicCheck(new Date()); - })(); - }, - start: false, - timeZone: "Asia/Tokyo", - }).start(); -} - -function runUserWorker() { - const start = (cronTime: string, key: ResWaitCountKey) => { - new CronJob({ - cronTime, - onTick: () => { - void (async () => { - const logger = new Logger(); - const userRepo = new UserRepo(prisma); + registerWorker(worker, CheckDeadTopicJob, async (_arg) => { + const ports = createPorts(PortsConfig); + await ports.topicRepo.cronTopicCheck(ports.clock.now()); + }); - logger.info(`UserCron ${key}`); - await userRepo.cronCountReset(key); - })(); - }, - start: false, - timeZone: "Asia/Tokyo", - }).start(); - }; + registerWorker(worker, UserPointResetJob, async (_arg) => { + const ports = createPorts(PortsConfig); + await ports.userRepo.cronPointReset(); + }); - start("00 00,10,20,30,40,50 * * * *", "m10"); - start("00 00,30 * * * *", "m30"); - start("00 00 * * * *", "h1"); - start("00 00 00,06,12,18 * * *", "h6"); - start("00 00 00,12 * * *", "h12"); - start("00 00 00 * * *", "d1"); - new CronJob({ - cronTime: "00 00 00 * * *", - onTick: () => { - void (async () => { - const userRepo = new UserRepo(prisma); - await userRepo.cronPointReset(); - })(); - }, - start: false, - timeZone: "Asia/Tokyo", - }).start(); + registerWorker(worker, UserCountResetJob, async (arg) => { + const ports = createPorts(PortsConfig); + await ports.userRepo.cronCountReset(arg.key); + }); }