Skip to content

Commit

Permalink
feat(medusa): Alternative Subscriber API and new ScheduledJobs API (#…
Browse files Browse the repository at this point in the history
…5624)

* workking subscribers API

* progress

* update registrar args

* cleanup

* progress

* progress

* tests

* rename to loaders

* rm build artifacts

* improve validation and change jobs args to object

* spread context on subscribe

* add changeset

* address comments

* fix spelling of warning and add warning on Redis not enabled for Scheduled Jobs

* fix tests

---------

Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
  • Loading branch information
kasperkristensen and olivermrbl authored Nov 16, 2023
1 parent 4f0bea4 commit 57573ed
Show file tree
Hide file tree
Showing 16 changed files with 804 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .changeset/gold-suits-march.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@medusajs/medusa": patch
---

feat(medusa): Adds a new alternative syntax for creating Subscribers in Medusa, as well as adding a new API for creating scheduled jobs.
10 changes: 6 additions & 4 deletions packages/medusa/src/index.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
export * from "./api"
export * from "./api/middlewares"
export * from "./interfaces"
export * from "./joiner-config"
export * from "./models"
export * from "./modules-config"
export * from "./services"
export * from "./types/batch-job"
export * from "./types/common"
export * from "./types/middlewares"
export * from "./types/routing"
export * from "./types/global"
export * from "./types/middlewares"
export * from "./types/price-list"
export * from "./types/routing"
export * from "./types/scheduled-jobs"
export * from "./types/subscribers"
export * from "./utils"
export * from "./joiner-config"
export * from "./modules-config"
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { ScheduledJobArgs } from "../../../../../types/scheduled-jobs"

export default async function ({ container, pluginOptions }: ScheduledJobArgs) {
// noop
return {}
}

