From b1e211cadc5eb4711b20cb9ea43c493c9cd636b3 Mon Sep 17 00:00:00 2001 From: Ammar Ansari Date: Mon, 28 Aug 2023 18:06:31 +0200 Subject: [PATCH] PubSub and Chat namespace with new interface (#814) * Task namespace with new interface * taskworker include * extend task from applyeventlisteners * base namespace class to handle the listen method * topic attach to event name * type update * remove older Task api * refactor and e2e test case * utility function to prefix the event * PubSub namespace with new interface * new interface for the Chat API * fix stack tests * include e2e test for PubSub API * e2e test case for Chat interface * test disconnected client * unit tests for Base classes * Unit tests for the Task class * fix TS for the Task class unit test * unit tests for PubSub and Chat API classes * include changeset * Update packages/realtime-api/src/chat/workers/chatWorker.ts Co-authored-by: Edoardo Gallo * Update packages/realtime-api/src/chat/workers/chatWorker.ts Co-authored-by: Edoardo Gallo * Update packages/realtime-api/src/pubSub/workers/pubSubWorker.ts Co-authored-by: Edoardo Gallo * fix typo * type in changeset --------- Co-authored-by: Edoardo Gallo --- .changeset/hip-bobcats-hear.md | 76 ++++++ internal/e2e-realtime-api/src/chat.test.ts | 246 ++++++++++------- .../src/disconnectClient.test.ts | 68 ----- internal/e2e-realtime-api/src/pubSub.test.ts | 152 +++++++---- .../playground-realtime-api/src/chat/index.ts | 72 +++-- .../src/pubSub/index.ts | 56 ++-- internal/stack-tests/src/chat/app.ts | 21 +- internal/stack-tests/src/pubSub/app.ts | 16 +- packages/core/src/BaseSession.ts | 6 + packages/core/src/chat/applyCommonMethods.ts | 109 ++++++++ packages/core/src/chat/index.ts | 1 + packages/core/src/types/chat.ts | 2 + .../realtime-api/src/BaseNamespace.test.ts | 249 ++++++++++++++++++ packages/realtime-api/src/BaseNamespace.ts | 29 +- packages/realtime-api/src/SWClient.test.ts | 66 +++++ packages/realtime-api/src/SWClient.ts | 18 ++ packages/realtime-api/src/SignalWire.ts | 5 + .../realtime-api/src/chat/BaseChat.test.ts | 110 ++++++++ packages/realtime-api/src/chat/BaseChat.ts | 139 ++++++++++ packages/realtime-api/src/chat/Chat.test.ts | 39 +++ packages/realtime-api/src/chat/Chat.ts | 49 +++- .../realtime-api/src/chat/ChatClient.test.ts | 115 -------- packages/realtime-api/src/chat/ChatClient.ts | 114 -------- .../src/chat/workers/chatWorker.ts | 65 +++++ .../realtime-api/src/chat/workers/index.ts | 1 + packages/realtime-api/src/index.ts | 46 +--- .../realtime-api/src/pubSub/PubSub.test.ts | 36 +++ packages/realtime-api/src/pubSub/PubSub.ts | 44 +++- .../realtime-api/src/pubSub/PubSubClient.ts | 96 ------- .../realtime-api/src/pubSub/workers/index.ts | 1 + .../src/pubSub/workers/pubSubWorker.ts | 66 +++++ packages/realtime-api/src/task/Task.test.ts | 78 ++++++ packages/realtime-api/src/task/Task.ts | 4 +- 33 files changed, 1506 insertions(+), 689 deletions(-) create mode 100644 .changeset/hip-bobcats-hear.md delete mode 100644 internal/e2e-realtime-api/src/disconnectClient.test.ts create mode 100644 packages/core/src/chat/applyCommonMethods.ts create mode 100644 packages/realtime-api/src/BaseNamespace.test.ts create mode 100644 packages/realtime-api/src/SWClient.test.ts create mode 100644 packages/realtime-api/src/chat/BaseChat.test.ts create mode 100644 packages/realtime-api/src/chat/BaseChat.ts create mode 100644 packages/realtime-api/src/chat/Chat.test.ts delete mode 100644 packages/realtime-api/src/chat/ChatClient.test.ts delete mode 100644 packages/realtime-api/src/chat/ChatClient.ts create mode 100644 packages/realtime-api/src/chat/workers/chatWorker.ts create mode 100644 packages/realtime-api/src/chat/workers/index.ts create mode 100644 packages/realtime-api/src/pubSub/PubSub.test.ts delete mode 100644 packages/realtime-api/src/pubSub/PubSubClient.ts create mode 100644 packages/realtime-api/src/pubSub/workers/index.ts create mode 100644 packages/realtime-api/src/pubSub/workers/pubSubWorker.ts create mode 100644 packages/realtime-api/src/task/Task.test.ts diff --git a/.changeset/hip-bobcats-hear.md b/.changeset/hip-bobcats-hear.md new file mode 100644 index 000000000..8ec756351 --- /dev/null +++ b/.changeset/hip-bobcats-hear.md @@ -0,0 +1,76 @@ +--- +'@signalwire/realtime-api': major +'@signalwire/core': major +--- + +New interface for PubSub and Chat APIs + +The new interface contains a single SW client with Chat and PubSub namespaces +```javascript +import { SignalWire } from '@signalwire/realtime-api' + +(async () => { + const client = await SignalWire({ + host: process.env.HOST, + project: process.env.PROJECT, + token: process.env.TOKEN, + }) + + // Attach pubSub listeners + const unsubHomePubSubListener = await client.pubSub.listen({ + channels: ['home'], + onMessageReceived: (message) => { + console.log('Message received under the "home" channel', message) + }, + }) + + // Publish on home channel + await client.pubSub.publish({ + content: 'Hello There', + channel: 'home', + meta: { + fooId: 'randomValue', + }, + }) + + // Attach chat listeners + const unsubOfficeChatListener = await client.chat.listen({ + channels: ['office'], + onMessageReceived: (message) => { + console.log('Message received on "office" channel', message) + }, + onMemberJoined: (member) => { + console.log('Member joined on "office" channel', member) + }, + onMemberUpdated: (member) => { + console.log('Member updated on "office" channel', member) + }, + onMemberLeft: (member) => { + console.log('Member left on "office" channel', member) + }, + }) + + // Publish a chat message on the office channel + const pubRes = await client.chat.publish({ + content: 'Hello There', + channel: 'office', + }) + + // Get channel messages + const messagesResult = await client.chat.getMessages({ + channel: 'office', + }) + + // Get channel members + const getMembersResult = await client.chat.getMembers({ channel: 'office' }) + + // Unsubscribe pubSub listener + await unsubHomePubSubListener() + + // Unsubscribe chat listener + await unsubOfficeChatListener() + + // Disconnect the client + client.disconnect() +})(); +``` \ No newline at end of file diff --git a/internal/e2e-realtime-api/src/chat.test.ts b/internal/e2e-realtime-api/src/chat.test.ts index 9a592b194..15eb4040c 100644 --- a/internal/e2e-realtime-api/src/chat.test.ts +++ b/internal/e2e-realtime-api/src/chat.test.ts @@ -6,6 +6,11 @@ */ import { timeoutPromise, SWCloseEvent } from '@signalwire/core' import { Chat as RealtimeAPIChat } from '@signalwire/realtime-api' +import { SignalWire as RealtimeSignalWire } from '@signalwire/realtime-api' +import type { + Chat as RealtimeChat, + SWClient as RealtimeSWClient, +} from '@signalwire/realtime-api' import { Chat as JSChat } from '@signalwire/js' import { WebSocket } from 'ws' import { randomUUID } from 'node:crypto' @@ -39,49 +44,52 @@ const params = { }, } -type ChatClient = RealtimeAPIChat.ChatClient | JSChat.Client -const testChatClientSubscribe = ( - firstClient: ChatClient, - secondClient: ChatClient -) => { +type ChatClient = RealtimeChat | JSChat.Client + +interface TestChatOptions { + jsChat: JSChat.Client + rtChat: RealtimeChat + publisher?: 'JS' | 'RT' +} + +const testSubscribe = ({ jsChat, rtChat }: TestChatOptions) => { const promise = new Promise(async (resolve) => { console.log('Running subscribe..') let events = 0 + const resolveIfDone = () => { // wait 4 events (rt and js receive their own events + the other member) if (events === 4) { - firstClient.off('member.joined') - secondClient.off('member.joined') + jsChat.off('member.joined') resolve(0) } } - firstClient.on('member.joined', (member) => { + jsChat.on('member.joined', (member) => { // TODO: Check the member payload console.log('jsChat member.joined') events += 1 resolveIfDone() }) - secondClient.on('member.joined', (member) => { - // TODO: Check the member payload - console.log('rtChat member.joined') - events += 1 - resolveIfDone() - }) - await Promise.all([ - firstClient.subscribe(channel), - secondClient.subscribe(channel), + const [unsubRTClient] = await Promise.all([ + rtChat.listen({ + channels: [channel], + onMemberJoined(member) { + // TODO: Check the member payload + console.log('rtChat member.joined') + events += 1 + resolveIfDone() + }, + }), + jsChat.subscribe(channel), ]) }) return timeoutPromise(promise, promiseTimeout, promiseException) } -const testChatClientPublish = ( - firstClient: ChatClient, - secondClient: ChatClient -) => { +const testPublish = ({ jsChat, rtChat, publisher }: TestChatOptions) => { const promise = new Promise(async (resolve) => { console.log('Running publish..') let events = 0 @@ -92,28 +100,32 @@ const testChatClientPublish = ( } const now = Date.now() - firstClient.once('message', (message) => { + jsChat.once('message', (message) => { console.log('jsChat message') if (message.meta.now === now) { events += 1 resolveIfDone() } }) - secondClient.once('message', (message) => { - console.log('rtChat message') - if (message.meta.now === now) { - events += 1 - resolveIfDone() - } - }) await Promise.all([ - firstClient.subscribe(channel), - secondClient.subscribe(channel), + jsChat.subscribe(channel), + rtChat.listen({ + channels: [channel], + onMessageReceived: (message) => { + console.log('rtChat message') + if (message.meta.now === now) { + events += 1 + resolveIfDone() + } + }, + }), ]) - await firstClient.publish({ - content: 'Hello There', + const publishClient = publisher === 'JS' ? jsChat : rtChat + + await publishClient.publish({ + content: 'Hello there!', channel, meta: { now, @@ -125,53 +137,48 @@ const testChatClientPublish = ( return timeoutPromise(promise, promiseTimeout, promiseException) } -const testChatClientUnsubscribe = ( - firstClient: ChatClient, - secondClient: ChatClient -) => { +const testUnsubscribe = ({ jsChat, rtChat }: TestChatOptions) => { const promise = new Promise(async (resolve) => { console.log('Running unsubscribe..') let events = 0 + const resolveIfDone = () => { - /** - * waits for 3 events: - * - first one generates 2 events on leave - * - second one generates only 1 event - */ - if (events === 3) { - firstClient.off('member.left') - secondClient.off('member.left') + // Both of these events will occur due to the JS chat + // RT chat will not trigger the `onMemberLeft` when we unsubscribe RT client + if (events === 2) { + jsChat.off('member.left') resolve(0) } } - firstClient.on('member.left', (member) => { + jsChat.on('member.left', (member) => { // TODO: Check the member payload console.log('jsChat member.left') events += 1 resolveIfDone() }) - secondClient.on('member.left', (member) => { - // TODO: Check the member payload - console.log('rtChat member.left') - events += 1 - resolveIfDone() - }) - await Promise.all([ - firstClient.subscribe(channel), - secondClient.subscribe(channel), + const [unsubRTClient] = await Promise.all([ + rtChat.listen({ + channels: [channel], + onMemberLeft(member) { + // TODO: Check the member payload + console.log('rtChat member.left') + events += 1 + resolveIfDone() + }, + }), + jsChat.subscribe(channel), ]) - await firstClient.unsubscribe(channel) - - await secondClient.unsubscribe(channel) + await jsChat.unsubscribe(channel) + await unsubRTClient() }) return timeoutPromise(promise, promiseTimeout, promiseException) } -const testChatClientMethods = async (client: ChatClient) => { +const testChatMethod = async (client: ChatClient) => { console.log('Get Messages..') const jsMessagesResult = await client.getMessages({ channel, @@ -184,10 +191,11 @@ const testChatClientMethods = async (client: ChatClient) => { return 0 } -const testChatClientSetAndGetMemberState = ( - firstClient: ChatClient, - secondClient: ChatClient -) => { +const testSetAndGetMemberState = ({ + jsChat, + rtChat, + publisher, +}: TestChatOptions) => { const promise = new Promise(async (resolve, reject) => { console.log('Set member state..') let events = 0 @@ -197,7 +205,7 @@ const testChatClientSetAndGetMemberState = ( } } - firstClient.once('member.updated', (member) => { + jsChat.once('member.updated', (member) => { // TODO: Check the member payload console.log('jsChat member.updated') if (member.state.email === 'e2e@example.com') { @@ -205,16 +213,9 @@ const testChatClientSetAndGetMemberState = ( resolveIfDone() } }) - secondClient.once('member.updated', (member) => { - console.log('rtChat member.updated') - if (member.state.email === 'e2e@example.com') { - events += 1 - resolveIfDone() - } - }) console.log('Get Member State..') - const getStateResult = await firstClient.getMemberState({ + const getStateResult = await jsChat.getMemberState({ channels: [channel], memberId: params.memberId, }) @@ -225,11 +226,22 @@ const testChatClientSetAndGetMemberState = ( } await Promise.all([ - firstClient.subscribe(channel), - secondClient.subscribe(channel), + jsChat.subscribe(channel), + rtChat.listen({ + channels: [channel], + onMemberUpdated(member) { + console.log('rtChat member.updated') + if (member.state.email === 'e2e@example.com') { + events += 1 + resolveIfDone() + } + }, + }), ]) - await firstClient.setMemberState({ + const publishClient = publisher === 'JS' ? jsChat : rtChat + + await publishClient.setMemberState({ channels: [channel], memberId: params.memberId, state: { @@ -241,6 +253,36 @@ const testChatClientSetAndGetMemberState = ( return timeoutPromise(promise, promiseTimeout, promiseException) } +const testDisconnectedRTClient = (rtClient: RealtimeSWClient) => { + const promise = new Promise(async (resolve, reject) => { + try { + await rtClient.chat.listen({ + channels: ['random'], + onMessageReceived: (message) => { + // Message should not be reached + throw undefined + }, + }) + + rtClient.disconnect() + + await rtClient.chat.publish({ + content: 'Unreached message!', + channel: 'random', + meta: { + foo: 'bar', + }, + }) + + reject(4) + } catch (e) { + resolve(0) + } + }) + + return timeoutPromise(promise, promiseTimeout, promiseException) +} + const handler = async () => { // Create JS Chat Client const CRT = await createCRT(params) @@ -250,64 +292,80 @@ const handler = async () => { token: CRT.token, }) - const jsChatResultCode = await testChatClientMethods(jsChat) + const jsChatResultCode = await testChatMethod(jsChat) if (jsChatResultCode !== 0) { return jsChatResultCode } console.log('Created jsChat') - // Create RT-API Chat Client - const rtChat = new RealtimeAPIChat.Client({ - // @ts-expect-error + // Create RT-API Client + const rtClient = await RealtimeSignalWire({ host: process.env.RELAY_HOST, project: process.env.RELAY_PROJECT as string, token: process.env.RELAY_TOKEN as string, }) + const rtChat = rtClient.chat - const rtChatResultCode = await testChatClientMethods(rtChat) + const rtChatResultCode = await testChatMethod(rtChat) if (rtChatResultCode !== 0) { return rtChatResultCode } console.log('Created rtChat') // Test Subscribe - const subscribeResultCode = await testChatClientSubscribe(jsChat, rtChat) + const subscribeResultCode = await testSubscribe({ jsChat, rtChat }) if (subscribeResultCode !== 0) { return subscribeResultCode } // Test Publish - const jsChatPublishCode = await testChatClientPublish(jsChat, rtChat) - if (jsChatPublishCode !== 0) { - return jsChatPublishCode + const jsPublishCode = await testPublish({ + jsChat, + rtChat, + publisher: 'JS', + }) + if (jsPublishCode !== 0) { + return jsPublishCode } - const rtChatPublishCode = await testChatClientPublish(rtChat, jsChat) - if (rtChatPublishCode !== 0) { - return rtChatPublishCode + const rtPublishCode = await testPublish({ + jsChat, + rtChat, + publisher: 'RT', + }) + if (rtPublishCode !== 0) { + return rtPublishCode } // Test Set/Get Member State - const jsChatGetSetStateCode = await testChatClientSetAndGetMemberState( + const jsChatGetSetStateCode = await testSetAndGetMemberState({ jsChat, - rtChat - ) + rtChat, + publisher: 'JS', + }) if (jsChatGetSetStateCode !== 0) { return jsChatGetSetStateCode } - const rtChatGetSetStateCode = await testChatClientSetAndGetMemberState( + const rtChatGetSetStateCode = await testSetAndGetMemberState({ + jsChat, rtChat, - jsChat - ) + publisher: 'RT', + }) if (rtChatGetSetStateCode !== 0) { return rtChatGetSetStateCode } // Test Unsubscribe - const unsubscribeResultCode = await testChatClientUnsubscribe(jsChat, rtChat) + const unsubscribeResultCode = await testUnsubscribe({ jsChat, rtChat }) if (unsubscribeResultCode !== 0) { return unsubscribeResultCode } + // Test diconnected client + const disconnectedRTClient = await testDisconnectedRTClient(rtClient) + if (disconnectedRTClient !== 0) { + return disconnectedRTClient + } + return 0 } diff --git a/internal/e2e-realtime-api/src/disconnectClient.test.ts b/internal/e2e-realtime-api/src/disconnectClient.test.ts deleted file mode 100644 index d21031d13..000000000 --- a/internal/e2e-realtime-api/src/disconnectClient.test.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { PubSub } from '@signalwire/realtime-api' -import { createTestRunner } from './utils' - -const handler = () => { - return new Promise(async (resolve) => { - const clientOptions = { - host: process.env.RELAY_HOST, - project: process.env.RELAY_PROJECT, - token: process.env.RELAY_TOKEN, - } - - const clientOne = new PubSub.Client({ - ...clientOptions, - debug: { - logWsTraffic: false, - }, - }) - - const channel = 'rw' - const meta = { foo: 'bar' } - const content = 'Hello World' - - await clientOne.publish({ - channel, - content, - meta, - }) - - clientOne.disconnect() - - const clientTwo = new PubSub.Client({ - ...clientOptions, - debug: { - logWsTraffic: false, - }, - }) - - await clientTwo.subscribe(channel) - clientTwo.on('message', (message) => { - if (message.meta.foo === 'bar' && message.content === 'Hello World') { - resolve(0) - } - }) - - const clientThree = new PubSub.Client({ - ...clientOptions, - debug: { - logWsTraffic: false, - }, - }) - await clientThree.publish({ - channel, - content, - meta, - }) - }) -} - -async function main() { - const runner = createTestRunner({ - name: 'Disconnect Client Tests', - testHandler: handler, - }) - - await runner.run() -} - -main() diff --git a/internal/e2e-realtime-api/src/pubSub.test.ts b/internal/e2e-realtime-api/src/pubSub.test.ts index 7da663124..956fb3776 100644 --- a/internal/e2e-realtime-api/src/pubSub.test.ts +++ b/internal/e2e-realtime-api/src/pubSub.test.ts @@ -7,7 +7,11 @@ * receive the proper events. */ import { timeoutPromise, SWCloseEvent } from '@signalwire/core' -import { PubSub as RealtimeAPIPubSub } from '@signalwire/realtime-api' +import { SignalWire as RealtimeSignalWire } from '@signalwire/realtime-api' +import type { + PubSub as RealtimePubSub, + SWClient as RealtimeSWClient, +} from '@signalwire/realtime-api' import { PubSub as JSPubSub } from '@signalwire/js' import { WebSocket } from 'ws' import { randomUUID } from 'node:crypto' @@ -41,20 +45,21 @@ const params = { }, } -type PubSubClient = RealtimeAPIPubSub.Client | JSPubSub.Client -const testPubSubClientSubscribe = ( - firstClient: PubSubClient, - secondClient: PubSubClient -) => { +interface TestPubSubOptions { + jsPubSub: JSPubSub.Client + rtPubSub: RealtimePubSub + publisher?: 'JS' | 'RT' +} + +const testSubscribe = ({ jsPubSub, rtPubSub }: TestPubSubOptions) => { const promise = new Promise(async (resolve, reject) => { console.log('Running subscribe..') - firstClient.once('message', () => {}) - secondClient.once('message', () => {}) + jsPubSub.once('message', () => {}) try { await Promise.all([ - firstClient.subscribe(channel), - secondClient.subscribe(channel), + jsPubSub.subscribe(channel), + rtPubSub.listen({ channels: [channel] }), ]) resolve(0) } catch (e) { @@ -65,10 +70,7 @@ const testPubSubClientSubscribe = ( return timeoutPromise(promise, promiseTimeout, promiseException) } -const testPubSubClientPublish = ( - firstClient: PubSubClient, - secondClient: PubSubClient -) => { +const testPublish = ({ jsPubSub, rtPubSub, publisher }: TestPubSubOptions) => { const promise = new Promise(async (resolve) => { console.log('Running publish..') let events = 0 @@ -79,28 +81,32 @@ const testPubSubClientPublish = ( } const now = Date.now() - firstClient.once('message', (message) => { + jsPubSub.once('message', (message) => { console.log('jsPubSub message') if (message.meta.now === now) { events += 1 resolveIfDone() } }) - secondClient.once('message', (message) => { - console.log('rtPubSub message') - if (message.meta.now === now) { - events += 1 - resolveIfDone() - } - }) await Promise.all([ - firstClient.subscribe(channel), - secondClient.subscribe(channel), + jsPubSub.subscribe(channel), + rtPubSub.listen({ + channels: [channel], + onMessageReceived: (message) => { + console.log('rtPubSub message') + if (message.meta.now === now) { + events += 1 + resolveIfDone() + } + }, + }), ]) - await firstClient.publish({ - content: 'Hello There', + const publishClient = publisher === 'JS' ? jsPubSub : rtPubSub + + await publishClient.publish({ + content: 'Hello there!', channel, meta: { now, @@ -112,22 +118,18 @@ const testPubSubClientPublish = ( return timeoutPromise(promise, promiseTimeout, promiseException) } -const testPubSubClientUnsubscribe = ( - firstClient: PubSubClient, - secondClient: PubSubClient -) => { +const testUnsubscribe = ({ jsPubSub, rtPubSub }: TestPubSubOptions) => { const promise = new Promise(async (resolve, reject) => { console.log('Running unsubscribe..') try { - await Promise.all([ - firstClient.subscribe(channel), - secondClient.subscribe(channel), + const [unsubRTClient] = await Promise.all([ + rtPubSub.listen({ channels: [channel] }), + jsPubSub.subscribe(channel), ]) - await firstClient.unsubscribe(channel) - - await secondClient.unsubscribe(channel) + await jsPubSub.unsubscribe(channel) + await unsubRTClient() resolve(0) } catch (e) { @@ -138,6 +140,36 @@ const testPubSubClientUnsubscribe = ( return timeoutPromise(promise, promiseTimeout, promiseException) } +const testDisconnectedRTClient = (rtClient: RealtimeSWClient) => { + const promise = new Promise(async (resolve, reject) => { + try { + await rtClient.pubSub.listen({ + channels: ['random'], + onMessageReceived: (message) => { + // Message should not be reached + throw undefined + }, + }) + + rtClient.disconnect() + + await rtClient.pubSub.publish({ + content: 'Unreached message!', + channel: 'random', + meta: { + foo: 'bar', + }, + }) + + reject(4) + } catch (e) { + resolve(0) + } + }) + + return timeoutPromise(promise, promiseTimeout, promiseException) +} + const handler = async () => { // Create JS PubSub Client const CRT = await createCRT(params) @@ -146,47 +178,55 @@ const handler = async () => { // @ts-expect-error token: CRT.token, }) - console.log('Created jsPubSub') // Create RT-API PubSub Client - const rtPubSub = new RealtimeAPIPubSub.Client({ - // @ts-expect-error + const rtClient = await RealtimeSignalWire({ host: process.env.RELAY_HOST, project: process.env.RELAY_PROJECT as string, token: process.env.RELAY_TOKEN as string, }) - - console.log('Created rtPubSub') + const rtPubSub = rtClient.pubSub + console.log('Created rtClient') // Test Subscribe - const subscribeResultCode = await testPubSubClientSubscribe( - jsPubSub, - rtPubSub - ) + const subscribeResultCode = await testSubscribe({ jsPubSub, rtPubSub }) if (subscribeResultCode !== 0) { return subscribeResultCode } - // Test Publish - const jsPubSubPublishCode = await testPubSubClientPublish(jsPubSub, rtPubSub) - if (jsPubSubPublishCode !== 0) { - return jsPubSubPublishCode + // Test Publish from JS + const jsPublishResultCode = await testPublish({ + jsPubSub, + rtPubSub, + publisher: 'JS', + }) + if (jsPublishResultCode !== 0) { + return jsPublishResultCode } - const rtPubSubPublishCode = await testPubSubClientPublish(rtPubSub, jsPubSub) - if (rtPubSubPublishCode !== 0) { - return rtPubSubPublishCode + + // Test Publish from RT + const rtPublishResultCode = await testPublish({ + jsPubSub, + rtPubSub, + publisher: 'RT', + }) + if (rtPublishResultCode !== 0) { + return rtPublishResultCode } // Test Unsubscribe - const unsubscribeResultCode = await testPubSubClientUnsubscribe( - jsPubSub, - rtPubSub - ) + const unsubscribeResultCode = await testUnsubscribe({ jsPubSub, rtPubSub }) if (unsubscribeResultCode !== 0) { return unsubscribeResultCode } + // Test diconnected client + const disconnectedRTClient = await testDisconnectedRTClient(rtClient) + if (disconnectedRTClient !== 0) { + return disconnectedRTClient + } + return 0 } diff --git a/internal/playground-realtime-api/src/chat/index.ts b/internal/playground-realtime-api/src/chat/index.ts index a824dde0f..1fabd3a22 100644 --- a/internal/playground-realtime-api/src/chat/index.ts +++ b/internal/playground-realtime-api/src/chat/index.ts @@ -1,72 +1,66 @@ -import { Chat } from '@signalwire/realtime-api' +import { SignalWire } from '@signalwire/realtime-api' async function run() { try { - const chat = new Chat.Client({ - // @ts-expect-error + const client = await SignalWire({ host: process.env.HOST || 'relay.swire.io', project: process.env.PROJECT as string, token: process.env.TOKEN as string, }) - const channel = 'channel-name-here' - - chat.on('member.joined', (member) => { - console.log('member.joined', member) - }) - - chat.on('member.updated', (member) => { - console.log('member.updated', member) - }) - - chat.on('member.left', (member) => { - console.log('member.left', member) - }) - - chat.on('message', (message) => { - console.log('message', message) + const unsubHome = await client.chat.listen({ + channels: ['home'], + onMessageReceived: (message) => { + console.log('Message received on "home" channel', message) + }, + onMemberJoined: (member) => { + console.log('Member joined on "home" channel', member) + }, + onMemberUpdated: (member) => { + console.log('Member updated on "home" channel', member) + }, + onMemberLeft: (member) => { + console.log('Member left on "home" channel', member) + }, }) - await chat.subscribe([channel]) - - const pubRes = await chat.publish({ + const pubRes = await client.chat.publish({ content: 'Hello There', - channel: channel, + channel: 'home', meta: { fooId: 'randomValue', }, }) - console.log('Publish Result --->', pubRes) - const messagesResult = await chat.getMessages({ - channel: channel, + const messagesResult = await client.chat.getMessages({ + channel: 'home', }) - console.log('Get Messages Result ---> ', messagesResult) - const setStateResult = await chat.setMemberState({ + const getMembersResult = await client.chat.getMembers({ channel: 'home' }) + console.log('Get Member Result --->', getMembersResult) + + const setStateResult = await client.chat.setMemberState({ state: { data: 'state data', }, - channels: [channel], - memberId: 'someMemberId', + channels: ['home'], + memberId: getMembersResult.members[0].id, }) - console.log('Set Member State Result --->', setStateResult) - const getStateResult = await chat.getMemberState({ - channels: [channel], - memberId: 'someMemberId', + const getStateResult = await client.chat.getMemberState({ + channels: 'home', + memberId: getMembersResult.members[0].id, }) - console.log('Get Member State Result --->', getStateResult) - const unsubscribeRes = await chat.unsubscribe(channel) - - console.log('Unsubscribe Result --->', unsubscribeRes) + console.log('Unsubscribing --->') + await unsubHome() - console.log('Client Running..') + console.log('Client disconnecting..') + client.disconnect() } catch (error) { console.log('', error) } diff --git a/internal/playground-realtime-api/src/pubSub/index.ts b/internal/playground-realtime-api/src/pubSub/index.ts index 9f2024483..3a002bbc3 100644 --- a/internal/playground-realtime-api/src/pubSub/index.ts +++ b/internal/playground-realtime-api/src/pubSub/index.ts @@ -1,41 +1,63 @@ -import { PubSub } from '@signalwire/realtime-api' +import { SignalWire } from '@signalwire/realtime-api' async function run() { try { - const pubSub = new PubSub.Client({ - // @ts-expect-error + const client = await SignalWire({ host: process.env.HOST || 'relay.swire.io', project: process.env.PROJECT as string, token: process.env.TOKEN as string, - logLevel: 'trace', - debug: { - logWsTraffic: true, - }, }) - const channel = 'channel-name-here' + const unsubHomeOffice = await client.pubSub.listen({ + channels: ['office', 'home'], + onMessageReceived: (payload) => { + console.log( + 'Message received under the "office" or "home" channels', + payload + ) + }, + }) - pubSub.on('message', (message) => { - console.log('message', message) + const unsubWorkplace = await client.pubSub.listen({ + channels: ['workplace'], + onMessageReceived: (payload) => { + console.log('Message received under the "workplace" channels', payload) + }, }) - await pubSub.subscribe([channel]) + const pubResOffice = await client.pubSub.publish({ + content: 'Hello There', + channel: 'office', + meta: { + fooId: 'randomValue', + }, + }) + console.log('Publish Result --->', pubResOffice) - const pubRes = await pubSub.publish({ + const pubResWorkplace = await client.pubSub.publish({ content: 'Hello There', - channel: channel, + channel: 'workplace', meta: { fooId: 'randomValue', }, }) + console.log('Publish Result --->', pubResWorkplace) - console.log('Publish Result --->', pubRes) + await unsubHomeOffice() - const unsubscribeRes = await pubSub.unsubscribe(channel) + const pubResHome = await client.pubSub.publish({ + content: 'Hello There', + channel: 'home', + meta: { + fooId: 'randomValue', + }, + }) + console.log('Publish Result --->', pubResHome) - console.log('Unsubscribe Result --->', unsubscribeRes) + await unsubWorkplace() - console.log('Client Running..') + console.log('Disconnect the client..') + client.disconnect() } catch (error) { console.log('', error) } diff --git a/internal/stack-tests/src/chat/app.ts b/internal/stack-tests/src/chat/app.ts index 8b28b6cbe..b7dfa5244 100644 --- a/internal/stack-tests/src/chat/app.ts +++ b/internal/stack-tests/src/chat/app.ts @@ -1,24 +1,21 @@ -import { Chat } from '@signalwire/realtime-api' +import { SignalWire } from '@signalwire/realtime-api' import tap from 'tap' async function run() { try { - const chat = new Chat.Client({ - // @ts-expect-error + const client = await SignalWire({ host: process.env.RELAY_HOST || 'relay.swire.io', project: process.env.RELAY_PROJECT as string, token: process.env.RELAY_TOKEN as string, }) - tap.ok(chat.on, 'chat.on is defined') - tap.ok(chat.once, 'chat.once is defined') - tap.ok(chat.off, 'chat.off is defined') - tap.ok(chat.subscribe, 'chat.subscribe is defined') - tap.ok(chat.removeAllListeners, 'chat.removeAllListeners is defined') - tap.ok(chat.getMemberState, 'chat.getMemberState is defined') - tap.ok(chat.getMembers, 'chat.getMembers is defined') - tap.ok(chat.getMessages, 'chat.getMessages is defined') - tap.ok(chat.setMemberState, 'chat.setMemberState is defined') + tap.ok(client.chat, 'client.chat is defined') + tap.ok(client.chat.listen, 'client.chat.listen is defined') + tap.ok(client.chat.publish, 'client.chat.publish is defined') + tap.ok(client.chat.getMessages, 'client.chat.getMessages is defined') + tap.ok(client.chat.getMembers, 'client.chat.getMembers is defined') + tap.ok(client.chat.getMemberState, 'client.chat.getMemberState is defined') + tap.ok(client.chat.setMemberState, 'client.chat.setMemberState is defined') process.exit(0) } catch (error) { diff --git a/internal/stack-tests/src/pubSub/app.ts b/internal/stack-tests/src/pubSub/app.ts index 8693a24ef..837497239 100644 --- a/internal/stack-tests/src/pubSub/app.ts +++ b/internal/stack-tests/src/pubSub/app.ts @@ -1,23 +1,17 @@ -import { PubSub } from '@signalwire/realtime-api' +import { SignalWire } from '@signalwire/realtime-api' import tap from 'tap' async function run() { try { - const pubSub = new PubSub.Client({ - // @ts-expect-error + const client = await SignalWire({ host: process.env.RELAY_HOST || 'relay.swire.io', project: process.env.RELAY_PROJECT as string, token: process.env.RELAY_TOKEN as string, - contexts: [process.env.RELAY_CONTEXT as string], }) - tap.ok(pubSub.on, 'pubSub.on is defined') - tap.ok(pubSub.once, 'pubSub.once is defined') - tap.ok(pubSub.off, 'pubSub.off is defined') - tap.ok(pubSub.removeAllListeners, 'pubSub.removeAllListeners is defined') - tap.ok(pubSub.publish, 'pubSub.publish is defined') - tap.ok(pubSub.subscribe, 'pubSub.subscribe is defined') - tap.ok(pubSub.unsubscribe, 'pubSub.unsubscribe is defined') + tap.ok(client.pubSub, 'client.pubSub is defined') + tap.ok(client.pubSub.listen, 'client.pubSub.listen is defined') + tap.ok(client.pubSub.publish, 'client.pubSub.publish is defined') process.exit(0) } catch (error) { diff --git a/packages/core/src/BaseSession.ts b/packages/core/src/BaseSession.ts index 922eba8af..e95a7867a 100644 --- a/packages/core/src/BaseSession.ts +++ b/packages/core/src/BaseSession.ts @@ -285,6 +285,12 @@ export class BaseSession { message: 'The SDK session is disconnecting', }) } + if (this._status === 'disconnected') { + return Promise.reject({ + code: '400', + message: 'The SDK is disconnected', + }) + } // In case of a response don't wait for a result let promise: Promise = Promise.resolve() if ('params' in msg) { diff --git a/packages/core/src/chat/applyCommonMethods.ts b/packages/core/src/chat/applyCommonMethods.ts new file mode 100644 index 000000000..956cf4a51 --- /dev/null +++ b/packages/core/src/chat/applyCommonMethods.ts @@ -0,0 +1,109 @@ +import { + InternalChatMemberEntity, + InternalChatMessageEntity, + PaginationCursor, +} from '../types' +import { toExternalJSON } from '../utils' +import { BaseRPCResult } from '../utils/interfaces' +import { isValidChannels, toInternalChatChannels } from './utils' + +export interface GetMembersInput extends BaseRPCResult { + members: InternalChatMemberEntity[] +} + +export interface GetMessagesInput extends BaseRPCResult { + messages: InternalChatMessageEntity[] + cursor: PaginationCursor +} + +interface ChatMemberMethodParams extends Record { + memberId?: string +} + +interface GetMemberStateOutput { + channels: any +} + +const transformParamChannels = (params: ChatMemberMethodParams) => { + const channels = isValidChannels(params?.channels) + ? toInternalChatChannels(params.channels) + : undefined + + return { + ...params, + channels, + } +} + +const baseCodeTransform = () => {} + +export function applyCommonMethods any>( + targetClass: T +) { + return class extends targetClass { + getMembers(params: GetMembersInput) { + return this._client.execute( + { + method: 'chat.members.get', + params, + }, + { + transformResolve: (payload: GetMembersInput) => ({ + members: payload.members.map((member) => toExternalJSON(member)), + }), + } + ) + } + + getMessages(params: GetMessagesInput) { + return this._client.execute( + { + method: 'chat.messages.get', + params, + }, + { + transformResolve: (payload: GetMessagesInput) => ({ + messages: payload.messages.map((message) => + toExternalJSON(message) + ), + cursor: payload.cursor, + }), + } + ) + } + + setMemberState({ memberId, ...rest }: Record = {}) { + return this._client.execute( + { + method: 'chat.member.set_state', + params: { + member_id: memberId, + ...rest, + }, + }, + { + transformResolve: baseCodeTransform, + transformParams: transformParamChannels, + } + ) + } + + getMemberState({ memberId, ...rest }: Record = {}) { + return this._client.execute( + { + method: 'chat.member.get_state', + params: { + member_id: memberId, + ...rest, + }, + }, + { + transformResolve: (payload: GetMemberStateOutput) => ({ + channels: payload.channels, + }), + transformParams: transformParamChannels, + } + ) + } + } +} diff --git a/packages/core/src/chat/index.ts b/packages/core/src/chat/index.ts index 7ac4bd3a1..8b0f6eb89 100644 --- a/packages/core/src/chat/index.ts +++ b/packages/core/src/chat/index.ts @@ -2,3 +2,4 @@ export * from './methods' export * from './BaseChat' export * from './ChatMessage' export * from './ChatMember' +export * from './applyCommonMethods' diff --git a/packages/core/src/types/chat.ts b/packages/core/src/types/chat.ts index 269ec72f9..018aace1a 100644 --- a/packages/core/src/types/chat.ts +++ b/packages/core/src/types/chat.ts @@ -31,6 +31,8 @@ export type ChatMemberEventNames = export type ChatEventNames = ChatMessageEventName | ChatMemberEventNames +export type ChatEvents = ToInternalChatEvent + export type ChatChannel = string | string[] export interface ChatSetMemberStateParams { diff --git a/packages/realtime-api/src/BaseNamespace.test.ts b/packages/realtime-api/src/BaseNamespace.test.ts new file mode 100644 index 000000000..1ba14bdd2 --- /dev/null +++ b/packages/realtime-api/src/BaseNamespace.test.ts @@ -0,0 +1,249 @@ +import { EventEmitter } from '@signalwire/core' +import { BaseNamespace } from './BaseNamespace' + +describe('BaseNamespace', () => { + // Using 'any' data type to bypass TypeScript checks for private or protected members. + let baseNamespace: any + let swClientMock: any + const listenOptions = { + topics: ['topic1', 'topic2'], + onEvent1: jest.fn(), + onEvent2: jest.fn(), + } + const eventMap: Record = { + onEvent1: 'event1', + onEvent2: 'event2', + } + + beforeEach(() => { + swClientMock = { + client: { + execute: jest.fn(), + }, + } + baseNamespace = new BaseNamespace({ swClient: swClientMock }) + }) + + afterEach(() => { + jest.restoreAllMocks() + }) + + describe('constructor', () => { + it('should initialize the necessary properties', () => { + expect(baseNamespace._sw).toBe(swClientMock) + expect(baseNamespace._client).toBe(swClientMock.client) + expect(baseNamespace._eventMap).toEqual({}) + expect(baseNamespace._namespaceEmitter).toBeInstanceOf(EventEmitter) + expect(baseNamespace._listenerMap).toBeInstanceOf(Map) + expect(baseNamespace._listenerMap.size).toBe(0) + }) + }) + + describe('addTopics', () => { + it('should call execute to add topics with the correct parameters', async () => { + const executeMock = jest.spyOn(swClientMock.client, 'execute') + + await baseNamespace.addTopics(listenOptions.topics) + + expect(executeMock).toHaveBeenCalledWith({ + method: 'signalwire.receive', + params: { + contexts: listenOptions.topics, + }, + }) + }) + }) + + describe('removeTopics', () => { + it('should call execute to remove topics with the correct parameters', async () => { + const executeMock = jest.spyOn(swClientMock.client, 'execute') + + await baseNamespace.removeTopics(listenOptions.topics) + + expect(executeMock).toHaveBeenCalledWith({ + method: 'signalwire.unreceive', + params: { + contexts: listenOptions.topics, + }, + }) + }) + }) + + describe('listen', () => { + it('should throw an error if topics is not an array with at least one topic', async () => { + const thrownMessage = + 'Invalid options: topics should be an array with at least one topic!' + + await expect(baseNamespace.listen({ topics: [] })).rejects.toThrow( + thrownMessage + ) + await expect(baseNamespace.listen({ topics: 'topic' })).rejects.toThrow( + thrownMessage + ) + }) + + it('should call the subscribe method with listen options', async () => { + const subscribeMock = jest.spyOn(baseNamespace, 'subscribe') + + await baseNamespace.listen(listenOptions) + + expect(subscribeMock).toHaveBeenCalledWith(listenOptions) + }) + + it('should resolve with a function to unsubscribe', async () => { + const unsubscribeMock = jest.fn().mockResolvedValue(undefined) + jest.spyOn(baseNamespace, 'subscribe').mockResolvedValue(unsubscribeMock) + + const unsub = await baseNamespace.listen(listenOptions) + expect(typeof unsub).toBe('function') + + await unsub() + expect(unsubscribeMock).toHaveBeenCalled() + }) + }) + + describe('subscribe', () => { + let emitterOnMock: jest.Mock + let emitterOffMock: jest.Mock + let addTopicsMock: jest.Mock + let removeTopicsMock: jest.Mock + + beforeEach(() => { + // Mock this._eventMap + baseNamespace._eventMap = eventMap + + // Mock emitter.on method + emitterOnMock = jest.fn() + baseNamespace.emitter.on = emitterOnMock + + // Mock emitter.off method + emitterOffMock = jest.fn() + baseNamespace.emitter.off = emitterOffMock + + // Mock addTopics method + addTopicsMock = jest.fn() + baseNamespace.addTopics = addTopicsMock + + // Mock removeTopics method + removeTopicsMock = jest.fn() + baseNamespace.removeTopics = removeTopicsMock + }) + + it('should attach listeners, add topics, and return an unsubscribe function', async () => { + const { topics, ...listeners } = listenOptions + const unsub = await baseNamespace.subscribe(listenOptions) + + // Check if the listeners are attached + const listenerKeys = Object.keys(listeners) as Array< + keyof typeof listeners + > + topics.forEach((topic) => { + listenerKeys.forEach((key) => { + const expectedEventName = `${topic}.${eventMap[key]}` + expect(emitterOnMock).toHaveBeenCalledWith( + expectedEventName, + listeners[key] + ) + }) + }) + + // Check if topics are added + expect(baseNamespace.addTopics).toHaveBeenCalledWith(topics) + + // Check if the listener is added to the listener map + expect(baseNamespace._listenerMap.size).toBe(1) + const [[_, value]] = baseNamespace._listenerMap.entries() + expect(value.topics).toEqual(new Set(topics)) + expect(value.listeners).toEqual(listeners) + + // Check if the returned unsubscribe function is valid + expect(unsub).toBeInstanceOf(Function) + await expect(unsub()).resolves.toBeUndefined() + + // Check if the topics are removed, listeners are detached, and entry is removed from the listener map + expect(baseNamespace.removeTopics).toHaveBeenCalledWith(topics) + topics.forEach((topic) => { + listenerKeys.forEach((key) => { + const expectedEventName = `${topic}.${eventMap[key]}` + expect(emitterOffMock).toHaveBeenCalledWith( + expectedEventName, + listeners[key] + ) + }) + }) + expect(baseNamespace._listenerMap.size).toBe(0) + }) + }) + + describe('hasOtherListeners', () => { + it('should return true if other listeners exist for the given topic', () => { + const uuid = 'uuid1' + const otherUUID = 'uuid2' + + baseNamespace._listenerMap.set(uuid, { + topics: new Set(['topic1', 'topic2']), + listeners: {}, + unsub: jest.fn(), + }) + baseNamespace._listenerMap.set(otherUUID, { + topics: new Set(['topic2']), + listeners: {}, + unsub: jest.fn(), + }) + + const result = baseNamespace.hasOtherListeners(uuid, 'topic2') + + expect(result).toBe(true) + }) + + it('should return false if no other listeners exist for the given topic', () => { + const uuid = 'uuid1' + const otherUUID = 'uuid2' + + baseNamespace._listenerMap.set(uuid, { + topics: new Set(['topic1', 'topic2']), + listeners: {}, + unsub: jest.fn(), + }) + baseNamespace._listenerMap.set(otherUUID, { + topics: new Set(['topic2']), + listeners: {}, + unsub: jest.fn(), + }) + + const result = baseNamespace.hasOtherListeners(uuid, 'topic1') + + expect(result).toBe(false) + }) + }) + + describe('unsubscribeAll', () => { + it('should call unsubscribe for each listener and clear the listener map', async () => { + const listener1 = { unsub: jest.fn() } + const listener2 = { unsub: jest.fn() } + baseNamespace._listenerMap.set('uuid1', listener1) + baseNamespace._listenerMap.set('uuid2', listener2) + + expect(baseNamespace._listenerMap.size).toBe(2) + + await baseNamespace.unsubscribeAll() + + expect(listener1.unsub).toHaveBeenCalledTimes(1) + expect(listener2.unsub).toHaveBeenCalledTimes(1) + expect(baseNamespace._listenerMap.size).toBe(0) + }) + }) + + describe('removeFromListenerMap', () => { + it('should remove the listener with the given UUID from the listener map', () => { + const idToRemove = 'uuid1' + baseNamespace._listenerMap.set('uuid1', {}) + baseNamespace._listenerMap.set('uuid2', {}) + + baseNamespace.removeFromListenerMap(idToRemove) + + expect(baseNamespace._listenerMap.size).toBe(1) + expect(baseNamespace._listenerMap.has(idToRemove)).toBe(false) + }) + }) +}) diff --git a/packages/realtime-api/src/BaseNamespace.ts b/packages/realtime-api/src/BaseNamespace.ts index 51c912046..ece79f50f 100644 --- a/packages/realtime-api/src/BaseNamespace.ts +++ b/packages/realtime-api/src/BaseNamespace.ts @@ -7,7 +7,7 @@ export interface ListenOptions { topics: string[] } -type ListenersKeys = keyof Omit +export type ListenersKeys = keyof Omit type ListenerMap = Map< string, @@ -21,20 +21,20 @@ type ListenerMap = Map< export class BaseNamespace { protected _client: Client protected _sw: SWClient - protected _eventMap: Record + protected _eventMap: Record = {} private _namespaceEmitter = new EventEmitter() - private _listenerMap: ListenerMap = new Map() + protected _listenerMap: ListenerMap = new Map() constructor(options: { swClient: SWClient }) { this._sw = options.swClient this._client = options.swClient.client } - get emitter() { + protected get emitter() { return this._namespaceEmitter } - private addTopics(topics: string[]) { + protected addTopics(topics: string[]) { const executeParams: ExecuteParams = { method: 'signalwire.receive', params: { @@ -44,7 +44,7 @@ export class BaseNamespace { return this._client.execute(executeParams) } - private removeTopics(topics: string[]) { + protected removeTopics(topics: string[]) { const executeParams: ExecuteParams = { method: 'signalwire.unreceive', params: { @@ -58,7 +58,7 @@ export class BaseNamespace { return new Promise<() => Promise>(async (resolve, reject) => { try { const { topics } = listenOptions - if (topics?.length < 1) { + if (!Array.isArray(topics) || topics?.length < 1) { throw new Error( 'Invalid options: topics should be an array with at least one topic!' ) @@ -90,10 +90,10 @@ export class BaseNamespace { await this.removeTopics(topicsToRemove) } - // Remove listeners + // Detach listeners this._detachListeners(topics, listeners) - // Remove task from the task listener array + // Remove topics from the listener map this.removeFromListenerMap(_uuid) resolve() @@ -112,7 +112,7 @@ export class BaseNamespace { return unsub } - private _attachListeners(topics: string[], listeners: Omit) { + protected _attachListeners(topics: string[], listeners: Omit) { const listenerKeys = Object.keys(listeners) as Array topics.forEach((topic) => { listenerKeys.forEach((key) => { @@ -124,7 +124,7 @@ export class BaseNamespace { }) } - private _detachListeners(topics: string[], listeners: Omit) { + protected _detachListeners(topics: string[], listeners: Omit) { const listenerKeys = Object.keys(listeners) as Array topics.forEach((topic) => { listenerKeys.forEach((key) => { @@ -136,10 +136,9 @@ export class BaseNamespace { }) } - private hasOtherListeners(uuid: string, topic: string) { + protected hasOtherListeners(uuid: string, topic: string) { for (const [key, listener] of this._listenerMap) { - if (key === uuid) continue - if (listener.topics.has(topic)) return true + if (key !== uuid && listener.topics.has(topic)) return true } return false } @@ -151,7 +150,7 @@ export class BaseNamespace { this._listenerMap.clear() } - private removeFromListenerMap(id: string) { + protected removeFromListenerMap(id: string) { return this._listenerMap.delete(id) } } diff --git a/packages/realtime-api/src/SWClient.test.ts b/packages/realtime-api/src/SWClient.test.ts new file mode 100644 index 000000000..a8270cf65 --- /dev/null +++ b/packages/realtime-api/src/SWClient.test.ts @@ -0,0 +1,66 @@ +import { SWClient } from './SWClient' +import { createClient } from './client/createClient' +import { clientConnect } from './client/clientConnect' +import { Task } from './task/Task' +import { PubSub } from './pubSub/PubSub' +import { Chat } from './chat/Chat' + +jest.mock('./client/createClient') +jest.mock('./client/clientConnect') + +describe('SWClient', () => { + let swClient: SWClient + let clientMock: any + const userOptions = { + host: 'example.com', + project: 'example.project', + token: 'example.token', + } + + beforeEach(() => { + clientMock = { + disconnect: jest.fn(), + runWorker: jest.fn(), + } + ;(createClient as any).mockReturnValue(clientMock) + + swClient = new SWClient(userOptions) + }) + + afterEach(() => { + jest.clearAllMocks() + }) + + it('should create SWClient instance with the provided options', () => { + expect(swClient.userOptions).toEqual(userOptions) + expect(createClient).toHaveBeenCalledWith(userOptions) + }) + + it('should connect the client', async () => { + await swClient.connect() + expect(clientConnect).toHaveBeenCalledWith(clientMock) + }) + + it('should disconnect the client', () => { + swClient.disconnect() + expect(clientMock.disconnect).toHaveBeenCalled() + }) + + it('should create and return a Task instance', () => { + const task = swClient.task + expect(task).toBeInstanceOf(Task) + expect(swClient.task).toBe(task) // Ensure the same instance is returned on subsequent calls + }) + + it('should create and return a PubSub instance', () => { + const pubSub = swClient.pubSub + expect(pubSub).toBeInstanceOf(PubSub) + expect(swClient.pubSub).toBe(pubSub) + }) + + it('should create and return a Chat instance', () => { + const chat = swClient.chat + expect(chat).toBeInstanceOf(Chat) + expect(swClient.chat).toBe(chat) + }) +}) diff --git a/packages/realtime-api/src/SWClient.ts b/packages/realtime-api/src/SWClient.ts index a6bc27d72..9f0fb3c0f 100644 --- a/packages/realtime-api/src/SWClient.ts +++ b/packages/realtime-api/src/SWClient.ts @@ -2,6 +2,8 @@ import { createClient } from './client/createClient' import type { Client } from './client/Client' import { clientConnect } from './client/clientConnect' import { Task } from './task/Task' +import { PubSub } from './pubSub/PubSub' +import { Chat } from './chat/Chat' export interface SWClientOptions { host?: string @@ -15,6 +17,8 @@ export interface SWClientOptions { export class SWClient { private _task: Task + private _pubSub: PubSub + private _chat: Chat public userOptions: SWClientOptions public client: Client @@ -38,4 +42,18 @@ export class SWClient { } return this._task } + + get pubSub() { + if (!this._pubSub) { + this._pubSub = new PubSub(this) + } + return this._pubSub + } + + get chat() { + if (!this._chat) { + this._chat = new Chat(this) + } + return this._chat + } } diff --git a/packages/realtime-api/src/SignalWire.ts b/packages/realtime-api/src/SignalWire.ts index 245773b6b..ca3aeeecd 100644 --- a/packages/realtime-api/src/SignalWire.ts +++ b/packages/realtime-api/src/SignalWire.ts @@ -12,3 +12,8 @@ export const SignalWire = (options: SWClientOptions): Promise => { } }) } + +export type { SWClient } from './SWClient' +export type { Chat } from './chat/Chat' +export type { PubSub } from './pubSub/PubSub' +export type { Task } from './task/Task' diff --git a/packages/realtime-api/src/chat/BaseChat.test.ts b/packages/realtime-api/src/chat/BaseChat.test.ts new file mode 100644 index 000000000..78383e622 --- /dev/null +++ b/packages/realtime-api/src/chat/BaseChat.test.ts @@ -0,0 +1,110 @@ +import { BaseChat } from './BaseChat' + +describe('BaseChat', () => { + // Using 'any' data type to bypass TypeScript checks for private or protected members. + let swClientMock: any + let baseChat: any + const listenOptions = { + channels: ['channel1', 'channel2'], + onEvent1: jest.fn(), + onEvent2: jest.fn(), + } + const eventMap: Record = { + onEvent1: 'event1', + onEvent2: 'event2', + } + + beforeEach(() => { + swClientMock = { + client: { + execute: jest.fn(), + }, + } + baseChat = new BaseChat(swClientMock) + + // Mock this._eventMap + baseChat._eventMap = eventMap + }) + + describe('listen', () => { + it('should throw an error if channels is not an array with at least one topic', async () => { + const thrownMessage = + 'Invalid options: channels should be an array with at least one channel!' + + await expect(baseChat.listen({ channels: [] })).rejects.toThrow( + thrownMessage + ) + await expect(baseChat.listen({ channels: 'topic' })).rejects.toThrow( + thrownMessage + ) + }) + + it('should call the subscribe method with listen options', async () => { + const subscribeMock = jest.spyOn(baseChat, 'subscribe') + + await baseChat.listen(listenOptions) + expect(subscribeMock).toHaveBeenCalledWith(listenOptions) + }) + + it('should resolve with a function to unsubscribe', async () => { + const unsubscribeMock = jest.fn().mockResolvedValue(undefined) + jest.spyOn(baseChat, 'subscribe').mockResolvedValue(unsubscribeMock) + + const unsub = await baseChat.listen(listenOptions) + expect(typeof unsub).toBe('function') + + await unsub() + expect(unsubscribeMock).toHaveBeenCalled() + }) + }) + + describe('subscribe', () => { + const { channels, ...listeners } = listenOptions + + it('should add channels and attach listeners', async () => { + const addChannelsMock = jest + .spyOn(baseChat, 'addChannels') + .mockResolvedValueOnce(null) + const attachListenersMock = jest.spyOn(baseChat, '_attachListeners') + + await expect(baseChat.subscribe(listenOptions)).resolves.toBeInstanceOf( + Function + ) + expect(addChannelsMock).toHaveBeenCalledWith(channels, [ + 'event1', + 'event2', + ]) + expect(attachListenersMock).toHaveBeenCalledWith(channels, listeners) + }) + + it('should remove channels and detach listeners when unsubscribed', async () => { + const removeChannelsMock = jest + .spyOn(baseChat, 'removeChannels') + .mockResolvedValueOnce(null) + const detachListenersMock = jest.spyOn(baseChat, '_detachListeners') + + const unsub = await baseChat.subscribe({ channels, ...listeners }) + expect(unsub).toBeInstanceOf(Function) + + await expect(unsub()).resolves.toBeUndefined() + expect(removeChannelsMock).toHaveBeenCalledWith(channels) + expect(detachListenersMock).toHaveBeenCalledWith(channels, listeners) + }) + }) + + describe('publish', () => { + const params = { channel: 'channel1', message: 'Hello from jest!' } + + it('should publish a chat message', async () => { + const executeMock = jest + .spyOn(baseChat._client, 'execute') + .mockResolvedValueOnce(undefined) + + await expect(baseChat.publish(params)).resolves.toBeUndefined() + expect(executeMock).toHaveBeenCalledWith({ + method: 'chat.publish', + params, + }) + }) + }) +}) diff --git a/packages/realtime-api/src/chat/BaseChat.ts b/packages/realtime-api/src/chat/BaseChat.ts new file mode 100644 index 000000000..fb5010505 --- /dev/null +++ b/packages/realtime-api/src/chat/BaseChat.ts @@ -0,0 +1,139 @@ +import { ExecuteParams, PubSubPublishParams, uuid } from '@signalwire/core' +import { BaseNamespace } from '../BaseNamespace' +import { SWClient } from '../SWClient' + +export interface BaseChatListenOptions { + channels: string[] +} + +export type BaseChatListenerKeys = keyof Omit + +export class BaseChat< + T extends BaseChatListenOptions +> extends BaseNamespace { + constructor(options: SWClient) { + super({ swClient: options }) + } + + public listen(listenOptions: T) { + return new Promise<() => Promise>(async (resolve, reject) => { + try { + const { channels } = listenOptions + if (!Array.isArray(channels) || channels?.length < 1) { + throw new Error( + 'Invalid options: channels should be an array with at least one channel!' + ) + } + const unsub = await this.subscribe(listenOptions) + resolve(unsub) + } catch (error) { + reject(error) + } + }) + } + + protected async subscribe(listenOptions: T) { + const { channels, ...listeners } = listenOptions + + const _uuid = uuid() + + // Attach listeners + this._attachListeners(channels, listeners) + + const listenerKeys = Object.keys(listeners) as Array + const events: string[] = [] + listenerKeys.forEach((key) => { + if (this._eventMap[key]) events.push(this._eventMap[key]) + }) + await this.addChannels(channels, events) + + const unsub = () => { + return new Promise(async (resolve, reject) => { + try { + // Remove the channels + const channelsToRemove = channels.filter( + (channel) => !this.hasOtherListeners(_uuid, channel) + ) + if (channelsToRemove.length > 0) { + await this.removeChannels(channelsToRemove) + } + + // Detach listeners + this._detachListeners(channels, listeners) + + // Remove channels from the listener map + this.removeFromListenerMap(_uuid) + + resolve() + } catch (error) { + reject(error) + } + }) + } + + this._listenerMap.set(_uuid, { + topics: new Set([...channels]), + listeners, + unsub, + }) + + return unsub + } + + private addChannels(channels: string[], events: string[]) { + return new Promise(async (resolve, reject) => { + try { + const execParams: ExecuteParams = { + method: 'chat.subscribe', + params: { + channels: channels.map((channel) => ({ + name: channel, + })), + events, + }, + } + + // @TODO: Do not subscribe if the user params are the same + + await this._client.execute(execParams) + resolve(undefined) + } catch (error) { + reject(error) + } + }) + } + + private removeChannels(channels: string[]) { + return new Promise(async (resolve, reject) => { + try { + const execParams: ExecuteParams = { + method: 'chat.unsubscribe', + params: { + channels: channels.map((channel) => ({ + name: channel, + })), + }, + } + + await this._client.execute(execParams) + resolve(undefined) + } catch (error) { + reject(error) + } + }) + } + + public publish(params: PubSubPublishParams) { + return new Promise((resolve, reject) => { + try { + const publish = this._client.execute({ + method: 'chat.publish', + params, + }) + resolve(publish) + } catch (error) { + reject(error) + } + }) + } +} diff --git a/packages/realtime-api/src/chat/Chat.test.ts b/packages/realtime-api/src/chat/Chat.test.ts new file mode 100644 index 000000000..f5cfc43fd --- /dev/null +++ b/packages/realtime-api/src/chat/Chat.test.ts @@ -0,0 +1,39 @@ +import { EventEmitter } from '@signalwire/core' +import { Chat } from './Chat' +import { createClient } from '../client/createClient' + +describe('Chat', () => { + let chat: Chat + const userOptions = { + host: 'example.com', + project: 'example.project', + token: 'example.token', + } + const swClientMock = { + userOptions, + client: createClient(userOptions), + } + + beforeEach(() => { + //@ts-expect-error + chat = new Chat(swClientMock) + }) + + afterEach(() => { + jest.clearAllMocks() + }) + + it('should have an event emitter', () => { + expect(chat['emitter']).toBeInstanceOf(EventEmitter) + }) + + it('should declare the correct event map', () => { + const expectedEventMap = { + onMessageReceived: 'chat.message', + onMemberJoined: 'chat.member.joined', + onMemberUpdated: 'chat.member.updated', + onMemberLeft: 'chat.member.left', + } + expect(chat['_eventMap']).toEqual(expectedEventMap) + }) +}) diff --git a/packages/realtime-api/src/chat/Chat.ts b/packages/realtime-api/src/chat/Chat.ts index 7820017aa..07d562dea 100644 --- a/packages/realtime-api/src/chat/Chat.ts +++ b/packages/realtime-api/src/chat/Chat.ts @@ -1,3 +1,50 @@ +import { + ChatMember, + ChatMessage, + EventEmitter, + ChatEvents, + Chat as ChatCore, +} from '@signalwire/core' +import { BaseChat, BaseChatListenOptions } from './BaseChat' +import { chatWorker } from './workers' +import { SWClient } from '../SWClient' + +interface ChatListenOptions extends BaseChatListenOptions { + onMessageReceived?: (message: ChatMessage) => unknown + onMemberJoined?: (member: ChatMember) => unknown + onMemberUpdated?: (member: ChatMember) => unknown + onMemberLeft?: (member: ChatMember) => unknown +} + +type ChatListenersKeys = keyof Omit + +export class Chat extends ChatCore.applyCommonMethods( + BaseChat +) { + private _chatEmitter = new EventEmitter() + protected _eventMap: Record = { + onMessageReceived: 'chat.message', + onMemberJoined: 'chat.member.joined', + onMemberUpdated: 'chat.member.updated', + onMemberLeft: 'chat.member.left', + } + + constructor(options: SWClient) { + super(options) + + this._client.runWorker('chatWorker', { + worker: chatWorker, + initialState: { + chatEmitter: this._chatEmitter, + }, + }) + } + + protected get emitter() { + return this._chatEmitter + } +} + export { ChatMember, ChatMessage } from '@signalwire/core' export type { ChatAction, @@ -34,5 +81,3 @@ export type { PubSubEventAction, PubSubPublishParams, } from '@signalwire/core' -export { ChatClientApiEvents, Client } from './ChatClient' -export type { ChatClientOptions } from './ChatClient' diff --git a/packages/realtime-api/src/chat/ChatClient.test.ts b/packages/realtime-api/src/chat/ChatClient.test.ts deleted file mode 100644 index 142221897..000000000 --- a/packages/realtime-api/src/chat/ChatClient.test.ts +++ /dev/null @@ -1,115 +0,0 @@ -import WS from 'jest-websocket-mock' -import { Client } from './ChatClient' - -jest.mock('uuid', () => { - return { - v4: jest.fn(() => 'mocked-uuid'), - } -}) - -describe('ChatClient', () => { - describe('Client', () => { - const host = 'ws://localhost:1234' - const token = '' - let server: WS - const authError = { - code: -32002, - message: - 'Authentication service failed with status ProtocolError, 401 Unauthorized: {}', - } - - beforeEach(async () => { - server = new WS(host, { jsonProtocol: true }) - server.on('connection', (socket: any) => { - socket.on('message', (data: any) => { - const parsedData = JSON.parse(data) - - if ( - parsedData.method === 'signalwire.connect' && - parsedData.params.authentication.token === '' - ) { - return socket.send( - JSON.stringify({ - jsonrpc: '2.0', - id: parsedData.id, - error: authError, - }) - ) - } - - socket.send( - JSON.stringify({ - jsonrpc: '2.0', - id: parsedData.id, - result: {}, - }) - ) - }) - }) - }) - - afterEach(() => { - WS.clean() - }) - - describe('Automatic connect', () => { - it('should automatically connect the underlying client', (done) => { - const chat = new Client({ - // @ts-expect-error - host, - project: 'some-project', - token, - }) - - chat.once('member.joined', () => {}) - - // @ts-expect-error - chat.session.on('session.connected', () => { - expect(server).toHaveReceivedMessages([ - { - jsonrpc: '2.0', - id: 'mocked-uuid', - method: 'signalwire.connect', - params: { - version: { major: 3, minor: 0, revision: 0 }, - authentication: { project: 'some-project', token: '' }, - }, - }, - ]) - - chat._session.disconnect() - - done() - }) - }) - }) - - it('should show an error if client.connect failed to connect', async () => { - const logger = { - error: jest.fn(), - info: jest.fn(), - trace: jest.fn(), - debug: jest.fn(), - warn: jest.fn(), - } - const chat = new Client({ - // @ts-expect-error - host, - project: 'some-project', - token: '', - logger: logger as any, - }) - - try { - await chat.subscribe('some-channel') - } catch (error) { - expect(error).toStrictEqual(new Error('Unauthorized')) - expect(logger.error).toHaveBeenNthCalledWith(1, 'Auth Error', { - code: -32002, - message: - 'Authentication service failed with status ProtocolError, 401 Unauthorized: {}', - }) - } - }) - }) -}) diff --git a/packages/realtime-api/src/chat/ChatClient.ts b/packages/realtime-api/src/chat/ChatClient.ts deleted file mode 100644 index ea8c2d705..000000000 --- a/packages/realtime-api/src/chat/ChatClient.ts +++ /dev/null @@ -1,114 +0,0 @@ -import { - ChatContract, - ConsumerContract, - UserOptions, - Chat as ChatNamespace, -} from '@signalwire/core' -import { clientConnect, setupClient, RealtimeClient } from '../client/index' -import type { RealTimeChatApiEventsHandlerMapping } from '../types/chat' - -export interface ChatClientApiEvents - extends ChatNamespace.BaseChatApiEvents {} - -export interface ClientFullState extends ChatClient {} -interface ChatClient - extends Omit, - Omit, 'subscribe'> { - new (opts: ChatClientOptions): this - - /** @internal */ - _session: RealtimeClient -} -export interface ChatClientOptions - extends Omit { - token?: string -} - -type ClientMethods = Exclude -const INTERCEPTED_METHODS: ClientMethods[] = [ - 'subscribe', - 'publish', - 'getMessages', - 'getMembers', - 'getMemberState', - 'setMemberState', -] -const UNSUPPORTED_METHODS = ['getAllowedChannels', 'updateToken'] - -/** - * You can use instances of this class to control the chat and subscribe to its - * events. Please see {@link ChatClientApiEvents} for the full list of events - * you can subscribe to. - * - * @param options - {@link ChatClientOptions} - * - * @returns - {@link ChatClient} - * - * @example - * - * ```javascript - * const chatClient = new Chat.Client({ - * project: '', - * token: '' - * }) - * - * await chatClient.subscribe([ 'mychannel1', 'mychannel2' ]) - * - * chatClient.on('message', (message) => { - * console.log("Received", message.content, - * "on", message.channel, - * "at", message.publishedAt) - * }) - * - * await chatClient.publish({ - * channel: 'mychannel1', - * content: 'hello world' - * }) - * ``` - */ -const ChatClient = function (options?: ChatClientOptions) { - const { client, store } = setupClient(options) - const chat = ChatNamespace.createBaseChatObject({ - store, - }) - - const createInterceptor = (prop: K) => { - return async (...params: Parameters) => { - await clientConnect(client) - - // @ts-expect-error - return chat[prop](...params) - } - } - - const interceptors = { - _session: client, - disconnect: () => client.disconnect(), - } as const - - return new Proxy(chat, { - get(target: ChatClient, prop: keyof ChatClient, receiver: any) { - if (prop in interceptors) { - // @ts-expect-error - return interceptors[prop] - } - - // FIXME: types and _session check - if (prop !== '_session' && INTERCEPTED_METHODS.includes(prop)) { - return createInterceptor(prop) - } else if (UNSUPPORTED_METHODS.includes(prop)) { - return undefined - } - - // Always connect the underlying client if the user call a function on the Proxy - if (typeof target[prop] === 'function') { - clientConnect(client) - } - - return Reflect.get(target, prop, receiver) - }, - }) - // For consistency with other constructors we'll make TS force the use of `new` -} as unknown as { new (options?: ChatClientOptions): ChatClient } - -export { ChatClient as Client } diff --git a/packages/realtime-api/src/chat/workers/chatWorker.ts b/packages/realtime-api/src/chat/workers/chatWorker.ts new file mode 100644 index 000000000..1c6117e1a --- /dev/null +++ b/packages/realtime-api/src/chat/workers/chatWorker.ts @@ -0,0 +1,65 @@ +import { SagaIterator } from '@redux-saga/core' +import { Chat } from '../Chat' +import { + sagaEffects, + SDKWorker, + getLogger, + ChatAction, + toExternalJSON, + ChatMessage, + ChatMember, + SDKActions, +} from '@signalwire/core' +import { prefixEvent } from '../../utils/internals' + +export const chatWorker: SDKWorker = function* (options): SagaIterator { + getLogger().trace('chatWorker started') + const { + channels: { swEventChannel }, + initialState: { chatEmitter }, + } = options + + function* worker(action: ChatAction) { + const { type, payload } = action + + switch (type) { + case 'chat.channel.message': { + const { channel, message } = payload + const externalJSON = toExternalJSON({ + ...message, + channel, + }) + const chatMessage = new ChatMessage(externalJSON) + + chatEmitter.emit(prefixEvent(channel, 'chat.message'), chatMessage) + break + } + case 'chat.member.joined': + case 'chat.member.updated': + case 'chat.member.left': { + const { member, channel } = payload + const externalJSON = toExternalJSON(member) + const chatMember = new ChatMember(externalJSON) + + chatEmitter.emit(prefixEvent(channel, type), chatMember) + break + } + default: + getLogger().warn(`Unknown chat event: "${type}"`, payload) + break + } + } + + const isChatEvent = (action: SDKActions) => action.type.startsWith('chat.') + + while (true) { + const action: ChatAction = yield sagaEffects.take( + swEventChannel, + isChatEvent + ) + + yield sagaEffects.fork(worker, action) + } + + getLogger().trace('chatWorker ended') +} diff --git a/packages/realtime-api/src/chat/workers/index.ts b/packages/realtime-api/src/chat/workers/index.ts new file mode 100644 index 000000000..c872556cb --- /dev/null +++ b/packages/realtime-api/src/chat/workers/index.ts @@ -0,0 +1 @@ +export * from './chatWorker' diff --git a/packages/realtime-api/src/index.ts b/packages/realtime-api/src/index.ts index 0048a5fe9..740b34b76 100644 --- a/packages/realtime-api/src/index.ts +++ b/packages/realtime-api/src/index.ts @@ -50,50 +50,6 @@ */ export * as Video from './video/Video' -/** - * Access the Chat API Consumer. You can instantiate a {@link Chat.Client} to - * subscribe to Chat events. Please check {@link Chat.ChatClientApiEvents} - * for the full list of events that a {@link Chat.Client} can subscribe to. - * - * @example - * - * The following example logs the messages sent to the "welcome" channel. - * - * ```javascript - * const chatClient = new Chat.Client({ - * project: '', - * token: '' - * }) - * - * chatClient.on('message', m => console.log(m)) - * - * await chatClient.subscribe("welcome") - * ``` - */ -export * as Chat from './chat/Chat' - -/** - * Access the PubSub API Consumer. You can instantiate a {@link PubSub.Client} to - * subscribe to PubSub events. Please check {@link PubSub.PubSubClientApiEvents} - * for the full list of events that a {@link PubSub.Client} can subscribe to. - * - * @example - * - * The following example logs the messages sent to the "welcome" channel. - * - * ```javascript - * const pubSubClient = new PubSub.Client({ - * project: '', - * token: '' - * }) - * - * pubSubClient.on('message', m => console.log(m)) - * - * await pubSubClient.subscribe("welcome") - * ``` - */ -export * as PubSub from './pubSub/PubSub' - /** @ignore */ export * from './configure' @@ -162,4 +118,4 @@ export * as Messaging from './messaging/Messaging' */ export * as Voice from './voice/Voice' -export { SignalWire } from './SignalWire' +export * from './SignalWire' diff --git a/packages/realtime-api/src/pubSub/PubSub.test.ts b/packages/realtime-api/src/pubSub/PubSub.test.ts new file mode 100644 index 000000000..1cb8a2a6e --- /dev/null +++ b/packages/realtime-api/src/pubSub/PubSub.test.ts @@ -0,0 +1,36 @@ +import { EventEmitter } from '@signalwire/core' +import { PubSub } from './PubSub' +import { createClient } from '../client/createClient' + +describe('PubSub', () => { + let pubSub: PubSub + const userOptions = { + host: 'example.com', + project: 'example.project', + token: 'example.token', + } + const swClientMock = { + userOptions, + client: createClient(userOptions), + } + + beforeEach(() => { + //@ts-expect-error + pubSub = new PubSub(swClientMock) + }) + + afterEach(() => { + jest.clearAllMocks() + }) + + it('should have an event emitter', () => { + expect(pubSub['emitter']).toBeInstanceOf(EventEmitter) + }) + + it('should declare the correct event map', () => { + const expectedEventMap = { + onMessageReceived: 'chat.message', + } + expect(pubSub['_eventMap']).toEqual(expectedEventMap) + }) +}) diff --git a/packages/realtime-api/src/pubSub/PubSub.ts b/packages/realtime-api/src/pubSub/PubSub.ts index 5b6b2a7fb..e418f47d7 100644 --- a/packages/realtime-api/src/pubSub/PubSub.ts +++ b/packages/realtime-api/src/pubSub/PubSub.ts @@ -1,4 +1,42 @@ -export { PubSubMessage } from '@signalwire/core' -export { Client } from './PubSubClient' +import { + EventEmitter, + PubSubMessageEventName, + PubSubNamespace, + PubSubMessage, +} from '@signalwire/core' +import { SWClient } from '../SWClient' +import { pubSubWorker } from './workers' +import { BaseChat, BaseChatListenOptions } from '../chat/BaseChat' + +interface PubSubListenOptions extends BaseChatListenOptions { + onMessageReceived?: (message: PubSubMessage) => unknown +} + +type PubSubListenersKeys = keyof Omit + +export class PubSub extends BaseChat { + private _pubSubEmitter = new EventEmitter() + protected _eventMap: Record< + PubSubListenersKeys, + `${PubSubNamespace}.${PubSubMessageEventName}` + > = { + onMessageReceived: 'chat.message', + } + + constructor(options: SWClient) { + super(options) + + this._client.runWorker('pubSubWorker', { + worker: pubSubWorker, + initialState: { + pubSubEmitter: this._pubSubEmitter, + }, + }) + } + + protected get emitter() { + return this._pubSubEmitter + } +} + export type { PubSubMessageContract } from '@signalwire/core' -export type { PubSubClientApiEvents } from './PubSubClient' diff --git a/packages/realtime-api/src/pubSub/PubSubClient.ts b/packages/realtime-api/src/pubSub/PubSubClient.ts deleted file mode 100644 index ee3177c42..000000000 --- a/packages/realtime-api/src/pubSub/PubSubClient.ts +++ /dev/null @@ -1,96 +0,0 @@ -import { - ConsumerContract, - UserOptions, - PubSub as PubSubNamespace, - PubSubContract, -} from '@signalwire/core' -import { clientConnect, setupClient, RealtimeClient } from '../client/index' -import type { RealTimePubSubApiEventsHandlerMapping } from '../types/pubSub' - -export interface PubSubClientApiEvents - extends PubSubNamespace.BasePubSubApiEvents {} - -export interface ClientFullState extends PubSubClient {} -interface PubSubClient - extends Omit, - Omit< - ConsumerContract, - 'subscribe' - > { - new (opts: PubSubClientOptions): this - - /** @internal */ - _session: RealtimeClient -} - -interface PubSubClientOptions - extends Omit { - token?: string -} - -type ClientMethods = Exclude -const INTERCEPTED_METHODS: ClientMethods[] = ['subscribe', 'publish'] -const UNSUPPORTED_METHODS = ['getAllowedChannels', 'updateToken'] - -/** - * Creates a new PubSub client. - * - * @param options - {@link PubSubClientOptions} - * - * @example - * - * ```js - * import { PubSub } from '@signalwire/realtime-api' - * - * const pubSubClient = new PubSub.Client({ - * project: '', - * token: '' - * }) - * ``` - */ -const PubSubClient = function (options?: PubSubClientOptions) { - const { client, store } = setupClient(options) - const pubSub = PubSubNamespace.createBasePubSubObject({ - store, - }) - - const createInterceptor = (prop: K) => { - return async (...params: Parameters) => { - await clientConnect(client) - - // @ts-expect-error - return pubSub[prop](...params) - } - } - - const interceptors = { - _session: client, - disconnect: () => client.disconnect(), - } as const - - return new Proxy(pubSub, { - get(target: PubSubClient, prop: keyof PubSubClient, receiver: any) { - if (prop in interceptors) { - // @ts-expect-error - return interceptors[prop] - } - - // FIXME: types and _session check - if (prop !== '_session' && INTERCEPTED_METHODS.includes(prop)) { - return createInterceptor(prop) - } else if (UNSUPPORTED_METHODS.includes(prop)) { - return undefined - } - - // Always connect the underlying client if the user call a function on the Proxy - if (typeof target[prop] === 'function') { - clientConnect(client) - } - - return Reflect.get(target, prop, receiver) - }, - }) - // For consistency with other constructors we'll make TS force the use of `new` -} as unknown as { new (options?: PubSubClientOptions): PubSubClient } - -export { PubSubClient as Client } diff --git a/packages/realtime-api/src/pubSub/workers/index.ts b/packages/realtime-api/src/pubSub/workers/index.ts new file mode 100644 index 000000000..439bd7018 --- /dev/null +++ b/packages/realtime-api/src/pubSub/workers/index.ts @@ -0,0 +1 @@ +export * from './pubSubWorker' diff --git a/packages/realtime-api/src/pubSub/workers/pubSubWorker.ts b/packages/realtime-api/src/pubSub/workers/pubSubWorker.ts new file mode 100644 index 000000000..fbf0772e5 --- /dev/null +++ b/packages/realtime-api/src/pubSub/workers/pubSubWorker.ts @@ -0,0 +1,66 @@ +import { SagaIterator } from '@redux-saga/core' +import { PubSub } from '../PubSub' +import { + sagaEffects, + PubSubEventAction, + SDKWorker, + getLogger, + PubSubMessage, + toExternalJSON, +} from '@signalwire/core' +import { prefixEvent } from '../../utils/internals' + +export const pubSubWorker: SDKWorker = function* ( + options +): SagaIterator { + getLogger().trace('pubSubWorker started') + const { + channels: { swEventChannel }, + initialState: { pubSubEmitter }, + } = options + + function* worker(action: PubSubEventAction) { + const { type, payload } = action + + switch (type) { + case 'chat.channel.message': { + const { + channel, + /** + * Since we're using the same event as `Chat` + * the payload comes with a `member` prop. To + * avoid confusion (since `PubSub` doesn't + * have members) we'll remove it from the + * payload sent to the end user. + */ + // @ts-expect-error + message: { member, ...restMessage }, + } = payload + const externalJSON = toExternalJSON({ + ...restMessage, + channel, + }) + const pubSubMessage = new PubSubMessage(externalJSON) + + pubSubEmitter.emit(prefixEvent(channel, 'chat.message'), pubSubMessage) + break + } + default: + getLogger().warn(`Unknown pubsub event: "${type}"`, payload) + break + } + } + + const isPubSubEvent = (action: any) => action.type.startsWith('chat.') + + while (true) { + const action: PubSubEventAction = yield sagaEffects.take( + swEventChannel, + isPubSubEvent + ) + + yield sagaEffects.fork(worker, action) + } + + getLogger().trace('pubSubWorker ended') +} diff --git a/packages/realtime-api/src/task/Task.test.ts b/packages/realtime-api/src/task/Task.test.ts new file mode 100644 index 000000000..b22f550e1 --- /dev/null +++ b/packages/realtime-api/src/task/Task.test.ts @@ -0,0 +1,78 @@ +import { request } from 'node:https' +import { EventEmitter } from '@signalwire/core' +import { Task, PATH } from './Task' +import { createClient } from '../client/createClient' + +jest.mock('node:https', () => { + return { + request: jest.fn().mockImplementation((_, callback) => { + callback({ statusCode: 204 }) + }), + } +}) + +describe('Task', () => { + let task: Task + const userOptions = { + host: 'example.com', + project: 'example.project', + token: 'example.token', + } + const swClientMock = { + userOptions, + client: createClient(userOptions), + } + const topic = 'jest-topic' + const message = { data: 'Hello from jest!' } + + beforeEach(() => { + // @ts-expect-error + task = new Task(swClientMock) + }) + + afterEach(() => { + jest.clearAllMocks() + }) + + it('should have an event emitter', () => { + expect(task['emitter']).toBeInstanceOf(EventEmitter) + }) + + it('should declare the correct event map', () => { + const expectedEventMap = { + onTaskReceived: 'task.received', + } + expect(task['_eventMap']).toEqual(expectedEventMap) + }) + + it('should throw an error when sending a task with invalid options', async () => { + // Create a new instance of Task with invalid options + const invalidTask = new Task({ + // @ts-expect-error + userOptions: {}, + client: createClient(userOptions), + }) + + await expect(async () => { + await invalidTask.send({ topic, message }) + }).rejects.toThrowError('Invalid options: project and token are required!') + }) + + it('should send a task', async () => { + await task.send({ topic, message }) + + expect(request).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'POST', + path: PATH, + host: userOptions.host, + headers: expect.objectContaining({ + 'Content-Type': 'application/json', + 'Content-Length': expect.any(Number), + Authorization: expect.stringContaining('Basic'), + }), + }), + expect.any(Function) + ) + }) +}) diff --git a/packages/realtime-api/src/task/Task.ts b/packages/realtime-api/src/task/Task.ts index cd734d2cb..fe06d09ea 100644 --- a/packages/realtime-api/src/task/Task.ts +++ b/packages/realtime-api/src/task/Task.ts @@ -8,7 +8,7 @@ import { SWClient } from '../SWClient' import { taskWorker } from './workers' import { ListenOptions, BaseNamespace } from '../BaseNamespace' -const PATH = '/api/relay/rest/tasks' +export const PATH = '/api/relay/rest/tasks' const HOST = 'relay.signalwire.com' interface TaskListenOptions extends ListenOptions { @@ -34,7 +34,7 @@ export class Task extends BaseNamespace { }) } - get emitter() { + protected get emitter() { return this._taskEmitter }