Skip to content

Commit

Permalink
Enabled graceful Redis shutdown in r11 (microsoft#23212)
Browse files Browse the repository at this point in the history
## Description

This PR adds supports for graceful shutdown of Redis client connections.
Prior to this fix, all pod terminations would result in a lot of Redis
client errors. SHutting down connections would prevent this.

## Breaking Changes

Adds a new resource to the `runnerFactory` of `Alfred`, `Nexus` and
`Riddler`. This change could be breaking for FRS and will be fixed by
consuming the latest OSS package into the repo. Added changesets for
these.
  • Loading branch information
dhr-verma authored Dec 2, 2024
1 parent ff159ee commit 807f880
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 7 deletions.
7 changes: 7 additions & 0 deletions server/routerlicious/.changeset/dirty-keys-follow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@fluidframework/server-services-shared": minor
---

Created a utility function for Redis connection handling

Exported a new function that helps shut down Redis connections using the `quit()` command.
7 changes: 7 additions & 0 deletions server/routerlicious/.changeset/new-eagles-drive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@fluidframework/server-lambdas": minor
---

Added a new event - `dispose` - which is triggered when `.dispose()` is called

This event is triggered when disposing factory resources. It can be used to trigger other graceful shutdown methods.
7 changes: 7 additions & 0 deletions server/routerlicious/.changeset/silly-papayas-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@fluidframework/server-routerlicious-base": major
---

Added support for Redis graceful shutdown

This PR adds a way to ensure that Redis connections are gracefully shut down when disposing service factory resources. There is a new required param, `redisClientConnectionManagers`, in the Nexus, Alfred, and Riddler RunnerFactories. This is scoped to r11s-base.
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ export class DeliLambdaFactory
}

public async dispose(): Promise<void> {
// Emit this event to close the broadcasterLambda and publisher
this.emit("dispose");
const mongoClosedP = this.operationsDbMongoManager.close();
const forwardProducerClosedP = this.forwardProducer.close();
const signalProducerClosedP = this.signalProducer?.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
} from "./services";
import { IAlfredResourcesCustomizations } from ".";
import { IReadinessCheck } from "@fluidframework/server-services-core";
import { StartupCheck } from "@fluidframework/server-services-shared";
import { closeRedisClientConnections, StartupCheck } from "@fluidframework/server-services-shared";

