Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve auto-subscribe logic #596

Merged
merged 10 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/lovely-crabs-impress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@signalwire/core': patch
'@signalwire/realtime-api': patch
---

Improve auto-subscribe logic in Video client and PubSub
6 changes: 6 additions & 0 deletions .changeset/popular-readers-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@sw-internal/e2e-js': patch
'@sw-internal/playground-js': patch
---

Add PubSub examples and e2e tests
75 changes: 75 additions & 0 deletions internal/e2e-js/tests/pubSub.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { test, expect } from '@playwright/test'
import { createTestServer, createTestCRTToken } from '../utils'

test.describe('PubSub', () => {
let server: any = null

test.beforeAll(async () => {
server = await createTestServer()
await server.start()
})

test.afterAll(async () => {
await server.close()
})

test('should subscribe to a PubSub channel and publish a message', async ({
page,
}) => {
await page.goto(server.url)

page.on('console', (log) => {
console.log(log)
})

const channel = 'js-e2e'
const messageContent = Date.now().toString()

const crt = await createTestCRTToken({
ttl: 30,
member_id: 'chat-e2e',
state: {},
channels: {
[channel]: {
read: true,
write: true,
},
},
})
const chatMessage: any = await page.evaluate(
(options) => {
return new Promise(async (resolve) => {
try {
// @ts-expect-error
const PubSub = window._SWJS.PubSub
const pubSubClient = new PubSub.Client({
host: options.RELAY_HOST,
token: options.API_TOKEN,
})
// .subscribe should be after .on but i left here for test.
await pubSubClient.subscribe([options.channel])
pubSubClient.on('message', (message: any) => {
resolve(message)
})

await pubSubClient.publish({
channel: options.channel,
content: options.messageContent,
})
} catch (error) {
console.log('PubSub Error', error)
}
})
},
{
RELAY_HOST: process.env.RELAY_HOST,
API_TOKEN: crt,
channel,
messageContent,
}
)

expect(chatMessage.content).toBe(messageContent)
expect(chatMessage.channel).toBe(channel)
})
})
24 changes: 24 additions & 0 deletions internal/e2e-js/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,27 @@ export const createTestVRTToken = async (body: CreateTestVRTOptions) => {
const data = await response.json()
return data.token
}

interface CreateTestCRTOptions {
ttl: number
member_id: string
state: Record<string, any>
channels: Record<string, { read?: boolean; write?: boolean }>
}

