Skip to content

Commit

Permalink
Allow sending signals to specific clients (#17729)
Browse files Browse the repository at this point in the history
  • Loading branch information
GaryWilber authored Oct 26, 2023
1 parent 0ae21f6 commit b59e8a3
Show file tree
Hide file tree
Showing 23 changed files with 87 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ export interface IContainerContext {
// @deprecated (undocumented)
readonly submitFn: (type: MessageType, contents: any, batch: boolean, appData?: any) => number;
// (undocumented)
readonly submitSignalFn: (contents: any) => void;
readonly submitSignalFn: (contents: any, targetClientId?: string) => void;
// (undocumented)
readonly submitSummaryFn: (summaryOp: ISummaryContent, referenceSequenceNumber?: number) => number;
// (undocumented)
Expand Down Expand Up @@ -260,7 +260,7 @@ export interface IDeltaManager<T, U> extends IEventProvider<IDeltaManagerEvents>
// (undocumented)
readonly readOnlyInfo: ReadOnlyInfo;
readonly serviceConfiguration: IClientConfiguration | undefined;
submitSignal(content: any): void;
submitSignal(content: any, targetClientId?: string): void;
readonly version: string;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/common/container-definitions/src/deltas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export interface IDeltaManager<T, U> extends IEventProvider<IDeltaManagerEvents>
*/
// TODO: use `unknown` instead (API breaking)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
submitSignal(content: any): void;
submitSignal(content: any, targetClientId?: string): void;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/common/container-definitions/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ export interface IContainerContext {
) => number;
// TODO: use `unknown` instead (API breaking)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
readonly submitSignalFn: (contents: any) => void;
readonly submitSignalFn: (contents: any, targetClientId?: string) => void;
readonly disposeFn?: (error?: ICriticalContainerError) => void;
readonly closeFn: (error?: ICriticalContainerError) => void;
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export interface IDocumentDeltaConnection extends IDisposable, IEventProvider<ID
relayServiceAgent?: string;
serviceConfiguration: IClientConfiguration;
submit(messages: IDocumentMessage[]): void;
submitSignal(message: any): void;
submitSignal(content: any, targetClientId?: string): void;
version: string;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/common/driver-definitions/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ export interface IDocumentDeltaConnection
submit(messages: IDocumentMessage[]): void;

/**
* Submit a new signal to the server
* Submits a new signal to the server
*/
// TODO: Use something other than `any`.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
submitSignal(message: any): void;
submitSignal(content: any, targetClientId?: string): void;
}

export enum LoaderCachingPolicy {
Expand Down
2 changes: 1 addition & 1 deletion packages/drivers/driver-base/api-report/driver-base.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export class DocumentDeltaConnection extends EventEmitterWithErrorHandling<IDocu
// (undocumented)
protected readonly socket: Socket;
submit(messages: IDocumentMessage[]): void;
submitSignal(message: IDocumentMessage): void;
submitSignal(content: IDocumentMessage, targetClientId?: string): void;
get version(): string;
}

Expand Down
14 changes: 10 additions & 4 deletions packages/drivers/driver-base/src/documentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
IDocumentDeltaConnection,
IDocumentDeltaConnectionEvents,
} from "@fluidframework/driver-definitions";
import { createGenericNetworkError } from "@fluidframework/driver-utils";
import { UsageError, createGenericNetworkError } from "@fluidframework/driver-utils";
import {
ConnectionMode,
IClientConfiguration,
Expand Down Expand Up @@ -331,11 +331,17 @@ export class DocumentDeltaConnection
/**
* Submits a new signal to the server
*
* @param message - signal to submit
* @param content - Content of the signal.
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
public submitSignal(message: IDocumentMessage): void {
public submitSignal(content: IDocumentMessage, targetClientId?: string): void {
this.checkNotDisposed();
this.emitMessages("submitSignal", [[message]]);

if (targetClientId && this.details.supportedFeatures?.submit_signals_v2 !== true) {
throw new UsageError("Sending signals to specific client ids is not supported.");
}

this.emitMessages("submitSignal", [[content]]);
}

/**
Expand Down
20 changes: 16 additions & 4 deletions packages/drivers/odsp-driver/src/odspDocumentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
IConnect,
IDocumentMessage,
INack,
ISentSignalMessage,
ISequencedDocumentMessage,
ISignalMessage,
} from "@fluidframework/protocol-definitions";
Expand All @@ -34,6 +35,7 @@ import { pkgVersion } from "./packageVersion";
const protocolVersions = ["^0.4.0", "^0.3.0", "^0.2.0", "^0.1.0"];
const feature_get_ops = "api_get_ops";
const feature_flush_ops = "api_flush_ops";
const feature_submit_signals_v2 = "submit_signals_v2";

export interface FlushResult {
lastPersistedSequenceNumber?: number;
Expand Down Expand Up @@ -296,8 +298,11 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
relayUserAgent: [client.details.environment, ` driverVersion:${pkgVersion}`].join(";"),
};

connectMessage.supportedFeatures = {
[feature_submit_signals_v2]: true,
};

// Reference to this client supporting get_ops flow.
connectMessage.supportedFeatures = {};
if (mc.config.getBoolean("Fluid.Driver.Odsp.GetOpsEnabled") !== false) {
connectMessage.supportedFeatures[feature_get_ops] = true;
}
Expand Down Expand Up @@ -736,10 +741,17 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection {
/**
* Submits a new signal to the server
*
* @param message - signal to submit
* @param content - Content of the signal.
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
public submitSignal(message: IDocumentMessage): void {
this.emitMessages("submitSignal", [[message]]);
public submitSignal(content: IDocumentMessage, targetClientId?: string): void {
const signal: ISentSignalMessage = {
content,
targetClientId,
};

// back-compat: the typing for this method and emitMessages is incorrect, will be fixed in a future PR
this.emitMessages("submitSignal", [signal] as any);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/loader/container-loader/src/connectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1059,9 +1059,9 @@ export class ConnectionManager implements IConnectionManager {
};
}

public submitSignal(content: any) {
public submitSignal(content: any, targetClientId?: string) {
if (this.connection !== undefined) {
this.connection.submitSignal(content);
this.connection.submitSignal(content, targetClientId);
} else {
this.logger.sendErrorEvent({ eventName: "submitSignalDisconnected" });
}
Expand Down
6 changes: 3 additions & 3 deletions packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2345,8 +2345,8 @@ export class Container
this.emit("op", message);
}

private submitSignal(message: any) {
this._deltaManager.submitSignal(JSON.stringify(message));
private submitSignal(content: any, targetClientId?: string) {
this._deltaManager.submitSignal(JSON.stringify(content), targetClientId);
}

private processSignal(message: ISignalMessage) {
Expand Down Expand Up @@ -2440,7 +2440,7 @@ export class Container
this.submitSummaryMessage(summaryOp, referenceSequenceNumber),
(batch: IBatchMessage[], referenceSequenceNumber?: number) =>
this.submitBatch(batch, referenceSequenceNumber),
(message) => this.submitSignal(message),
(content, targetClientId) => this.submitSignal(content, targetClientId),
(error?: ICriticalContainerError) => this.dispose(error),
(error?: ICriticalContainerError) => this.close(error),
this.updateDirtyContainerState,
Expand Down
2 changes: 1 addition & 1 deletion packages/loader/container-loader/src/containerContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class ContainerContext implements IContainerContext {
batch: IBatchMessage[],
referenceSequenceNumber?: number,
) => number,
public readonly submitSignalFn: (contents: any) => void,
public readonly submitSignalFn: (content: any, targetClientId?: string) => void,
public readonly disposeFn: (error?: ICriticalContainerError) => void,
public readonly closeFn: (error?: ICriticalContainerError) => void,
public readonly updateDirtyContainerState: (dirty: boolean) => void,
Expand Down
2 changes: 1 addition & 1 deletion packages/loader/container-loader/src/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export interface IConnectionManager {
* Submits signal to relay service.
* Called only when active connection is present.
*/
submitSignal(content: any): void;
submitSignal(content: any, targetClientId?: string): void;

/**
* Submits messages to relay service.
Expand Down
4 changes: 2 additions & 2 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
return message.clientSequenceNumber;
}

public submitSignal(content: any) {
return this.connectionManager.submitSignal(content);
public submitSignal(content: any, targetClientId?: string) {
return this.connectionManager.submitSignal(content, targetClientId);
}

public flush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents
submitDataStoreAliasOp(contents: any, localOpMetadata: unknown): void;
// (undocumented)
submitDataStoreOp(id: string, contents: any, localOpMetadata?: unknown): void;
// (undocumented)
submitDataStoreSignal(address: string, type: string, content: any): void;
submitSignal(type: string, content: any): void;
submitDataStoreSignal(address: string, type: string, content: any, targetClientId?: string): void;
submitSignal(type: string, content: any, targetClientId?: string): void;
submitSummary(options: ISubmitSummaryOptions): Promise<SubmitSummaryResult>;
summarize(options: {
fullTree?: boolean;
Expand Down
22 changes: 17 additions & 5 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ export class ContainerRuntime
summaryOp: ISummaryContent,
referenceSequenceNumber?: number,
) => number;
private readonly submitSignalFn: (contents: any) => void;
private readonly submitSignalFn: (content: any, targetClientId?: string) => void;
public readonly disposeFn: (error?: ICriticalContainerError) => void;
public readonly closeFn: (error?: ICriticalContainerError) => void;

Expand Down Expand Up @@ -2664,16 +2664,28 @@ export class ContainerRuntime
* Submits the signal to be sent to other clients.
* @param type - Type of the signal.
* @param content - Content of the signal.
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
public submitSignal(type: string, content: any) {
public submitSignal(type: string, content: any, targetClientId?: string) {
this.verifyNotClosed();
const envelope = this.createNewSignalEnvelope(undefined /* address */, type, content);
return this.submitSignalFn(envelope);
return this.submitSignalFn(envelope, targetClientId);
}

