From 4e063355724aacc1c7cdc099602687762308fa64 Mon Sep 17 00:00:00 2001 From: Curtis Man Date: Tue, 12 Jan 2021 09:31:48 -0800 Subject: [PATCH] Add leader tests (#4785) Fix #4297 - the LocalDocumentDeltaConnection.close need to disconnect the client. - OpProcessController need to keep track of disconnecting client and expecting Leave messages. Other changes: - Don't lint for the build task that is run before we launch the debugging session. --- .vscode/launch.json | 2 +- .vscode/tasks.json | 3 +- .../src/documentDeltaConnection.ts | 2 +- .../src/localDocumentDeltaConnection.ts | 2 +- .../runtime/agent-scheduler/src/scheduler.ts | 4 +- .../test/deRehydrateContainerTests.spec.ts | 1 - .../end-to-end-tests/src/test/leader.spec.ts | 183 ++++++++++++++++++ .../test-utils/src/opProcessingController.ts | 43 +++- 8 files changed, 228 insertions(+), 12 deletions(-) create mode 100644 packages/test/end-to-end-tests/src/test/leader.spec.ts diff --git a/.vscode/launch.json b/.vscode/launch.json index c7d56bda5c07..0c13a32551ab 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -243,7 +243,7 @@ "skipFiles": [ "/**/*.js" ], - "preLaunchTask": "fluid-build $cwd" + "preLaunchTask": "fluid-build $cwd --nolint" } ] diff --git a/.vscode/tasks.json b/.vscode/tasks.json index d768e6abba06..b1dde704d216 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -23,7 +23,7 @@ ] }, { - "label": "fluid-build $cwd", + "label": "fluid-build $cwd --nolint", "type": "process", "command": "node", "args": [ @@ -31,6 +31,7 @@ "--root", "${workspaceRoot}", "--vscode", + "--nolint", "${fileDirname}" ], "group": "build", diff --git a/packages/drivers/driver-base/src/documentDeltaConnection.ts b/packages/drivers/driver-base/src/documentDeltaConnection.ts index b020255e4346..5da30280678f 100644 --- a/packages/drivers/driver-base/src/documentDeltaConnection.ts +++ b/packages/drivers/driver-base/src/documentDeltaConnection.ts @@ -332,7 +332,7 @@ export class DocumentDeltaConnection protected closeCore(socketProtocolError: boolean, err: DriverError) { if (this.closed) { - // We see cases where socket is closed while we have two "disconect" listeners - one from DeltaManager, + // We see cases where socket is closed while we have two "disconnect" listeners - one from DeltaManager, // one - early handler that should have been removed on establishing connection. This causes asserts in // OdspDocumentDeltaConnection.disconnect() due to not expectting two calls. this.logger.sendErrorEvent( diff --git a/packages/drivers/local-driver/src/localDocumentDeltaConnection.ts b/packages/drivers/local-driver/src/localDocumentDeltaConnection.ts index e1b0bd0d712b..74bb984e2464 100644 --- a/packages/drivers/local-driver/src/localDocumentDeltaConnection.ts +++ b/packages/drivers/local-driver/src/localDocumentDeltaConnection.ts @@ -189,7 +189,7 @@ export class LocalDocumentDeltaConnection } public close() { - // Do nothing + this.disconnectClient("client close"); } /** diff --git a/packages/runtime/agent-scheduler/src/scheduler.ts b/packages/runtime/agent-scheduler/src/scheduler.ts index 080145e0c388..4082b0d92329 100644 --- a/packages/runtime/agent-scheduler/src/scheduler.ts +++ b/packages/runtime/agent-scheduler/src/scheduler.ts @@ -230,7 +230,7 @@ class AgentScheduler extends EventEmitter implements IAgentScheduler { if (this.isActive() && currentClient === this.clientId) { this.onNewTaskAssigned(key); } else { - await this.onTaskReasigned(key, currentClient); + await this.onTaskReassigned(key, currentClient); } }); @@ -274,7 +274,7 @@ class AgentScheduler extends EventEmitter implements IAgentScheduler { } } - private async onTaskReasigned(key: string, currentClient: string | null) { + private async onTaskReassigned(key: string, currentClient: string | null) { if (this.runningTasks.has(key)) { this.runningTasks.delete(key); this.emit("released", key); diff --git a/packages/test/end-to-end-tests/src/test/deRehydrateContainerTests.spec.ts b/packages/test/end-to-end-tests/src/test/deRehydrateContainerTests.spec.ts index 142467c7c2a9..757eaf3b7b52 100644 --- a/packages/test/end-to-end-tests/src/test/deRehydrateContainerTests.spec.ts +++ b/packages/test/end-to-end-tests/src/test/deRehydrateContainerTests.spec.ts @@ -498,7 +498,6 @@ describe(`Dehydrate Rehydrate Container Test`, () => { assert.strictEqual(dataStore2FromRC.runtime.id, dataStore2.runtime.id, "DataStore2 id should match"); }); - // eslint-disable-next-line max-len it("Container rehydration with not bounded data store handle stored in root of bound dataStore. The not bounded" + "data store also stores handle not bounded dds", async () => { diff --git a/packages/test/end-to-end-tests/src/test/leader.spec.ts b/packages/test/end-to-end-tests/src/test/leader.spec.ts new file mode 100644 index 000000000000..5a35d5aee409 --- /dev/null +++ b/packages/test/end-to-end-tests/src/test/leader.spec.ts @@ -0,0 +1,183 @@ +/*! + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; +import { Container } from "@fluidframework/container-loader"; +import { requestFluidObject } from "@fluidframework/runtime-utils"; +import { ITestFluidObject } from "@fluidframework/test-utils"; +import { + generateLocalNonCompatTest, + ITestObjectProvider, +} from "./compatUtils"; + +async function ensureConnected(container: Container) { + if (!container.connected) { + await new Promise((resolve, rejected) => container.on("connected", resolve)); + } +} + +const tests = (args: ITestObjectProvider) => { + let container1: Container; + let dataObject1: ITestFluidObject; + beforeEach(async () => { + container1 = await args.makeTestContainer() as Container; + dataObject1 = await requestFluidObject(container1, "default"); + await ensureConnected(container1); + }); + + it("Create and load", async () => { + // after detach create, we are in view only mode + assert(!container1.deltaManager.active); + + // shouldn't be a leader in view only mode + assert(!dataObject1.context.leader); + + const container2 = await args.loadTestContainer() as Container; + await ensureConnected(container2); + await args.opProcessingController.process(); + const dataObject2 = await requestFluidObject(container2, "default"); + + // Currently, we load a container in write mode from the start. See issue #3304. + // Once that is fix, this needs to change + assert(container2.deltaManager.active); + assert(dataObject2.context.leader); + }); + + interface ListenerConfig { dataObject: ITestFluidObject, name: string, leader: boolean, notleader: boolean } + const setupListener = (config: ListenerConfig) => { + config.dataObject.runtime.on("leader", () => { + assert(config.leader, `leader event not expected in ${config.name}`); + config.leader = false; + }); + + config.dataObject.runtime.on("notleader", () => { + assert(config.notleader, `notleader event not expected in ${config.name}`); + config.notleader = false; + }); + }; + + const checkExpected = (config: ListenerConfig) => { + assert(!config.leader, `Missing leader event on ${config.name}`); + assert(!config.notleader, `Missing notleader event on ${config.name}`); + }; + + it("View to write mode", async () => { + const config = { dataObject: dataObject1, name: "dataObject1", leader: true, notleader: false }; + setupListener(config); + + // write something to get out of view only mode and take leadership + dataObject1.root.set("blah", "blah"); + await args.opProcessingController.process(); + + checkExpected(config); + assert(dataObject1.context.leader); + }); + + it("force read only", async () => { + // write something to get out of view only mode and take leadership + dataObject1.root.set("blah", "blah"); + await args.opProcessingController.process(); + + const config = { dataObject: dataObject1, name: "dataObject1", leader: false, notleader: true }; + setupListener(config); + + container1.forceReadonly(true); + await args.opProcessingController.process(); + + checkExpected(config); + assert(!dataObject1.context.leader); + }); + + it("Events on close", async () => { + // write something to get out of view only mode and take leadership + dataObject1.root.set("blah", "blah"); + + const container2 = await args.loadTestContainer() as Container; + const dataObject2 = await requestFluidObject(container2, "default"); + + // Currently, we load a container in write mode from the start. See issue #3304. + // Once that is fix, this needs to change + await ensureConnected(container2); + await args.opProcessingController.process(); + + assert(dataObject1.context.leader); + assert(!dataObject2.context.leader); + + const config1 = { dataObject: dataObject1, name: "dataObject1", leader: false, notleader: true }; + const config2 = { dataObject: dataObject2, name: "dataObject2", leader: true, notleader: false }; + setupListener(config1); + setupListener(config2); + + container1.close(); + + await args.opProcessingController.process(); + + checkExpected(config1); + checkExpected(config2); + assert(!dataObject1.context.leader); + assert(dataObject2.context.leader); + }); + + it("Concurrent update", async () => { + // write something to get out of view only mode and take leadership + dataObject1.root.set("blah", "blah"); + await args.opProcessingController.process(); + assert(dataObject1.context.leader); + + const container2 = await args.loadTestContainer() as Container; + const dataObject2 = await requestFluidObject(container2, "default"); + + const container3 = await args.loadTestContainer() as Container; + const dataObject3 = await requestFluidObject(container3, "default"); + + // Currently, we load a container in write mode from the start. See issue #3304. + // Once that is fix, this needs to change + await Promise.all([ensureConnected(container2), ensureConnected(container3)]); + await args.opProcessingController.process(); + + assert(dataObject1.context.leader); + assert(!dataObject2.context.leader); + assert(!dataObject3.context.leader); + + await args.opProcessingController.pauseProcessing(); + + const config2 = { dataObject: dataObject2, name: "dataObject2", leader: false, notleader: false }; + const config3 = { dataObject: dataObject3, name: "dataObject3", leader: false, notleader: false }; + setupListener(config2); + setupListener(config3); + + container1.close(); + + // Process all the leave message + await args.opProcessingController.processIncoming(); + + // No one should be a leader yet + assert(!dataObject1.context.leader); + assert(!dataObject2.context.leader); + assert(!dataObject3.context.leader); + + config2.leader = true; + config3.leader = true; + + await args.opProcessingController.process(); + assert((dataObject2.context.leader || dataObject3.context.leader) && + (!dataObject2.context.leader || !dataObject3.context.leader), + "only one container should be the leader"); + + if (dataObject2.context.leader) { + assert(config3.leader); + config3.leader = false; + } else if (dataObject3.context.leader) { + assert(config2.leader); + config2.leader = false; + } + checkExpected(config2); + checkExpected(config3); + }); +}; + +describe("Leader", () => { + generateLocalNonCompatTest(tests, { tinylicious: true }); +}); diff --git a/packages/test/test-utils/src/opProcessingController.ts b/packages/test/test-utils/src/opProcessingController.ts index fb028401d0d6..fec5a4a5d5d6 100644 --- a/packages/test/test-utils/src/opProcessingController.ts +++ b/packages/test/test-utils/src/opProcessingController.ts @@ -78,9 +78,11 @@ class DeltaManagerToggle { */ class DeltaManagerMonitor extends DeltaManagerToggle { private pendingCount: number = 0; - private clientId: string | undefined; + public clientId: string | undefined; + public readMode = true; private firstClientSequenceNumber: number = -1; private lastOutbound: IDocumentMessage | undefined; + private readonly pendingLeaveClientIds = new Set(); private readonly lastInboundPerClient = new Map(); private pendingWriteConnection = false; @@ -153,11 +155,12 @@ class DeltaManagerMonitor extends DeltaManagerToggle { public hasPendingWork() { return !this.deltaManager.disposed - && (this.pendingWriteConnection || this.pendingCount !== 0); + && (this.pendingWriteConnection || this.pendingCount !== 0 || this.pendingLeaveClientIds.size !== 0); } private connect(clientId: string) { this.clientId = clientId; + this.readMode = !this.deltaManager.active; this.trace("CON"); } private inbound(message: ISequencedDocumentMessage) { @@ -168,6 +171,7 @@ class DeltaManagerMonitor extends DeltaManagerToggle { const systemLeaveMessage = message as ISequencedDocumentSystemMessage; const clientId = JSON.parse(systemLeaveMessage.data) as string; this.lastInboundPerClient.delete(clientId); + this.pendingLeaveClientIds.delete(clientId); } if (this.clientId === undefined) { @@ -189,9 +193,10 @@ class DeltaManagerMonitor extends DeltaManagerToggle { // Need to filter system messages switch (message.type) { - // These are generated by the server, don't count case MessageType.ClientJoin: case MessageType.ClientLeave: + assert(false, "join and leave message shouldn't have clientId"); + // These are generated by the server, don't count case MessageType.NoOp: case MessageType.NoClient: this.trace("SEQ", message.type); @@ -222,7 +227,7 @@ class DeltaManagerMonitor extends DeltaManagerToggle { // to bump min seq if (message.type !== MessageType.NoOp) { this.pendingCount++; - this.lastOutbound = message; + this.lastOutbound = message; } this.trace("OUT", message.type); } @@ -232,6 +237,11 @@ class DeltaManagerMonitor extends DeltaManagerToggle { debug(`DeltaConnectionMonitor: ${action.padEnd(3)}: ${this.clientId} ` + `pending:${this.pendingCount} seq:${this.latestSequenceNumber} ${op ?? ""}`); } + + public onClientDisconnect(clientId: string) { + // Keep track of a list of clientIds that we expect leave message from + this.pendingLeaveClientIds.add(clientId); + } } /** * @deprecated OpProcessingController has been improved to not need server information and work against other servers. @@ -277,7 +287,30 @@ export class OpProcessingController { */ public addDeltaManagers(...deltaManagers: DeltaManager[]) { deltaManagers.forEach((deltaManager) => { - this.deltaManagerMonitors.set(deltaManager, new DeltaManagerMonitor(deltaManager)); + const monitorSetup = (monitor1: DeltaManagerMonitor, monitor2: DeltaManagerMonitor) => { + if (monitor1.clientId !== undefined && monitor1.deltaManager.active) { + const clientId = monitor1.clientId; + monitor1.deltaManager.once("disconnect", () => { + monitor2.onClientDisconnect(clientId); + }); + } + monitor1.deltaManager.on("connect", (details) => { + if (monitor1.deltaManager.active) { + monitor1.deltaManager.once("disconnect", () => { + monitor2.onClientDisconnect(details.clientId); + }); + } + }); + }; + + // Wire up event listener so we can keep track of leave message that we expects + const newMonitor = new DeltaManagerMonitor(deltaManager); + for (const monitor of this.deltaManagerMonitors.values()) { + monitorSetup(newMonitor, monitor); + monitorSetup(monitor, newMonitor); + } + + this.deltaManagerMonitors.set(deltaManager, newMonitor); }); }