From 50a83ec851791c4ec687657ffb3c455599e1e8d7 Mon Sep 17 00:00:00 2001 From: Sanjay Raveendran Date: Fri, 13 Dec 2024 16:00:33 -0800 Subject: [PATCH] Clear storage cache to workaround race conditions --- apps/hubble/src/storage/engine/index.ts | 10 ++++++++++ apps/hubble/src/storage/stores/storageCache.ts | 8 +++++++- apps/hubble/src/storage/stores/storeEventHandler.ts | 9 +++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index b8727378ab..b80446cdcc 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -540,6 +540,7 @@ class Engine extends TypedEmitter { } async pruneMessages(fid: number): HubAsyncResult { + await this.clearStorageCacheForFid(fid); const logPruneResult = (result: HubResult, store: string): number => { return result.match( (ids) => { @@ -968,6 +969,7 @@ class Engine extends TypedEmitter { return err(validatedFid.error); } + await this.clearStorageCacheForFid(fid); const slot = await this.eventHandler.getCurrentStorageSlotForFid(fid); if (slot.isErr()) { @@ -1004,6 +1006,14 @@ class Engine extends TypedEmitter { }); } + async clearStorageCacheForFid(fid: number): HubAsyncResult { + const limits = getStoreLimits([]); + for (const limit of limits) { + await this.eventHandler.clearCachedMessageCount(fid, limit.storeType); + } + return ok(undefined); + } + async getUserNameProof(name: Uint8Array, retries = 1): HubAsyncResult { const nameString = bytesToUtf8String(name); if (nameString.isErr()) { diff --git a/apps/hubble/src/storage/stores/storageCache.ts b/apps/hubble/src/storage/stores/storageCache.ts index 8d29a8f3e1..94c4c2adf7 100644 --- a/apps/hubble/src/storage/stores/storageCache.ts +++ b/apps/hubble/src/storage/stores/storageCache.ts @@ -17,7 +17,7 @@ import { } from "@farcaster/hub-nodejs"; import { err, ok } from "neverthrow"; import RocksDB from "../db/rocksdb.js"; -import { FID_BYTES, OnChainEventPostfix, RootPrefix, UserMessagePostfix } from "../db/types.js"; +import { FID_BYTES, OnChainEventPostfix, RootPrefix, UserMessagePostfix, UserPostfix } from "../db/types.js"; import { logger } from "../../utils/logger.js"; import { makeFidKey, makeMessagePrimaryKey, makeTsHash, typeToSetPostfix } from "../db/message.js"; import { bytesCompare, getFarcasterTime, HubAsyncResult } from "@farcaster/core"; @@ -151,6 +151,12 @@ export class StorageCache { } } + async clearMessageCount(fid: number, set: UserMessagePostfix): Promise { + this._counts.delete(makeKey(fid, set)); + this._earliestTsHashes.delete(makeKey(fid, set)); + await this.getMessageCount(fid, set, true); + } + async getCurrentStorageSlotForFid(fid: number): HubAsyncResult { let slot = this._activeStorageSlots.get(fid); diff --git a/apps/hubble/src/storage/stores/storeEventHandler.ts b/apps/hubble/src/storage/stores/storeEventHandler.ts index 5fbaa0915c..386501aeef 100644 --- a/apps/hubble/src/storage/stores/storeEventHandler.ts +++ b/apps/hubble/src/storage/stores/storeEventHandler.ts @@ -227,6 +227,15 @@ class StoreEventHandler extends TypedEmitter { return await this._storageCache.getMessageCount(fid, set, forceFetch); } + async clearCachedMessageCount(fid: number, store: StoreType): HubAsyncResult { + const set = STORE_TO_SET[store]; + if (!set) { + return err(new HubError("bad_request.invalid_param", `invalid store type ${store}`)); + } + await this._storageCache.clearMessageCount(fid, set); + return ok(undefined); + } + async getMaxMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult { const slot = await this.getCurrentStorageSlotForFid(fid);