Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: viz and quantel callback leak #344

Merged
merged 3 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
QuantelTrackedState,
QuantelTrackedStatePort,
} from './types'
import { WaitGroup } from '../../waitGroup'

const SOFT_JUMP_WAIT_TIME = 250

Expand All @@ -30,9 +31,7 @@ export class QuantelManager extends EventEmitter {
port: {},
}
private _cache = new Cache()
private _waitWithPorts: {
[portId: string]: Function[]
} = {}
private _waitWithPorts = new WaitGroup()
private _retryLoadFragmentsTimeout: { [portId: string]: NodeJS.Timeout } = {}
private _failedAction: {
[portId: string]: {
Expand Down Expand Up @@ -591,23 +590,13 @@ export class QuantelManager extends EventEmitter {
})
}
public clearAllWaitWithPort(portId: string) {
if (!this._waitWithPorts[portId]) {
_.each(this._waitWithPorts[portId], (fcn) => {
fcn(true)
})
}
this._waitWithPorts.clearAllForKey(portId)
}
/**
* Returns true if the wait was cleared from someone else
*/
private async waitWithPort(portId: string, delay: number): Promise<boolean> {
return new Promise((resolve) => {
if (!this._waitWithPorts[portId]) this._waitWithPorts[portId] = []
this._waitWithPorts[portId].push(resolve)
setTimeout(() => {
resolve(false)
}, delay || 0)
})
return this._waitWithPorts.waitOnKey(portId, delay)
}
}
class Cache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
import { VizEngineTcpSender } from './vizEngineTcpSender'
import * as crypto from 'crypto'
import * as path from 'path'
import { WaitGroup } from '../../waitGroup'

/** Minimum time to wait before removing an element after an expectedPlayoutItem has been removed */
const DELETE_TIME_WAIT = 20 * 1000
Expand Down Expand Up @@ -82,9 +83,7 @@ export class VizMSEManager extends EventEmitter {
private _mseConnected: boolean | undefined = undefined // undefined: first connection not established yet
private _msePingConnected = false
private _loadingAllElements = false
private _waitWithLayers: {
[portId: string]: Function[]
} = {}
private _waitWithLayers = new WaitGroup()
public ignoreAllWaits = false // Only to be used in tests
private _terminated = false
private _activeRundownPlaylistId: string | undefined
Expand Down Expand Up @@ -1240,24 +1239,15 @@ export class VizMSEManager extends EventEmitter {
this.emit('connectionChanged', this._mseConnected && this._msePingConnected)
}

public clearAllWaitWithLayer(portId: string) {
if (!this._waitWithLayers[portId]) {
_.each(this._waitWithLayers[portId], (fcn) => {
fcn(true)
})
}
public clearAllWaitWithLayer(_portId: string) {
// HACK: Prior to #344 this was broken. This has been left in the broken state until it can be tested that the 'fix' doesn't cause other issues SOFIE-3419
// this._waitWithLayers.clearAllForKey(portId)
}
/**
* Returns true if the wait was cleared from someone else
*/
private async waitWithLayer(layerId: string, delay: number): Promise<boolean> {
return new Promise((resolve) => {
if (!this._waitWithLayers[layerId]) this._waitWithLayers[layerId] = []
this._waitWithLayers[layerId].push(resolve)
setTimeout(() => {
resolve(false)
}, delay || 0)
})
return this._waitWithLayers.waitOnKey(layerId, delay)
}
private getElementsToKeep(): VIZMSEPlayoutItemContentExternal[] {
return this._expectedPlayoutItems
Expand Down
48 changes: 48 additions & 0 deletions packages/timeline-state-resolver/src/waitGroup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
type ResolveFn = (value: boolean) => void

/**
* A WaitGroup is used to wait for a number of operations to complete, or timeout
*/
export class WaitGroup {
Julusian marked this conversation as resolved.
Show resolved Hide resolved
#store: Map<string, Map<number, ResolveFn>> = new Map()
#nextId = 0

/**
* Resolve all waiting operations for a key, with success
*/
clearAllForKey(key: string): void {
const callbacks = this.#store.get(key)
if (!callbacks) return

this.#store.delete(key)

for (const resolve of callbacks.values()) {
resolve(true)
}
}

/**
* Wait for a key to be resolved (true), or timeout (false)
*/
async waitOnKey(key: string, delay: number): Promise<boolean> {
let callbacks = this.#store.get(key)
if (!callbacks) {
callbacks = new Map()
this.#store.set(key, callbacks)
}
const callbacks2 = callbacks

const id = this.#nextId++

return new Promise((resolve) => {
const callbackWithCleanup = (value: boolean) => {
callbacks2.delete(id)

resolve(value)
}

callbacks2.set(id, callbackWithCleanup)
setTimeout(() => callbackWithCleanup(false), delay || 0)
})
}
}
Loading