Skip to content

Commit

Permalink
Target signals to specific clients (#795)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanbliss authored Aug 30, 2024
1 parent d40306f commit 2db81b7
Show file tree
Hide file tree
Showing 16 changed files with 1,734 additions and 1,323 deletions.
2,572 changes: 1,295 additions & 1,277 deletions package-lock.json

Large diffs are not rendered by default.

24 changes: 16 additions & 8 deletions packages/live-share-media/src/LiveMediaSessionCoordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,10 @@ export class LiveMediaSessionCoordinator extends TypedEventEmitter<ILiveMediaSes
* @hidden
* Called by MediaSession to trigger the sending of a position update.
*/
public sendPositionUpdate(state: IMediaPlayerState): void {
public sendPositionUpdate(
state: IMediaPlayerState,
targetClientId?: string
): void {
LiveDataObjectNotInitializedError.assert(
"LiveMediaSessionCoordinator:sendPositionUpdate",
"sendPositionUpdate",
Expand All @@ -631,12 +634,15 @@ export class LiveMediaSessionCoordinator extends TypedEventEmitter<ILiveMediaSes
if (this.canSendPositionUpdates) {
// Send position update event
const evt = this._groupState!.createPositionUpdateEvent(state);
this._positionUpdateEvent?.sendEvent(evt).catch((err) => {
this._logger.sendErrorEvent(
TelemetryEvents.SessionCoordinator.PositionUpdateEventError,
err
);
});
this._positionUpdateEvent
?.sendEvent(evt, targetClientId)
.catch((err) => {
this._logger.sendErrorEvent(
TelemetryEvents.SessionCoordinator
.PositionUpdateEventError,
err
);
});
} else if (this.isSuspended) {
// send a local only position update event that was not sent as a signal, and use to handle local
// position update for clients that have canSendPositionUpdates==false, but are suspending
Expand Down Expand Up @@ -748,7 +754,9 @@ export class LiveMediaSessionCoordinator extends TypedEventEmitter<ILiveMediaSes
// Immediately send a position update
try {
const state = this._getPlayerState();
this.sendPositionUpdate(state);
// Send only to the user that joined the session using the targetClientId prop.
// This ensures that only the user that connected receives this one-time position update, minimizing costs.
this.sendPositionUpdate(state, clientId);
} catch (err: any) {
// if player is not setup yet, local client might have also just joined and can't send its position.
const playerNotSetup =
Expand Down
4 changes: 2 additions & 2 deletions packages/live-share-react/src/live-hooks/useLiveEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export function useLiveEvent<TEvent = any>(
* User facing: callback to send event through `LiveEvent`
*/
const sendEvent: SendLiveEventAction<TEvent> = React.useCallback(
async (event: TEvent) => {
async (event: TEvent, targetClientId?: string) => {
if (!container) {
throw new ActionContainerNotJoinedError(
"liveEvent",
Expand All @@ -123,7 +123,7 @@ export function useLiveEvent<TEvent = any>(
"sendEvent"
);
}
return await liveEvent.send(event);
return await liveEvent.send(event, targetClientId);
},
[container, liveEvent]
);
Expand Down
5 changes: 3 additions & 2 deletions packages/live-share-react/src/types/ActionTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ export type SetLiveStateAction<TState = undefined> = (

/**
* Callback for SendLiveEventAction<TEvent>.
* (event: TEvent) => Promise<void>
* (event: TEvent, targetClientId?: string) => Promise<void>
*/
export type SendLiveEventAction<TEvent> = (
event: TEvent
event: TEvent,
targetClientId?: string
) => Promise<ILiveEvent<TEvent>>;

/**
Expand Down
8 changes: 6 additions & 2 deletions packages/live-share/src/LiveEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ export class LiveEventClass<TEvent = any> extends LiveDataObject<{
* The event will be queued for delivery if the client isn't currently connected.
*
* @param evt Event to send. If omitted, an event will still be sent but it won't include any custom event data.
* @param targetClientId Optional. When specified, the signal is only sent to the provided client id.
*
* @returns A promise with the full event object that was sent, including the timestamp of when the event was sent and the clientId if known.
* The clientId will be `undefined` if the client is disconnected at time of delivery.
Expand Down Expand Up @@ -204,7 +205,10 @@ export class LiveEventClass<TEvent = any> extends LiveDataObject<{
});
```
*/
public async send(evt: TEvent): Promise<ILiveEvent<TEvent>> {
public async send(
evt: TEvent,
targetClientId?: string
): Promise<ILiveEvent<TEvent>> {
LiveDataObjectNotInitializedError.assert(
"LiveEvent:send",
"send",
Expand All @@ -216,7 +220,7 @@ export class LiveEventClass<TEvent = any> extends LiveDataObject<{
"`this._eventTarget` is undefined, implying there was an error during initialization that should not occur."
);

return await this._eventTarget.sendEvent(evt);
return await this._eventTarget.sendEvent(evt, targetClientId);
}
}

Expand Down
13 changes: 12 additions & 1 deletion packages/live-share/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ export interface IEvent {
* Name of the event.
*/
name: string;
/**
* Optional. When specified, the signal is only sent to the provided client id.
*/
targetClientId?: string;
}

/**
Expand Down Expand Up @@ -307,6 +311,7 @@ export interface ILiveShareJoinResults {
* just pass `this.context.containerRuntime` to any class that takes an `IContainerRuntimeSignaler`.
*/
export interface IContainerRuntimeSignaler {
clientId?: string;
on(
event: "signal",
listener: (message: IInboundSignalMessage, local: boolean) => void
Expand All @@ -315,7 +320,13 @@ export interface IContainerRuntimeSignaler {
event: "signal",
listener: (message: IInboundSignalMessage, local: boolean) => void
): this;
submitSignal(type: string, content: any): void;
/**
* Submits the signal to be sent to other clients.
* @param type - Type of the signal.
* @param content - Content of the signal. Should be a JSON serializable object or primitive.
* @param targetClientId Optional. When specified, the signal is only sent to the provided client id.
*/
submitSignal(type: string, content: any, targetClientId?: string): void;
}

/**
Expand Down
24 changes: 18 additions & 6 deletions packages/live-share/src/internals/ContainerSynchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,20 @@ export class ContainerSynchronizer {
/**
* On send background updates handler
*
* @param targetClientId Optional. When specified, the signal is only sent to the provided client id.
*
* @returns void promise once the events were sent (unless skipped)
*/
public async onSendBackgroundUpdates(): Promise<void> {
public async onSendBackgroundUpdates(
targetClientId?: string
): Promise<void> {
if (!this._liveRuntime.canSendBackgroundUpdates) return;
await this.sendGroupEvent(
this._connectedKeys.filter((key) =>
this._ddsBackgroundUpdateEnabled.has(key)
),
ObjectSynchronizerEvents.update
ObjectSynchronizerEvents.update,
targetClientId
).catch((err) => console.error(err));
}

Expand Down Expand Up @@ -199,11 +204,13 @@ export class ContainerSynchronizer {
* Send a batch of events
* @param updates updates to send
* @param evtType type of event
* @param targetClientId Optional. When specified, the signal is only sent to the provided client id.
* @returns event where data is then StateSyncEventContent containing the batched events that were sent.
*/
public async sendEventUpdates(
updates: StateSyncEventContent,
evtType: string
evtType: string,
targetClientId?: string
): Promise<ILiveEvent<StateSyncEventContent> | undefined> {
const updateKeys = Object.keys(updates);
// Send event if we have any updates to broadcast
Expand All @@ -218,7 +225,11 @@ export class ContainerSynchronizer {
: this._liveRuntime.getTimestamp(),
name: evtType,
};
this._containerRuntime.submitSignal(evtType, content);
this._containerRuntime.submitSignal(
evtType,
content,
targetClientId
);
return content;
}
}
Expand Down Expand Up @@ -264,7 +275,8 @@ export class ContainerSynchronizer {

private async sendGroupEvent(
keys: string[],
evtType: string
evtType: string,
targetClientId?: string
): Promise<{
sent: string[];
skipped: string[];
Expand Down Expand Up @@ -314,7 +326,7 @@ export class ContainerSynchronizer {
const updateKeys = Object.keys(updates);
// Send event if we have any updates to broadcast
// - `send` is only set if at least one component returns an update.
await this.sendEventUpdates(updates, evtType);
await this.sendEventUpdates(updates, evtType, targetClientId);
return {
sent: updateKeys,
skipped: skipKeys,
Expand Down
37 changes: 32 additions & 5 deletions packages/live-share/src/internals/LiveEventScope.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,17 @@ export interface IRuntimeSignaler {
event: "signal",
listener: (message: IInboundSignalMessage, local: boolean) => void
): this;
submitSignal(type: string, content: any): void;
/**
* Submits the signal to be sent to other clients.
* @param type Type of the signal.
* @param content Content of the signal. Should be a JSON serializable object or primitive.
* @param targetClientId Optional. When specified, the signal is only sent to the provided client id.
*/
submitSignal: (
type: string,
content: unknown,
targetClientId?: string
) => void;
}

/**
Expand Down Expand Up @@ -120,6 +130,19 @@ export class LiveEventScope extends TypedEventEmitter<IErrorEvent> {

if (isILiveEvent(message.content)) {
const content = message.content;
// While the Fluid odsp-driver currently supports targeted signals, it isn't guaranteed in other drivers.
// As of Fluid v2.2.0, azure-client and tinylicious do not support it currently.
// For consistency, we return early when the local client is not the targeted one.
if (message.targetClientId && content.targetClientId) {
// If we know the true targetClientId, we are in a supported driver so we override it
content.targetClientId = message.targetClientId;
}
if (
!local &&
content.targetClientId &&
content.targetClientId !== this._runtime.clientId
)
return;
content.clientId = clientId;
this.emitToListeners(clientId, content, local);
}
Expand Down Expand Up @@ -182,18 +205,20 @@ export class LiveEventScope extends TypedEventEmitter<IErrorEvent> {
* @template TEvent Type of event to send.
* @param eventName Name of the event to send.
* @param evt Optional. Partial event object to send. The `ILiveEvent.name`,
* @param targetClientId Optional. When specified, the signal is only sent to the provided client id.
* `ILiveEvent.timestamp`, and `ILiveEvent.clientId`
* fields will be automatically populated prior to sending.
* @returns The full event, including `ILiveEvent.name`,
* `ILiveEvent.timestamp`, and `ILiveEvent.clientId` fields if known.
*/
public async sendEvent<TEvent>(
eventName: string,
evt: TEvent
evt: TEvent,
targetClientId?: string
): Promise<ILiveEvent<TEvent>> {
const event = await this.createEvent(eventName, evt);
const event = await this.createEvent(eventName, evt, targetClientId);
// Send event
this._runtime.submitSignal(eventName, event);
this._runtime.submitSignal(eventName, event, targetClientId);
return event;
}

Expand All @@ -219,7 +244,8 @@ export class LiveEventScope extends TypedEventEmitter<IErrorEvent> {

private async createEvent<TEvent>(
eventName: string,
evt: TEvent
evt: TEvent,
targetClientId?: string
): Promise<ILiveEvent<TEvent>> {
const clientId = await this.waitUntilConnected();
const isAllowed = await this._liveRuntime.verifyRolesAllowed(
Expand All @@ -236,6 +262,7 @@ export class LiveEventScope extends TypedEventEmitter<IErrorEvent> {
// Clone passed in event and fill out required props.
const clone: ILiveEvent<TEvent> = {
clientId,
targetClientId,
name: eventName,
timestamp: this._liveRuntime.getTimestamp(),
data: evt,
Expand Down
12 changes: 10 additions & 2 deletions packages/live-share/src/internals/LiveEventSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,21 @@ export class LiveEventSource<TEvent> {
/**
* Broadcasts an event to any listening `LiveEventTarget` instances.
* @param evt Optional. Partial event object to send. The `ILiveEvent.name`,
* @param targetClientId Optional. When specified, the signal is only sent to the provided client id.
* `ILiveEvent.timestamp`, and `ILiveEvent.clientId`
* fields will be automatically populated prior to sending.
* @returns The full event, including `ILiveEvent.name`,
* `ILiveEvent.timestamp`, and `ILiveEvent.clientId` fields if known.
*/
public async sendEvent(evt: TEvent): Promise<ILiveEvent<TEvent>> {
return await this._scope.sendEvent<TEvent>(this._eventName, evt);
public async sendEvent(
evt: TEvent,
targetClientId?: string
): Promise<ILiveEvent<TEvent>> {
return await this._scope.sendEvent<TEvent>(
this._eventName,
evt,
targetClientId
);
}

/**
Expand Down
17 changes: 16 additions & 1 deletion packages/live-share/src/internals/LiveObjectManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ export class LiveObjectManager extends TypedEventEmitter<IContainerLiveObjectSto
typeof message.content.data !== "object"
)
return;
// While the Fluid odsp-driver currently supports targeted signals, it isn't guaranteed in other drivers.
// As of Fluid v2.2.0, azure-client and tinylicious do not support it currently.
// For consistency, we return early when the local client is not the targeted one.
if (
// If we have message.targetClientId, our fluid driver supports targeting and thus will always be from the right client.
!message.targetClientId &&
message.content.targetClientId &&
message.content.targetClientId !== this._containerRuntime.clientId
)
return;
this.dispatchUpdates(
ObjectSynchronizerEvents.update,
message.clientId,
Expand All @@ -260,7 +270,12 @@ export class LiveObjectManager extends TypedEventEmitter<IContainerLiveObjectSto
);
// If the non-local user is connecting for the first time
if (message.type === ObjectSynchronizerEvents.connect) {
this._synchronizer?.onSendBackgroundUpdates();
// Sent with a targetClientId so that only the user connecting receives the signal.
// This reduces the cost & server burden of connect messages, particularly in larger session sizes.
// This perf/COGS benefit only applies if the Fluid driver / service supports targeting (e.g., ODSP).
// Otherwise, it will still send the signal to all clients, but only dispatch the update to the targeted client.
// If/when the driver later supports targeting, the benefit will get picked up automatically with no update to this code.
this._synchronizer?.onSendBackgroundUpdates(message.clientId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ export class LiveObjectSynchronizer<TState> {
/**
* Sends a one-time event through the synchronizer
* @param data the date for the event to send
* @param targetClientId Optional. When specified, the signal is only sent to the provided client id.
* @returns the event that was sent
*/
public sendEvent<TState = any>(data: TState): Promise<ILiveEvent<TState>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ export class MockLiveShareRuntime extends LiveShareRuntime {
return this._containerRuntime;
return undefined;
}
connectToOtherRuntime(otherLiveRuntime: MockLiveShareRuntime) {
connectToOtherRuntime(...otherLiveRuntimes: MockLiveShareRuntime[]) {
MockContainerRuntimeSignaler.connectContainers([
this.getLocalMockContainer()!,
otherLiveRuntime.getLocalMockContainer()!,
...otherLiveRuntimes.map(
(runtime) => runtime.getLocalMockContainer()!
),
]);
}
}
Loading

0 comments on commit 2db81b7

Please sign in to comment.