diff --git a/.changeset/few-colts-mate.md b/.changeset/few-colts-mate.md new file mode 100644 index 0000000000..43be011449 --- /dev/null +++ b/.changeset/few-colts-mate.md @@ -0,0 +1,5 @@ +--- +'@signalwire/core': patch +--- + +[internal] add `runWorker` api to replace setWorker/attachWorker combo diff --git a/.changeset/modern-terms-wave.md b/.changeset/modern-terms-wave.md new file mode 100644 index 0000000000..75ea1eef81 --- /dev/null +++ b/.changeset/modern-terms-wave.md @@ -0,0 +1,5 @@ +--- +'@signalwire/realtime-api': patch +--- + +[internal] remove usage of synthetic events for dial/answer/hangup diff --git a/.changeset/real-bikes-travel.md b/.changeset/real-bikes-travel.md new file mode 100644 index 0000000000..5f28e32a05 --- /dev/null +++ b/.changeset/real-bikes-travel.md @@ -0,0 +1,5 @@ +--- +'@signalwire/core': patch +--- + +[internal] add option to skip caching the base instance when using the event emitter transform pipeline. diff --git a/packages/core/src/BaseComponent.ts b/packages/core/src/BaseComponent.ts index 892f12f215..3ab486f700 100644 --- a/packages/core/src/BaseComponent.ts +++ b/packages/core/src/BaseComponent.ts @@ -21,7 +21,7 @@ import { SDKWorker, SDKWorkerDefinition, SessionAuthStatus, - SDKWorkerParams, + AttachSDKWorkerParams, } from './utils/interfaces' import { EventEmitter } from './utils/EventEmitter' import { SDKState } from './redux/interfaces' @@ -289,7 +289,11 @@ export class BaseComponent< transform: EventTransform payload: unknown }): BaseComponent { - if (!this._eventsTransformsCache.has(internalEvent)) { + if (transform.mode === "no-cache") { + const instance = transform.instanceFactory(payload) + + return instance + } else if (!this._eventsTransformsCache.has(internalEvent)) { const instance = transform.instanceFactory(payload) this._eventsTransformsCache.set(internalEvent, instance) @@ -876,29 +880,62 @@ export class BaseComponent< }) } - /** @internal */ - protected setWorker(name: string, def: SDKWorkerDefinition) { - this._workers.set(name, def) + protected runWorker(name: string, def: SDKWorkerDefinition) { + if (this._workers.has(name)) { + getLogger().warn( + `[runWorker] Worker with name ${name} has already been registerd.` + ) + } else { + this._setWorker(name, def) + } + + this._attachWorker(name, def) } - /** @internal */ - protected attachWorkers(params: Partial> = {}) { - return this._workers.forEach(({ worker }, name) => { - const task = this.store.runSaga(worker, { - instance: this, - runSaga: this.store.runSaga, + /** + * @internal + * @deprecated use {@link runWorker} instead + */ + protected setWorker(name: string, def: SDKWorkerDefinition) { + this._setWorker(name, def) + } + + /** + * @internal + * @deprecated use {@link runWorker} instead + */ + protected attachWorkers(params: AttachSDKWorkerParams = {}) { + return this._workers.forEach(({ worker, ...workerOptions }, name) => { + this._attachWorker(name, { + worker, + ...workerOptions, ...params, }) - this._runningWorkers.push(task) - /** - * Attaching workers is a one-time op for instances so - * the moment we attach one we'll remove it from the - * queue. - */ - this._workers.delete(name) }) } + private _setWorker(name: string, def: SDKWorkerDefinition) { + this._workers.set(name, def) + } + + private _attachWorker( + name: string, + { worker, ...params }: SDKWorkerDefinition + ) { + const task = this.store.runSaga(worker, { + instance: this, + runSaga: this.store.runSaga, + ...params, + }) + this._runningWorkers.push(task) + /** + * Attaching workers is a one-time op for instances so + * the moment we attach one we'll remove it from the + * queue. + */ + this._workers.delete(name) + } + private detachWorkers() { this._runningWorkers.forEach((task) => { task.cancel() diff --git a/packages/core/src/redux/features/session/sessionSaga.ts b/packages/core/src/redux/features/session/sessionSaga.ts index e5d35331a9..577af8e408 100644 --- a/packages/core/src/redux/features/session/sessionSaga.ts +++ b/packages/core/src/redux/features/session/sessionSaga.ts @@ -88,7 +88,7 @@ export function* executeActionWatcher(session: BaseSession): SagaIterator { ) } } catch (error) { - getLogger().warn('worker error', componentId, error) + getLogger().warn('worker error', componentId, JSON.stringify(error)) if (componentId && requestId) { yield put( componentActions.executeFailure({ diff --git a/packages/core/src/types/utils.ts b/packages/core/src/types/utils.ts index 9dcd2dc042..db2c7756de 100644 --- a/packages/core/src/types/utils.ts +++ b/packages/core/src/types/utils.ts @@ -173,3 +173,11 @@ export type DeepReadonly = T extends Builtin : IsUnknown extends true ? unknown : Readonly + +/** + * If one property is present then all properties should be + * present. + */ +export type AllOrNone> = + | T + | Partial> diff --git a/packages/core/src/utils/interfaces.ts b/packages/core/src/utils/interfaces.ts index 90c5c17a39..773647f2c1 100644 --- a/packages/core/src/utils/interfaces.ts +++ b/packages/core/src/utils/interfaces.ts @@ -14,6 +14,7 @@ import type { } from '../redux/interfaces' import type { URL as NodeURL } from 'node:url' import { + AllOrNone, CallingTransformType, ChatJSONRPCMethod, ChatTransformType, @@ -410,6 +411,12 @@ export interface EventTransform { * Allow us to define the `event_channel` for the Proxy. */ getInstanceEventChannel?: (payload: any) => string + /** + * Determines if the instance created by `instanceFactory` + * should be cached per event. This is the instance that + * will be passed to our event handlers + */ + mode?: 'cache' | 'no-cache' } export type BaseEventHandler = (...args: any[]) => void @@ -419,7 +426,12 @@ export type InternalChannels = { swEventChannel: SwEventChannel } -export interface SDKWorkerParams { +type SDKWorkerHooks = AllOrNone<{ + onDone: (options?: Partial>) => void + onFail: (options?: Partial>) => void +}> + +type SDKWorkerBaseParams = { channels: InternalChannels instance: T runSaga: any @@ -430,11 +442,15 @@ export interface SDKWorkerParams { payload?: any } +export type SDKWorkerParams = SDKWorkerBaseParams & SDKWorkerHooks + +export type AttachSDKWorkerParams = Partial> + export type SDKWorker = (params: SDKWorkerParams) => SagaIterator -export interface SDKWorkerDefinition { +export type SDKWorkerDefinition = { worker: SDKWorker -} +} & SDKWorkerHooks interface LogFn { (obj: T, msg?: string, ...args: any[]): void diff --git a/packages/realtime-api/src/voice/Call.ts b/packages/realtime-api/src/voice/Call.ts index c20f421cb5..7d4fb1d44f 100644 --- a/packages/realtime-api/src/voice/Call.ts +++ b/packages/realtime-api/src/voice/Call.ts @@ -32,14 +32,11 @@ import { CallingCallStateEventParams, VoiceCallConnectMethodParams, CallingCallConnectEventParams, - CallingCallDialEvent, } from '@signalwire/core' import { RealTimeCallApiEvents } from '../types' import { AutoApplyTransformsConsumer } from '../AutoApplyTransformsConsumer' import { toInternalDevices, toInternalPlayParams } from './utils' import { - SYNTHETIC_CALL_STATE_ANSWERED_EVENT, - SYNTHETIC_CALL_STATE_ENDED_EVENT, voiceCallStateWorker, voiceCallPlayWorker, voiceCallRecordWorker, @@ -47,8 +44,6 @@ import { voiceCallTapWorker, voiceCallConnectWorker, voiceCallDialWorker, - SYNTHETIC_CALL_DIAL_ANSWERED_EVENT, - SYNTHETIC_CALL_DIAL_FAILED_EVENT, } from './workers' import { createCallPlaybackObject } from './CallPlayback' import { CallRecording, createCallRecordingObject } from './CallRecording' @@ -137,7 +132,7 @@ export class CallConsumer extends AutoApplyTransformsConsumer { - // TODO: pass resolve/reject to the worker instead of use synthetic events? - this.setWorker('voiceCallDialWorker', { + this.runWorker('voiceCallDialWorker', { worker: voiceCallDialWorker, + onDone: resolve, + onFail: reject, }) - this.attachWorkers() - - const dialAnswerHandler = ( - /** - * This event implies that dial_state === "answered", - * which implies `call` to be defined. - */ - payload: Required - ) => { - // @ts-expect-error - this.off(SYNTHETIC_CALL_DIAL_FAILED_EVENT, dialFailHandler) - this.callId = payload.call.call_id - this.nodeId = payload.node_id - resolve(this) - } - const dialFailHandler = () => { - // @ts-expect-error - this.off(SYNTHETIC_CALL_DIAL_ANSWERED_EVENT, dialAnswerHandler) - reject(new Error('Failed to establish the call.')) - } - this.once( - // @ts-expect-error - SYNTHETIC_CALL_DIAL_ANSWERED_EVENT, - dialAnswerHandler - ) - // @ts-expect-error - this.once(SYNTHETIC_CALL_DIAL_FAILED_EVENT, dialFailHandler) this.execute({ method: 'calling.dial', @@ -360,12 +329,11 @@ export class CallConsumer extends AutoApplyTransformsConsumer { - resolve(undefined) + this.on('call.state', (params) => { + if (params.callState === 'ended') { + resolve(new Error('Failed to hangup the call.')) + } }) this.execute({ @@ -387,25 +355,15 @@ export class CallConsumer extends AutoApplyTransformsConsumer { - reject(new Error('Failed to answer the call.')) - } - - // TODO: pass resolve/reject to the worker instead of - // use synthetic events? - this.attachWorkers() - // @ts-expect-error - this.once(SYNTHETIC_CALL_STATE_ANSWERED_EVENT, () => { - // @ts-expect-error - this.off(SYNTHETIC_CALL_STATE_ENDED_EVENT, errorHandler) - - resolve(this) + this.on('call.state', (params) => { + if (params.callState === 'answered') { + resolve(this) + } else if (params.callState === 'ended') { + reject(new Error('Failed to answer the call.')) + } }) - // @ts-expect-error - this.once(SYNTHETIC_CALL_STATE_ENDED_EVENT, errorHandler) - this.execute({ method: 'calling.answer', params: { diff --git a/packages/realtime-api/src/voice/workers/index.ts b/packages/realtime-api/src/voice/workers/index.ts index 5e8f3ce184..b7522d8718 100644 --- a/packages/realtime-api/src/voice/workers/index.ts +++ b/packages/realtime-api/src/voice/workers/index.ts @@ -1,4 +1,3 @@ -import { toSyntheticEvent } from '@signalwire/core' export * from './voiceCallStateWorker' export * from './voiceCallReceiveWorker' export * from './voiceCallPlayWorker' @@ -7,18 +6,3 @@ export * from './voiceCallPromptWorker' export * from './voiceCallTapWorker' export * from './voiceCallConnectWorker' export * from './voiceCallDialWorker' - -export const SYNTHETIC_CALL_STATE_ANSWERED_EVENT = toSyntheticEvent( - 'calling.call.answered' -) - -export const SYNTHETIC_CALL_STATE_ENDED_EVENT = - toSyntheticEvent('calling.call.ended') - -export const SYNTHETIC_CALL_DIAL_ANSWERED_EVENT = toSyntheticEvent( - 'calling.call.dial.answered' -) - -export const SYNTHETIC_CALL_DIAL_FAILED_EVENT = toSyntheticEvent( - 'calling.call.dial.failed' -) diff --git a/packages/realtime-api/src/voice/workers/voiceCallDialWorker.ts b/packages/realtime-api/src/voice/workers/voiceCallDialWorker.ts index 7f157d5eda..192b8d8c91 100644 --- a/packages/realtime-api/src/voice/workers/voiceCallDialWorker.ts +++ b/packages/realtime-api/src/voice/workers/voiceCallDialWorker.ts @@ -8,10 +8,6 @@ import { CallingCallDialEvent, } from '@signalwire/core' import type { Call } from '../Call' -import { - SYNTHETIC_CALL_DIAL_ANSWERED_EVENT, - SYNTHETIC_CALL_DIAL_FAILED_EVENT, -} from './' const TARGET_DIAL_STATES: CallingCallDialEvent['params']['dial_state'][] = [ 'answered', @@ -21,8 +17,8 @@ const TARGET_DIAL_STATES: CallingCallDialEvent['params']['dial_state'][] = [ export const voiceCallDialWorker: SDKWorker = function* ( options ): SagaIterator { - const { channels, instance } = options - const { swEventChannel, pubSubChannel } = channels + const { channels, instance, onDone, onFail } = options + const { swEventChannel } = channels getLogger().trace('voiceCallDialWorker started') const action: MapToPubSubShape = yield sagaEffects.take( @@ -39,19 +35,9 @@ export const voiceCallDialWorker: SDKWorker = function* ( ) if (action.payload.dial_state === 'answered') { - yield sagaEffects.put(pubSubChannel, { - // @ts-expect-error - type: SYNTHETIC_CALL_DIAL_ANSWERED_EVENT, - // @ts-expect-error - payload: action.payload, - }) + onDone?.() } else if (action.payload.dial_state === 'failed') { - yield sagaEffects.put(pubSubChannel, { - // @ts-expect-error - type: SYNTHETIC_CALL_DIAL_FAILED_EVENT, - // @ts-expect-error - payload: action.payload, - }) + onFail?.() } else { throw new Error('[voiceCallDialWorker] unhandled call_state') } diff --git a/packages/realtime-api/src/voice/workers/voiceCallPlayWorker.ts b/packages/realtime-api/src/voice/workers/voiceCallPlayWorker.ts index aabf631462..3a5211762b 100644 --- a/packages/realtime-api/src/voice/workers/voiceCallPlayWorker.ts +++ b/packages/realtime-api/src/voice/workers/voiceCallPlayWorker.ts @@ -33,6 +33,7 @@ export const voiceCallPlayWorker: SDKWorker = function* ( ) }) + /** Add `tag` to the payload to allow pubSubSaga to match it with the Call namespace */ const payloadWithTag = { tag: instance.tag, ...action.payload, diff --git a/packages/realtime-api/src/voice/workers/voiceCallStateWorker.ts b/packages/realtime-api/src/voice/workers/voiceCallStateWorker.ts index 2f6039198d..a7fd937985 100644 --- a/packages/realtime-api/src/voice/workers/voiceCallStateWorker.ts +++ b/packages/realtime-api/src/voice/workers/voiceCallStateWorker.ts @@ -8,27 +8,24 @@ import { MapToPubSubShape, } from '@signalwire/core' import type { Call } from '../Call' -import { - SYNTHETIC_CALL_STATE_ANSWERED_EVENT, - SYNTHETIC_CALL_STATE_ENDED_EVENT, -} from './' export const voiceCallStateWorker: SDKWorker = function* ( options ): SagaIterator { const { channels, instance } = options const { swEventChannel, pubSubChannel } = channels - getLogger().trace('voiceCallStateWorker started') + getLogger().trace('voiceCallStateWorker started', instance.id, instance.tag) let run = true + const done = () => (run = false) + while (run) { const action: MapToPubSubShape = yield sagaEffects.take(swEventChannel, (action: SDKActions) => { - if ( - action.type === 'calling.call.state' - ) { - // To avoid mixing events on `connect` we check for `instance.id` - // if there's already a callId value. + if (action.type === 'calling.call.state') { + // To avoid mixing events on `connect` we check + // for `instance.id` if there's already a callId + // value. if (instance.id) { return instance.id === action.payload.call_id } @@ -37,11 +34,22 @@ export const voiceCallStateWorker: SDKWorker = function* ( return false }) - // Inject `tag` to have our EE to work because inbound - // calls don't have tags. + /** + * Override (or inject) "tag" with `instance.tag` + * because we use it as namespace in the EE and: + * - all the inbound legs have no "tag" in the + * `calling.call.state` events + * - all the legs created by a "connect" RPC will share + * the same "tag" of the originator leg to allow the + * SDK to make a relation + * + * Since in the SDK each Call has its own "tag" + * (__uuid), we need to target them through the EE with + * the right "tag". + */ const newPayload = { - tag: instance.tag, ...action.payload, + tag: instance.tag, } /** @@ -52,23 +60,10 @@ export const voiceCallStateWorker: SDKWorker = function* ( payload: newPayload, }) - if (action.payload.call_state === 'answered') { - yield sagaEffects.put(pubSubChannel, { - // @ts-expect-error - type: SYNTHETIC_CALL_STATE_ANSWERED_EVENT, - // @ts-expect-error - payload: newPayload, - }) - } else if (action.payload.call_state === 'ended') { - run = false - - yield sagaEffects.put(pubSubChannel, { - // @ts-expect-error - type: SYNTHETIC_CALL_STATE_ENDED_EVENT, - // @ts-expect-error - payload: newPayload, - }) + if (newPayload.call_state === 'ended') { + done() } } - getLogger().trace('voiceCallStateWorker ended') + + getLogger().info('voiceCallStateWorker ended', instance.id, instance.tag) }