Skip to content

Commit

Permalink
Add expired client identifier field and checks to improve old clientI…
Browse files Browse the repository at this point in the history
…d check perf (#3704) (#3801)

#3675
Change the old/expired clientId check to use the quorum instead of by keeping a list of old clientIds. This works by adding a shouldHaveLeft field to the ISequencedClient in the quorum, where a reconnecting client will set its old clientId shouldHaveLeft to true (if it still exists in the client). Then when we process a message, instead of checking against a list of old clientIds, we check if the clientId exists in the quorum (messages should only be coming from clients in the quorum) and if the clientId has been marked should have left. An inactive clientId in this situation would suggest a message from an old connection of the same client (which shouldn't happen).
  • Loading branch information
heliocliu authored Oct 1, 2020
1 parent d33fa99 commit 5baf68a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 10 deletions.
55 changes: 46 additions & 9 deletions packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
ITelemetryBaseLogger,
ITelemetryLogger,
} from "@fluidframework/common-definitions";
import { performance } from "@fluidframework/common-utils";
import { IFluidObject, IRequest, IResponse, IFluidRouter } from "@fluidframework/core-interfaces";
import {
IAudience,
Expand All @@ -29,14 +30,7 @@ import {
IThrottlingWarning,
AttachState,
} from "@fluidframework/container-definitions";
import { performance } from "@fluidframework/common-utils";
import {
ChildLogger,
EventEmitterWithErrorHandling,
PerformanceEvent,
raiseConnectedEvent,
TelemetryLogger,
} from "@fluidframework/telemetry-utils";
import { CreateContainerError, GenericError } from "@fluidframework/container-utils";
import {
IDocumentService,
IDocumentStorageService,
Expand All @@ -56,7 +50,6 @@ import {
combineAppAndProtocolSummary,
readAndParseFromBlobs,
} from "@fluidframework/driver-utils";
import { CreateContainerError } from "@fluidframework/container-utils";
import {
isSystemMessage,
ProtocolOpHandler,
Expand Down Expand Up @@ -85,6 +78,13 @@ import {
TreeEntry,
ISummaryTree,
} from "@fluidframework/protocol-definitions";
import {
ChildLogger,
EventEmitterWithErrorHandling,
PerformanceEvent,
raiseConnectedEvent,
TelemetryLogger,
} from "@fluidframework/telemetry-utils";
import { Audience } from "./audience";
import { ContainerContext } from "./containerContext";
import { debug } from "./debug";
Expand All @@ -98,6 +98,10 @@ import { parseUrl, convertProtocolAndAppSummaryToSnapshotTree } from "./utils";

const PackageNotFactoryError = "Code package does not implement IRuntimeFactory";

interface ILocalSequencedClient extends ISequencedClient {
shouldHaveLeft?: boolean;
}

export interface IContainerConfig {
resolvedUrl?: IResolvedUrl;
canReconnect?: boolean;
Expand Down Expand Up @@ -1391,6 +1395,14 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
this._connectionState = value;

if (value === ConnectionState.Connected) {
// Mark our old client should have left in the quorum if it's still there
if (this._clientId !== undefined) {
const client: ILocalSequencedClient | undefined =
this._protocolHandler?.quorum.getMember(this._clientId);
if (client !== undefined) {
client.shouldHaveLeft = true;
}
}
this._clientId = this.pendingClientId;
this._deltaManager.updateQuorumJoin();
} else if (value === ConnectionState.Disconnected) {
Expand Down Expand Up @@ -1451,6 +1463,31 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
}

private processRemoteMessage(message: ISequencedDocumentMessage): IProcessMessageResult {
// Check and report if we're getting messages from a clientId that we previously
// flagged as shouldHaveLeft, or from a client that's not in the quorum but should be
if (message.clientId != null) {
let errorMsg: string | undefined;
const client: ILocalSequencedClient | undefined =
this._protocolHandler?.quorum.getMember(message.clientId);
if (client === undefined && message.type !== MessageType.ClientJoin) {
errorMsg = "messageClientIdMissingFromQuorum";
} else if (client?.shouldHaveLeft === true) {
errorMsg = "messageClientIdShouldHaveLeft";
}
if (errorMsg !== undefined) {
const error = new GenericError(
errorMsg,
{
clientId: this._clientId,
messageClientId: message.clientId,
sequenceNumber: message.sequenceNumber,
clientSequenceNumber: message.clientSequenceNumber,
},
);
this.close(CreateContainerError(error));
}
}

const local = this._clientId === message.clientId;

// Forward non system messages to the loaded runtime for processing
Expand Down
2 changes: 2 additions & 0 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,8 @@ export class DeltaManager
* @param connection - The newly established connection
*/
private setupNewSuccessfulConnection(connection: DeltaConnection, requestedMode: ConnectionMode) {
// Old connection should have been cleaned up before establishing a new one
assert(this.connection === undefined, "old connection exists on new connection setup");
this.connection = connection;

// Does information in scopes & mode matches?
Expand Down
2 changes: 1 addition & 1 deletion packages/loader/container-utils/src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function messageFromError(error: any) {
/**
* Generic error
*/
class GenericError extends CustomErrorWithProps implements IGenericError {
export class GenericError extends CustomErrorWithProps implements IGenericError {
readonly errorType = ContainerErrorType.genericError;

constructor(
Expand Down

0 comments on commit 5baf68a

Please sign in to comment.