Skip to content

Commit

Permalink
feat: Repair sync trie when events and fnames are already present (#1424
Browse files Browse the repository at this point in the history
)

* feat: Include events and fnames when rebuilding the sync trie

* Repair sync trie when events and proofs are already present

* Add changeset
  • Loading branch information
sanjayprabhu authored Sep 25, 2023
1 parent 6a526b7 commit f0ad204
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 19 deletions.
5 changes: 5 additions & 0 deletions .changeset/green-ducks-sip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

feat: Repair sync trie when events and fnames are already present
64 changes: 58 additions & 6 deletions apps/hubble/src/network/sync/merkleTrie.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { Result, ResultAsync } from "neverthrow";
import { Worker } from "worker_threads";
import { HubError, Message } from "@farcaster/hub-nodejs";
import { HubError, Message, OnChainEvent, UserNameProof } from "@farcaster/hub-nodejs";
import { SyncId } from "./syncId.js";
import { TrieNode, TrieSnapshot } from "./trieNode.js";
import RocksDB from "../../storage/db/rocksdb.js";
import { FID_BYTES, HASH_LENGTH, RootPrefix, UserMessagePostfixMax } from "../../storage/db/types.js";
import {
FID_BYTES,
HASH_LENGTH,
OnChainEventPostfix,
RootPrefix,
UserMessagePostfixMax,
} from "../../storage/db/types.js";
import { logger } from "../../utils/logger.js";
import { getStatusdInitialization } from "../../utils/statsd.js";

Expand Down Expand Up @@ -205,7 +211,7 @@ class MerkleTrie {
return this.callMethod("initialize");
}

