Skip to content

Commit

Permalink
Clear storage cache to workaround race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
sanjayprabhu committed Dec 14, 2024
1 parent 7b75406 commit 50a83ec
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
10 changes: 10 additions & 0 deletions apps/hubble/src/storage/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ class Engine extends TypedEmitter<EngineEvents> {
}

async pruneMessages(fid: number): HubAsyncResult<number> {
await this.clearStorageCacheForFid(fid);
const logPruneResult = (result: HubResult<number[]>, store: string): number => {
return result.match(
(ids) => {
Expand Down Expand Up @@ -968,6 +969,7 @@ class Engine extends TypedEmitter<EngineEvents> {
return err(validatedFid.error);
}

await this.clearStorageCacheForFid(fid);
const slot = await this.eventHandler.getCurrentStorageSlotForFid(fid);

if (slot.isErr()) {
Expand Down Expand Up @@ -1004,6 +1006,14 @@ class Engine extends TypedEmitter<EngineEvents> {
});
}

async clearStorageCacheForFid(fid: number): HubAsyncResult<void> {
const limits = getStoreLimits([]);
for (const limit of limits) {
await this.eventHandler.clearCachedMessageCount(fid, limit.storeType);
}
return ok(undefined);
}

async getUserNameProof(name: Uint8Array, retries = 1): HubAsyncResult<UserNameProof> {
const nameString = bytesToUtf8String(name);
if (nameString.isErr()) {
Expand Down
8 changes: 7 additions & 1 deletion apps/hubble/src/storage/stores/storageCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
} from "@farcaster/hub-nodejs";
import { err, ok } from "neverthrow";
import RocksDB from "../db/rocksdb.js";
import { FID_BYTES, OnChainEventPostfix, RootPrefix, UserMessagePostfix } from "../db/types.js";
import { FID_BYTES, OnChainEventPostfix, RootPrefix, UserMessagePostfix, UserPostfix } from "../db/types.js";
import { logger } from "../../utils/logger.js";
import { makeFidKey, makeMessagePrimaryKey, makeTsHash, typeToSetPostfix } from "../db/message.js";
import { bytesCompare, getFarcasterTime, HubAsyncResult } from "@farcaster/core";
Expand Down Expand Up @@ -151,6 +151,12 @@ export class StorageCache {
}
}

async clearMessageCount(fid: number, set: UserMessagePostfix): Promise<void> {
this._counts.delete(makeKey(fid, set));
this._earliestTsHashes.delete(makeKey(fid, set));
await this.getMessageCount(fid, set, true);
}

async getCurrentStorageSlotForFid(fid: number): HubAsyncResult<StorageSlot> {
let slot = this._activeStorageSlots.get(fid);

Expand Down
9 changes: 9 additions & 0 deletions apps/hubble/src/storage/stores/storeEventHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ class StoreEventHandler extends TypedEmitter<StoreEvents> {
return await this._storageCache.getMessageCount(fid, set, forceFetch);
}

async clearCachedMessageCount(fid: number, store: StoreType): HubAsyncResult<void> {
const set = STORE_TO_SET[store];
if (!set) {
return err(new HubError("bad_request.invalid_param", `invalid store type ${store}`));
}
await this._storageCache.clearMessageCount(fid, set);
return ok(undefined);
}

async getMaxMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult<number> {
const slot = await this.getCurrentStorageSlotForFid(fid);

Expand Down

0 comments on commit 50a83ec

Please sign in to comment.