Skip to content

Commit

Permalink
Implementation for resolving data corruption issue for distributed da…
Browse files Browse the repository at this point in the history
…ta ordering service (#5484)
  • Loading branch information
jatgarg authored Mar 25, 2021
1 parent 0b4889b commit 369384c
Show file tree
Hide file tree
Showing 5 changed files with 568 additions and 11 deletions.
5 changes: 5 additions & 0 deletions packages/loader/container-definitions/src/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ export type ILoaderOptions = {
* Set min op frequency with which noops would be sent in case of active connection which is not sending any op.
*/
noopCountFrequency?: number;

/**
* Max time(in ms) container will wait for a leave message of a disconnected client.
*/
maxClientLeaveWaitTime?: number,
};

/**
Expand Down
108 changes: 101 additions & 7 deletions packages/loader/container-loader/src/connectionStateHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
* Licensed under the MIT License.
*/

import { EventEmitter } from "events";
import { IEvent, ITelemetryLogger } from "@fluidframework/common-definitions";
import { IConnectionDetails } from "@fluidframework/container-definitions";
import { ProtocolOpHandler, Quorum } from "@fluidframework/protocol-base";
import { ProtocolOpHandler } from "@fluidframework/protocol-base";
import { ConnectionMode, ISequencedClient } from "@fluidframework/protocol-definitions";
import { EventEmitterWithErrorHandling } from "@fluidframework/telemetry-utils";
import { EventEmitterWithErrorHandling, PerformanceEvent } from "@fluidframework/telemetry-utils";
import { assert, Timer } from "@fluidframework/common-utils";
import { connectEventName, ConnectionState } from "./container";

export interface IConnectionStateHandler {
Expand All @@ -17,6 +17,8 @@ export interface IConnectionStateHandler {
(value: ConnectionState, oldState: ConnectionState, reason?: string | undefined) => void,
propagateConnectionState: () => void,
isContainerLoaded: () => boolean,
shouldClientJoinWrite: () => boolean,
maxClientLeaveWaitTime: number | undefined,
}

export interface ILocalSequencedClient extends ISequencedClient {
Expand All @@ -37,6 +39,13 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne
private _connectionState = ConnectionState.Disconnected;
private _pendingClientId: string | undefined;
private _clientId: string | undefined;
private readonly prevClientLeftTimer: Timer;
// True if we received the leave. False if timed out. Undefined when
// starting the timer.
private leaveReceivedResult: boolean | undefined;
private waitEvent: PerformanceEvent | undefined;
private _clientSentOps: boolean = false;
private clientConnectionMode: ConnectionMode | undefined;

public get connectionState(): ConnectionState {
return this._connectionState;
Expand All @@ -59,12 +68,70 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne
private readonly logger: ITelemetryLogger,
) {
super();
this.prevClientLeftTimer = new Timer(
// Default is 90 sec for which we are going to wait for its own "leave" message.
this.handler.maxClientLeaveWaitTime ?? 90000,
() => {
this.leaveReceivedResult = false;
this.applyForConnectedState("timeout");
},
);
}

public receivedAddMemberEvent(clientId: string, quorum: Quorum) {
// This is true when this client submitted any ops.
public clientSentOps(connectionMode: ConnectionMode) {
assert(this._connectionState === ConnectionState.Connected, "Ops could only be sent when connected");
this._clientSentOps = true;
this.clientConnectionMode = connectionMode;
}

public receivedAddMemberEvent(clientId: string) {
// This is the only one that requires the pending client ID
if (clientId === this.pendingClientId) {
// Start the event in case we are waiting for leave or timeout.
if (this.prevClientLeftTimer.hasTimer) {
this.waitEvent = PerformanceEvent.start(this.logger, {
eventName: "WaitBeforeClientLeave",
waitOnClientId: this._clientId,
hadOutstandingOps: this.handler.shouldClientJoinWrite(),
});
}
this.applyForConnectedState("addMemberEvent");
}
}

private applyForConnectedState(source: "removeMemberEvent" | "addMemberEvent" | "timeout") {
const protocolHandler = this.handler.protocolHandler();
// Move to connected state only if we are in Connecting state, we have seen our join op
// and there is no timer running which means we are not waiting for previous client to leave
// or timeout has occured while doing so.
if (this.pendingClientId !== this.clientId
&& this.pendingClientId !== undefined
&& protocolHandler !== undefined && protocolHandler.quorum.getMember(this.pendingClientId) !== undefined
&& !this.prevClientLeftTimer.hasTimer
) {
this.waitEvent?.end({ leaveReceived: this.leaveReceivedResult, source });
this.setConnectionState(ConnectionState.Connected);
} else {
// Adding this event temporarily so that we can get help debugging if something goes wrong.
this.logger.sendTelemetryEvent({
eventName: "connectedStateRejected",
source,
pendingClientId: this.pendingClientId,
clientId: this.clientId,
hasTimer: this.prevClientLeftTimer.hasTimer,
inQuorum: protocolHandler !== undefined && this.pendingClientId !== undefined
&& protocolHandler.quorum.getMember(this.pendingClientId) !== undefined,
});
}
}

public receivedRemoveMemberEvent(clientId: string) {
// If the client which has left was us, then finish the timer.
if (this.clientId === clientId) {
this.prevClientLeftTimer.clear();
this.leaveReceivedResult = true;
this.applyForConnectedState("removeMemberEvent");
}
}

Expand All @@ -73,7 +140,6 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne
}

public receivedConnectEvent(
emitter: EventEmitter,
connectionMode: ConnectionMode,
details: IConnectionDetails,
opsBehind?: number,
Expand All @@ -100,7 +166,7 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne
// Given async processes, it's possible that we have already processed our own join message before
// connection was fully established.
// Note that we might be still initializing quorum - connection is established proactively on load!
if ((protocolHandler !== undefined && protocolHandler.quorum.has(details.clientId))
if ((protocolHandler !== undefined && protocolHandler.quorum.getMember(details.clientId) !== undefined)
|| connectionMode === "read"
) {
this.setConnectionState(ConnectionState.Connected);
Expand All @@ -118,8 +184,8 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne

const oldState = this._connectionState;
this._connectionState = value;

if (value === ConnectionState.Connected) {
assert(oldState === ConnectionState.Connecting, "Should only transition from Connecting state");
// Mark our old client should have left in the quorum if it's still there
if (this._clientId !== undefined) {
const client: ILocalSequencedClient | undefined =
Expand All @@ -129,9 +195,37 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne
}
}
this._clientId = this.pendingClientId;
// Set _clientSentOps to false as this is a fresh connection.
this._clientSentOps = false;
} else if (value === ConnectionState.Disconnected) {
// Important as we process our own joinSession message through delta request
this._pendingClientId = undefined;
// Only wait for "leave" message if we have some outstanding ops and the client was write client as
// server would not accept ops from read client. Also check if the timer is not already running as we
// could receive "Disconnected" event multiple times without getting connected and in that case we
// don't want to reset the timer as we still want to wait on original client which started this timer.
// We also check the dirty state of this connection as we only want to wait for the client leave of the
// client which created the ops. This helps with situation where a client disconnects immediately after
// getting connected without sending any ops(from previous client). In this case, we would join as write
// because there would be a diff between client seq number and clientSeqNumberObserved but then we don't
// want to wait for newly disconnected client to leave as it has not sent any ops yet.
if (this.handler.shouldClientJoinWrite()
&& this.clientConnectionMode === "write"
&& this.prevClientLeftTimer.hasTimer === false
&& this._clientSentOps
) {
this.leaveReceivedResult = undefined;
this.prevClientLeftTimer.restart();
} else {
// Adding this event temporarily so that we can get help debugging if something goes wrong.
this.logger.sendTelemetryEvent({
eventName: "noWaitOnDisconnected",
clientConnectionMode: this.clientConnectionMode,
hasTimer: this.prevClientLeftTimer.hasTimer,
clientSentOps: this._clientSentOps,
shouldClientJoinWrite: this.handler.shouldClientJoinWrite(),
});
}
}

if (this.handler.isContainerLoaded()) {
Expand Down
16 changes: 13 additions & 3 deletions packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
public get options(): ILoaderOptions { return this.loader.services.options; }
private get scope() { return this.loader.services.scope;}
private get codeLoader() { return this.loader.services.codeLoader;}

constructor(
private readonly loader: Loader,
config: IContainerConfig,
Expand Down Expand Up @@ -617,6 +618,8 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
this.logConnectionStateChangeTelemetry(value, oldState, reason),
propagateConnectionState: () => this.propagateConnectionState(),
isContainerLoaded: () => this.loaded,
shouldClientJoinWrite: () => this._deltaManager.shouldJoinWrite(),
maxClientLeaveWaitTime: this.loader.services.options.maxClientLeaveWaitTime,
},
this.logger,
);
Expand Down Expand Up @@ -1415,10 +1418,14 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i

// Track membership changes and update connection state accordingly
protocol.quorum.on("addMember", (clientId, details) => {
this.connectionStateHandler.receivedAddMemberEvent(clientId, this.protocolHandler.quorum);
this.connectionStateHandler.receivedAddMemberEvent(clientId);
});

protocol.quorum.on("removeMember", (clientId) => {
this.connectionStateHandler.receivedRemoveMemberEvent(clientId);
});

protocol.quorum.on("addProposal",(proposal: IPendingProposal) => {
protocol.quorum.on("addProposal", (proposal: IPendingProposal) => {
if (proposal.key === "code" || proposal.key === "code2") {
this.emit("codeDetailsProposed", proposal.value, proposal);
}
Expand Down Expand Up @@ -1534,7 +1541,6 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i

deltaManager.on(connectEventName, (details: IConnectionDetails, opsBehind?: number) => {
this.connectionStateHandler.receivedConnectEvent(
this,
this._deltaManager.connectionMode,
details,
opsBehind,
Expand All @@ -1548,6 +1554,10 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
}
});

deltaManager.once("submitOp", (message: IDocumentMessage) => {
this.connectionStateHandler.clientSentOps(this._deltaManager.connectionMode);
});

deltaManager.on("disconnect", (reason: string) => {
this.manualReconnectInProgress = false;
this.connectionStateHandler.receivedDisconnectEvent(reason);
Expand Down
6 changes: 5 additions & 1 deletion packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ export class DeltaManager
return this._reconnectMode;
}

public shouldJoinWrite(): boolean {
return this.clientSequenceNumber !== this.clientSequenceNumberObserved;
}

public async connectToStorage(): Promise<IDocumentStorageService> {
if (this.storageService !== undefined) {
return this.storageService;
Expand Down Expand Up @@ -585,7 +589,7 @@ export class DeltaManager
// firing of "connected" event from Container and switch of current clientId (as tracked
// by all DDSes). This will make it impossible to figure out if ops actually made it through,
// so DDSes will immediately resubmit all pending ops, and some of them will be duplicates, corrupting document
if (this.clientSequenceNumberObserved !== this.clientSequenceNumber) {
if (this.shouldJoinWrite()) {
requestedMode = "write";
}

Expand Down
Loading

0 comments on commit 369384c

Please sign in to comment.