public submitDataStoreSignal(address: string, type: string, content: any) {
/**
* Submits the signal to be sent to other clients.
* @param type - Type of the signal.
* @param content - Content of the signal.
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
public submitDataStoreSignal(
address: string,
type: string,
content: any,
targetClientId?: string,
) {
const envelope = this.createNewSignalEnvelope(address, type, content);
return this.submitSignalFn(envelope);
return this.submitSignalFn(envelope, targetClientId);
}

public setAttachState(attachState: AttachState.Attaching | AttachState.Attached): void {
Expand Down
10 changes: 8 additions & 2 deletions packages/runtime/container-runtime/src/dataStoreContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,17 @@ export abstract class FluidDataStoreContext
}
}

public submitSignal(type: string, content: any) {
/**
* Submits the signal to be sent to other clients.
* @param type - Type of the signal.
* @param content - Content of the signal.
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
public submitSignal(type: string, content: any, targetClientId?: string) {
this.verifyNotClosed("submitSignal");

assert(!!this.channel, 0x147 /* "Channel must exist on submitting signal" */);
return this._containerRuntime.submitDataStoreSignal(this.id, type, content);
return this._containerRuntime.submitDataStoreSignal(this.id, type, content, targetClientId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ export class DeltaManagerProxyBase
super.dispose();
}

