Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deli read client fixes #9925

Merged
merged 3 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 75 additions & 24 deletions server/routerlicious/packages/lambdas/src/deli/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ import {
SignalOperationType,
ITicketedMessage,
IExtendClientControlMessageContents,
ITimedClient,
ISequencedSignalClient,
IClientManager,
} from "@fluidframework/server-services-core";
import {
CommonProperties,
Expand Down Expand Up @@ -220,13 +221,14 @@ export class DeliLambda extends TypedEventEmitter<IDeliLambdaEvents> implements
private readonly documentId: string,
readonly lastCheckpoint: IDeliState,
checkpointManager: IDeliCheckpointManager,
private readonly clientManager: IClientManager | undefined,
private readonly deltasProducer: IProducer,
private readonly signalsProducer: IProducer | undefined,
private readonly rawDeltasProducer: IProducer,
private readonly serviceConfiguration: IServiceConfiguration,
private sessionMetric: Lumber<LumberEventName.SessionResult> | undefined,
private sessionStartMetric: Lumber<LumberEventName.StartSessionResult> | undefined,
private readonly readClients: Map<string, ITimedClient> = new Map()) {
private readonly readClients: Map<string, ISequencedSignalClient> = new Map()) {
super();

// Instantiate existing clients
Expand Down Expand Up @@ -651,30 +653,40 @@ export class DeliLambda extends TypedEventEmitter<IDeliLambdaEvents> implements
`Op not allowed`);
}

let isReadClient = false;

