Skip to content
This repository has been archived by the owner on Oct 18, 2024. It is now read-only.

Commit

Permalink
feat: observer and handle ioc connection states
Browse files Browse the repository at this point in the history
  • Loading branch information
carlossantos74 committed Jun 23, 2024
1 parent 30db5e8 commit e0f09d1
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 46 deletions.
8 changes: 8 additions & 0 deletions __mocks__/io.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ import { jest } from '@jest/globals';
import { Room } from '@superviz/socket-client';

export const MOCK_IO = {
ClientState: {
CONNECTED: 'CONNECTED',
CONNECTING: 'CONNECTING',
DISCONNECTED: 'DISCONNECTED',
CONNECTION_ERROR: 'CONNECTION_ERROR',
RECONNECTING: 'RECONNECTING',
RECONNECT_ERROR: 'RECONNECT_ERROR',
},
PresenceEvents: {
JOINED_ROOM: 'presence.joined-room',
LEAVE: 'presence.leave',
Expand Down
38 changes: 18 additions & 20 deletions src/core/launcher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ApiService from '../../services/api';
import config from '../../services/config';
import { EventBus } from '../../services/event-bus';
import { IOC } from '../../services/io';
import { IOCState } from '../../services/io/types';
import LimitsService from '../../services/limits';
import { Presence3DManager } from '../../services/presence-3d-manager';
import { SlotService } from '../../services/slot';
Expand All @@ -29,7 +30,7 @@ export class Launcher extends Observable implements DefaultLauncher {
private activeComponentsInstances: Partial<BaseComponent>[] = [];

private ioc: IOC;
private LauncherRealtimeRoom: Socket.Room;
private room: Socket.Room;
private eventBus: EventBus = new EventBus();
private timestamp: number = 0;

Expand All @@ -49,7 +50,7 @@ export class Launcher extends Observable implements DefaultLauncher {

group.publish(participantGroup);
this.ioc = new IOC(localParticipant.value);
this.LauncherRealtimeRoom = this.ioc.createRoom('launcher');
this.room = this.ioc.createRoom('launcher');

// internal events without realtime
this.eventBus = new EventBus();
Expand Down Expand Up @@ -169,9 +170,9 @@ export class Launcher extends Observable implements DefaultLauncher {
this.eventBus.destroy();
this.eventBus = undefined;

this.LauncherRealtimeRoom?.presence.off(Socket.PresenceEvents.JOINED_ROOM);
this.LauncherRealtimeRoom?.presence.off(Socket.PresenceEvents.LEAVE);
this.LauncherRealtimeRoom?.presence.off(Socket.PresenceEvents.UPDATE);
this.room?.presence.off(Socket.PresenceEvents.JOINED_ROOM);
this.room?.presence.off(Socket.PresenceEvents.LEAVE);
this.room?.presence.off(Socket.PresenceEvents.UPDATE);
this.ioc?.destroy();

this.isDestroyed = true;
Expand Down Expand Up @@ -224,8 +225,6 @@ export class Launcher extends Observable implements DefaultLauncher {
return true;
};

/** Ably Listeners */

private onAuthentication = (isAuthenticated: boolean): void => {
if (isAuthenticated) return;

Expand Down Expand Up @@ -305,7 +304,12 @@ export class Launcher extends Observable implements DefaultLauncher {
const { participants, localParticipant } = useStore(StoreType.GLOBAL);
// retrieve the current participants in the room

this.LauncherRealtimeRoom.presence.get((presences) => {
this.ioc.stateSubject.subscribe((state) => {
if (state === IOCState.AUTH_ERROR) {
this.onAuthentication(false);
}
});
this.room.presence.get((presences) => {
const participantsMap: Record<string, Participant> = {};

presences.forEach((presence) => {
Expand All @@ -325,20 +329,14 @@ export class Launcher extends Observable implements DefaultLauncher {
participants.publish(participantsMap);
});

this.LauncherRealtimeRoom.presence.on<Participant>(
this.room.presence.on<Participant>(
Socket.PresenceEvents.JOINED_ROOM,
this.onParticipantJoinedIOC,
);

this.LauncherRealtimeRoom.presence.on<Participant>(
Socket.PresenceEvents.LEAVE,
this.onParticipantLeaveIOC,
);
this.room.presence.on<Participant>(Socket.PresenceEvents.LEAVE, this.onParticipantLeaveIOC);

this.LauncherRealtimeRoom.presence.on<Participant>(
Socket.PresenceEvents.UPDATE,
this.onParticipantUpdatedIOC,
);
this.room.presence.on<Participant>(Socket.PresenceEvents.UPDATE, this.onParticipantUpdatedIOC);

const { hasJoinedRoom } = useStore(StoreType.GLOBAL);
hasJoinedRoom.publish(true);
Expand All @@ -357,12 +355,12 @@ export class Launcher extends Observable implements DefaultLauncher {
if (presence.id !== localParticipant.value.id) return;

// Assign a slot to the participant
const slot = new SlotService(this.LauncherRealtimeRoom);
const slot = new SlotService(this.room);
await slot.assignSlot();

this.timestamp = presence.timestamp;

this.LauncherRealtimeRoom.presence.update(localParticipant.value);
this.room.presence.update(localParticipant.value);

this.logger.log('launcher service @ onParticipantJoined - local participant joined');
this.onParticipantJoined(presence);
Expand Down Expand Up @@ -418,7 +416,7 @@ export class Launcher extends Observable implements DefaultLauncher {
} as Participant);

this.timestamp = presence.timestamp;
this.LauncherRealtimeRoom.presence.update(localParticipant.value);
this.room.presence.update(localParticipant.value);

this.publish(ParticipantEvent.LOCAL_UPDATED, presence.data);

Expand Down
45 changes: 41 additions & 4 deletions src/services/io/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { MOCK_IO } from '../../../__mocks__/io.mock';
import * as Socket from '@superviz/socket-client';

import { MOCK_LOCAL_PARTICIPANT } from '../../../__mocks__/participants.mock';

import { IOCState } from './types';

import { IOC } from '.';

describe('io', () => {
Expand Down Expand Up @@ -28,8 +31,42 @@ describe('io', () => {
expect(room).toHaveProperty('emit');
});

test('should publish state changes', () => {
const next = jest.fn();
instance?.onStateChange(next);
test('should force reconnect', () => {
const spy = jest.spyOn(instance as any, 'forceReconnect');
const callback = jest.fn();

instance?.stateSubject.subscribe(callback);
instance?.['handleConnectionState']({
state: Socket.ClientState.DISCONNECTED,
reason: '',
});

expect(spy).toHaveBeenCalled();
expect(spy).toHaveBeenCalledTimes(1);
expect(callback).toHaveBeenCalledWith(IOCState.DISCONNECTED);
});

test('should not force reconnect if reason is Unauthorized connection', () => {
const spy = jest.spyOn(instance as any, 'forceReconnect');
const callback = jest.fn();

instance?.stateSubject.subscribe(callback);
instance?.['handleConnectionState']({
state: Socket.ClientState.DISCONNECTED,
reason: 'Unauthorized connection',
});

expect(callback).toHaveBeenCalledWith(IOCState.AUTH_ERROR);
expect(spy).not.toHaveBeenCalled();
});

test('should not force reconnect if state is not DISCONNECTED or RECONNECT_ERROR', () => {
const spy = jest.spyOn(instance as any, 'forceReconnect');

instance?.['handleConnectionState']({
state: Socket.ClientState.CONNECTED,
});

expect(spy).not.toHaveBeenCalled();
});
});
86 changes: 64 additions & 22 deletions src/services/io/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,16 @@ import { Subject } from 'rxjs';
import { Participant } from '../../common/types/participant.types';
import config from '../config/index';

import { IOCState } from './types';

export class IOC {
public state: Socket.ConnectionState;
public client: Socket.Realtime;

private stateSubject: Subject<Socket.ConnectionState> = new Subject();
public stateSubject: Subject<IOCState> = new Subject();

constructor(participant: Participant) {
let environment = config.get<string>('environment') as 'dev' | 'prod';
environment = ['dev', 'prod'].includes(environment) ? environment : 'dev';

this.client = new Socket.Realtime(config.get<string>('apiKey'), environment, {
id: participant.id,
name: participant.name,
});

this.subscribeToDefaultEvents();
constructor(private participant: Participant) {
this.createClient();
}

/**
Expand All @@ -33,25 +27,73 @@ export class IOC {
}

/**
* @function onStateChange
* @description Subscribe to the socket connection state changes
* @param next {Function}
* @function subscribeToDefaultEvents
* @description subscribe to the default socket events
* @returns {void}
*/
private subscribeToDefaultEvents(): void {
this.client.connection.on(this.handleConnectionState);
}

private handleConnectionState(state: Socket.ConnectionState): void {
const needsToReconnectStates = [
Socket.ClientState.DISCONNECTED,
Socket.ClientState.RECONNECT_ERROR,
];

if (
needsToReconnectStates.includes(state.state) &&
state.reason !== 'Unauthorized connection'
) {
this.forceReconnect();
}

if (state.reason === 'Unauthorized connection') {
console.error(
'[Superviz] Unauthorized connection. Please check your API key and if your domain is white listed.',
);

this.state = {
state: Socket.ClientState.DISCONNECTED,
reason: 'Unauthorized connection',
};

this.stateSubject.next(IOCState.AUTH_ERROR);

return;
}

this.state = state;
this.stateSubject.next(state.state as unknown as IOCState);
}

/**
* @function forceReconnect
* @description force the socket to reconnect
* @returns {void}
*/
public onStateChange(next: (state: Socket.ConnectionState) => void): void {
this.stateSubject.subscribe(next);
private forceReconnect(): void {
this.client?.destroy();
this.client = null;

this.createClient();
}

/**
* @function subscribeToDefaultEvents
* @description subscribe to the default socket events
* @function createClient
* @description create a new socket client
* @returns {void}
*/
private subscribeToDefaultEvents(): void {
this.client.connection.on((state) => {
this.state = state;
this.stateSubject.next(state);
public createClient(): void {
let environment = config.get<string>('environment') as 'dev' | 'prod';
environment = ['dev', 'prod'].includes(environment) ? environment : 'dev';

this.client = new Socket.Realtime(config.get<string>('apiKey'), environment, {
id: this.participant.id,
name: this.participant.name,
});

this.subscribeToDefaultEvents();
}

/**
Expand Down
9 changes: 9 additions & 0 deletions src/services/io/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export enum IOCState {
CONNECTED = 'CONNECTED',
CONNECTING = 'CONNECTING',
DISCONNECTED = 'DISCONNECTED',
CONNECTION_ERROR = 'CONNECTION_ERROR',
RECONNECTING = 'RECONNECTING',
RECONNECT_ERROR = 'RECONNECT_ERROR',
AUTH_ERROR = 'AUTH_ERROR',
}

0 comments on commit e0f09d1

Please sign in to comment.