Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation for resolving data corruption issue for distributed data ordering service #5484

Merged
merged 35 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d09e0c0
Implementation for resolving data corruption for distributed data ord…
jatgarg Mar 11, 2021
bb0e995
merge conflict
jatgarg Mar 15, 2021
6afa05a
Add in loader options
jatgarg Mar 15, 2021
0499496
Add in loader options
jatgarg Mar 15, 2021
15f32db
Expose proper prop
jatgarg Mar 15, 2021
002b907
Expose proper prop
jatgarg Mar 15, 2021
e75b5cd
Fix event
jatgarg Mar 15, 2021
5e3623f
Change promise resolve
jatgarg Mar 15, 2021
d9e9675
set to undefined in case of connected only
jatgarg Mar 18, 2021
a9bd5e5
set to undefined in case of connected only
jatgarg Mar 18, 2021
25d3984
Pr sugg
jatgarg Mar 18, 2021
f03afd3
fix
jatgarg Mar 18, 2021
af8697b
fix
jatgarg Mar 18, 2021
717819d
merge conflict
jatgarg Mar 18, 2021
c018824
fix order
jatgarg Mar 18, 2021
9a6e7b8
set isdirty and add comments
jatgarg Mar 18, 2021
e9b2a47
add space
jatgarg Mar 18, 2021
82fa0c4
fix event
jatgarg Mar 18, 2021
e4232c1
fix event
jatgarg Mar 18, 2021
1e478c8
fix event
jatgarg Mar 18, 2021
68f8db5
fix dirty
jatgarg Mar 18, 2021
571998b
fix dirty
jatgarg Mar 18, 2021
f6dac03
Add basic test
jatgarg Mar 19, 2021
91d8439
Add test and remove promise
jatgarg Mar 19, 2021
3633fc1
add more test
jatgarg Mar 19, 2021
3461bcb
add more test
jatgarg Mar 19, 2021
97f46da
add more test
jatgarg Mar 19, 2021
71297b3
Pr sugg
jatgarg Mar 19, 2021
12ae2d5
Pr sugg
jatgarg Mar 19, 2021
1c85d89
Pr sugg
jatgarg Mar 19, 2021
1420cc5
Pr sugg
jatgarg Mar 19, 2021
93480ee
Make noop on non connected state
jatgarg Mar 24, 2021
ff6d151
pr sugg
jatgarg Mar 24, 2021
17de42e
add some telemetry events
jatgarg Mar 25, 2021
3c76d63
add some telemetry events
jatgarg Mar 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/loader/container-definitions/src/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ export type ILoaderOptions = {
* Set min op frequency with which noops would be sent in case of active connection which is not sending any op.
*/
noopCountFrequency?: number;

/**
* Max time(in ms) container will wait for a leave message of a disconnected client.
*/
maxClientLeaveWaitTime?: number,
};

/**
Expand Down
84 changes: 77 additions & 7 deletions packages/loader/container-loader/src/connectionStateHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
* Licensed under the MIT License.
*/

import { EventEmitter } from "events";
import { IEvent, ITelemetryLogger } from "@fluidframework/common-definitions";
import { IConnectionDetails } from "@fluidframework/container-definitions";
import { ProtocolOpHandler, Quorum } from "@fluidframework/protocol-base";
import { ConnectionMode, ISequencedClient } from "@fluidframework/protocol-definitions";
import { EventEmitterWithErrorHandling } from "@fluidframework/telemetry-utils";
import { ProtocolOpHandler } from "@fluidframework/protocol-base";
import { ConnectionMode, IClient, ISequencedClient } from "@fluidframework/protocol-definitions";
import { EventEmitterWithErrorHandling, PerformanceEvent } from "@fluidframework/telemetry-utils";
import { assert, Timer } from "@fluidframework/common-utils";
import { connectEventName, ConnectionState } from "./container";

