Skip to content

Commit

Permalink
feat: Add ActiveSigners cache (#1657)
Browse files Browse the repository at this point in the history
* feat: Add ActiveSigners cache

* test

* cleanup
  • Loading branch information
adityapk00 authored Feb 5, 2024
1 parent dc36db0 commit 6ec1b4d
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/fifty-carrots-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

feat: Add a LRU cache to the active signers
4 changes: 4 additions & 0 deletions apps/hubble/src/rpc/test/signerService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ beforeAll(async () => {
client = getInsecureHubRpcClient(`127.0.0.1:${port}`);
});

beforeEach(async () => {
engine.clearCache();
});

afterAll(async () => {
client.close();
await syncEngine.stop();
Expand Down
9 changes: 9 additions & 0 deletions apps/hubble/src/storage/engine/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ beforeAll(async () => {
);
});

beforeEach(async () => {
engine.clearCache();
});

afterAll(async () => {
await engine.stop();
await db.close();
});

describe("mergeOnChainEvent", () => {
test("succeeds", async () => {
await expect(engine.mergeOnChainEvent(custodyEvent)).resolves.toBeInstanceOf(Ok);
Expand Down
4 changes: 4 additions & 0 deletions apps/hubble/src/storage/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ class Engine extends TypedEmitter<EngineEvents> {
return this._db;
}

clearCache() {
this._onchainEventsStore.clearActiveSignerCache();
}

async mergeMessages(messages: Message[]): Promise<Array<HubResult<number>>> {
return Promise.all(messages.map((message) => this.mergeMessage(message)));
}
Expand Down
46 changes: 46 additions & 0 deletions apps/hubble/src/storage/stores/onChainEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ import { bytesCompare } from "@farcaster/core";

const SUPPORTED_SIGNER_SCHEMES = [1];

const LRU_CACHE_SIZE = 1000;

type LRUCacheItem = {
_event: SignerOnChainEvent;
_lastUsed: number;
};

/**
* OnChainStore persists On Chain Event messages in RocksDB using a grow only CRDT set
* to guarantee eventual consistency.
Expand All @@ -47,6 +54,9 @@ class OnChainEventStore {
protected _db: RocksDB;
protected _eventHandler: StoreEventHandler;

// Store the last 1000 active signers in memory to avoid hitting the database
protected _activeSignerCache = new Map<string, LRUCacheItem>();

constructor(db: RocksDB, eventHandler: StoreEventHandler) {
this._db = db;
this._eventHandler = eventHandler;
Expand All @@ -70,13 +80,44 @@ class OnChainEventStore {
return getManyOnChainEvents(this._db, keys);
}

getActiveSignerCacheKey = (fid: number, signer: Uint8Array): string => {
return `${fid}:${Buffer.from(signer).toString("hex")}`;
};

clearActiveSignerCache() {
this._activeSignerCache.clear();
}

async getActiveSigner(fid: number, signer: Uint8Array): Promise<SignerOnChainEvent> {
// See if we have this in the cache
const cacheKey = this.getActiveSignerCacheKey(fid, signer);
const cacheItem = this._activeSignerCache.get(cacheKey);
if (cacheItem) {
cacheItem._lastUsed = Date.now();
return cacheItem._event;
}

// Otherwise, look it up in the database
const signerEventPrimaryKey = await this._db.get(makeSignerOnChainEventBySignerKey(fid, signer));
const event = await getOnChainEventByKey<SignerOnChainEvent>(this._db, signerEventPrimaryKey);
if (
event.signerEventBody.eventType === SignerEventType.ADD &&
SUPPORTED_SIGNER_SCHEMES.includes(event.signerEventBody.keyType)
) {
// Add to the cache
if (this._activeSignerCache.size >= LRU_CACHE_SIZE) {
// Evict the least 10% recently used
const sortedCache = [...this._activeSignerCache.entries()].sort((a, b) => a[1]._lastUsed - b[1]._lastUsed);
const evictCount = Math.ceil(LRU_CACHE_SIZE * 0.1);
for (let i = 0; i < evictCount; i++) {
const key = sortedCache[i]?.[0] || "";
this._activeSignerCache.delete(key);
}

logger.info(`Evicted ${evictCount} items from the active signer cache`);
}
this._activeSignerCache.set(cacheKey, { _event: event, _lastUsed: Date.now() });

return event;
} else {
throw new HubError("not_found", "no such active signer");
Expand Down Expand Up @@ -183,6 +224,11 @@ class OnChainEventStore {
}
}

if (event.signerEventBody.eventType === SignerEventType.REMOVE) {
// Remove the signer from the cache
this._activeSignerCache.delete(this.getActiveSignerCacheKey(event.fid, event.signerEventBody.key));
}

if (event.signerEventBody.eventType === SignerEventType.ADMIN_RESET) {
const signerEvents = await this.getOnChainEvents<SignerOnChainEvent>(
OnChainEventType.EVENT_TYPE_SIGNER,
Expand Down
5 changes: 4 additions & 1 deletion apps/hubble/src/storage/stores/onchainEventStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,12 @@ describe("OnChainEventStore", () => {
}),
});

// Add the signer, make sure we can fetch it
await set.mergeOnChainEvent(signer);
await set.mergeOnChainEvent(signerRemoved);
await expect(set.getActiveSigner(signer.fid, signer.signerEventBody.key)).resolves.toEqual(signer);

// Remove the signer, make sure we can't fetch it
await set.mergeOnChainEvent(signerRemoved);
await expect(set.getActiveSigner(signer.fid, signer.signerEventBody.key)).rejects.toThrow("active signer");
});

Expand Down

0 comments on commit 6ec1b4d

Please sign in to comment.