Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Gracefully shutdown the embedder #9693

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/embedder/EmbeddingsJobQueueStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
}

orchestrator: WorkflowOrchestrator
done: boolean

constructor(orchestrator: WorkflowOrchestrator) {
this.orchestrator = orchestrator
this.done = false
}
async next(): Promise<IteratorResult<DBJob>> {
if (this.done) {
return {done: true as const, value: undefined}
}
const pg = getKysely()
const getJob = (isFailed: boolean) => {
return pg
Expand Down Expand Up @@ -57,9 +63,11 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
}
}
return() {
this.done = true
return Promise.resolve({done: true as const, value: undefined})
}
throw(error: any) {
this.done = true
return Promise.resolve({done: true as const, value: error})
}
}
10 changes: 6 additions & 4 deletions packages/embedder/embedder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ const run = async () => {
const streams = mergeAsyncIterators(jobQueueStreams)

const kill: NodeJS.SignalsListener = (signal) => {
Logger.log(`Kill signal received: ${signal}`)
primaryLock?.release()
Logger.log(
`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`
)
primaryLock?.release().catch(() => {})
streams.return?.()
process.exit()
}
process.on('SIGTERM', kill)
process.on('SIGINT', kill)
Expand All @@ -66,7 +67,8 @@ const run = async () => {
}

// On graceful shutdown
Logger.log('Streaming Complete. Goodbye!')
Logger.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`)
process.exit()
}

run()
5 changes: 5 additions & 0 deletions packages/embedder/lint-staged.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = {
'*.{ts,tsx}': ['eslint --fix', 'prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'],
'*.graphql': ['prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'],
'**/*.{ts,tsx}': () => 'tsc --noEmit -p tsconfig.json'
}
2 changes: 1 addition & 1 deletion packages/embedder/logMemoryUse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ export const logMemoryUse = () => {
const {rss} = memoryUsage
const usedMB = Math.floor(rss / MB)
console.log('Memory use:', usedMB, 'MB')
}, 10000)
}, 10000).unref()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an attempt to regularly end the event loop. It will continue if there are still timers running unless these are unrefed. This did not allow proper shutdown as there are many things in dependencies still running, but it's good practice nonetheless if we're not keeping a reference to the timer to allow cleanup.

}
2 changes: 1 addition & 1 deletion packages/embedder/logPerformance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ export const logPerformance = (logEvery: number, resetEvery: number) => {
logs = 0
start = performance.now()
}
}, logEvery * 1000)
}, logEvery * 1000).unref()
return counter
}
135 changes: 65 additions & 70 deletions packages/embedder/mergeAsyncIterators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,85 +12,80 @@ type Result<T extends AsyncIterator<any>> = UnYield<Awaited<ReturnType<T['next']

// Promise.race has a memory leak
// To avoid: https://github.com/tc39/proposal-async-iterator-helpers/issues/15#issuecomment-1937011820
export function mergeAsyncIterators<T extends AsyncIterator<any>[] | []>(
iterators: T
): AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result<T[P]>]}[number]> {
return (async function* () {
type ResultThunk = () => [number, Result<T[number]>]
let count = iterators.length as number
let capability: PromiseCapability<ResultThunk | null> | undefined
const queuedResults: ResultThunk[] = []
const getNext = async (idx: number, iterator: T[number]) => {
try {
const next = await iterator.next()
if (next.done) {
if (--count === 0 && capability !== undefined) {
capability.resolve(null)
}
} else {
resolveResult(() => {
void getNext(idx, iterator)
return [idx, next.value]
})
export function mergeAsyncIterators<T extends AsyncIterator<any>[] | []>(iterators: T) {
type ResultThunk = () => [number, Result<T[number]>]
let count = iterators.length as number
let capability: PromiseCapability<ResultThunk | null> | undefined
const queuedResults: ResultThunk[] = []
const getNext = async (idx: number, iterator: T[number]) => {
try {
const next = await iterator.next()
if (next.done) {
if (--count === 0 && capability !== undefined) {
capability.resolve(null)
}
} catch (error) {
} else {
resolveResult(() => {
throw error
void getNext(idx, iterator)
return [idx, next.value]
})
}
} catch (error) {
resolveResult(() => {
throw error
})
}
const resolveResult = (resultThunk: ResultThunk) => {
if (capability === undefined) {
queuedResults.push(resultThunk)
} else {
capability.resolve(resultThunk)
}
}
const resolveResult = (resultThunk: ResultThunk) => {
if (capability === undefined) {
queuedResults.push(resultThunk)
} else {
capability.resolve(resultThunk)
}
}

try {
// Begin all iterators
for (const [idx, iterable] of iterators.entries()) {
void getNext(idx, iterable)
// Begin all iterators
for (const [idx, iterable] of iterators.entries()) {
void getNext(idx, iterable)
}

const it: AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result<T[P]>]}[number]> = {
[Symbol.asyncIterator]: () => it,
next: async () => {
const nextQueuedResult = queuedResults.shift()
if (nextQueuedResult !== undefined) {
return {done: false as const, value: nextQueuedResult()}
}
if (count === 0) {
return {done: true as const, value: undefined}
}

// Delegate to iterables as results complete
while (true) {
while (true) {
const nextQueuedResult = queuedResults.shift()
if (nextQueuedResult === undefined) {
break
} else {
yield nextQueuedResult()
}
}
if (count === 0) {
break
} else {
// Promise.withResolvers() is not yet implemented in node
capability = {
resolve: undefined as any,
reject: undefined as any,
promise: undefined as any
}
capability.promise = new Promise((res, rej) => {
capability!.resolve = res
capability!.reject = rej
})
const nextResult = await capability.promise
if (nextResult === null) {
break
} else {
capability = undefined
yield nextResult()
}
}
// Promise.withResolvers() is not yet implemented in node
capability = {
resolve: undefined as any,
reject: undefined as any,
promise: undefined as any
}
capability.promise = new Promise((res, rej) => {
capability!.resolve = res
capability!.reject = rej
})
const nextResult = await capability.promise
if (nextResult === null) {
return {done: true as const, value: undefined}
} else {
capability = undefined
return {done: false as const, value: nextResult()}
}
} catch (err) {
// Unwind remaining iterators on failure
try {
await Promise.all(iterators.map((iterator) => iterator.return?.()))
} catch {}
throw err
},
return: async () => {
await Promise.allSettled(iterators.map((iterator) => iterator.return?.()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactoring was done so we can define this return function. This is necessary so all streams are notified of the return. The async generator function we had previously would hang in await capability.promise and handle the return only afterwards. If all embedders are idle, the promise would never resolve.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting! really good find having to define our own return here

return {done: true as const, value: undefined}
},
throw: async () => {
await Promise.allSettled(iterators.map((iterator) => iterator.return?.()))
return {done: true as const, value: undefined}
}
})()
}
return it
}
1 change: 1 addition & 0 deletions packages/embedder/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"url": "git+https://github.com/ParabolInc/parabol.git"
},
"scripts": {
"precommit": "lint-staged",
"lint": "eslint --fix . --ext .ts,.tsx",
"lint:check": "eslint . --ext .ts,.tsx",
"prettier": "prettier --config ../../.prettierrc --write \"**/*.{ts,tsx}\"",
Expand Down
2 changes: 1 addition & 1 deletion packages/embedder/resetStalledJobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ export const resetStalledJobs = () => {
}))
.where('startAt', '<', new Date(Date.now() - ms('5m')))
.execute()
}, ms('5m'))
}, ms('5m')).unref()
}
4 changes: 3 additions & 1 deletion packages/gql-executor/gqlExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ const run = async () => {
const executorChannel = GQLExecutorChannelId.join(SERVER_ID!)

// on shutdown, remove consumer from the group
process.on('SIGTERM', async () => {
process.on('SIGTERM', async (signal) => {
console.log(`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`)
await publisher.xgroup(
'DELCONSUMER',
ServerChannel.GQL_EXECUTOR_STREAM,
ServerChannel.GQL_EXECUTOR_CONSUMER_GROUP,
executorChannel
)
console.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`)
process.exit()
})