// Handle client join/leave messages.
if (!message.clientId) {
if (message.operation.type === MessageType.ClientLeave) {
isReadClient = this.readClients.has(dataContent);
if (!isReadClient && !this.clientSeqManager.removeClient(dataContent)) {
const readClient = this.readClients.get(dataContent);
if (readClient) {
this.readClients.delete(dataContent);
return this.createSignalMessage(message, this.sequenceNumber, dataContent);
} else if (!this.clientSeqManager.removeClient(dataContent)) {
// Return if the client has already been removed due to a prior leave message.
return;
}
} else if (message.operation.type === MessageType.ClientJoin) {
const clientJoinMessage = dataContent as IClientJoin;

isReadClient = clientJoinMessage.detail.mode === "read";
if (isReadClient) {
if (clientJoinMessage.detail.mode === "read") {
if (this.readClients.has(clientJoinMessage.clientId)) {
// Return if the client has already been added due to a prior join message.
return;
}

this.readClients.set(clientJoinMessage.clientId, {
...clientJoinMessage.detail,
lastKeepAlive: Date.now(),
});
// create the signal message
const signalMessage = this.createSignalMessage(message, this.sequenceNumber, dataContent);

// store the read client in-memory, including the signal sequence numbers
const readClient: ISequencedSignalClient = {
client: clientJoinMessage.detail,
referenceSequenceNumber: (signalMessage.message.operation as any).referenceSequenceNumber,
clientConnectionNumber: (signalMessage.message.operation as any).clientConnectionNumber,
exp: Date.now() + this.serviceConfiguration.deli.clientTimeout,
};

this.readClients.set(clientJoinMessage.clientId, readClient);

return signalMessage;
} else {
const isNewClient = this.clientSeqManager.upsertClient(
clientJoinMessage.clientId,
Expand Down Expand Up @@ -738,11 +750,6 @@ export class DeliLambda extends TypedEventEmitter<IDeliLambdaEvents> implements

let sequenceNumber = this.sequenceNumber;

if (isReadClient) {
// create the signal message
return this.createSignalMessage(message, sequenceNumber, dataContent);
}

// Get the current sequence number and increment it if appropriate.
// We don't increment sequence number for noops sent by client since they will
// be consolidated and sent later as raw message.
Expand Down Expand Up @@ -894,10 +901,52 @@ export class DeliLambda extends TypedEventEmitter<IDeliLambdaEvents> implements
case ControlMessageType.ExtendClient: {
const controlContents = controlMessage.contents as IExtendClientControlMessageContents;

const readClient = this.readClients.get(controlContents.clientId);
if (readClient) {
// extend the clients livelihood
readClient.lastKeepAlive = Date.now();
const clientsToExtend: Map<string, ISequencedSignalClient> = new Map();

const clientIds = controlContents.clientIds ??
(controlContents.clientId ? [controlContents.clientId] : []);
for (const clientId of clientIds) {
const client = this.readClients.get(clientId);
if (client) {
clientsToExtend.set(clientId, client);
}
}

if (clientsToExtend.size > 0) {
if (this.clientManager) {
this.clientManager.extendSequencedClients(
this.tenantId,
this.documentId,
clientsToExtend,
this.serviceConfiguration.deli.clientTimeout)
.catch((error) => {
const errorMsg = "Could not extend clients";
this.context.log?.error(
`${errorMsg}: ${JSON.stringify(error)}`,
{
messageMetaData: {
documentId: this.documentId,
tenantId: this.tenantId,
},
});
Lumberjack.error(
errorMsg,
getLumberBaseProperties(this.documentId, this.tenantId), error);
});
} else {
const errorMsg = "Could not extend clients. Missing client manager";
this.context.log?.error(
`${errorMsg}`,
{
messageMetaData: {
documentId: this.documentId,
tenantId: this.tenantId,
},
});
Lumberjack.error(
errorMsg,
getLumberBaseProperties(this.documentId, this.tenantId));
}
}

break;
Expand Down Expand Up @@ -1065,14 +1114,16 @@ export class DeliLambda extends TypedEventEmitter<IDeliLambdaEvents> implements
}

/**
* Check if there are any old/idle read clients.
* Check if there are any expired read clients.
* The read client will expire if alfred has not sent
* an ExtendClient control message within the time for 'clientTimeout'.
* Craft and send a leave message for each one found.
*/
private checkIdleReadClients() {
const currentTime = Date.now();

for (const [clientId, { lastKeepAlive }] of this.readClients) {
if ((currentTime - lastKeepAlive) > this.serviceConfiguration.deli.clientTimeout) {
for (const [clientId, { exp }] of this.readClients) {
if (exp < currentTime) {
const leaveMessage = this.createLeaveMessage(clientId);
void this.sendToRawDeltas(leaveMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { inspect } from "util";
import { toUtf8 } from "@fluidframework/common-utils";
import { ICreateCommitParams, ICreateTreeEntry } from "@fluidframework/gitresources";
import {
IClientManager,
ICollection,
IContext,
IDeliState,
Expand Down Expand Up @@ -55,6 +56,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
private readonly operationsDbMongoManager: MongoManager,
private readonly collection: ICollection<IDocument>,
private readonly tenantManager: ITenantManager,
private readonly clientManager: IClientManager | undefined,
private readonly forwardProducer: IProducer,
private readonly signalProducer: IProducer | undefined,
private readonly reverseProducer: IProducer,
Expand Down Expand Up @@ -160,6 +162,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
documentId,
newCheckpoint,
checkpointManager,
this.clientManager,
// The producer as well it shouldn't take. Maybe it just gives an output stream?
this.forwardProducer,
this.signalProducer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ describe("Routerlicious", () => {
mongoManager,
testCollection,
testTenantManager,
undefined,
testForwardProducer,
undefined,
testReverseProducer,
Expand All @@ -120,6 +121,7 @@ describe("Routerlicious", () => {
mongoManager,
testCollection,
testTenantManager,
undefined,
testForwardProducer,
testSignalProducer,
testReverseProducer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ export class LocalOrderer implements IOrderer {
this.documentId,
lastCheckpoint,
checkpointManager,
undefined,
this.deltasKafka,
undefined,
this.rawDeltasKafka,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export async function deliCreate(config: Provider): Promise<core.IPartitionLambd
operationsDbMongoManager,
collection,
tenantManager,
undefined,
combinedProducer,
undefined,
reverseProducer,
Expand Down
37 changes: 33 additions & 4 deletions server/routerlicious/packages/services-core/src/clientManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,30 @@

import { IClient, ISignalClient, ISignalMessage } from "@fluidframework/protocol-definitions";

export interface ITimedClient extends IClient {
/**
* Represents a client that has some sequence numbers attached
*/
export interface ISequencedSignalClient {
/**
* The client object
*/
client: IClient;

/**
* Counts the number of signals sent by the client
*/
clientConnectionNumber: number;

/**
* Timestamp for the last time deli heard about this client
* Sequence number that indicates when the signal was created in relation to the delta stream
*/
lastKeepAlive: number;
referenceSequenceNumber: number;

/**
* The time when the client will expire.
* The client will expire if Date.now() exceeds this value.
*/
exp: number;
}

/**
Expand Down Expand Up @@ -40,5 +59,15 @@ export interface IClientManager {
* Returns all clients currently connected including a keep alive time.
* Should be used with delis read only client functionality.
*/
getTimedClients?(tenantId: string, documentId: string): Promise<Map<string, ITimedClient>>;
getSequencedClients(tenantId: string, documentId: string): Promise<Map<string, ISequencedSignalClient>>;

/**
* Called when the expiration time of clients should be extended.
* @param clientTimeout Amount of time in milliseconds to add to the clients expiration time.
*/
extendSequencedClients(
tenantId: string,
documentId: string,
clients: Map<string, ISequencedSignalClient>,
clientTimeout: number): Promise<void>;
}
3 changes: 2 additions & 1 deletion server/routerlicious/packages/services-core/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,5 +250,6 @@ export interface ILambdaStartControlMessageContents {
}

export interface IExtendClientControlMessageContents {
clientId: string;
clientId?: string;
clientIds?: string[];
}
24 changes: 23 additions & 1 deletion server/routerlicious/packages/services/src/redisClientManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import { IClient, ISignalClient } from "@fluidframework/protocol-definitions";
import { IClientManager } from "@fluidframework/server-services-core";
import { IClientManager, ISequencedSignalClient } from "@fluidframework/server-services-core";
import { executeRedisMultiWithHmsetExpire, IRedisParameters } from "@fluidframework/server-services-utils";
import { Redis } from "ioredis";
import * as winston from "winston";
Expand Down Expand Up @@ -62,6 +62,28 @@ export class ClientManager implements IClientManager {
return clients;
}

/**
* Returns all clients currently connected including a keep alive time.
* Should be used with delis read only client functionality.
*/
public async getSequencedClients(
tenantId: string,
documentId: string): Promise<Map<string, ISequencedSignalClient>> {
throw new Error("Not implemented");
}

/**
* Called when the expiration time of clients should be extended.
* @param clientTimeout Amount of time in milliseconds to add to the clients expiration time.
*/
public async extendSequencedClients(
tenantId: string,
documentId: string,
clients: Map<string, ISequencedSignalClient>,
clientTimeout: number): Promise<void> {
throw new Error("Not implemented");
}

private getKey(tenantId: string, documentId: string): string {
return `${this.prefix}:${tenantId}:${documentId}`;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import { IClient, ISignalClient } from "@fluidframework/protocol-definitions";
import { IClientManager } from "@fluidframework/server-services-core";
import { IClientManager, ISequencedSignalClient } from "@fluidframework/server-services-core";

export class TestClientManager implements IClientManager {
private readonly clients: Map<string, Map<string, Map<string, IClient>>> = new Map();
Expand All @@ -19,11 +19,13 @@ export class TestClientManager implements IClientManager {

this.clients.get(tenantId).get(documentId).set(clientId, details);
}

public async removeClient(tenantId: string, documentId: string, clientId: string): Promise<void> {
if (this.clients.has(tenantId) && this.clients.get(tenantId).has(documentId)) {
this.clients.get(tenantId).get(documentId).delete(clientId);
}
}

public async getClients(tenantId: string, documentId: string): Promise<ISignalClient[]> {
const signalClients: ISignalClient[] = [];
if (this.clients.has(tenantId) && this.clients.get(tenantId).has(documentId)) {
Expand All @@ -36,4 +38,26 @@ export class TestClientManager implements IClientManager {
}
return signalClients;
}

/**
* Returns all clients currently connected including a keep alive time.
* Should be used with delis read only client functionality.
*/
public async getSequencedClients(
tenantId: string,
documentId: string): Promise<Map<string, ISequencedSignalClient>> {
throw new Error("Not implemented");
}

/**
* Called when the expiration time of clients should be extended.
* @param clientTimeout Amount of time in milliseconds to add to the clients expiration time.
*/
public async extendSequencedClients(
tenantId: string,
documentId: string,
clients: Map<string, ISequencedSignalClient>,
clientTimeout: number): Promise<void> {
throw new Error("Not implemented");
}
}