/**
* @internal
Expand All @@ -46,6 +46,7 @@ export class AlfredResources implements core.IResources {
public documentRepository: core.IDocumentRepository,
public documentDeleteService: IDocumentDeleteService,
public startupCheck: IReadinessCheck,
public redisClientConnectionManagers: utils.IRedisClientConnectionManager[],
public tokenRevocationManager?: core.ITokenRevocationManager,
public revokedTokenChecker?: core.IRevokedTokenChecker,
public serviceMessageResourceManager?: core.IServiceMessageResourceManager,
Expand Down Expand Up @@ -73,11 +74,15 @@ export class AlfredResources implements core.IResources {
const serviceMessageManagerP = this.serviceMessageResourceManager
? this.serviceMessageResourceManager.close()
: Promise.resolve();
const redisClientConnectionManagersP = closeRedisClientConnections(
this.redisClientConnectionManagers,
);
await Promise.all([
producerClosedP,
mongoClosedP,
tokenRevocationManagerP,
serviceMessageManagerP,
redisClientConnectionManagersP,
]);
}
}
Expand Down Expand Up @@ -105,6 +110,8 @@ export class AlfredResourcesFactory implements core.IResourcesFactory<AlfredReso
);
const eventHubConnString: string = config.get("kafka:lib:eventHubConnString");
const oauthBearerConfig = config.get("kafka:lib:oauthBearerConfig");
// List of Redis client connection managers that need to be closed on dispose
const redisClientConnectionManagers: utils.IRedisClientConnectionManager[] = [];

const producer = services.createProducer(
kafkaLibrary,
Expand Down Expand Up @@ -146,6 +153,7 @@ export class AlfredResourcesFactory implements core.IResourcesFactory<AlfredReso
redisConfig2.slotsRefreshTimeout,
retryDelays,
);
redisClientConnectionManagers.push(redisClientConnectionManagerForJwtCache);
const redisJwtCache = new services.RedisCache(redisClientConnectionManagerForJwtCache);

// Database connection for global db if enabled
Expand Down Expand Up @@ -230,6 +238,7 @@ export class AlfredResourcesFactory implements core.IResourcesFactory<AlfredReso
redisConfigForThrottling.slotsRefreshTimeout,
retryDelays,
);
redisClientConnectionManagers.push(redisClientConnectionManagerForThrottling);

const redisThrottleAndUsageStorageManager =
new services.RedisThrottleAndUsageStorageManager(
Expand Down Expand Up @@ -400,6 +409,7 @@ export class AlfredResourcesFactory implements core.IResourcesFactory<AlfredReso
documentRepository,
documentDeleteService,
startupCheck,
redisClientConnectionManagers,
tokenRevocationManager,
revokedTokenChecker,
serviceMessageResourceManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { StorageNameAllocator } from "./services";
import { INexusResourcesCustomizations } from "./customizations";
import { OrdererManager, type IOrdererManagerOptions } from "./ordererManager";
import { IReadinessCheck } from "@fluidframework/server-services-core";
import { StartupCheck } from "@fluidframework/server-services-shared";
import { closeRedisClientConnections, StartupCheck } from "@fluidframework/server-services-shared";

class NodeWebSocketServer implements core.IWebSocketServer {
private readonly webSocketServer: ws.Server;
Expand Down Expand Up @@ -68,6 +68,7 @@ export class NexusResources implements core.IResources {
public documentsCollectionName: string,
public metricClientConfig: any,
public startupCheck: IReadinessCheck,
public redisClientConnectionManagers: utils.IRedisClientConnectionManager[],
public throttleAndUsageStorageManager?: core.IThrottleAndUsageStorageManager,
public verifyMaxMessageSize?: boolean,
public redisCache?: core.ICache,
Expand All @@ -89,7 +90,15 @@ export class NexusResources implements core.IResources {
const serviceMessageManagerP = this.serviceMessageResourceManager
? this.serviceMessageResourceManager.close()
: Promise.resolve();
await Promise.all([mongoClosedP, tokenRevocationManagerP, serviceMessageManagerP]);
const redisClientConnectionManagersP = closeRedisClientConnections(
this.redisClientConnectionManagers,
);
await Promise.all([
mongoClosedP,
tokenRevocationManagerP,
serviceMessageManagerP,
redisClientConnectionManagersP,
]);
}
}

Expand Down Expand Up @@ -117,6 +126,8 @@ export class NexusResourcesFactory implements core.IResourcesFactory<NexusResour
);
const eventHubConnString: string = config.get("kafka:lib:eventHubConnString");
const oauthBearerConfig = config.get("kafka:lib:oauthBearerConfig");
// List of Redis client connection managers that need to be closed on dispose
const redisClientConnectionManagers: utils.IRedisClientConnectionManager[] = [];

const producer = services.createProducer(
kafkaLibrary,
Expand Down Expand Up @@ -160,6 +171,7 @@ export class NexusResourcesFactory implements core.IResourcesFactory<NexusResour
redisConfig2.slotsRefreshTimeout,
retryDelays,
);
redisClientConnectionManagers.push(redisClientConnectionManager);

const clientManager = new services.ClientManager(
redisClientConnectionManager,
Expand Down Expand Up @@ -215,6 +227,7 @@ export class NexusResourcesFactory implements core.IResourcesFactory<NexusResour
redisConfig2.enableClustering,
redisConfig2.slotsRefreshTimeout,
);
redisClientConnectionManagers.push(redisClientConnectionManagerForJwtCache);
const redisJwtCache = new services.RedisCache(redisClientConnectionManagerForJwtCache);

// Database connection for global db if enabled
Expand Down Expand Up @@ -305,6 +318,7 @@ export class NexusResourcesFactory implements core.IResourcesFactory<NexusResour
redisConfigForThrottling.slotsRefreshTimeout,
retryDelays,
);
redisClientConnectionManagers.push(redisClientConnectionManagerForThrottling);

const redisThrottleAndUsageStorageManager =
new services.RedisThrottleAndUsageStorageManager(
Expand Down Expand Up @@ -426,6 +440,7 @@ export class NexusResourcesFactory implements core.IResourcesFactory<NexusResour
redisConfig.enableClustering,
redisConfig.slotsRefreshTimeout,
);
redisClientConnectionManagers.push(redisClientConnectionManagerForLogging);

redisCache = new services.RedisCache(redisClientConnectionManagerForLogging);
}
Expand Down Expand Up @@ -500,6 +515,9 @@ export class NexusResourcesFactory implements core.IResourcesFactory<NexusResour

const webSocketLibrary = config.get("nexus:webSocketLib");

// Do not add the pub/sub connection manager to the list of managers to close
// as these are gracefully closed by the web server factory
// server/routerlicious/packages/services-shared/src/socketIoServer.ts Line 330
const redisClientConnectionManagerForPub =
customizations?.redisClientConnectionManagerForPub
? customizations.redisClientConnectionManagerForPub
Expand Down Expand Up @@ -565,6 +583,7 @@ export class NexusResourcesFactory implements core.IResourcesFactory<NexusResour
documentsCollectionName,
metricClientConfig,
startupCheck,
redisClientConnectionManagers,
redisThrottleAndUsageStorageManager,
verifyMaxMessageSize,
redisCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { RiddlerRunner } from "./runner";
import { ITenantDocument } from "./tenantManager";
import { IRiddlerResourcesCustomizations } from "./customizations";
import { ITenantRepository, MongoTenantRepository } from "./mongoTenantRepository";
import { StartupCheck } from "@fluidframework/server-services-shared";
import { closeRedisClientConnections, StartupCheck } from "@fluidframework/server-services-shared";

/**
* @internal
Expand All @@ -48,6 +48,7 @@ export class RiddlerResources implements IResources {
public readonly riddlerStorageRequestMetricIntervalMs: number,
public readonly tenantKeyGenerator: utils.ITenantKeyGenerator,
public readonly startupCheck: IReadinessCheck,
public readonly redisClientConnectionManagers: utils.IRedisClientConnectionManager[],
public readonly cache?: RedisCache,
public readonly readinessCheck?: IReadinessCheck,
) {
Expand All @@ -62,7 +63,11 @@ export class RiddlerResources implements IResources {
}

public async dispose(): Promise<void> {
await this.mongoManager.close();
const mongoManagerCloseP = this.mongoManager.close();
const redisClientConnectionManagersCloseP = closeRedisClientConnections(
this.redisClientConnectionManagers,
);
await Promise.all([mongoManagerCloseP, redisClientConnectionManagersCloseP]);
}
}

Expand All @@ -77,6 +82,8 @@ export class RiddlerResourcesFactory implements IResourcesFactory<RiddlerResourc
// Cache connection
const redisConfig = config.get("redisForTenantCache");
let cache: RedisCache | undefined;
// List of Redis client connection managers that need to be closed on dispose
const redisClientConnectionManagers: utils.IRedisClientConnectionManager[] = [];
if (redisConfig) {
const redisParams = {
expireAfterSeconds: redisConfig.keyExpireAfterSeconds as number | undefined,
Expand All @@ -100,6 +107,7 @@ export class RiddlerResourcesFactory implements IResourcesFactory<RiddlerResourc
redisConfig.slotsRefreshTimeout,
retryDelays,
);
redisClientConnectionManagers.push(redisClientConnectionManagerForTenantCache);
cache = new RedisCache(redisClientConnectionManagerForTenantCache, redisParams);
}
// Database connection
Expand Down Expand Up @@ -187,6 +195,7 @@ export class RiddlerResourcesFactory implements IResourcesFactory<RiddlerResourc
riddlerStorageRequestMetricIntervalMs,
tenantKeyGenerator,
startupCheck,
redisClientConnectionManagers,
cache,
customizations?.readinessCheck,
);
Expand Down
12 changes: 11 additions & 1 deletion server/routerlicious/packages/routerlicious/src/deli/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ export async function deliCreate(
redisConfig.enableClustering,
redisConfig.slotsRefreshTimeout,
);
// The socketioredispublisher handles redis connection graceful shutdown
const publisher = new services.SocketIoRedisPublisher(redisClientConnectionManager);
publisher.on("error", (err) => {
winston.error("Error with Redis Publisher:", err);
Expand Down Expand Up @@ -204,7 +205,7 @@ export async function deliCreate(
localCheckpointEnabled,
);

return new DeliLambdaFactory(
const deliLambdaFactory = new DeliLambdaFactory(
operationsDbManager,
documentRepository,
checkpointService,
Expand All @@ -216,6 +217,15 @@ export async function deliCreate(
serviceConfiguration,
customizations?.clusterDrainingChecker,
);

deliLambdaFactory.on("dispose", () => {
broadcasterLambda.close();
publisher.close().catch((error) => {
Lumberjack.error("Error closing publisher", undefined, error);
});
});

return deliLambdaFactory;
}

export async function create(
Expand Down
2 changes: 1 addition & 1 deletion server/routerlicious/packages/services-shared/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export {
} from "./redisSocketIoAdapter";
export { decodeHeader, RestLessServer } from "./restLessServer";
export { run, runService } from "./runner";
export { runnerHttpServerStop } from "./runnerUtils";
export { runnerHttpServerStop, closeRedisClientConnections } from "./runnerUtils";
export { SocketIoAdapterCreator } from "./socketIoServer";
export { DocumentStorage } from "./storage";
export {
Expand Down
37 changes: 37 additions & 0 deletions server/routerlicious/packages/services-shared/src/runnerUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,43 @@ import { IWebServer } from "@fluidframework/server-services-core";
import { Lumber, Lumberjack } from "@fluidframework/server-services-telemetry";
import { promiseTimeout } from "@fluidframework/server-services-client";
import { Deferred } from "@fluidframework/common-utils";
import type { IRedisClientConnectionManager } from "@fluidframework/server-services-utils";

/**
* @internal
*/
export async function closeRedisClientConnections(
redisClientConnectionManagers: IRedisClientConnectionManager[],
): Promise<void> {
const redisClientsQuittingP: Promise<void | string>[] = [];
for (const redisClientConnectionManager of redisClientConnectionManagers) {
try {
const redisClient = redisClientConnectionManager.getRedisClient();
redisClientsQuittingP.push(
redisClient
.quit()
.then(() => {
Lumberjack.info("Redis client quit");
return "OK";
})
.catch((error) => {
if (error?.message === "Connection is closed.") {
// Ignore the error caused by disconnecting since
// we're disconnecting...
// https://github.com/redis/ioredis/blob/v4/lib/cluster/index.ts#L415
Lumberjack.info("Redis client already closed");
return "OK";
}
Lumberjack.error("Failed to quit redis client", undefined, error);
}),
);
} catch (error) {
Lumberjack.error("Failed to add redis client to promise list", undefined, error);
}
}

await Promise.all(redisClientsQuittingP);
}

/**
* @internal
Expand Down

0 comments on commit 807f880

Please sign in to comment.