Expand Down
18 changes: 12 additions & 6 deletions packages/server/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import tracer from 'dd-trace'
import {r} from 'rethinkdb-ts'
import uws, {SHARED_COMPRESSOR} from 'uWebSockets.js'
import sleep from '../client/utils/sleep'
import ICSHandler from './ICSHandler'
import PWAHandler from './PWAHandler'
import activeClients from './activeClients'
Expand Down Expand Up @@ -37,14 +38,19 @@ if (!__PRODUCTION__) {
})
}

process.on('SIGTERM', () => {
process.on('SIGTERM', async (signal) => {
console.log(
`Server ID: ${process.env.SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`
)
const RECONNECT_WINDOW = 60_000 // ms
Object.values(activeClients.store).forEach((connectionContext) => {
const disconnectIn = ~~(Math.random() * RECONNECT_WINDOW)
setTimeout(() => {
await Promise.allSettled(
Object.values(activeClients.store).map(async (connectionContext) => {
const disconnectIn = Math.floor(Math.random() * RECONNECT_WINDOW)
await sleep(disconnectIn)
handleDisconnect(connectionContext)
}, disconnectIn)
})
})
)
console.log(`Server ID: ${process.env.SERVER_ID}. Graceful shutdown complete, exiting.`)
})

const PORT = Number(__PRODUCTION__ ? process.env.PORT : process.env.SOCKET_PORT)
Expand Down
Loading