Skip to content
This repository has been archived by the owner on Oct 18, 2024. It is now read-only.

Commit

Permalink
feat: perform slot assignment on old io too
Browse files Browse the repository at this point in the history
  • Loading branch information
carlossantos74 committed Mar 11, 2024
1 parent b0d9d09 commit c629e2c
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 101 deletions.
2 changes: 1 addition & 1 deletion src/core/launcher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ export class Launcher extends Observable implements DefaultLauncher {
private onParticipantJoinedIOC = (presence: Socket.PresenceEvent<Participant>): void => {
if (presence.id === this.participant.value.id) {
// Assign a slot to the participant
const _ = new SlotService(this.LaucherRealtimeRoom, this.participant.value);
SlotService.register(this.LaucherRealtimeRoom, this.realtime, this.participant.value);
this.LaucherRealtimeRoom.presence.update<Participant>(this.participant.value);
}

Expand Down
94 changes: 0 additions & 94 deletions src/services/realtime/ably/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -830,97 +830,6 @@ export default class AblyRealtimeService extends RealtimeService implements Ably
}
}

/**
* @function findSlotIndex
* @description Finds an available slot index for the participant and confirms it.
* @returns {void}
*/
private findSlotIndex = async (): Promise<void> => {
const slot = Math.floor(Math.random() * 16);

const hasAnyOneUsingMySlot = await new Promise((resolve) => {
this.supervizChannel.presence.get((error, presences) => {
if (error) {
resolve(true);
return;
}

presences.forEach((presence) => {
if (presence.clientId === this.myParticipant.clientId) return;

if (presence.data.slotIndex === slot) resolve(true);
});

resolve(false);
});
});

if (hasAnyOneUsingMySlot) {
this.logger.log(
'slot already taken by someone else, trying again',
this.myParticipant.clientId,
);
this.findSlotIndex();
return;
}

this.updateMyProperties({ slotIndex: slot });
};

/**
* @function validateSlots
* @description Validates the slot index of all participants and resolves conflicts.
* @returns {void}
*/
private async validateSlots(): Promise<void> {
const slots = [];
await new Promise((resolve) => {
this.supervizChannel.presence.get((_, presences) => {
presences.forEach((presence) => {
const hasValidSlot =
presence.data.slotIndex !== undefined && presence.data.slotIndex !== null;

if (hasValidSlot) {
slots.push({
slotIndex: presence.data.slotIndex,
clientId: presence.clientId,
timestamp: presence.timestamp,
});
}
});
resolve(true);
});
});

const duplicatesMap: Record<
string,
{
slotIndex: number;
clientId: string;
timestamp: number;
}[]
> = {};

slots.forEach((a) => {
if (!duplicatesMap[a.slotIndex]) {
duplicatesMap[a.slotIndex] = [];
}

duplicatesMap[a.slotIndex].push(a);
});

Object.values(duplicatesMap).forEach((arr) => {
const ordered = arr.sort((a, b) => a.timestamp - b.timestamp);
ordered.shift();

ordered.forEach((slot) => {
if (slot.clientId !== this.myParticipant.clientId) return;

this.findSlotIndex();
});
});
}

/**
* @function onStateChange
* @description Translates connection state and channel state into realtime state
Expand Down Expand Up @@ -1011,8 +920,6 @@ export default class AblyRealtimeService extends RealtimeService implements Ably
this.localRoomProperties = await this.fetchRoomProperties();
this.myParticipant = myPresence;

if (this.enableSync) this.findSlotIndex();

if (!this.localRoomProperties) {
this.initializeRoomProperties();
} else {
Expand All @@ -1037,7 +944,6 @@ export default class AblyRealtimeService extends RealtimeService implements Ably
this.updateParticipants();
this.participantJoinedObserver.publish(presence);
this.updateMyProperties(); // send a sync
this.validateSlots();
};

/**
Expand Down
8 changes: 4 additions & 4 deletions src/services/slot/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ describe('slot service', () => {
id: '123',
} as any;

const instance = new SlotService(room, participant);
const instance = new SlotService(room, { updateMyProperties: jest.fn() } as any, participant);
await instance['assignSlot']();

expect(instance['slotIndex']).toBeDefined();
Expand Down Expand Up @@ -47,7 +47,7 @@ describe('slot service', () => {
id: '123',
} as any;

const instance = new SlotService(room, participant);
const instance = new SlotService(room, { updateMyProperties: jest.fn() } as any, participant);
await instance['assignSlot']();

expect(instance['slotIndex']).toBeDefined();
Expand Down Expand Up @@ -80,7 +80,7 @@ describe('slot service', () => {
id: '123',
} as any;

const instance = new SlotService(room, participant);
const instance = new SlotService(room, { updateMyProperties: jest.fn() } as any, participant);
await instance['assignSlot']();

expect(instance['slotIndex']).toBeUndefined();
Expand Down Expand Up @@ -114,7 +114,7 @@ describe('slot service', () => {
id: '123',
} as any;

const instance = new SlotService(room, participant);
const instance = new SlotService(room, { updateMyProperties: jest.fn() } as any, participant);
await instance['assignSlot']();

expect(instance['slotIndex']).toBeDefined();
Expand Down
34 changes: 32 additions & 2 deletions src/services/slot/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,37 @@ import {
MeetingColorsHex,
} from '../../common/types/meeting-colors.types';
import { Participant } from '../../common/types/participant.types';
import { AblyRealtimeService } from '../realtime';

export class SlotService {
private room: Socket.Room;
private participant: Participant;
private slotIndex: number;
private realtime: AblyRealtimeService;
private static instance: SlotService;

constructor(room: Socket.Room, participant: Participant) {
// @NOTE - reciving old realtime service instance until we migrate to new IO
constructor(room: Socket.Room, realtime: AblyRealtimeService, participant: Participant) {
this.room = room;
this.participant = participant;
this.realtime = realtime;

this.assignSlot();
this.room.presence.on(Socket.PresenceEvents.UPDATE, this.onPresenceUpdate);
}

public static register(
room: Socket.Room,
realtime: AblyRealtimeService,
participant: Participant,
) {
if (!SlotService.instance) {
SlotService.instance = new SlotService(room, realtime, participant);
}

return SlotService.instance;
}

/**
* @function assignSlot
* @description Assigns a slot to the participant
Expand All @@ -46,7 +63,7 @@ export class SlotService {
resolve(false);
});
})
.then((isUsing) => {
.then(async (isUsing) => {
if (isUsing) {
this.assignSlot();
return;
Expand All @@ -69,6 +86,19 @@ export class SlotService {
this.room.presence.update({
slot: slotData,
});

// @NOTE - this is a temporary fix for the issue where the slot is not being updated in the presence
// @TODO - remove this once we remove the colors from the old io
if (!this.realtime.isJoinedRoom) {
await new Promise((resolve) => {
setTimeout(resolve, 1500);
});
}

this.realtime.updateMyProperties({
slotIndex: slot,
slot: slotData,
});
})
.catch((error) => {
this.room.presence.update({
Expand Down

0 comments on commit c629e2c

Please sign in to comment.