Skip to content

Commit

Permalink
workerを整理
Browse files Browse the repository at this point in the history
cron/workerを分離
  • Loading branch information
kgtkr committed Feb 5, 2024
1 parent 3e49fe1 commit 9a19f82
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 86 deletions.
12 changes: 6 additions & 6 deletions packages/server/src/adapters/NotificationQueueImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { NotificationQueue } from "../ports/NotificationQueue";
import webpush from "web-push";
import { PushSubscriptionsRepo } from "../ports/PushSubscriptionsRepo";
import { Client as FaktoryClient } from "faktory-worker";
import * as SendPushNotificationJob from "../jobs/SendPushNotification";
import { SendPushNotificationJob } from "../jobs/SendPushNotificationJob";
import { createJob } from "../jobs/JobType";

export class NotificationQueueImpl implements NotificationQueue {
constructor(
Expand Down Expand Up @@ -30,14 +31,13 @@ export class NotificationQueueImpl implements NotificationQueue {
await this.faktory.pushBulk(
payloads.flatMap(({ userId, payload }) => {
const pushSubscriptions = pushSubscriptionsByUserId.get(userId) ?? [];
return pushSubscriptions.map((pushSubscription) => {
const arg: SendPushNotificationJob.Arg = {
return pushSubscriptions.map((pushSubscription) =>
createJob(this.faktory, SendPushNotificationJob, {
pushSubscription,
payload,
userId,
};
return this.faktory.job(SendPushNotificationJob.JobType, arg);
});
})
);
})
);
}
Expand Down
46 changes: 46 additions & 0 deletions packages/server/src/cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { CronJob } from "cron";
import { ResWaitCountKey } from "./entities";
import { CheckDeadTopicJob } from "./jobs/CheckDeadTopicJob";
import { faktoryClient } from "./faktoryClient";
import { createJob } from "./jobs/JobType";
import { UserPointResetJob } from "./jobs/UserPointResetJob";
import { UserCountResetJob } from "./jobs/UserCountResetJob";

export function startCron() {
const startUserCountResetCron = (cronTime: string, key: ResWaitCountKey) => {
new CronJob({
cronTime,
onTick: () => {
void faktoryClient.push(
createJob(faktoryClient, UserCountResetJob, { key })
);
},
start: false,
timeZone: "Asia/Tokyo",
}).start();
};
startUserCountResetCron("00 00,10,20,30,40,50 * * * *", "m10");
startUserCountResetCron("00 00,30 * * * *", "m30");
startUserCountResetCron("00 00 * * * *", "h1");
startUserCountResetCron("00 00 00,06,12,18 * * *", "h6");
startUserCountResetCron("00 00 00,12 * * *", "h12");
startUserCountResetCron("00 00 00 * * *", "d1");

new CronJob({
cronTime: "00 00 00 * * *",
onTick: () => {
void faktoryClient.push(createJob(faktoryClient, UserPointResetJob, {}));
},
start: false,
timeZone: "Asia/Tokyo",
}).start();

new CronJob({
cronTime: "00 00 * * * *",
onTick: () => {
void faktoryClient.push(createJob(faktoryClient, CheckDeadTopicJob, {}));
},
start: false,
timeZone: "Asia/Tokyo",
}).start();
}
7 changes: 7 additions & 0 deletions packages/server/src/jobs/CheckDeadTopicJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { z } from "zod";
import { JobType } from "./JobType";

export const CheckDeadTopicJob = JobType({
type: "CheckDeadTopic",
arg: z.object({}),
});
27 changes: 27 additions & 0 deletions packages/server/src/jobs/JobType.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { z } from "zod";
import { Job, Client, Worker } from "faktory-worker";

export interface JobType<A> {
type: string;
arg: z.ZodType<A>;
}

// 型推論のためのヘルパー関数
export function JobType<A>(value: JobType<A>): JobType<A> {
return value;
}

export function createJob<A>(client: Client, type: JobType<A>, arg: A): Job {
return client.job(type.type, arg);
}

export function registerWorker<A>(
worker: Worker,
type: JobType<A>,
handler: (arg: A) => Promise<void>
): void {
worker.register(type.type, async (raw) => {
const arg = type.arg.parse(raw);
await handler(arg);
});
}
15 changes: 0 additions & 15 deletions packages/server/src/jobs/SendPushNotification.ts

This file was deleted.

17 changes: 17 additions & 0 deletions packages/server/src/jobs/SendPushNotificationJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { z } from "zod";
import { JobType } from "./JobType";

export const SendPushNotificationJob = JobType({
type: "SendPushNotification",
arg: z.object({
pushSubscription: z.object({
endpoint: z.string(),
keys: z.object({
p256dh: z.string(),
auth: z.string(),
}),
}),
payload: z.string(),
userId: z.string(),
}),
});
16 changes: 16 additions & 0 deletions packages/server/src/jobs/UserCountResetJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { JobType } from "./JobType";
import { z } from "zod";

export const UserCountResetJob = JobType({
type: "UserCountReset",
arg: z.object({
key: z.union([
z.literal("m10"),
z.literal("m30"),
z.literal("h1"),
z.literal("h6"),
z.literal("h12"),
z.literal("d1"),
]),
}),
});
7 changes: 7 additions & 0 deletions packages/server/src/jobs/UserPointResetJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { z } from "zod";
import { JobType } from "./JobType";

export const UserPointResetJob = JobType({
type: "UserPointReset",
arg: z.object({}),
});
6 changes: 4 additions & 2 deletions packages/server/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Config } from "../config";
import { resolvers } from "../schema/resolvers.generated";
import { resolveTypes } from "../schema/resolveTypes";
import { typeDefs } from "../schema/typeDefs.generated";
import { runWorker } from "../worker";
import { startWorker } from "../worker";
import { AppContext, createContext } from "./context";
import Router from "@koa/router";
import { ApolloServer } from "@apollo/server";
Expand All @@ -21,6 +21,7 @@ import { koaMiddleware } from "@as-integrations/koa";
import { makeExecutableSchema } from "@graphql-tools/schema";
import bodyParser from "koa-bodyparser";
import { GraphQLError } from "graphql";
import { startCron } from "../cron";

export async function serverRun() {
const app = new Koa();
Expand Down Expand Up @@ -86,7 +87,8 @@ export async function serverRun() {
});
await server.start();

await runWorker();
startCron();
await startWorker();

router.get("/ping", (ctx, _next) => (ctx.body = "OK"));
router.get("/livez", (ctx, _next) => (ctx.body = "OK"));
Expand Down
82 changes: 19 additions & 63 deletions packages/server/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,82 +1,38 @@
import { CronJob } from "cron";
import { Logger, TopicRepo, UserRepo } from "./adapters";
import { ResWaitCountKey } from "./entities";
import { prisma } from "./prisma-client";
import faktory from "faktory-worker";
import { Config } from "./config";
import * as SendPushNotificationJob from "./jobs/SendPushNotification";
import { SendPushNotificationJob } from "./jobs/SendPushNotificationJob";
import { CheckDeadTopicJob } from "./jobs/CheckDeadTopicJob";
import { createPorts, PortsConfig } from "./createPorts";
import { registerWorker } from "./jobs/JobType";
import { UserPointResetJob } from "./jobs/UserPointResetJob";
import { UserCountResetJob } from "./jobs/UserCountResetJob";

export async function runWorker(): Promise<void> {
export async function startWorker(): Promise<void> {
const worker = await faktory.work({
url: Config.faktory.url,
});

worker.register(SendPushNotificationJob.JobType, async (raw) => {
registerWorker(worker, SendPushNotificationJob, async (arg) => {
const ports = createPorts(PortsConfig);
const arg = SendPushNotificationJob.Arg.parse(raw);
await ports.notificationSender.sendNotification(
arg.userId,
arg.pushSubscription,
arg.payload
);
});

runTopicWorker();
runUserWorker();
}

function runTopicWorker() {
// 毎時間トピ落ちチェック
new CronJob({
cronTime: "00 00 * * * *",
onTick: () => {
void (async () => {
const logger = new Logger();
const topicRepo = new TopicRepo(prisma);

logger.info("TopicCron");
await topicRepo.cronTopicCheck(new Date());
})();
},
start: false,
timeZone: "Asia/Tokyo",
}).start();
}

function runUserWorker() {
const start = (cronTime: string, key: ResWaitCountKey) => {
new CronJob({
cronTime,
onTick: () => {
void (async () => {
const logger = new Logger();
const userRepo = new UserRepo(prisma);
registerWorker(worker, CheckDeadTopicJob, async (_arg) => {
const ports = createPorts(PortsConfig);
await ports.topicRepo.cronTopicCheck(ports.clock.now());
});

logger.info(`UserCron ${key}`);
await userRepo.cronCountReset(key);
})();
},
start: false,
timeZone: "Asia/Tokyo",
}).start();
};
registerWorker(worker, UserPointResetJob, async (_arg) => {
const ports = createPorts(PortsConfig);
await ports.userRepo.cronPointReset();
});

start("00 00,10,20,30,40,50 * * * *", "m10");
start("00 00,30 * * * *", "m30");
start("00 00 * * * *", "h1");
start("00 00 00,06,12,18 * * *", "h6");
start("00 00 00,12 * * *", "h12");
start("00 00 00 * * *", "d1");
new CronJob({
cronTime: "00 00 00 * * *",
onTick: () => {
void (async () => {
const userRepo = new UserRepo(prisma);
await userRepo.cronPointReset();
})();
},
start: false,
timeZone: "Asia/Tokyo",
}).start();
registerWorker(worker, UserCountResetJob, async (arg) => {
const ports = createPorts(PortsConfig);
await ports.userRepo.cronCountReset(arg.key);
});
}

0 comments on commit 9a19f82

Please sign in to comment.