Skip to content

Commit

Permalink
Task Namespace in realtime-api (#471)
Browse files Browse the repository at this point in the history
* new playground for task

* add support for contexts

* prettier

* add task namespace

* e2e tests for task

* changesets

* cleanup

* add task types

* type task worker

* remove Proxy usage in createTaskObject

* rm debug in e2e tasks

* solved fixme

* rename to task.received

* drop Job and expose send

* fixes

* update changeset from patch to minor
  • Loading branch information
edolix authored Mar 25, 2022
1 parent 4f66680 commit cf84560
Show file tree
Hide file tree
Showing 21 changed files with 366 additions and 8 deletions.
7 changes: 7 additions & 0 deletions .changeset/khaki-buses-hear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@sw-internal/e2e-realtime-api': minor
'@sw-internal/playground-realtime-api': minor
'@signalwire/core': minor
---

[internal] Add playground and e2e tests for Task namespace
5 changes: 5 additions & 0 deletions .changeset/thin-oranges-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@signalwire/realtime-api': minor
---

Add `Task` namespace
46 changes: 46 additions & 0 deletions internal/e2e-realtime-api/src/task.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Task } from '@signalwire/realtime-api'
import { createTestRunner } from './utils'

const handler = () => {
return new Promise<number>(async (resolve, reject) => {
const context = 'task-e2e'
const jobPayload = {
id: Date.now(),
item: 'foo',
}

const client = new Task.Client({
host: process.env.RELAY_HOST as string,
project: process.env.RELAY_PROJECT as string,
token: process.env.RELAY_TOKEN as string,
contexts: [context],
})

client.on('task.received', (payload) => {
if (payload.id === jobPayload.id && payload.item === 'foo') {
return resolve(0)
}
console.error('Invalid payload on `task.received`', payload)
return reject(4)
})

await Task.send({
host: process.env.RELAY_HOST as string,
project: process.env.RELAY_PROJECT as string,
token: process.env.RELAY_TOKEN as string,
context,
message: jobPayload,
})
})
}

async function main() {
const runner = createTestRunner({
name: 'Task E2E',
testHandler: handler,
})

await runner.run()
}

main()
28 changes: 28 additions & 0 deletions internal/playground-realtime-api/src/task/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Task } from '@signalwire/realtime-api'

const client = new Task.Client({
host: process.env.HOST || 'relay.swire.io',
project: process.env.PROJECT as string,
token: process.env.TOKEN as string,

contexts: ['office'],

debug: {
logWsTraffic: true,
},
})

client.on('task.received', (payload) => {
console.log('Task Received', payload)
})

setTimeout(async () => {
console.log('Sending to the client..')
await Task.send({
host: process.env.HOST || 'relay.swire.io',
project: process.env.PROJECT as string,
token: process.env.TOKEN as string,
context: 'office',
message: { yo: ['bro', 1, true] },
})
}, 2000)
3 changes: 3 additions & 0 deletions packages/core/src/BaseSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ export class BaseSession {
if (this._relayProtocolIsValid()) {
params.protocol = this.relayProtocol
}
if (this.options.contexts?.length) {
params.contexts = this.options.contexts
}
this._rpcConnectResult = await this.execute(RPCConnect(params))
}

Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export type {
CustomSaga,
PubSubChannel,
MapToPubSubShape,
SDKActions,
} from './redux/interfaces'
export * as actions from './redux/actions'
export * as sagaHelpers from './redux/utils/sagaHelpers'
Expand All @@ -77,4 +78,4 @@ export type { RoomSessionRecording, RoomSessionPlayback } from './rooms'
export const selectors = {
...sessionSelectors,
}
export { ChatMember, ChatMessage } from './chat'
export { ChatMember, ChatMessage } from './chat'
5 changes: 1 addition & 4 deletions packages/core/src/redux/features/pubSub/pubSubSaga.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ import {
getLogger,
} from '../../../utils'
import type { EventEmitter } from '../../../utils/EventEmitter'
import type {
PubSubChannel,
PubSubAction,
} from '../../interfaces'
import type { PubSubChannel, PubSubAction } from '../../interfaces'
import { findNamespaceInPayload } from '../shared/namespace'

