Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup startup by caching rooms with active connections #892

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/893.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve startup time on redis configurations by caching the last known active rooms.
22 changes: 19 additions & 3 deletions src/Bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -684,12 +684,20 @@
const queue = new PQueue({
concurrency: 2,
});

// Set up already joined rooms
await queue.addAll(this.botUsersManager.joinedRooms.map((roomId) => async () => {
let allActiveJoinedRooms = await this.storage.getAllRoomsWithActiveConnections();
if (!allActiveJoinedRooms.length) {
allActiveJoinedRooms = this.botUsersManager.joinedRooms;
}
log.info(`Found ${allActiveJoinedRooms.length} active rooms`);

const loadRoom = async (roomId: string) => {
log.debug("Fetching state for " + roomId);

try {
await connManager.createConnectionsForRoomId(roomId, false);
this.storage.addRoomHasActiveConnections(roomId);
} catch (ex) {
log.error(`Unable to create connection for ${roomId}`, ex);
return;
Expand Down Expand Up @@ -735,13 +743,16 @@
}
}
const adminRoom = await this.setUpAdminRoom(botUser.intent, roomId, accountData, notifContent || NotifFilter.getDefaultContent());
this.storage.addRoomHasActiveConnections(roomId);
// Call this on startup to set the state
await this.onAdminRoomSettingsChanged(adminRoom, accountData, { admin_user: accountData.admin_user });
log.debug(`Room ${roomId} is connected to: ${adminRoom.toString()}`);
} catch (ex) {
log.error(`Failed to set up admin room ${roomId}:`, ex);
}
}));
}

await queue.addAll(allActiveJoinedRooms.map(roomId => () => loadRoom(roomId)));

// Handle spaces
for (const discussion of connManager.getAllConnectionsOfType(GitHubDiscussionSpace)) {
Expand Down Expand Up @@ -774,8 +785,9 @@
if (this.config.metrics?.enabled) {
this.listener.bindResource('metrics', Metrics.expressRouter);
}
// This will load all the *active* connections
await queue.onIdle();
log.info(`All connections loaded`);
log.info(`All active connections loaded`);

// Load feeds after connections, to limit the chances of us double
// posting to rooms if a previous hookshot instance is being replaced.
Expand All @@ -794,6 +806,9 @@
await this.as.begin();
log.info(`Bridge is now ready. Found ${this.connectionManager.size} connections`);
this.ready = true;
const inactiveRooms = this.botUsersManager.joinedRooms.filter(rId => !allActiveJoinedRooms.includes(rId));
log.info(`Checking ${inactiveRooms.length} rooms with previously inactive state in the background`);
await queue.addAll(inactiveRooms.map(rId => () => loadRoom(rId)));
}

private async handleHookshotEvent<EventType, ConnType extends IConnection>(msg: MessageQueueMessageOut<EventType>, connection: ConnType, handler: (c: ConnType, data: EventType) => Promise<unknown>|unknown) {
Expand Down Expand Up @@ -920,7 +935,7 @@
}
log.info(`Got message roomId=${roomId} type=${event.type} from=${event.sender}`);
log.debug("Content:", JSON.stringify(event));
let processedReply: any;

Check warning on line 938 in src/Bridge.ts

View workflow job for this annotation

GitHub Actions / lint-node

