Skip to content

Commit

Permalink
fix: handle outdated message in channel queue (#184)
Browse files Browse the repository at this point in the history
Co-authored-by: JounQin <admin@1stg.me>
  • Loading branch information
jedlikowski and JounQin authored Oct 7, 2024
1 parent 35a89ea commit 30d28ae
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changeset/violet-laws-compare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"synckit": patch
---

fix: handle outdated message in channel queue
79 changes: 58 additions & 21 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import path from 'node:path'
import { fileURLToPath, pathToFileURL } from 'node:url'
import {
MessageChannel,
MessagePort,
type TransferListItem,
Worker,
parentPort,
Expand All @@ -19,6 +20,7 @@ import type {
AnyAsyncFn,
AnyFn,
GlobalShim,
MainToWorkerCommandMessage,
MainToWorkerMessage,
Syncify,
ValueOf,
Expand Down Expand Up @@ -522,36 +524,59 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(

let nextID = 0

const syncFn = (...args: Parameters<T>): R => {
const id = nextID++

const msg: MainToWorkerMessage<Parameters<T>> = { id, args }

worker.postMessage(msg)

const status = Atomics.wait(sharedBufferView!, 0, 0, timeout)

// Reset SharedArrayBuffer for next call
const receiveMessageWithId = (
port: MessagePort,
expectedId: number,
waitingTimeout?: number,
): WorkerToMainMessage<R> => {
const start = Date.now()
const status = Atomics.wait(sharedBufferView!, 0, 0, waitingTimeout)
Atomics.store(sharedBufferView!, 0, 0)

/* istanbul ignore if */
if (!['ok', 'not-equal'].includes(status)) {
const abortMsg: MainToWorkerCommandMessage = {
id: expectedId,
cmd: 'abort',
}
port.postMessage(abortMsg)
throw new Error('Internal error: Atomics.wait() failed: ' + status)
}

const {
id: id2,
result,
error,
properties,
} = (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> })
.message
const { id, ...message } = (
receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> }
).message

/* istanbul ignore if */
if (id !== id2) {
throw new Error(`Internal error: Expected id ${id} but got id ${id2}`)
if (id < expectedId) {
const waitingTime = Date.now() - start
return receiveMessageWithId(
port,
expectedId,
waitingTimeout ? waitingTimeout - waitingTime : undefined,
)
}

if (expectedId !== id) {
throw new Error(
`Internal error: Expected id ${expectedId} but got id ${id}`,
)
}

return { id, ...message }
}

const syncFn = (...args: Parameters<T>): R => {
const id = nextID++

const msg: MainToWorkerMessage<Parameters<T>> = { id, args }

worker.postMessage(msg)

const { result, error, properties } = receiveMessageWithId(
mainPort,
id,
timeout,
)

if (error) {
throw Object.assign(error as object, properties)
}
Expand Down Expand Up @@ -587,12 +612,24 @@ export function runAsWorker<
({ id, args }: MainToWorkerMessage<Parameters<T>>) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
;(async () => {
let isAborted = false
const handleAbortMessage = (msg: MainToWorkerCommandMessage) => {
if (msg.id === id && msg.cmd === 'abort') {
isAborted = true
}
}
workerPort.on('message', handleAbortMessage)
let msg: WorkerToMainMessage<R>
try {
msg = { id, result: await fn(...args) }
} catch (error: unknown) {
msg = { id, error, properties: extractProperties(error) }
}
workerPort.off('message', handleAbortMessage)
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (isAborted) {
return
}
workerPort.postMessage(msg)
Atomics.add(sharedBufferView, 0, 1)
Atomics.notify(sharedBufferView, 0)
Expand Down
5 changes: 5 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ export interface MainToWorkerMessage<T extends unknown[]> {
args: T
}

export interface MainToWorkerCommandMessage {
id: number
cmd: string
}

export interface WorkerData {
sharedBuffer: SharedArrayBuffer
workerPort: MessagePort
Expand Down
116 changes: 115 additions & 1 deletion test/fn.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ import path from 'node:path'

import { jest } from '@jest/globals'

import { _dirname, testIf, tsUseEsmSupported } from './helpers.js'
import {
_dirname,
setupReceiveMessageOnPortMock,
testIf,
tsUseEsmSupported,
} from './helpers.js'
import type { AsyncWorkerFn } from './types.js'

import { createSyncFn } from 'synckit'
Expand All @@ -12,6 +17,7 @@ const { SYNCKIT_TIMEOUT } = process.env

beforeEach(() => {
jest.resetModules()
jest.restoreAllMocks()

delete process.env.SYNCKIT_GLOBAL_SHIMS

Expand Down Expand Up @@ -104,6 +110,114 @@ test('timeout', async () => {
)
})

test('subsequent executions after timeout', async () => {
const executionTimeout = 30
const longRunningTaskDuration = executionTimeout * 10
process.env.SYNCKIT_TIMEOUT = executionTimeout.toString()

const { createSyncFn } = await import('synckit')
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)

// start an execution in worker that will definitely time out
expect(() => syncFn(1, longRunningTaskDuration)).toThrow()

// wait for timed out execution to finish inside worker
await new Promise(resolve => setTimeout(resolve, longRunningTaskDuration))

// subsequent executions should work correctly
expect(syncFn(2, 1)).toBe(2)
expect(syncFn(3, 1)).toBe(3)
})

test('handling of outdated message from worker', async () => {
const executionTimeout = 60
process.env.SYNCKIT_TIMEOUT = executionTimeout.toString()
const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock()

jest.spyOn(Atomics, 'wait').mockReturnValue('ok')

receiveMessageOnPortMock
.mockReturnValueOnce({ message: { id: -1 } })
.mockReturnValueOnce({ message: { id: 0, result: 1 } })

const { createSyncFn } = await import('synckit')
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)
expect(syncFn(1)).toBe(1)
expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2)
})

test('propagation of undefined timeout', async () => {
delete process.env.SYNCKIT_TIMEOUT
const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock()

const atomicsWaitSpy = jest.spyOn(Atomics, 'wait').mockReturnValue('ok')

receiveMessageOnPortMock
.mockReturnValueOnce({ message: { id: -1 } })
.mockReturnValueOnce({ message: { id: 0, result: 1 } })

const { createSyncFn } = await import('synckit')
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)
expect(syncFn(1)).toBe(1)
expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2)

const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] =
atomicsWaitSpy.mock.calls
const [, , , firstAtomicsWaitCallTimeout] = firstAtomicsWaitArgs
const [, , , secondAtomicsWaitCallTimeout] = secondAtomicsWaitArgs

expect(typeof firstAtomicsWaitCallTimeout).toBe('undefined')
expect(typeof secondAtomicsWaitCallTimeout).toBe('undefined')
})

test('reduction of waiting time', async () => {
const synckitTimeout = 60
process.env.SYNCKIT_TIMEOUT = synckitTimeout.toString()
const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock()

const atomicsWaitSpy = jest.spyOn(Atomics, 'wait').mockImplementation(() => {
const start = Date.now()
// simulate waiting 10ms for worker to respond
while (Date.now() - start < 10) {
continue
}

return 'ok'
})

receiveMessageOnPortMock
.mockReturnValueOnce({ message: { id: -1 } })
.mockReturnValueOnce({ message: { id: 0, result: 1 } })

const { createSyncFn } = await import('synckit')
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)
expect(syncFn(1)).toBe(1)
expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2)

const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] =
atomicsWaitSpy.mock.calls
const [, , , firstAtomicsWaitCallTimeout] = firstAtomicsWaitArgs
const [, , , secondAtomicsWaitCallTimeout] = secondAtomicsWaitArgs

expect(typeof firstAtomicsWaitCallTimeout).toBe('number')
expect(firstAtomicsWaitCallTimeout).toBe(synckitTimeout)
expect(typeof secondAtomicsWaitCallTimeout).toBe('number')
expect(secondAtomicsWaitCallTimeout).toBeLessThan(synckitTimeout)
})

