Skip to content

Commit

Permalink
Fix handle leak for killed worker checker (#2197)
Browse files Browse the repository at this point in the history
* Cleanup killed worker

* Handle one more case of leak in error case
  • Loading branch information
cmdcolin authored Aug 10, 2021
1 parent ba12c74 commit f6f719e
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions packages/core/rpc/BaseRpcDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ export default abstract class BaseRpcDriver {
if (!sessionId) {
throw new TypeError('sessionId is required')
}
let done = false
const worker = this.getWorker(sessionId, pluginManager)
const rpcMethod = pluginManager.getRpcMethodType(functionName)
const serializedArgs = await rpcMethod.serializeArguments(args, this.name)
Expand All @@ -237,17 +238,21 @@ export default abstract class BaseRpcDriver {
)

// now actually call the worker
const callP = worker.call(functionName, filteredAndSerializedArgs, {
timeout: 5 * 60 * 1000, // 5 minutes
statusCallback: args.statusCallback,
rpcDriverClassName: this.name,
...options,
})
const callP = worker
.call(functionName, filteredAndSerializedArgs, {
timeout: 5 * 60 * 1000, // 5 minutes
statusCallback: args.statusCallback,
rpcDriverClassName: this.name,
...options,
})
.finally(() => {
done = true
})

// check every 5 seconds to see if the worker has been killed, and
// reject the killedP promise if it has
let killedCheckInterval: ReturnType<typeof setInterval>
const killedP = new Promise((_resolve, reject) => {
const killedP = new Promise((resolve, reject) => {
killedCheckInterval = setInterval(() => {
// must've been killed
if (worker.status === 'killed') {
Expand All @@ -256,6 +261,8 @@ export default abstract class BaseRpcDriver {
`operation timed out, worker process stopped responding, ${worker.error}`,
),
)
} else if (done) {
resolve()
}
}, this.workerCheckFrequency)
}).finally(() => {
Expand All @@ -265,8 +272,8 @@ export default abstract class BaseRpcDriver {
// the result is a race between the actual result promise, and the "killed"
// promise. the killed promise will only actually win if the worker was
// killed before the call could return
const resultP = Promise.race([callP, killedP])
const result = await Promise.race([callP, killedP])

return rpcMethod.deserializeReturn(await resultP, args, this.name)
return rpcMethod.deserializeReturn(result, args, this.name)
}
}

0 comments on commit f6f719e

Please sign in to comment.