Skip to content

Commit

Permalink
PubSub namespace (#533)
Browse files Browse the repository at this point in the history
* wip for the pubSub namespace

* expose pubSub in core

* rename contract, update chat contract

* add pubSub client in realtime-api

* remove unused

* create PubSubPagingCursor type

* update usage of cursor

* wip for adding pubSub to js

* fix typing

* add publish to chat

* cleanup BasePubSub

* mark as override for clarity

* add message transform, create base pubSub message and extend chat's message from it, fix client singleton for pubSub

* expose PubSub from reatltime-api

* update chat message to extend from pubsub message

* add worker for handling chat.channel.message

* fix exports

* add e2e for pubSub

* add playground for pubSub

* add test for pubSub client in js

* added guard to avoid handling the same event twice, move chat to new api for workers

* remove member from payload

* remove logger

* remove initial payload from chat

* remove loggers from e2e test

* prettier

* fix comment

* fix naming

* add comment

* fix naming and imports

* remove timeout

* fix transform type for chat

* fix transform type for pubSub

* improve comment for chat.channel.message

* add comment explaining ChannelMessageEventName

* rename type

* remove unused

* add changelog
  • Loading branch information
framini authored May 16, 2022
1 parent 12c6458 commit b6d5bb3
Show file tree
Hide file tree
Showing 31 changed files with 1,478 additions and 210 deletions.
7 changes: 7 additions & 0 deletions .changeset/silent-eagles-compare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@signalwire/core': minor
'@signalwire/js': minor
'@signalwire/realtime-api': minor
---

Introduce PubSub namespace
197 changes: 197 additions & 0 deletions internal/e2e-realtime-api/src/pubSub.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/**
* The goal here is to run PubSub from `realtime-api` and
* `js` SDKs and make sure they both receive the proper
* responses and events. The `handler` method grab a CRT and
* connects a JS PubSubClient and a RealtimeAPI PubSubClient
* and the consume all the methods asserting both SDKs
* receive the proper events.
*/
import { timeoutPromise } from '@signalwire/core'
import { PubSub as RealtimeAPIPubSub } from '@signalwire/realtime-api'
import { PubSub as JSPubSub } from '@signalwire/js'
import { WebSocket } from 'ws'
import { createTestRunner, createCRT } from './utils'

// @ts-ignore
global.WebSocket = WebSocket

const promiseTimeout = 4_000
const promiseException = 4 // error code to identify the Promise timeout
// TODO: pass as argument
const channel = 'rw'

const params = {
memberId: 'e2e-uuid-here',
channels: {
rw: {
read: true,
write: true,
},
r: {
read: true,
},
w: {
write: true,
},
},
}

type PubSubClient = RealtimeAPIPubSub.Client | JSPubSub.Client
const testPubSubClientSubscribe = (
firstClient: PubSubClient,
secondClient: PubSubClient
) => {
const promise = new Promise<number>(async (resolve, reject) => {
console.log('Running subscribe..')

firstClient.once('message', () => {})
secondClient.once('message', () => {})
try {
await Promise.all([
firstClient.subscribe(channel),
secondClient.subscribe(channel),
])
resolve(0)
} catch (e) {
reject(4)
}
})

return timeoutPromise(promise, promiseTimeout, promiseException)
}

const testPubSubClientPublish = (
firstClient: PubSubClient,
secondClient: PubSubClient
) => {
const promise = new Promise<number>(async (resolve) => {
console.log('Running publish..')
let events = 0
const resolveIfDone = () => {
if (events === 2) {
resolve(0)
}
}

const now = Date.now()
firstClient.once('message', (message) => {
console.log('jsPubSub message', message)
if (message.meta.now === now) {
events += 1
resolveIfDone()
}
})
secondClient.once('message', (message) => {
console.log('rtPubSub message', message)
if (message.meta.now === now) {
events += 1
resolveIfDone()
}
})

await Promise.all([
firstClient.subscribe(channel),
secondClient.subscribe(channel),
])

await firstClient.publish({
content: 'Hello There',
channel,
meta: {
now,
foo: 'bar',
},
})
})

return timeoutPromise(promise, promiseTimeout, promiseException)
}

const testPubSubClientUnsubscribe = (
firstClient: PubSubClient,
secondClient: PubSubClient
) => {
const promise = new Promise<number>(async (resolve, reject) => {
console.log('Running unsubscribe..')

try {
await Promise.all([
firstClient.subscribe(channel),
secondClient.subscribe(channel),
])

await firstClient.unsubscribe(channel)

await secondClient.unsubscribe(channel)

resolve(0)
} catch (e) {
reject(4)
}
})

return timeoutPromise(promise, promiseTimeout, promiseException)
}

const handler = async () => {
// Create JS PubSub Client
const CRT = await createCRT(params)
const jsPubSub = new JSPubSub.Client({
host: process.env.RELAY_HOST,
// @ts-expect-error
token: CRT.token,
})

console.log('Created jsPubSub')

// Create RT-API PubSub Client
const rtPubSub = new RealtimeAPIPubSub.Client({
// @ts-expect-error
host: process.env.RELAY_HOST,
project: process.env.RELAY_PROJECT as string,
token: process.env.RELAY_TOKEN as string,
})

console.log('Created rtPubSub')

// Test Subscribe
const subscribeResultCode = await testPubSubClientSubscribe(
jsPubSub,
rtPubSub
)
if (subscribeResultCode !== 0) {
return subscribeResultCode
}

// Test Publish
const jsPubSubPublishCode = await testPubSubClientPublish(jsPubSub, rtPubSub)
if (jsPubSubPublishCode !== 0) {
return jsPubSubPublishCode
}
const rtPubSubPublishCode = await testPubSubClientPublish(rtPubSub, jsPubSub)
if (rtPubSubPublishCode !== 0) {
return rtPubSubPublishCode
}

// Test Unsubscribe
const unsubscribeResultCode = await testPubSubClientUnsubscribe(
jsPubSub,
rtPubSub
)
if (unsubscribeResultCode !== 0) {
return unsubscribeResultCode
}

return 0
}

async function main() {
const runner = createTestRunner({
name: 'PubSub E2E',
testHandler: handler,
})

await runner.run()
}

main()
44 changes: 44 additions & 0 deletions internal/playground-realtime-api/src/pubSub/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { PubSub } from '@signalwire/realtime-api'

async function run() {
try {
const pubSub = new PubSub.Client({
// @ts-expect-error
host: process.env.HOST || 'relay.swire.io',
project: process.env.PROJECT as string,
token: process.env.TOKEN as string,
logLevel: 'trace',
debug: {
logWsTraffic: true,
},
})

const channel = 'channel-name-here'

pubSub.on('message', (message) => {
console.log('message', message)
})

await pubSub.subscribe([channel])

const pubRes = await pubSub.publish({
content: 'Hello There',
channel: channel,
meta: {
fooId: 'randomValue',
},
})

console.log('Publish Result --->', pubRes)

const unsubscribeRes = await pubSub.unsubscribe(channel)

console.log('Unsubscribe Result --->', unsubscribeRes)

console.log('Client Running..')
} catch (error) {
console.log('<Error>', error)
}
}

run()
Loading

0 comments on commit b6d5bb3

Please sign in to comment.