test('unexpected message from worker', async () => {
jest.spyOn(Atomics, 'wait').mockReturnValue('ok')

const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock()
receiveMessageOnPortMock.mockReturnValueOnce({ message: { id: 100 } })

const { createSyncFn } = await import('synckit')
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)
expect(() => syncFn(1)).toThrow(
'Internal error: Expected id 0 but got id 100',
)
})

test('globalShims env', async () => {
process.env.SYNCKIT_GLOBAL_SHIMS = '1'

Expand Down
24 changes: 24 additions & 0 deletions test/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import path from 'node:path'
import { fileURLToPath } from 'node:url'
import WorkerThreads from 'node:worker_threads'

import { jest } from '@jest/globals'

import { MTS_SUPPORTED_NODE_VERSION } from 'synckit'

Expand All @@ -13,3 +16,24 @@ export const tsUseEsmSupported =
nodeVersion >= MTS_SUPPORTED_NODE_VERSION && nodeVersion <= 18.18

export const testIf = (condition: boolean) => (condition ? it : it.skip)

type ReceiveMessageOnPortMock = jest.Mock<
typeof WorkerThreads.receiveMessageOnPort
>
export const setupReceiveMessageOnPortMock =
async (): Promise<ReceiveMessageOnPortMock> => {
jest.unstable_mockModule('node:worker_threads', () => {
return {
...WorkerThreads,
receiveMessageOnPort: jest.fn(WorkerThreads.receiveMessageOnPort),
}
})

const { receiveMessageOnPort: receiveMessageOnPortMock } = (await import(
'node:worker_threads'
)) as unknown as {
receiveMessageOnPort: ReceiveMessageOnPortMock
}

return receiveMessageOnPortMock
}

0 comments on commit 30d28ae

Please sign in to comment.