public async rebuild(): Promise<void> {
public async rebuild(eventsEnabled = false): Promise<void> {
// First, delete the root node
const dbStatus = await ResultAsync.fromPromise(
this._db.del(TrieNode.makePrimaryKey(new Uint8Array())),
Expand All @@ -218,12 +224,12 @@ class MerkleTrie {
// Brand new empty root node
await this.callMethod("clear");

// Rebuild the trie by iterating over all the messages in the db
const prefix = Buffer.from([RootPrefix.User]);
// Rebuild the trie by iterating over all the messages, on chain events and fnames in the db
let count = 0;

// Messages
await this._db.forEachIteratorByPrefix(
prefix,
Buffer.from([RootPrefix.User]),
async (key, value) => {
const postfix = (key as Buffer).readUint8(1 + FID_BYTES);
if (postfix < UserMessagePostfixMax) {
Expand All @@ -243,6 +249,52 @@ class MerkleTrie {
{},
1 * 60 * 60 * 1000,
);
log.info({ count }, "Rebuilt messages trie");
if (!eventsEnabled) {
return;
}
// On chain events
await this._db.forEachIteratorByPrefix(
Buffer.from([RootPrefix.OnChainEvent]),
async (key, value) => {
const postfix = (key as Buffer).readUint8(1);
if (postfix === OnChainEventPostfix.OnChainEvents) {
const event = Result.fromThrowable(
() => OnChainEvent.decode(new Uint8Array(value as Buffer)),
(e) => e as HubError,
)();
if (event.isOk()) {
await this.insert(SyncId.fromOnChainEvent(event.value));
count += 1;
if (count % 10_000 === 0) {
log.info({ count }, "Rebuilding Merkle Trie (events)");
}
}
}
},
{},
1 * 60 * 60 * 1000,
);
log.info({ count }, "Rebuilt events trie");
await this._db.forEachIteratorByPrefix(
Buffer.from([RootPrefix.FNameUserNameProof]),
async (key, value) => {
const proof = Result.fromThrowable(
() => UserNameProof.decode(new Uint8Array(value as Buffer)),
(e) => e as HubError,
)();
if (proof.isOk()) {
await this.insert(SyncId.fromFName(proof.value));
count += 1;
if (count % 10_000 === 0) {
log.info({ count }, "Rebuilding Merkle Trie (proofs)");
}
}
},
{},
1 * 60 * 60 * 1000,
);
log.info({ count }, "Rebuilt fnmames trie");
}

public async insert(id: SyncId): Promise<boolean> {
Expand Down
12 changes: 7 additions & 5 deletions apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,23 +473,25 @@ describe("Multi peer sync engine", () => {
await engine1.mergeOnChainEvent(custodyEvent);
await engine1.mergeOnChainEvent(signerEvent);
await engine1.mergeOnChainEvent(storageEvent);
await engine1.mergeUserNameProof(fname);

// We add it to the engine2 synctrie as normal...
await syncEngine2.enableEventsSync();
await engine2.mergeOnChainEvent(custodyEvent);
await engine2.mergeOnChainEvent(signerEvent);
await engine2.mergeOnChainEvent(storageEvent);

const initialEngine2Count = await syncEngine2.trie.items();
await engine2.mergeUserNameProof(fname);

await engine1.mergeMessage(castAdd);
await engine2.mergeMessage(castAdd);

// ...but we'll corrupt the sync trie by pretending that the castAdd message is missing
// ...but we'll corrupt the sync trie by pretending that the castAdd message, an onchain event and an fname are missing
await syncEngine2.trie.deleteBySyncId(SyncId.fromMessage(castAdd));
await syncEngine2.trie.deleteBySyncId(SyncId.fromOnChainEvent(storageEvent));
await syncEngine2.trie.deleteBySyncId(SyncId.fromFName(fname));

// syncengine2 should be empty
expect(await syncEngine2.trie.items()).toEqual(initialEngine2Count);
// syncengine2 should only have 2 onchain events
expect(await syncEngine2.trie.items()).toEqual(2);

// Attempt to sync engine2 <-- engine1.
// It will appear to engine2 that the message is missing, so it will request it from engine1.
Expand Down
59 changes: 58 additions & 1 deletion apps/hubble/src/network/sync/syncEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import { MockHub } from "../../test/mocks.js";
import { jest } from "@jest/globals";
import { publicClient } from "../../test/utils.js";
import { IdRegisterOnChainEvent } from "@farcaster/core";
import { sync } from "rimraf";

const TEST_TIMEOUT_SHORT = 60 * 1000;
const SLEEPWHILE_TIMEOUT = 10 * 1000;
Expand Down Expand Up @@ -575,6 +574,28 @@ describe("SyncEngine", () => {
expect(await syncEngine.trie.exists(SyncId.fromFName(userNameProof))).toBeFalsy();
expect(await syncEngine.trie.exists(SyncId.fromFName(supercedingUserNameProof))).toBeTruthy();
});

test("adds sync ids to the trie when not present and egnine rejects as duplicate", async () => {
await engine.mergeOnChainEvent(custodyEvent);
await engine.mergeUserNameProof(userNameProof);

await syncEngine.trie.deleteBySyncId(SyncId.fromOnChainEvent(custodyEvent));
await syncEngine.trie.deleteBySyncId(SyncId.fromFName(userNameProof));

expect(await syncEngine.trie.exists(SyncId.fromFName(userNameProof))).toBeFalsy();
expect(await syncEngine.trie.exists(SyncId.fromOnChainEvent(custodyEvent))).toBeFalsy();

const eventResult = await engine.mergeOnChainEvent(custodyEvent);
const fnameResult = await engine.mergeUserNameProof(userNameProof);

expect(eventResult._unsafeUnwrapErr().errCode).toEqual("bad_request.duplicate");
expect(fnameResult._unsafeUnwrapErr().errCode).toEqual("bad_request.duplicate");

await sleep(10);

expect(await syncEngine.trie.exists(SyncId.fromFName(userNameProof))).toBeTruthy();
expect(await syncEngine.trie.exists(SyncId.fromOnChainEvent(custodyEvent))).toBeTruthy();
});
});
});

Expand All @@ -591,4 +612,40 @@ describe("SyncEngine", () => {
expect(syncEngine.shouldCompactDb).toBeFalsy();
});
});

describe("rebuildSyncTrie", () => {
test("reconstructs the trie from the db", async () => {
await engine.mergeOnChainEvent(custodyEvent);
await engine.mergeOnChainEvent(signerEvent);
await engine.mergeOnChainEvent(storageEvent);
const usernameProof = Factories.UserNameProof.build();
await engine.mergeUserNameProof(usernameProof);
await engine.mergeMessage(castAdd);

// Manually remove cast add to have an emtpy trie
await syncEngine.trie.deleteBySyncId(SyncId.fromMessage(castAdd));
expect(await syncEngine.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy();
expect(await syncEngine.trie.items()).toEqual(0);

await syncEngine.rebuildSyncTrie();

// No events or fnames by default
expect(await syncEngine.trie.items()).toEqual(1);
expect(await syncEngine.trie.exists(SyncId.fromMessage(castAdd))).toBeTruthy();

// Remove cast add again
await syncEngine.trie.deleteBySyncId(SyncId.fromMessage(castAdd));

// Includes events and proofs if enabled
await syncEngine.enableEventsSync();
await syncEngine.rebuildSyncTrie();

expect(await syncEngine.trie.items()).toEqual(5);
expect(await syncEngine.trie.exists(SyncId.fromMessage(castAdd))).toBeTruthy();
expect(await syncEngine.trie.exists(SyncId.fromOnChainEvent(signerEvent))).toBeTruthy();
expect(await syncEngine.trie.exists(SyncId.fromOnChainEvent(custodyEvent))).toBeTruthy();
expect(await syncEngine.trie.exists(SyncId.fromOnChainEvent(storageEvent))).toBeTruthy();
expect(await syncEngine.trie.exists(SyncId.fromFName(usernameProof))).toBeTruthy();
});
});
});
18 changes: 17 additions & 1 deletion apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,22 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}
}
});
this._hub.engine.on("duplicateUserNameProofEvent", async (event: UserNameProof) => {
if (this._syncEvents && event.type === UserNameType.USERNAME_TYPE_FNAME) {
const syncId = SyncId.fromFName(event);
if (!(await this.trie.exists(syncId))) {
await this._trie.insert(syncId);
}
}
});
this._hub.engine.on("duplicateOnChainEvent", async (event: OnChainEvent) => {
if (this._syncEvents) {
const syncId = SyncId.fromOnChainEvent(event);
if (!(await this.trie.exists(syncId))) {
await this._trie.insert(syncId);
}
}
});
}

