Skip to content

Commit

Permalink
New Workers API + removal of Synthetic events for dial/answer/hangup (#…
Browse files Browse the repository at this point in the history
…496)

* add new apis for handling workers

* wip for moving to the new api, remove synthetic events

* add option to skip caching the base instance

* remove last call of attachWorkers

* add dirty debugging files

* fix hangup logic

* add logs for callId/tag in voiceCallStateWorker

* do override for "tag" in calling.call.state" events

* kill the voiceCallStateWorker on ended

* prettier

* fix params access

* remove todos

* update comment in voiceCalStateWorker

* add comment on voiceCallPlayWorker

* remove commented import

* fix casing

* format comments

* add changeset

Co-authored-by: Edoardo Gallo <edo91.gallo@gmail.com>
  • Loading branch information
framini and edolix committed Apr 20, 2022
1 parent 5b58ae6 commit 143b574
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 142 deletions.
5 changes: 5 additions & 0 deletions .changeset/few-colts-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@signalwire/core': patch
---

[internal] add `runWorker` api to replace setWorker/attachWorker combo
5 changes: 5 additions & 0 deletions .changeset/modern-terms-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@signalwire/realtime-api': patch
---

[internal] remove usage of synthetic events for dial/answer/hangup
5 changes: 5 additions & 0 deletions .changeset/real-bikes-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@signalwire/core': patch
---

[internal] add option to skip caching the base instance when using the event emitter transform pipeline.
73 changes: 55 additions & 18 deletions packages/core/src/BaseComponent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
SDKWorker,
SDKWorkerDefinition,
SessionAuthStatus,
SDKWorkerParams,
AttachSDKWorkerParams,
} from './utils/interfaces'
import { EventEmitter } from './utils/EventEmitter'
import { SDKState } from './redux/interfaces'
Expand Down Expand Up @@ -288,7 +288,11 @@ export class BaseComponent<
transform: EventTransform
payload: unknown
}): BaseComponent<EventTypes> {
if (!this._eventsTransformsCache.has(internalEvent)) {
if (transform.mode === "no-cache") {
const instance = transform.instanceFactory(payload)

return instance
} else if (!this._eventsTransformsCache.has(internalEvent)) {
const instance = transform.instanceFactory(payload)
this._eventsTransformsCache.set(internalEvent, instance)

Expand Down Expand Up @@ -884,29 +888,62 @@ export class BaseComponent<
})
}