type PubSubSagaParams = {
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/redux/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import type {
VideoAPIEventParams,
InternalVideoAPIEvent,
ChatAction,
TaskAction,
SwEventParams,
} from '../types'
import { SDKRunSaga } from '.'
import { MulticastChannel } from '@redux-saga/core'
import { END, MulticastChannel } from '@redux-saga/core'

interface SWComponent {
id: string
Expand Down Expand Up @@ -106,6 +107,9 @@ export type PubSubAction =
payload: Error | undefined
}
| ChatAction
| TaskAction

export type PubSubChannel = MulticastChannel<PubSubAction>
export type SwEventChannel = MulticastChannel<MapToPubSubShape<SwEventParams>>

export type SDKActions = MapToPubSubShape<SwEventParams> | END
5 changes: 4 additions & 1 deletion packages/core/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { VideoAPIEventParams, InternalVideoEventNames } from './video'
import type { SessionEvents, JSONRPCRequest } from '../utils/interfaces'
import type { CantinaEvent } from './cantina'
import type { ChatEvent } from './chat'
import type { TaskEvent } from './task'

export interface SwEvent {
event_channel: string
Expand All @@ -28,7 +29,7 @@ export interface EmitterContract<
): EmitterContract<EventTypes>

removeAllListeners<T extends EventEmitter.EventNames<EventTypes>>(
event?: T,
event?: T
): EmitterContract<EventTypes>
}

Expand Down Expand Up @@ -115,6 +116,7 @@ export type SwEventParams =
| WebRTCMessageParams
| CantinaEvent
| ChatEvent
| TaskEvent

// prettier-ignore
export type PubSubChannelEvents =
Expand All @@ -125,3 +127,4 @@ export * from './video'
export * from './utils'
export * from './cantina'
export * from './chat'
export * from './task'
45 changes: 45 additions & 0 deletions packages/core/src/types/task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type { OnlyStateProperties, OnlyFunctionProperties } from '..'

export type TaskReceivedEventName = 'task.received'

export type TaskEventNames = TaskReceivedEventName

export interface TaskContract {}

export type TaskEntity = OnlyStateProperties<TaskContract>
export type TaskMethods = Omit<
OnlyFunctionProperties<TaskContract>,
'subscribe' | 'unsubscribe' | 'updateToken'
>

/**
* ==========
* ==========
* Server-Side Events
* ==========
* ==========
*/

/**
* 'queuing.relay.tasks'
*/
export interface TaskInboundEvent {
event_type: 'queuing.relay.tasks'
context: string
message: Record<string, unknown>
timestamp: number
space_id: string
project_id: string
}

export type TaskEvent = TaskInboundEvent

/**
* TODO: update MapToPubSubShape in another PR
* not used MapToPubSubShape because queuing.relay.tasks
* has a different shape
*/
export type TaskAction = {
type: TaskReceivedEventName
payload: TaskInboundEvent
}
2 changes: 2 additions & 0 deletions packages/core/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ export const SYNTHETIC_EVENT_PREFIX = '__synthetic__'
export const PRODUCT_PREFIX_VIDEO = 'video'
export const PRODUCT_PREFIX_CANTINA = 'cantina-manager'
export const PRODUCT_PREFIX_CHAT = 'chat'
export const PRODUCT_PREFIX_TASK = 'tasking'

export const GLOBAL_VIDEO_EVENTS = ['room.started', 'room.ended'] as const