export const createTestCRTToken = async (body: CreateTestCRTOptions) => {
const authCreds = `${process.env.RELAY_PROJECT}:${process.env.RELAY_TOKEN}`
const response = await fetch(
`https://${process.env.API_HOST}/api/chat/tokens`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Basic ${Buffer.from(authCreds).toString('base64')}`,
},
body: JSON.stringify(body),
}
)
const data = await response.json()
return data.token
}
20 changes: 20 additions & 0 deletions internal/playground-js/src/pubSub/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!DOCTYPE html>
<html>
<head>
<title>Signalwire Chat Demo</title>
<meta charset="utf-8" />
<meta
name="viewport"
content="width=device-width, initial-scale=1, shrink-to-fit=no"
/>
<link
rel="shortcut icon"
href="https://signalwire.com/assets/images/favicon.ico"
/>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-gray-200">
<h1>PubSub Test</h1>
<script type="module" src="./index.js"></script>
</body>
</html>
27 changes: 27 additions & 0 deletions internal/playground-js/src/pubSub/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Chat as PubSub } from '@signalwire/js'

window.connect = async ({ channels, host, token }) => {
const pubSubClient = new PubSub.Client({
host,
token,
})

// .subscribe should be after .on but i left here for test.
await pubSubClient.subscribe(channels)

pubSubClient.on('message', (message) => {
console.log(
'Received',
message.content,
'on',
message.channel,
'at',
message.publishedAt
)
})

await pubSubClient.publish({
channel: channels[0],
content: 'Hello World',
})
}
30 changes: 17 additions & 13 deletions packages/core/src/BaseConsumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
import { BaseConsumer, connect, EventEmitter } from '.'
import type { SDKStore } from './redux'
import { configureJestStore } from './testUtils'
import { BaseConsumer, connect } from '.'
import { configureFullStack } from './testUtils'

describe('BaseConsumer', () => {
describe('subscribe', () => {
let store: SDKStore
let instance: any
let fullStack: ReturnType<typeof configureFullStack>

beforeEach(() => {
store = configureJestStore()
fullStack = configureFullStack()

instance = connect({
store,
store: fullStack.store,
componentListeners: {
errors: 'onError',
responses: 'onSuccess',
},
Component: BaseConsumer,
})({
emitter: new EventEmitter(),
emitter: fullStack.emitter,
})
instance.execute = jest.fn()
instance._attachListeners(instance.__uuid)
})

it('should be idempotent', () => {
afterEach(() => {
fullStack.destroy()
})

it('should be idempotent', async () => {
instance.on('something-1', () => {})
instance.on('something-2', () => {})
instance.on('something-2', () => {})

instance.subscribe()
instance.subscribe()
instance.subscribe()
instance.subscribe()
instance.subscribe()
await instance.subscribe()
await instance.subscribe()
await instance.subscribe()
await instance.subscribe()
await instance.subscribe()
expect(instance.execute).toHaveBeenCalledTimes(1)
})
})
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/BaseConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export class BaseConsumer<
}

async subscribe() {
await this._waitUntilSessionAuthorized()

const subscriptions = this.getSubscriptions()

if (subscriptions.length === 0) {
Expand Down
13 changes: 13 additions & 0 deletions packages/core/src/pubSub/BasePubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,20 @@ export class BasePubSubConsumer<
}
}

private _checkMissingSubscriptions() {
const subscriptions = this.getSubscriptions()
if (subscriptions.length === 0) {
this.logger.info(
'Subscribe was called before any listeners were attached. Move `.subscribe()` right after your event listeners to suppress this message.'
)
// @ts-ignore
this.once('message', () => {})
}
}

async subscribe(channels?: PubSubChannel) {
this._checkMissingSubscriptions()

const params = this._getSubscribeParams({ channels })

this._setSubscribeParams(params)
Expand Down
39 changes: 39 additions & 0 deletions packages/core/src/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { PubSubChannel, SwEventChannel } from './redux/interfaces'
import { BaseSession } from './BaseSession'
import { RPCConnectResult, InternalSDKLogger } from './utils/interfaces'
import { EventEmitter } from './utils/EventEmitter'
import { actions } from '.'

const PROJECT_ID = '8f0a119a-cda7-4497-a47d-c81493b824d4'
const TOKEN = '<VRT>'
Expand Down Expand Up @@ -40,6 +41,44 @@ export const configureJestStore = (
}) as SDKStore
}

/**
* Helper method to configure a Store with a rootSaga
* and a mocked Session object.
* This allow to write integration tests.
*
* @returns { store, session, emitter, destroy }
*/
export const configureFullStack = () => {
const session = {
dispatch: console.log,
connect: jest.fn(),
disconnect: jest.fn(),
execute: jest.fn(),
}
const emitter = new EventEmitter()
const store = configureStore({
userOptions: {
project: PROJECT_ID,
token: TOKEN,
devTools: false,
emitter,
},
SessionConstructor: jest.fn().mockImplementation(() => {
return session
}),
})

store.dispatch(actions.initAction())
store.dispatch(actions.authSuccessAction())

return {
store,
session,
emitter,
destroy: () => store.dispatch(actions.destroyAction()),
}
}

export const wait = (ms: number) => {
return new Promise((resolve) => {
setTimeout(resolve, ms)
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ export const validateEventsToSubscribe = <T = string>(events: T[]): T[] => {
if (
CLIENT_SIDE_EVENT_NAMES.includes(event) ||
isSyntheticEvent(event) ||
isLocalEvent(event) ||
isSessionEvent(event)
) {
return null
Expand Down
42 changes: 32 additions & 10 deletions packages/realtime-api/src/chat/ChatClient.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import WS from 'jest-websocket-mock'
import { Client } from './ChatClient'

jest.mock('uuid', () => {
return {
v4: jest.fn(() => 'mocked-uuid'),
}
})

describe('ChatClient', () => {
describe('Client', () => {
const host = 'ws://localhost:1234'
Expand All @@ -13,7 +19,7 @@ describe('ChatClient', () => {
}

beforeEach(async () => {
server = new WS(host)
server = new WS(host, { jsonProtocol: true })
server.on('connection', (socket: any) => {
socket.on('message', (data: any) => {
const parsedData = JSON.parse(data)
Expand Down Expand Up @@ -58,6 +64,18 @@ describe('ChatClient', () => {
chat.once('member.joined', () => {})

chat._session.on('session.connected', () => {
expect(server).toHaveReceivedMessages([
{
jsonrpc: '2.0',
id: 'mocked-uuid',
method: 'signalwire.connect',
params: {
version: { major: 3, minor: 0, revision: 0 },
authentication: { project: 'some-project', token: '<jwt>' },
},
},
])

chat._session.disconnect()

done()
Expand All @@ -68,6 +86,7 @@ describe('ChatClient', () => {
it('should show an error if client.connect failed to connect', async () => {
const logger = {
error: jest.fn(),
info: jest.fn(),
trace: jest.fn(),
debug: jest.fn(),
warn: jest.fn(),
Expand All @@ -80,15 +99,18 @@ describe('ChatClient', () => {
logger: logger as any,
})

await chat.subscribe('some-channel')

expect(logger.error).toHaveBeenNthCalledWith(1, 'Auth Error', {
code: -32002,
message:
'Authentication service failed with status ProtocolError, 401 Unauthorized: {}',
})

chat._session.disconnect()
try {
await chat.subscribe('some-channel')
} catch (error) {
expect(error).toStrictEqual(new Error('Unauthorized'))
expect(logger.error).toHaveBeenNthCalledWith(1, 'Auth Error', {
code: -32002,
message:
'Authentication service failed with status ProtocolError, 401 Unauthorized: {}',
})
} finally {
chat._session.disconnect()
}
})
})
})
Loading