/** @internal */
protected setWorker(name: string, def: SDKWorkerDefinition) {
this._workers.set(name, def)
protected runWorker(name: string, def: SDKWorkerDefinition) {
if (this._workers.has(name)) {
getLogger().warn(
`[runWorker] Worker with name ${name} has already been registerd.`
)
} else {
this._setWorker(name, def)
}

this._attachWorker(name, def)
}

/** @internal */
protected attachWorkers(params: Partial<SDKWorkerParams<any>> = {}) {
return this._workers.forEach(({ worker }, name) => {
const task = this.store.runSaga(worker, {
instance: this,
runSaga: this.store.runSaga,
/**
* @internal
* @deprecated use {@link runWorker} instead
*/
protected setWorker(name: string, def: SDKWorkerDefinition) {
this._setWorker(name, def)
}

/**
* @internal
* @deprecated use {@link runWorker} instead
*/
protected attachWorkers(params: AttachSDKWorkerParams<any> = {}) {
return this._workers.forEach(({ worker, ...workerOptions }, name) => {
this._attachWorker(name, {
worker,
...workerOptions,
...params,
})
this._runningWorkers.push(task)
/**
* Attaching workers is a one-time op for instances so
* the moment we attach one we'll remove it from the
* queue.
*/
this._workers.delete(name)
})
}

private _setWorker(name: string, def: SDKWorkerDefinition) {
this._workers.set(name, def)
}

private _attachWorker(
name: string,
{ worker, ...params }: SDKWorkerDefinition
) {
const task = this.store.runSaga(worker, {
instance: this,
runSaga: this.store.runSaga,
...params,
})
this._runningWorkers.push(task)
/**
* Attaching workers is a one-time op for instances so
* the moment we attach one we'll remove it from the
* queue.
*/
this._workers.delete(name)
}

private detachWorkers() {
this._runningWorkers.forEach((task) => {
task.cancel()
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/redux/features/session/sessionSaga.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export function* executeActionWatcher(session: BaseSession): SagaIterator {
)
}
} catch (error) {
getLogger().warn('worker error', componentId, error)
getLogger().warn('worker error', componentId, JSON.stringify(error))
if (componentId && requestId) {
yield put(
componentActions.executeFailure({
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/types/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,11 @@ export type DeepReadonly<T> = T extends Builtin
: IsUnknown<T> extends true
? unknown
: Readonly<T>

/**
* If one property is present then all properties should be
* present.
*/
export type AllOrNone<T extends Record<any, any>> =
| T
| Partial<Record<keyof T, never>>
22 changes: 19 additions & 3 deletions packages/core/src/utils/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type {
} from '../redux/interfaces'
import type { URL as NodeURL } from 'node:url'
import {
AllOrNone,
CallingTransformType,
ChatJSONRPCMethod,
ChatTransformType,
Expand Down Expand Up @@ -438,6 +439,12 @@ export interface EventTransform {
* Allow us to define the `event_channel` for the Proxy.
*/
getInstanceEventChannel?: (payload: any) => string
/**
* Determines if the instance created by `instanceFactory`
* should be cached per event. This is the instance that
* will be passed to our event handlers
*/
mode?: 'cache' | 'no-cache'
}

export type BaseEventHandler = (...args: any[]) => void
Expand All @@ -447,7 +454,12 @@ export type InternalChannels = {
swEventChannel: SwEventChannel
}

export interface SDKWorkerParams<T> {
type SDKWorkerHooks<T> = AllOrNone<{
onDone: (options?: Partial<SDKWorkerParams<T>>) => void
onFail: (options?: Partial<SDKWorkerParams<T>>) => void
}>

type SDKWorkerBaseParams<T> = {
channels: InternalChannels
instance: T
runSaga: any
Expand All @@ -458,11 +470,15 @@ export interface SDKWorkerParams<T> {
payload?: any
}

export type SDKWorkerParams<T> = SDKWorkerBaseParams<T> & SDKWorkerHooks<any>

export type AttachSDKWorkerParams<T> = Partial<SDKWorkerBaseParams<T>>

export type SDKWorker<T> = (params: SDKWorkerParams<T>) => SagaIterator<any>

export interface SDKWorkerDefinition {
export type SDKWorkerDefinition = {
worker: SDKWorker<any>
}
} & SDKWorkerHooks<any>

interface LogFn {
<T extends object>(obj: T, msg?: string, ...args: any[]): void
Expand Down
70 changes: 14 additions & 56 deletions packages/realtime-api/src/voice/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,18 @@ import {
CallingCallStateEventParams,
VoiceCallConnectMethodParams,
CallingCallConnectEventParams,
CallingCallDialEvent,
} from '@signalwire/core'
import { RealTimeCallApiEvents } from '../types'
import { AutoApplyTransformsConsumer } from '../AutoApplyTransformsConsumer'
import { toInternalDevices, toInternalPlayParams } from './utils'
import {
SYNTHETIC_CALL_STATE_ANSWERED_EVENT,
SYNTHETIC_CALL_STATE_ENDED_EVENT,
voiceCallStateWorker,
voiceCallPlayWorker,
voiceCallRecordWorker,
voiceCallPromptWorker,
voiceCallTapWorker,
voiceCallConnectWorker,
voiceCallDialWorker,
SYNTHETIC_CALL_DIAL_ANSWERED_EVENT,
SYNTHETIC_CALL_DIAL_FAILED_EVENT,
} from './workers'
import { createCallPlaybackObject } from './CallPlayback'
import { CallRecording, createCallRecordingObject } from './CallRecording'
Expand Down Expand Up @@ -137,7 +132,7 @@ export class CallConsumer extends AutoApplyTransformsConsumer<RealTimeCallApiEve
* server. Changes will be available to the consumer via
* our Proxy API.
*/
this.setWorker('voiceCallStateWorker', {
this.runWorker('voiceCallStateWorker', {
worker: voiceCallStateWorker,
})
}
Expand Down Expand Up @@ -305,37 +300,11 @@ export class CallConsumer extends AutoApplyTransformsConsumer<RealTimeCallApiEve

dial(params: VoiceCallDialMethodParams) {
return new Promise((resolve, reject) => {
// TODO: pass resolve/reject to the worker instead of use synthetic events?
this.setWorker('voiceCallDialWorker', {
this.runWorker('voiceCallDialWorker', {
worker: voiceCallDialWorker,
onDone: resolve,
onFail: reject,
})
this.attachWorkers()

const dialAnswerHandler = (
/**
* This event implies that dial_state === "answered",
* which implies `call` to be defined.
*/
payload: Required<CallingCallDialEvent['params']>
) => {
// @ts-expect-error
this.off(SYNTHETIC_CALL_DIAL_FAILED_EVENT, dialFailHandler)
this.callId = payload.call.call_id
this.nodeId = payload.node_id
resolve(this)
}
const dialFailHandler = () => {
// @ts-expect-error
this.off(SYNTHETIC_CALL_DIAL_ANSWERED_EVENT, dialAnswerHandler)
reject(new Error('Failed to establish the call.'))
}
this.once(
// @ts-expect-error
SYNTHETIC_CALL_DIAL_ANSWERED_EVENT,
dialAnswerHandler
)
// @ts-expect-error
this.once(SYNTHETIC_CALL_DIAL_FAILED_EVENT, dialFailHandler)

this.execute({
method: 'calling.dial',
Expand All @@ -360,12 +329,11 @@ export class CallConsumer extends AutoApplyTransformsConsumer<RealTimeCallApiEve
)
}

// TODO: pass resolve/reject to the worker instead of use synthetic events?
this.attachWorkers()

// @ts-expect-error
this.once(SYNTHETIC_CALL_STATE_ENDED_EVENT, () => {
resolve(undefined)
this.on('call.state', (params) => {
if (params.callState === 'ended') {
resolve(new Error('Failed to hangup the call.'))
}
})

this.execute({
Expand All @@ -387,25 +355,15 @@ export class CallConsumer extends AutoApplyTransformsConsumer<RealTimeCallApiEve
reject(new Error(`Can't call answer() on a call without callId.`))
}

const errorHandler = () => {
reject(new Error('Failed to answer the call.'))
}

// TODO: pass resolve/reject to the worker instead of
// use synthetic events?
this.attachWorkers()

// @ts-expect-error
this.once(SYNTHETIC_CALL_STATE_ANSWERED_EVENT, () => {
// @ts-expect-error
this.off(SYNTHETIC_CALL_STATE_ENDED_EVENT, errorHandler)

resolve(this)
this.on('call.state', (params) => {
if (params.callState === 'answered') {
resolve(this)
} else if (params.callState === 'ended') {
reject(new Error('Failed to answer the call.'))
}
})

// @ts-expect-error
this.once(SYNTHETIC_CALL_STATE_ENDED_EVENT, errorHandler)

this.execute({
method: 'calling.answer',
params: {
Expand Down
16 changes: 0 additions & 16 deletions packages/realtime-api/src/voice/workers/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { toSyntheticEvent } from '@signalwire/core'
export * from './voiceCallStateWorker'
export * from './voiceCallReceiveWorker'
export * from './voiceCallPlayWorker'
Expand All @@ -7,18 +6,3 @@ export * from './voiceCallPromptWorker'
export * from './voiceCallTapWorker'
export * from './voiceCallConnectWorker'
export * from './voiceCallDialWorker'

export const SYNTHETIC_CALL_STATE_ANSWERED_EVENT = toSyntheticEvent(
'calling.call.answered'
)

export const SYNTHETIC_CALL_STATE_ENDED_EVENT =
toSyntheticEvent('calling.call.ended')

export const SYNTHETIC_CALL_DIAL_ANSWERED_EVENT = toSyntheticEvent(
'calling.call.dial.answered'
)

export const SYNTHETIC_CALL_DIAL_FAILED_EVENT = toSyntheticEvent(
'calling.call.dial.failed'
)
22 changes: 4 additions & 18 deletions packages/realtime-api/src/voice/workers/voiceCallDialWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import {
CallingCallDialEvent,
} from '@signalwire/core'
import type { Call } from '../Call'
import {
SYNTHETIC_CALL_DIAL_ANSWERED_EVENT,
SYNTHETIC_CALL_DIAL_FAILED_EVENT,
} from './'

const TARGET_DIAL_STATES: CallingCallDialEvent['params']['dial_state'][] = [
'answered',
Expand All @@ -21,8 +17,8 @@ const TARGET_DIAL_STATES: CallingCallDialEvent['params']['dial_state'][] = [
export const voiceCallDialWorker: SDKWorker<Call> = function* (
options
): SagaIterator {
const { channels, instance } = options
const { swEventChannel, pubSubChannel } = channels
const { channels, instance, onDone, onFail } = options
const { swEventChannel } = channels
getLogger().trace('voiceCallDialWorker started')

const action: MapToPubSubShape<CallingCallDialEvent> = yield sagaEffects.take(
Expand All @@ -39,19 +35,9 @@ export const voiceCallDialWorker: SDKWorker<Call> = function* (
)

if (action.payload.dial_state === 'answered') {
yield sagaEffects.put(pubSubChannel, {
// @ts-expect-error
type: SYNTHETIC_CALL_DIAL_ANSWERED_EVENT,
// @ts-expect-error
payload: action.payload,
})
onDone?.()
} else if (action.payload.dial_state === 'failed') {
yield sagaEffects.put(pubSubChannel, {
// @ts-expect-error
type: SYNTHETIC_CALL_DIAL_FAILED_EVENT,
// @ts-expect-error
payload: action.payload,
})
onFail?.()
} else {
throw new Error('[voiceCallDialWorker] unhandled call_state')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export const voiceCallPlayWorker: SDKWorker<Call> = function* (
)
})

/** Add `tag` to the payload to allow pubSubSaga to match it with the Call namespace */
const payloadWithTag = {
tag: instance.tag,
...action.payload,
Expand Down
Loading

0 comments on commit 143b574

Please sign in to comment.