export const PRODUCT_PREFIXES = [
PRODUCT_PREFIX_VIDEO,
PRODUCT_PREFIX_CANTINA,
PRODUCT_PREFIX_CHAT,
PRODUCT_PREFIX_TASK,
] as const

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/utils/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ export interface SessionOptions {
project?: string
/** SignalWire project token, e.g. `PT9e5660c101cd140a1c93a0197640a369cf5f16975a0079c9` */
token: string
/** Relay contexts, e.g. 'home' or 'office' */
contexts?: string[]
// From `LogLevelDesc` of loglevel to simplify our docs
/** logging level */
logLevel?: 'trace' | 'debug' | 'info' | 'warn' | 'error' | 'silent'
Expand Down
13 changes: 12 additions & 1 deletion packages/core/src/utils/toInternalAction.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import { MapToPubSubShape } from '../redux/interfaces'

export const toInternalAction = <
T extends { event_type: unknown; params: unknown }
T extends { event_type: unknown; params?: unknown }
>(
event: T
) => {
const { event_type, params } = event

/**
* queuing.relay.tasks has a slightly different shape:
* no nested "params" so we return the whole event.
*/
if (event_type === 'queuing.relay.tasks') {
return {
type: event_type,
payload: event,
} as MapToPubSubShape<T>
}

return {
type: event_type,
payload: params,
Expand Down
3 changes: 3 additions & 0 deletions packages/realtime-api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,6 @@ export * as Chat from './chat/Chat'

/** @ignore */
export * from './configure'

/** @ignore */
export * as Task from './task/Task'
42 changes: 42 additions & 0 deletions packages/realtime-api/src/task/Task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import type { ConsumerContract, BaseComponentOptions } from '@signalwire/core'
import { connect, BaseConsumer } from '@signalwire/core'
import type { RealTimeTaskApiEvents } from '../types'
import { RealtimeClient } from '../client/index'
import { taskWorker } from './workers'

export interface Task extends ConsumerContract<RealTimeTaskApiEvents> {
/** @internal */
_session: RealtimeClient
}

/** @internal */
class TaskAPI extends BaseConsumer<RealTimeTaskApiEvents> {
constructor(options: BaseComponentOptions<RealTimeTaskApiEvents>) {
super(options)

this.setWorker('taskWorker', {
worker: taskWorker,
})
this.attachWorkers()
this._attachListeners('')
}
}

/** @internal */
export const createTaskObject = (
params: BaseComponentOptions<RealTimeTaskApiEvents>
): Task => {
const task = connect<RealTimeTaskApiEvents, TaskAPI, Task>({
store: params.store,
Component: TaskAPI,
componentListeners: {
errors: 'onError',
responses: 'onSuccess',
},
})(params)

return task
}

export * from './TaskClient'
export * from './send'
12 changes: 12 additions & 0 deletions packages/realtime-api/src/task/TaskClient.docs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { Task } from './Task'

export interface TaskClientDocs extends Task {
new (opts: {
/** SignalWire project id, e.g. `a10d8a9f-2166-4e82-56ff-118bc3a4840f` */
project: string
/** SignalWire project token, e.g. `PT9e5660c101cd140a1c93a0197640a369cf5f16975a0079c9` */
token: string
/** SignalWire contexts, e.g. 'home', 'office'... */
contexts: string[]
}): this
}
58 changes: 58 additions & 0 deletions packages/realtime-api/src/task/TaskClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import type { AssertSameType, UserOptions } from '@signalwire/core'
import { setupClient, clientConnect } from '../client/index'
import { TaskClientDocs } from './TaskClient.docs'
import type { Task } from './Task'
import { createTaskObject } from './Task'

interface TaskClientMain extends Task {
new (opts: TaskClientOptions): this
}

interface TaskClient extends AssertSameType<TaskClientMain, TaskClientDocs> {}

/** @ignore */
export interface TaskClientOptions
extends Omit<UserOptions, '_onRefreshToken'> {
contexts: string[]
}

/** @ignore */
const TaskClient = function (options?: TaskClientOptions) {
const { client, store, emitter } = setupClient(options)

const task = createTaskObject({
store,
emitter,
})

const taskOn: TaskClient['on'] = (...args) => {
clientConnect(client)

return task.on(...args)
}
const taskOnce: TaskClient['once'] = (...args) => {
clientConnect(client)

return task.once(...args)
}

const interceptors = {
on: taskOn,
once: taskOnce,
_session: client,
} as const

return new Proxy<Omit<TaskClient, 'new'>>(task, {
get(target, prop, receiver) {
if (prop in interceptors) {
// @ts-expect-error
return interceptors[prop]
}

return Reflect.get(target, prop, receiver)
},
})
// For consistency with other constructors we'll make TS force the use of `new`
} as unknown as { new (options?: TaskClientOptions): TaskClient }

export { TaskClient as Client }
Loading

0 comments on commit cf84560

Please sign in to comment.