export const config = {
name: "every-hour",
schedule: "0 * * * *",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { ScheduledJobArgs } from "../../../../../types/scheduled-jobs"

export default async function ({ container, pluginOptions }: ScheduledJobArgs) {
// noop
return {}
}

export const config = {
name: "every-minute",
schedule: "* * * * *",
}
16 changes: 16 additions & 0 deletions packages/medusa/src/loaders/helpers/jobs/__mocks__/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const jobSchedulerServiceMock = {
create: jest.fn().mockImplementation((...args) => {
return Promise.resolve(args)
}),
}

export const containerMock = {
// mock .resolve method so if its called with "jobSchedulerService" it returns the mock
resolve: jest.fn().mockImplementation((name: string) => {
if (name === "jobSchedulerService") {
return jobSchedulerServiceMock
} else {
return {}
}
}),
}
49 changes: 49 additions & 0 deletions packages/medusa/src/loaders/helpers/jobs/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { MedusaContainer } from "@medusajs/types"
import { join } from "path"
import { containerMock, jobSchedulerServiceMock } from "../__mocks__"
import ScheduledJobsLoader from "../index"

describe("ScheduledJobsLoader", () => {
const rootDir = join(__dirname, "../__fixtures__", "jobs")

const pluginOptions = {
important_data: {
enabled: true,
},
}

beforeAll(async () => {
jest.clearAllMocks()

await new ScheduledJobsLoader(
rootDir,
containerMock as unknown as MedusaContainer,
pluginOptions
).load()
})

it("should register every job in '/jobs'", async () => {
// As '/jobs' contains 2 jobs, we expect the create method to be called twice
expect(jobSchedulerServiceMock.create).toHaveBeenCalledTimes(2)
})

it("should register every job with the correct props", async () => {
// Registering every-hour.ts
expect(jobSchedulerServiceMock.create).toHaveBeenCalledWith(
"every-hour",
undefined,
"0 * * * *",
expect.any(Function),
{ keepExisting: false }
)

// Registering every-minute.ts
expect(jobSchedulerServiceMock.create).toHaveBeenCalledWith(
"every-minute",
undefined,
"* * * * *",
expect.any(Function),
{ keepExisting: false }
)
})
})
174 changes: 174 additions & 0 deletions packages/medusa/src/loaders/helpers/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { MedusaContainer } from "@medusajs/types"
import { readdir } from "fs/promises"
import { join } from "path"
import JobSchedulerService from "../../../services/job-scheduler"
import {
ScheduledJobArgs,
ScheduledJobConfig,
} from "../../../types/scheduled-jobs"
import logger from "../../logger"

type ScheduledJobHandler = (args: ScheduledJobArgs) => Promise<void>

type ScheduledJobModule = {
config: ScheduledJobConfig
handler: ScheduledJobHandler
}

export default class ScheduledJobsLoader {
protected container_: MedusaContainer
protected pluginOptions_: Record<string, unknown>
protected rootDir_: string
protected excludes: RegExp[] = [
/\.DS_Store/,
/(\.ts\.map|\.js\.map|\.d\.ts)/,
/^_[^/\\]*(\.[^/\\]+)?$/,
]

protected jobDescriptors_: Map<string, ScheduledJobModule> = new Map()

constructor(
rootDir: string,
container: MedusaContainer,
options: Record<string, unknown> = {}
) {
this.rootDir_ = rootDir
this.pluginOptions_ = options
this.container_ = container
}

private validateJob(
job: any,
path: string
): job is {
default: ScheduledJobHandler
config: ScheduledJobConfig
} {
const handler = job.default

if (!handler || typeof handler !== "function") {
logger.warn(`The job in ${path} is not a function.`)
return false
}

const config = job.config

if (!config) {
logger.warn(`The job in ${path} is missing a config.`)
return false
}

if (!config.schedule) {
logger.warn(`The job in ${path} is missing a schedule.`)
return false
}

if (!config.name) {
logger.warn(`The job in ${path} is missing a name.`)
return false
}

if (config.data && typeof config.data !== "object") {
logger.warn(`The job data in ${path} is not an object.`)
return false
}

return true
}

private async createDescriptor(absolutePath: string, entry: string) {
return await import(absolutePath).then((module_) => {
const isValid = this.validateJob(module_, absolutePath)

if (!isValid) {
return
}

this.jobDescriptors_.set(absolutePath, {
config: module_.config,
handler: module_.default,
})
})
}

private async createMap(dirPath: string) {
await Promise.all(
await readdir(dirPath, { withFileTypes: true }).then(async (entries) => {
return entries
.filter((entry) => {
if (
this.excludes.length &&
this.excludes.some((exclude) => exclude.test(entry.name))
) {
return false
}

return true
})
.map(async (entry) => {
const fullPath = join(dirPath, entry.name)

if (entry.isDirectory()) {
return this.createMap(fullPath)
}

return await this.createDescriptor(fullPath, entry.name)
})
})
)
}

private async createScheduledJobs() {
const jobs = Array.from(this.jobDescriptors_.values())

if (!jobs.length) {
return
}

const jobSchedulerService: JobSchedulerService = this.container_.resolve(
"jobSchedulerService"
)

for (const job of jobs) {
try {
const { name, data, schedule } = job.config

const handler = async () => {
await job.handler({
container: this.container_,
data,
pluginOptions: this.pluginOptions_,
})
}

await jobSchedulerService.create(name, data, schedule, handler, {
keepExisting: false, // For now, we do not support changing this flag
})
} catch (err) {
logger.error(
`An error occurred while registering job ${job.config.name}`,
err
)
}
}
}

async load(): Promise<void> {
let hasJobsDir = false

try {
await readdir(this.rootDir_)
hasJobsDir = true
} catch (_err) {
hasJobsDir = false
}

if (!hasJobsDir) {
return
}

await this.createMap(this.rootDir_)

await this.createScheduledJobs()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { OrderService } from "../../../../../services"
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"

export default async function orderNotifier({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}

export const config: SubscriberConfig = {
event: [
OrderService.Events.PLACED,
OrderService.Events.CANCELED,
OrderService.Events.COMPLETED,
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { ProductService } from "../../../../../services"
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"

export default async function productUpdater({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}

export const config: SubscriberConfig = {
event: ProductService.Events.UPDATED,
context: {
subscriberId: "product-updater",
},
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { ProductVariantService } from "../../../../../services"
import {
SubscriberArgs,
SubscriberConfig,
} from "../../../../../types/subscribers"

export default async function ({
data,
eventName,
container,
pluginOptions,
}: SubscriberArgs) {
return Promise.resolve()
}

export const config: SubscriberConfig = {
event: ProductVariantService.Events.CREATED,
}
16 changes: 16 additions & 0 deletions packages/medusa/src/loaders/helpers/subscribers/__mocks__/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const eventBusServiceMock = {
subscribe: jest.fn().mockImplementation((...args) => {
return Promise.resolve(args)
}),
}

export const containerMock = {
// mock .resolve method so if its called with "eventBusService" it returns the mock
resolve: jest.fn().mockImplementation((name: string) => {
if (name === "eventBusService") {
return eventBusServiceMock
} else {
return {}
}
}),
}
Loading

0 comments on commit 57573ed

Please sign in to comment.