Unexpected any. Specify a different type
let processedReplyMetadata: IRichReplyMetadata|undefined = undefined;
try {
processedReply = await this.replyProcessor.processEvent(event, this.as.botClient, EventKind.RoomEvent);
Expand Down Expand Up @@ -1384,6 +1399,7 @@
const adminRoom = new AdminRoom(
roomId, accountData, notifContent, intent, this.tokenStore, this.config, this.connectionManager,
);
this.storage.addRoomHasActiveConnections(roomId);

adminRoom.on("settings.changed", this.onAdminRoomSettingsChanged.bind(this));
adminRoom.on("open.project", async (project: ProjectsGetResponseData) => {
Expand Down
11 changes: 10 additions & 1 deletion src/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
}
this.connections.push(connection);
this.emit('new-connection', connection);
this.storage.addRoomHasActiveConnections(connection.roomId);
}
Metrics.connections.set(this.connections.length);
// Already exists, noop.
Expand Down Expand Up @@ -173,7 +174,7 @@
* @param rollbackBadState
* @returns
*/
public async createConnectionForState(roomId: string, state: StateEvent<any>, rollbackBadState: boolean): Promise<IConnection|undefined> {

Check warning on line 177 in src/ConnectionManager.ts

View workflow job for this annotation

GitHub Actions / lint-node

Unexpected any. Specify a different type
// Empty object == redacted
if (state.content.disabled === true || Object.keys(state.content).length === 0) {
log.debug(`${roomId} has disabled state for ${state.type}`);
Expand Down Expand Up @@ -394,6 +395,9 @@
this.connections.splice(connectionIndex, 1);
Metrics.connections.set(this.connections.length);
this.emit('connection-removed', connection);
if (this.getAllConnectionsForRoom(roomId).length === 0) {
this.storage.removeRoomHasActiveConnections(roomId);
}
}

/**
Expand All @@ -403,8 +407,13 @@
*/
public async removeConnectionsForRoom(roomId: string) {
log.info(`Removing all connections from ${roomId}`);
this.connections = this.connections.filter((c) => c.roomId !== roomId);
const removedConnections = this.connections.filter((c) => c.roomId === roomId);
this.connections = this.connections.filter((c) => !removedConnections.includes(c));
removedConnections.forEach(c => {
this.emit('connection-removed', c);
})
Metrics.connections.set(this.connections.length);
this.storage.removeRoomHasActiveConnections(roomId);
}

public registerProvisioningConnection(connType: {getProvisionerDetails: (botUserId: string) => GetConnectionTypeResponseItem}) {
Expand Down
10 changes: 10 additions & 0 deletions src/Stores/MemoryStorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,14 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
public async setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise<void> {
this.gitlabDiscussionThreads.set(connectionId, value);
}
public addRoomHasActiveConnections(): void {
// no-op: only used for startup speedups
}
public removeRoomHasActiveConnections(): void {
// no-op: only used for startup speedups
}
public async getAllRoomsWithActiveConnections(): Promise<string[]> {
// no-op: only used for startup speedups
return [];
}
}
22 changes: 17 additions & 5 deletions src/Stores/RedisStorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@ const GH_ISSUES_REVIEW_DATA_KEY = "gh.issues.review_data";
const FIGMA_EVENT_COMMENT_ID = "figma.comment_event_id";
const STORED_FILES_KEY = "storedfiles.";
const GL_DISCUSSIONTHREADS_KEY = "gl.discussion-threads";
const ACTIVE_ROOMS = "cache.active_rooms";
const STORED_FILES_EXPIRE_AFTER = 24 * 60 * 60; // 24 hours
const COMPLETED_TRANSACTIONS_EXPIRE_AFTER = 24 * 60 * 60; // 24 hours
const ISSUES_EXPIRE_AFTER = 7 * 24 * 60 * 60; // 7 days
const ISSUES_LAST_COMMENT_EXPIRE_AFTER = 14 * 24 * 60 * 60; // 7 days


const WIDGET_TOKENS = "widgets.tokens.";
const WIDGET_USER_TOKENS = "widgets.user-tokens.";

const FEED_GUIDS = "feeds.guids.";



const log = new Logger("RedisASProvider");

export class RedisStorageContextualProvider implements IStorageProvider {
Expand Down Expand Up @@ -229,4 +225,20 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
public async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null;
}

public addRoomHasActiveConnections(roomId: string): void {
this.redis.sadd(ACTIVE_ROOMS, roomId).catch((ex) => {
log.warn(`Failed to add ${roomId} to active rooms`, ex);
});
}

public removeRoomHasActiveConnections(roomId: string): void {
this.redis.srem(ACTIVE_ROOMS, roomId).catch((ex) => {
log.warn(`Failed to remove ${roomId} from active rooms`, ex);
});
}

public getAllRoomsWithActiveConnections(): Promise<string[]> {
return this.redis.smembers(ACTIVE_ROOMS);
}
}
3 changes: 3 additions & 0 deletions src/Stores/StorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto
storeFeedGuids(url: string, ...guid: string[]): Promise<void>;
hasSeenFeed(url: string, ...guid: string[]): Promise<boolean>;
hasSeenFeedGuid(url: string, guid: string): Promise<boolean>;
addRoomHasActiveConnections(roomId: string): void;
removeRoomHasActiveConnections(roomId: string): void;
getAllRoomsWithActiveConnections(): Promise<string[]>;
}
Loading