export interface IConnectionStateHandler {
Expand All @@ -17,6 +17,9 @@ export interface IConnectionStateHandler {
(value: ConnectionState, oldState: ConnectionState, reason?: string | undefined) => void,
propagateConnectionState: () => void,
isContainerLoaded: () => boolean,
client: () => IClient,
shouldClientJoinWrite: () => boolean,
maxClientLeaveWaitTime: number | undefined,
}

export interface ILocalSequencedClient extends ISequencedClient {
Expand All @@ -37,6 +40,12 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne
private _connectionState = ConnectionState.Disconnected;
private _pendingClientId: string | undefined;
private _clientId: string | undefined;
private prevClientLeftTimer: Timer;
// This is client id of client for which we have received the addMember event but we are waiting on some previous
// client to leave before moving to Connected state.
private waitingClientId: string | undefined;
private waitEvent: PerformanceEvent | undefined;
private _clientSentOps: boolean = false;

public get connectionState(): ConnectionState {
return this._connectionState;
Expand All @@ -59,21 +68,62 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne
private readonly logger: ITelemetryLogger,
) {
super();
this.prevClientLeftTimer = new Timer(
// Default is 90 sec for which we are going to wait for its own "leave" message.
this.handler.maxClientLeaveWaitTime ?? 90000,
() => {
this.clientLeaveWaitEnded(false);
},
);
}

public receivedAddMemberEvent(clientId: string, quorum: Quorum) {
// This is true when this client submitted any ops.
public clientSentOps() {
this._clientSentOps = true;
}

public receivedAddMemberEvent(clientId: string) {
// This is the only one that requires the pending client ID
if (clientId === this.pendingClientId) {
// Wait for previous client to leave the quorum before firing "connected" event.
if (this.prevClientLeftTimer.hasTimer) {
this.waitEvent = PerformanceEvent.start(this.logger, {
eventName: "WaitBeforeClientLeave",
waitOnClientId: this._clientId,
hadOutstandingOps: this.handler.shouldClientJoinWrite(),
});
this.waitingClientId = clientId;
} else {
this.setConnectionState(ConnectionState.Connected);
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

private clientLeaveWaitEnded(leaveReceived: boolean) {
// Move to connected state only if there was a client waiting for client leave. It may happen that
// during wait, the waiting client again got Disconnected/Connecting and then we don't want to
// move to connected state here for a non waiting client.
if (this.waitingClientId !== undefined) {
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
assert(this.waitingClientId === this.pendingClientId, "Pending Client Id should match the waiting client");
vladsud marked this conversation as resolved.
Show resolved Hide resolved
assert(this.waitEvent !== undefined, "Wait event should be set");
this.waitEvent.end({ leaveReceived });
this.setConnectionState(ConnectionState.Connected);
}
}

public receivedRemoveMemberEvent(clientId: string) {
// If the client which has left was us, then finish the timer.
if (this.clientId === clientId) {
this.prevClientLeftTimer?.clear();
this.clientLeaveWaitEnded(true);
}
}

public receivedDisconnectEvent(reason: string) {
this.setConnectionState(ConnectionState.Disconnected, reason);
}

public receivedConnectEvent(
emitter: EventEmitter,
connectionMode: ConnectionMode,
details: IConnectionDetails,
opsBehind?: number,
Expand Down Expand Up @@ -118,8 +168,10 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne

const oldState = this._connectionState;
this._connectionState = value;

// Set it to undefined as now there is no client waiting to get connected.
this.waitingClientId = undefined;
if (value === ConnectionState.Connected) {
assert(oldState === ConnectionState.Connecting, "Should only transition from Connecting state");
// Mark our old client should have left in the quorum if it's still there
if (this._clientId !== undefined) {
const client: ILocalSequencedClient | undefined =
Expand All @@ -129,9 +181,27 @@ export class ConnectionStateHandler extends EventEmitterWithErrorHandling<IConne
}
}
this._clientId = this.pendingClientId;
// Set _clientSentOps to false as this is a fresh connection.
this._clientSentOps = false;
} else if (value === ConnectionState.Disconnected) {
// Important as we process our own joinSession message through delta request
this._pendingClientId = undefined;
// Only wait for "leave" message if we have some outstanding ops and the client was write client as
// server would not accept ops from read client. Also check if the timer is not already running as we
// could receive "Disconnected" event multiple times without getting connected and in that case we
// don't want to reset the timer as we still want to wait on original client which started this timer.
// We also check the dirty state of this connection as we only want to wait for the client leave of the
// client which created the ops. This helps with situation where a client disconnects immediately after
// getting connected without sending any ops(from previous client). In this case, we would join as write
// because there would be a diff between client seq number and clientSeqNumberObserved but then we don't
// want to wait for newly disconnected client to leave as it has not sent any ops yet.
if (this.handler.shouldClientJoinWrite()
&& this.handler.client().mode === "write"
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
&& this.prevClientLeftTimer.hasTimer === false
&& this._clientSentOps
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
) {
this.prevClientLeftTimer.restart();
}
}

if (this.handler.isContainerLoaded()) {
Expand Down
17 changes: 14 additions & 3 deletions packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
public get options(): ILoaderOptions { return this.loader.services.options; }
private get scope() { return this.loader.services.scope;}
private get codeLoader() { return this.loader.services.codeLoader;}

constructor(
private readonly loader: Loader,
config: IContainerConfig,
Expand Down Expand Up @@ -599,6 +600,9 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
this.logConnectionStateChangeTelemetry(value, oldState, reason),
propagateConnectionState: () => this.propagateConnectionState(),
isContainerLoaded: () => this.loaded,
client: () => this.client,
shouldClientJoinWrite: () => this._deltaManager.shouldJoinWrite(),
maxClientLeaveWaitTime: this.loader.services.options.maxClientLeaveWaitTime,
},
this.logger,
);
Expand Down Expand Up @@ -1396,10 +1400,14 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i

// Track membership changes and update connection state accordingly
protocol.quorum.on("addMember", (clientId, details) => {
this.connectionStateHandler.receivedAddMemberEvent(clientId, this.protocolHandler.quorum);
this.connectionStateHandler.receivedAddMemberEvent(clientId);
});

protocol.quorum.on("removeMember", (clientId) => {
this.connectionStateHandler.receivedRemoveMemberEvent(clientId);
});

protocol.quorum.on("addProposal",(proposal: IPendingProposal) => {
protocol.quorum.on("addProposal", (proposal: IPendingProposal) => {
if (proposal.key === "code" || proposal.key === "code2") {
this.emit("codeDetailsProposed", proposal.value, proposal);
}
Expand Down Expand Up @@ -1515,7 +1523,6 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i

deltaManager.on(connectEventName, (details: IConnectionDetails, opsBehind?: number) => {
this.connectionStateHandler.receivedConnectEvent(
this,
this._deltaManager.connectionMode,
details,
opsBehind,
Expand All @@ -1529,6 +1536,10 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
}
});

deltaManager.once("submitOp", (message: IDocumentMessage) => {
this.connectionStateHandler.clientSentOps();
});

deltaManager.on("disconnect", (reason: string) => {
this.manualReconnectInProgress = false;
this.connectionStateHandler.receivedDisconnectEvent(reason);
Expand Down
6 changes: 5 additions & 1 deletion packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ export class DeltaManager
return this._reconnectMode;
}

public shouldJoinWrite(): boolean {
return this.clientSequenceNumber !== this.clientSequenceNumberObserved;
}

public async connectToStorage(): Promise<IDocumentStorageService> {
if (this.storageService !== undefined) {
return this.storageService;
Expand Down Expand Up @@ -580,7 +584,7 @@ export class DeltaManager
// firing of "connected" event from Container and switch of current clientId (as tracked
// by all DDSes). This will make it impossible to figure out if ops actually made it through,
// so DDSes will immediately resubmit all pending ops, and some of them will be duplicates, corrupting document
if (this.clientSequenceNumberObserved !== this.clientSequenceNumber) {
if (this.shouldJoinWrite()) {
requestedMode = "write";
}

Expand Down
Loading