Skip to content

Commit

Permalink
Add leader tests (#4785)
Browse files Browse the repository at this point in the history
Fix #4297
- the LocalDocumentDeltaConnection.close need to disconnect the client.
- OpProcessController need to keep track of disconnecting client and expecting Leave messages.

Other changes:
- Don't lint for the build task that is run before we launch the debugging session.
  • Loading branch information
curtisman authored Jan 12, 2021
1 parent af422ff commit 4e06335
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@
"skipFiles": [
"<node_internals>/**/*.js"
],
"preLaunchTask": "fluid-build $cwd"
"preLaunchTask": "fluid-build $cwd --nolint"
}

]
Expand Down
3 changes: 2 additions & 1 deletion .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
]
},
{
"label": "fluid-build $cwd",
"label": "fluid-build $cwd --nolint",
"type": "process",
"command": "node",
"args": [
"${workspaceRoot}/node_modules/@fluidframework/build-tools/dist/fluidBuild/fluidBuild.js",
"--root",
"${workspaceRoot}",
"--vscode",
"--nolint",
"${fileDirname}"
],
"group": "build",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ export class DocumentDeltaConnection

protected closeCore(socketProtocolError: boolean, err: DriverError) {
if (this.closed) {
// We see cases where socket is closed while we have two "disconect" listeners - one from DeltaManager,
// We see cases where socket is closed while we have two "disconnect" listeners - one from DeltaManager,
// one - early handler that should have been removed on establishing connection. This causes asserts in
// OdspDocumentDeltaConnection.disconnect() due to not expectting two calls.
this.logger.sendErrorEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export class LocalDocumentDeltaConnection
}

public close() {
// Do nothing
this.disconnectClient("client close");
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/runtime/agent-scheduler/src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class AgentScheduler extends EventEmitter implements IAgentScheduler {
if (this.isActive() && currentClient === this.clientId) {
this.onNewTaskAssigned(key);
} else {
await this.onTaskReasigned(key, currentClient);
await this.onTaskReassigned(key, currentClient);
}
});

Expand Down Expand Up @@ -274,7 +274,7 @@ class AgentScheduler extends EventEmitter implements IAgentScheduler {
}
}

private async onTaskReasigned(key: string, currentClient: string | null) {
private async onTaskReassigned(key: string, currentClient: string | null) {
if (this.runningTasks.has(key)) {
this.runningTasks.delete(key);
this.emit("released", key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,6 @@ describe(`Dehydrate Rehydrate Container Test`, () => {
assert.strictEqual(dataStore2FromRC.runtime.id, dataStore2.runtime.id, "DataStore2 id should match");
});

// eslint-disable-next-line max-len
it("Container rehydration with not bounded data store handle stored in root of bound dataStore. The not bounded" +
"data store also stores handle not bounded dds",
async () => {
Expand Down
183 changes: 183 additions & 0 deletions packages/test/end-to-end-tests/src/test/leader.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*!
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { strict as assert } from "assert";
import { Container } from "@fluidframework/container-loader";
import { requestFluidObject } from "@fluidframework/runtime-utils";
import { ITestFluidObject } from "@fluidframework/test-utils";
import {
generateLocalNonCompatTest,
ITestObjectProvider,
} from "./compatUtils";

async function ensureConnected(container: Container) {
if (!container.connected) {
await new Promise((resolve, rejected) => container.on("connected", resolve));
}
}

const tests = (args: ITestObjectProvider) => {
let container1: Container;
let dataObject1: ITestFluidObject;
beforeEach(async () => {
container1 = await args.makeTestContainer() as Container;
dataObject1 = await requestFluidObject<ITestFluidObject>(container1, "default");
await ensureConnected(container1);
});

it("Create and load", async () => {
// after detach create, we are in view only mode
assert(!container1.deltaManager.active);

// shouldn't be a leader in view only mode
assert(!dataObject1.context.leader);

const container2 = await args.loadTestContainer() as Container;
await ensureConnected(container2);
await args.opProcessingController.process();
const dataObject2 = await requestFluidObject<ITestFluidObject>(container2, "default");

// Currently, we load a container in write mode from the start. See issue #3304.
// Once that is fix, this needs to change
assert(container2.deltaManager.active);
assert(dataObject2.context.leader);
});

interface ListenerConfig { dataObject: ITestFluidObject, name: string, leader: boolean, notleader: boolean }
const setupListener = (config: ListenerConfig) => {
config.dataObject.runtime.on("leader", () => {
assert(config.leader, `leader event not expected in ${config.name}`);
config.leader = false;
});

config.dataObject.runtime.on("notleader", () => {
assert(config.notleader, `notleader event not expected in ${config.name}`);
config.notleader = false;
});
};

const checkExpected = (config: ListenerConfig) => {
assert(!config.leader, `Missing leader event on ${config.name}`);
assert(!config.notleader, `Missing notleader event on ${config.name}`);
};

it("View to write mode", async () => {
const config = { dataObject: dataObject1, name: "dataObject1", leader: true, notleader: false };
setupListener(config);

// write something to get out of view only mode and take leadership
dataObject1.root.set("blah", "blah");
await args.opProcessingController.process();

checkExpected(config);
assert(dataObject1.context.leader);
});

it("force read only", async () => {
// write something to get out of view only mode and take leadership
dataObject1.root.set("blah", "blah");
await args.opProcessingController.process();

const config = { dataObject: dataObject1, name: "dataObject1", leader: false, notleader: true };
setupListener(config);

container1.forceReadonly(true);
await args.opProcessingController.process();

checkExpected(config);
assert(!dataObject1.context.leader);
});

it("Events on close", async () => {
// write something to get out of view only mode and take leadership
dataObject1.root.set("blah", "blah");

const container2 = await args.loadTestContainer() as Container;
const dataObject2 = await requestFluidObject<ITestFluidObject>(container2, "default");

// Currently, we load a container in write mode from the start. See issue #3304.
// Once that is fix, this needs to change
await ensureConnected(container2);
await args.opProcessingController.process();

assert(dataObject1.context.leader);
assert(!dataObject2.context.leader);

const config1 = { dataObject: dataObject1, name: "dataObject1", leader: false, notleader: true };
const config2 = { dataObject: dataObject2, name: "dataObject2", leader: true, notleader: false };
setupListener(config1);
setupListener(config2);

container1.close();

await args.opProcessingController.process();

checkExpected(config1);
checkExpected(config2);
assert(!dataObject1.context.leader);
assert(dataObject2.context.leader);
});

it("Concurrent update", async () => {
// write something to get out of view only mode and take leadership
dataObject1.root.set("blah", "blah");
await args.opProcessingController.process();
assert(dataObject1.context.leader);

const container2 = await args.loadTestContainer() as Container;
const dataObject2 = await requestFluidObject<ITestFluidObject>(container2, "default");

const container3 = await args.loadTestContainer() as Container;
const dataObject3 = await requestFluidObject<ITestFluidObject>(container3, "default");

// Currently, we load a container in write mode from the start. See issue #3304.
// Once that is fix, this needs to change
await Promise.all([ensureConnected(container2), ensureConnected(container3)]);
await args.opProcessingController.process();

assert(dataObject1.context.leader);
assert(!dataObject2.context.leader);
assert(!dataObject3.context.leader);

await args.opProcessingController.pauseProcessing();

const config2 = { dataObject: dataObject2, name: "dataObject2", leader: false, notleader: false };
const config3 = { dataObject: dataObject3, name: "dataObject3", leader: false, notleader: false };
setupListener(config2);
setupListener(config3);

container1.close();

// Process all the leave message
await args.opProcessingController.processIncoming();

// No one should be a leader yet
assert(!dataObject1.context.leader);
assert(!dataObject2.context.leader);
assert(!dataObject3.context.leader);

config2.leader = true;
config3.leader = true;

await args.opProcessingController.process();
assert((dataObject2.context.leader || dataObject3.context.leader) &&
(!dataObject2.context.leader || !dataObject3.context.leader),
"only one container should be the leader");

if (dataObject2.context.leader) {
assert(config3.leader);
config3.leader = false;
} else if (dataObject3.context.leader) {
assert(config2.leader);
config2.leader = false;
}
checkExpected(config2);
checkExpected(config3);
});
};

describe("Leader", () => {
generateLocalNonCompatTest(tests, { tinylicious: true });
});
43 changes: 38 additions & 5 deletions packages/test/test-utils/src/opProcessingController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ class DeltaManagerToggle {
*/
class DeltaManagerMonitor extends DeltaManagerToggle {
private pendingCount: number = 0;
private clientId: string | undefined;
public clientId: string | undefined;
public readMode = true;
private firstClientSequenceNumber: number = -1;
private lastOutbound: IDocumentMessage | undefined;
private readonly pendingLeaveClientIds = new Set<string>();
private readonly lastInboundPerClient = new Map<string, ISequencedDocumentMessage>();
private pendingWriteConnection = false;

Expand Down Expand Up @@ -153,11 +155,12 @@ class DeltaManagerMonitor extends DeltaManagerToggle {

public hasPendingWork() {
return !this.deltaManager.disposed
&& (this.pendingWriteConnection || this.pendingCount !== 0);
&& (this.pendingWriteConnection || this.pendingCount !== 0 || this.pendingLeaveClientIds.size !== 0);
}

private connect(clientId: string) {
this.clientId = clientId;
this.readMode = !this.deltaManager.active;
this.trace("CON");
}
private inbound(message: ISequencedDocumentMessage) {
Expand All @@ -168,6 +171,7 @@ class DeltaManagerMonitor extends DeltaManagerToggle {
const systemLeaveMessage = message as ISequencedDocumentSystemMessage;
const clientId = JSON.parse(systemLeaveMessage.data) as string;
this.lastInboundPerClient.delete(clientId);
this.pendingLeaveClientIds.delete(clientId);
}

if (this.clientId === undefined) {
Expand All @@ -189,9 +193,10 @@ class DeltaManagerMonitor extends DeltaManagerToggle {

// Need to filter system messages
switch (message.type) {
// These are generated by the server, don't count
case MessageType.ClientJoin:
case MessageType.ClientLeave:
assert(false, "join and leave message shouldn't have clientId");
// These are generated by the server, don't count
case MessageType.NoOp:
case MessageType.NoClient:
this.trace("SEQ", message.type);
Expand Down Expand Up @@ -222,7 +227,7 @@ class DeltaManagerMonitor extends DeltaManagerToggle {
// to bump min seq
if (message.type !== MessageType.NoOp) {
this.pendingCount++;
this.lastOutbound = message;
this.lastOutbound = message;
}
this.trace("OUT", message.type);
}
Expand All @@ -232,6 +237,11 @@ class DeltaManagerMonitor extends DeltaManagerToggle {
debug(`DeltaConnectionMonitor: ${action.padEnd(3)}: ${this.clientId} `
+ `pending:${this.pendingCount} seq:${this.latestSequenceNumber} ${op ?? ""}`);
}

public onClientDisconnect(clientId: string) {
// Keep track of a list of clientIds that we expect leave message from
this.pendingLeaveClientIds.add(clientId);
}
}
/**
* @deprecated OpProcessingController has been improved to not need server information and work against other servers.
Expand Down Expand Up @@ -277,7 +287,30 @@ export class OpProcessingController {
*/
public addDeltaManagers(...deltaManagers: DeltaManager[]) {
deltaManagers.forEach((deltaManager) => {
this.deltaManagerMonitors.set(deltaManager, new DeltaManagerMonitor(deltaManager));
const monitorSetup = (monitor1: DeltaManagerMonitor, monitor2: DeltaManagerMonitor) => {
if (monitor1.clientId !== undefined && monitor1.deltaManager.active) {
const clientId = monitor1.clientId;
monitor1.deltaManager.once("disconnect", () => {
monitor2.onClientDisconnect(clientId);
});
}
monitor1.deltaManager.on("connect", (details) => {
if (monitor1.deltaManager.active) {
monitor1.deltaManager.once("disconnect", () => {
monitor2.onClientDisconnect(details.clientId);
});
}
});
};

// Wire up event listener so we can keep track of leave message that we expects
const newMonitor = new DeltaManagerMonitor(deltaManager);
for (const monitor of this.deltaManagerMonitors.values()) {
monitorSetup(newMonitor, monitor);
monitorSetup(monitor, newMonitor);
}

this.deltaManagerMonitors.set(deltaManager, newMonitor);
});
}

Expand Down

0 comments on commit 4e06335

Please sign in to comment.