From dde37527d7df2cbebc88b438497598c4dcc63ba8 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 23 Aug 2024 17:20:23 +0100 Subject: [PATCH 1/3] feat: refactor viz `waitWithLayer` and quantel `waitWithPort` to avoid leaking callbacks --- .../src/integrations/quantel/connection.ts | 19 ++------- .../src/integrations/vizMSE/vizMSEManager.ts | 19 ++------- .../timeline-state-resolver/src/waitGroup.ts | 39 +++++++++++++++++++ 3 files changed, 47 insertions(+), 30 deletions(-) create mode 100644 packages/timeline-state-resolver/src/waitGroup.ts diff --git a/packages/timeline-state-resolver/src/integrations/quantel/connection.ts b/packages/timeline-state-resolver/src/integrations/quantel/connection.ts index 7c5fce368..937fa8b4a 100644 --- a/packages/timeline-state-resolver/src/integrations/quantel/connection.ts +++ b/packages/timeline-state-resolver/src/integrations/quantel/connection.ts @@ -15,6 +15,7 @@ import { QuantelTrackedState, QuantelTrackedStatePort, } from './types' +import { WaitGroup } from '../../waitGroup' const SOFT_JUMP_WAIT_TIME = 250 @@ -30,9 +31,7 @@ export class QuantelManager extends EventEmitter { port: {}, } private _cache = new Cache() - private _waitWithPorts: { - [portId: string]: Function[] - } = {} + private _waitWithPorts = new WaitGroup() private _retryLoadFragmentsTimeout: { [portId: string]: NodeJS.Timeout } = {} private _failedAction: { [portId: string]: { @@ -591,23 +590,13 @@ export class QuantelManager extends EventEmitter { }) } public clearAllWaitWithPort(portId: string) { - if (!this._waitWithPorts[portId]) { - _.each(this._waitWithPorts[portId], (fcn) => { - fcn(true) - }) - } + this._waitWithPorts.clearAllForKey(portId) } /** * Returns true if the wait was cleared from someone else */ private async waitWithPort(portId: string, delay: number): Promise { - return new Promise((resolve) => { - if (!this._waitWithPorts[portId]) this._waitWithPorts[portId] = [] - this._waitWithPorts[portId].push(resolve) - setTimeout(() => { - resolve(false) - }, delay || 0) - }) + return this._waitWithPorts.waitOnKey(portId, delay) } } class Cache { diff --git a/packages/timeline-state-resolver/src/integrations/vizMSE/vizMSEManager.ts b/packages/timeline-state-resolver/src/integrations/vizMSE/vizMSEManager.ts index 2b2b83b8a..51090459b 100644 --- a/packages/timeline-state-resolver/src/integrations/vizMSE/vizMSEManager.ts +++ b/packages/timeline-state-resolver/src/integrations/vizMSE/vizMSEManager.ts @@ -41,6 +41,7 @@ import { import { VizEngineTcpSender } from './vizEngineTcpSender' import * as crypto from 'crypto' import * as path from 'path' +import { WaitGroup } from '../../waitGroup' /** Minimum time to wait before removing an element after an expectedPlayoutItem has been removed */ const DELETE_TIME_WAIT = 20 * 1000 @@ -82,9 +83,7 @@ export class VizMSEManager extends EventEmitter { private _mseConnected: boolean | undefined = undefined // undefined: first connection not established yet private _msePingConnected = false private _loadingAllElements = false - private _waitWithLayers: { - [portId: string]: Function[] - } = {} + private _waitWithLayers = new WaitGroup() public ignoreAllWaits = false // Only to be used in tests private _terminated = false private _activeRundownPlaylistId: string | undefined @@ -1241,23 +1240,13 @@ export class VizMSEManager extends EventEmitter { } public clearAllWaitWithLayer(portId: string) { - if (!this._waitWithLayers[portId]) { - _.each(this._waitWithLayers[portId], (fcn) => { - fcn(true) - }) - } + this._waitWithLayers.clearAllForKey(portId) } /** * Returns true if the wait was cleared from someone else */ private async waitWithLayer(layerId: string, delay: number): Promise { - return new Promise((resolve) => { - if (!this._waitWithLayers[layerId]) this._waitWithLayers[layerId] = [] - this._waitWithLayers[layerId].push(resolve) - setTimeout(() => { - resolve(false) - }, delay || 0) - }) + return this._waitWithLayers.waitOnKey(layerId, delay) } private getElementsToKeep(): VIZMSEPlayoutItemContentExternal[] { return this._expectedPlayoutItems diff --git a/packages/timeline-state-resolver/src/waitGroup.ts b/packages/timeline-state-resolver/src/waitGroup.ts new file mode 100644 index 000000000..58bb97ab3 --- /dev/null +++ b/packages/timeline-state-resolver/src/waitGroup.ts @@ -0,0 +1,39 @@ +type ResolveFn = (value: boolean) => void + +export class WaitGroup { + #store: Map> = new Map() + #nextId = 0 + + clearAllForKey(key: string): void { + const callbacks = this.#store.get(key) + if (!callbacks) return + + this.#store.delete(key) + + for (const resolve of callbacks.values()) { + resolve(true) + } + } + + async waitOnKey(portId: string, delay: number): Promise { + let callbacks = this.#store.get(portId) + if (!callbacks) { + callbacks = new Map() + this.#store.set(portId, callbacks) + } + const callbacks2 = callbacks + + const id = this.#nextId++ + + return new Promise((resolve) => { + const callbackWithCleanup = (value: boolean) => { + callbacks2.delete(id) + + resolve(value) + } + + callbacks2.set(id, callbackWithCleanup) + setTimeout(() => callbackWithCleanup(false), delay || 0) + }) + } +} From a1bff7fc4bed4a8aabacb6f8b16675e7b7f1d5b8 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Mon, 26 Aug 2024 14:23:53 +0100 Subject: [PATCH 2/3] chore: review comment --- packages/timeline-state-resolver/src/waitGroup.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/timeline-state-resolver/src/waitGroup.ts b/packages/timeline-state-resolver/src/waitGroup.ts index 58bb97ab3..a2619ff0b 100644 --- a/packages/timeline-state-resolver/src/waitGroup.ts +++ b/packages/timeline-state-resolver/src/waitGroup.ts @@ -1,9 +1,15 @@ type ResolveFn = (value: boolean) => void +/** + * A WaitGroup is used to wait for a number of operations to complete, or timeout + */ export class WaitGroup { #store: Map> = new Map() #nextId = 0 + /** + * Resolve all waiting operations for a key, with success + */ clearAllForKey(key: string): void { const callbacks = this.#store.get(key) if (!callbacks) return @@ -15,11 +21,14 @@ export class WaitGroup { } } - async waitOnKey(portId: string, delay: number): Promise { - let callbacks = this.#store.get(portId) + /** + * Wait for a key to be resolved (true), or timeout (false) + */ + async waitOnKey(key: string, delay: number): Promise { + let callbacks = this.#store.get(key) if (!callbacks) { callbacks = new Map() - this.#store.set(portId, callbacks) + this.#store.set(key, callbacks) } const callbacks2 = callbacks From e0bcfee645ac5aeb8fd358d1882161913d2fe153 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Tue, 27 Aug 2024 09:58:15 +0100 Subject: [PATCH 3/3] chore: add comment and avoid 'breaking' viz --- .../src/integrations/vizMSE/vizMSEManager.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/timeline-state-resolver/src/integrations/vizMSE/vizMSEManager.ts b/packages/timeline-state-resolver/src/integrations/vizMSE/vizMSEManager.ts index 51090459b..d73f0b662 100644 --- a/packages/timeline-state-resolver/src/integrations/vizMSE/vizMSEManager.ts +++ b/packages/timeline-state-resolver/src/integrations/vizMSE/vizMSEManager.ts @@ -1239,8 +1239,9 @@ export class VizMSEManager extends EventEmitter { this.emit('connectionChanged', this._mseConnected && this._msePingConnected) } - public clearAllWaitWithLayer(portId: string) { - this._waitWithLayers.clearAllForKey(portId) + public clearAllWaitWithLayer(_portId: string) { + // HACK: Prior to #344 this was broken. This has been left in the broken state until it can be tested that the 'fix' doesn't cause other issues SOFIE-3419 + // this._waitWithLayers.clearAllForKey(portId) } /** * Returns true if the wait was cleared from someone else