public get syncTrieQSize(): number {
Expand Down Expand Up @@ -318,7 +334,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
/** Rebuild the entire Sync Trie */
public async rebuildSyncTrie() {
log.info("Rebuilding sync trie...");
await this._trie.rebuild();
await this._trie.rebuild(this._syncEvents);
log.info("Rebuilding sync trie complete");
}

Expand Down
21 changes: 19 additions & 2 deletions apps/hubble/src/storage/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,18 @@ import OnChainEventStore from "../stores/onChainEventStore.js";
import { isRateLimitedByKey, consumeRateLimitByKey, getRateLimiterForTotalMessages } from "../../utils/rateLimits.js";
import { nativeValidationMethods } from "../../rustfunctions.js";
import { RateLimiterAbstract } from "rate-limiter-flexible";
import { TypedEmitter } from "tiny-typed-emitter";

const log = logger.child({
component: "Engine",
});

class Engine {
export type EngineEvents = {
duplicateUserNameProofEvent: (usernameProof: UserNameProof) => void;
duplicateOnChainEvent: (onChainEvent: OnChainEvent) => void;
};

class Engine extends TypedEmitter<EngineEvents> {
public eventHandler: StoreEventHandler;

private _db: RocksDB;
Expand All @@ -93,6 +99,7 @@ class Engine {
private _totalPruneSize: number;

constructor(db: RocksDB, network: FarcasterNetwork, eventHandler?: StoreEventHandler, publicClient?: PublicClient) {
super();
this._db = db;
this._network = network;
this._publicClient = publicClient;
Expand Down Expand Up @@ -266,12 +273,22 @@ class Engine {
this._onchainEventsStore.mergeOnChainEvent(event),
(e) => e as HubError,
);
if (mergeResult.isErr() && mergeResult.error.errCode === "bad_request.duplicate") {
this.emit("duplicateOnChainEvent", event);
}
return mergeResult;
}

async mergeUserNameProof(usernameProof: UserNameProof): HubAsyncResult<number> {
// TODO: Validate signature here instead of the fname event provider
return ResultAsync.fromPromise(this._userDataStore.mergeUserNameProof(usernameProof), (e) => e as HubError);
const mergeResult = await ResultAsync.fromPromise(
this._userDataStore.mergeUserNameProof(usernameProof),
(e) => e as HubError,
);
if (mergeResult.isErr() && mergeResult.error.errCode === "bad_request.duplicate") {
this.emit("duplicateUserNameProofEvent", usernameProof);
}
return mergeResult;
}

async revokeMessagesBySigner(fid: number, signer: Uint8Array): HubAsyncResult<void> {
Expand Down
8 changes: 8 additions & 0 deletions apps/hubble/src/storage/stores/userDataStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ describe("mergeUserNameProof", () => {
await expect(set.getUserNameProofByFid(proof.fid)).resolves.toEqual(proof);
});

test("does not merge duplicates", async () => {
const proof = await Factories.UserNameProof.build();
await set.mergeUserNameProof(proof);
await expect(set.getUserNameProof(proof.name)).resolves.toEqual(proof);

await expect(set.mergeUserNameProof(proof)).rejects.toThrow("already exists");
});

test("replaces existing proof with proof of greater timestamp", async () => {
const existingProof = await Factories.UserNameProof.build();
await set.mergeUserNameProof(existingProof);
Expand Down
4 changes: 3 additions & 1 deletion apps/hubble/src/storage/stores/userDataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ class UserDataStore extends Store<UserDataAddMessage, never> {

async mergeUserNameProof(usernameProof: UserNameProof): Promise<number> {
const existingProof = await ResultAsync.fromPromise(this.getUserNameProof(usernameProof.name), () => undefined);
if (existingProof.isOk() && usernameProofCompare(existingProof.value, usernameProof) >= 0) {
if (existingProof.isOk() && usernameProofCompare(existingProof.value, usernameProof) === 0) {
throw new HubError("bad_request.duplicate", "proof already exists");
} else if (existingProof.isOk() && usernameProofCompare(existingProof.value, usernameProof) > 0) {
throw new HubError("bad_request.conflict", "event conflicts with a more recent UserNameProof");
}

Expand Down
8 changes: 5 additions & 3 deletions apps/hubble/src/storage/stores/utils.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { UserNameProof, HubError, bytesIncrement } from "@farcaster/hub-nodejs";
import { UserNameProof, bytesIncrement } from "@farcaster/hub-nodejs";
import { bytesCompare } from "@farcaster/core";

export const usernameProofCompare = (a: UserNameProof, b: UserNameProof): number => {
// Compare timestamps
// Compare timestamps (assumes name and proof type have already been checked by the caller)
if (a.timestamp < b.timestamp) {
return -1;
} else if (a.timestamp > b.timestamp) {
return 1;
}

throw new HubError("bad_request.validation_failure", "proofs have the same timestamp");
// If timestamps match, order by signature bytes so we can deterministically choose the same proof everywhere
return bytesCompare(a.signature, b.signature);
};

export const makeEndPrefix = (prefix: Buffer): Buffer | undefined => {
Expand Down

0 comments on commit f0ad204

Please sign in to comment.