diff --git a/packages/loader/container-loader/src/container.ts b/packages/loader/container-loader/src/container.ts index 02645b07a478..bc4bc2f5e816 100644 --- a/packages/loader/container-loader/src/container.ts +++ b/packages/loader/container-loader/src/container.ts @@ -11,6 +11,7 @@ import { ITelemetryBaseLogger, ITelemetryLogger, } from "@fluidframework/common-definitions"; +import { performance } from "@fluidframework/common-utils"; import { IFluidObject, IRequest, IResponse, IFluidRouter } from "@fluidframework/core-interfaces"; import { IAudience, @@ -29,14 +30,7 @@ import { IThrottlingWarning, AttachState, } from "@fluidframework/container-definitions"; -import { performance } from "@fluidframework/common-utils"; -import { - ChildLogger, - EventEmitterWithErrorHandling, - PerformanceEvent, - raiseConnectedEvent, - TelemetryLogger, -} from "@fluidframework/telemetry-utils"; +import { CreateContainerError, GenericError } from "@fluidframework/container-utils"; import { IDocumentService, IDocumentStorageService, @@ -56,7 +50,6 @@ import { combineAppAndProtocolSummary, readAndParseFromBlobs, } from "@fluidframework/driver-utils"; -import { CreateContainerError } from "@fluidframework/container-utils"; import { isSystemMessage, ProtocolOpHandler, @@ -85,6 +78,13 @@ import { TreeEntry, ISummaryTree, } from "@fluidframework/protocol-definitions"; +import { + ChildLogger, + EventEmitterWithErrorHandling, + PerformanceEvent, + raiseConnectedEvent, + TelemetryLogger, +} from "@fluidframework/telemetry-utils"; import { Audience } from "./audience"; import { ContainerContext } from "./containerContext"; import { debug } from "./debug"; @@ -98,6 +98,10 @@ import { parseUrl, convertProtocolAndAppSummaryToSnapshotTree } from "./utils"; const PackageNotFactoryError = "Code package does not implement IRuntimeFactory"; +interface ILocalSequencedClient extends ISequencedClient { + shouldHaveLeft?: boolean; +} + export interface IContainerConfig { resolvedUrl?: IResolvedUrl; canReconnect?: boolean; @@ -1391,6 +1395,14 @@ export class Container extends EventEmitterWithErrorHandling i this._connectionState = value; if (value === ConnectionState.Connected) { + // Mark our old client should have left in the quorum if it's still there + if (this._clientId !== undefined) { + const client: ILocalSequencedClient | undefined = + this._protocolHandler?.quorum.getMember(this._clientId); + if (client !== undefined) { + client.shouldHaveLeft = true; + } + } this._clientId = this.pendingClientId; this._deltaManager.updateQuorumJoin(); } else if (value === ConnectionState.Disconnected) { @@ -1451,6 +1463,31 @@ export class Container extends EventEmitterWithErrorHandling i } private processRemoteMessage(message: ISequencedDocumentMessage): IProcessMessageResult { + // Check and report if we're getting messages from a clientId that we previously + // flagged as shouldHaveLeft, or from a client that's not in the quorum but should be + if (message.clientId != null) { + let errorMsg: string | undefined; + const client: ILocalSequencedClient | undefined = + this._protocolHandler?.quorum.getMember(message.clientId); + if (client === undefined && message.type !== MessageType.ClientJoin) { + errorMsg = "messageClientIdMissingFromQuorum"; + } else if (client?.shouldHaveLeft === true) { + errorMsg = "messageClientIdShouldHaveLeft"; + } + if (errorMsg !== undefined) { + const error = new GenericError( + errorMsg, + { + clientId: this._clientId, + messageClientId: message.clientId, + sequenceNumber: message.sequenceNumber, + clientSequenceNumber: message.clientSequenceNumber, + }, + ); + this.close(CreateContainerError(error)); + } + } + const local = this._clientId === message.clientId; // Forward non system messages to the loaded runtime for processing diff --git a/packages/loader/container-loader/src/deltaManager.ts b/packages/loader/container-loader/src/deltaManager.ts index 346b9e5dc14a..a7f812c8e20b 100644 --- a/packages/loader/container-loader/src/deltaManager.ts +++ b/packages/loader/container-loader/src/deltaManager.ts @@ -891,6 +891,8 @@ export class DeltaManager * @param connection - The newly established connection */ private setupNewSuccessfulConnection(connection: DeltaConnection, requestedMode: ConnectionMode) { + // Old connection should have been cleaned up before establishing a new one + assert(this.connection === undefined, "old connection exists on new connection setup"); this.connection = connection; // Does information in scopes & mode matches? diff --git a/packages/loader/container-utils/src/error.ts b/packages/loader/container-utils/src/error.ts index 887f8891354b..caab34bbee4f 100644 --- a/packages/loader/container-utils/src/error.ts +++ b/packages/loader/container-utils/src/error.ts @@ -17,7 +17,7 @@ function messageFromError(error: any) { /** * Generic error */ -class GenericError extends CustomErrorWithProps implements IGenericError { +export class GenericError extends CustomErrorWithProps implements IGenericError { readonly errorType = ContainerErrorType.genericError; constructor(