public submitSignal(content: any): void {
return this.deltaManager.submitSignal(content);
public submitSignal(content: any, targetClientId?: string): void {
return this.deltaManager.submitSignal(content, targetClientId);
}

public flush(): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
request(request: IRequest): Promise<IResponse>;
// (undocumented)
readonly rootRoutingContext: IFluidHandleContext;
submitSignal(type: string, content: any): void;
submitSignal(type: string, content: any, targetClientId?: string): void;
uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise<IFluidHandle<ArrayBufferLike>>;
waitAttached(): Promise<void>;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ export interface IFluidDataStoreRuntime
* Submits the signal to be sent to other clients.
* @param type - Type of the signal.
* @param content - Content of the signal.
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
submitSignal(type: string, content: any): void;
submitSignal(type: string, content: any, targetClientId?: string): void;

/**
* Returns the current quorum.
Expand Down
3 changes: 1 addition & 2 deletions packages/runtime/datastore/api-report/datastore.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter<IFluidDataStoreRunt
setConnectionState(connected: boolean, clientId?: string): void;
// (undocumented)
submitMessage(type: DataStoreMessageType, content: any, localOpMetadata: unknown): void;
// (undocumented)
submitSignal(type: string, content: any): void;
submitSignal(type: string, content: any, targetClientId?: string): void;
summarize(fullTree?: boolean, trackState?: boolean, telemetryContext?: ITelemetryContext): Promise<ISummaryTreeWithStats>;
updateUsedRoutes(usedRoutes: string[]): void;
// (undocumented)
Expand Down
10 changes: 8 additions & 2 deletions packages/runtime/datastore/src/dataStoreRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -860,9 +860,15 @@ export class FluidDataStoreRuntime
this.submit(type, content, localOpMetadata);
}

public submitSignal(type: string, content: any) {
/**
* Submits the signal to be sent to other clients.
* @param type - Type of the signal.
* @param content - Content of the signal.
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
public submitSignal(type: string, content: any, targetClientId?: string) {
this.verifyNotClosed();
return this.dataStoreContext.submitSignal(type, content);
return this.dataStoreContext.submitSignal(type, content, targetClientId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ export interface IFluidDataStoreContext extends IEventProvider<IFluidDataStoreCo
// (undocumented)
readonly storage: IDocumentStorageService;
submitMessage(type: string, content: any, localOpMetadata: unknown): void;
submitSignal(type: string, content: any): void;
submitSignal(type: string, content: any, targetClientId?: string): void;
// (undocumented)
uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise<IFluidHandle<ArrayBufferLike>>;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/runtime/runtime-definitions/src/dataStoreContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,9 @@ export interface IFluidDataStoreContext
* Submits the signal to be sent to other clients.
* @param type - Type of the signal.
* @param content - Content of the signal.
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
submitSignal(type: string, content: any): void;
submitSignal(type: string, content: any, targetClientId?: string): void;

/**
* Called to make the data store locally visible in the container. This happens automatically for root data stores
Expand Down

0 comments on commit b59e8a3

Please sign in to comment.