diff --git a/packages/nonce/package.json b/packages/nonce/package.json index 0a48bee1..78aad5bc 100644 --- a/packages/nonce/package.json +++ b/packages/nonce/package.json @@ -1,6 +1,6 @@ { "name": "@tableland/nonce", - "version": "1.0.1", + "version": "1.0.2", "description": "", "repository": "https://github.com/tablelandnetwork/studio/packages/nonce", "publishConfig": { diff --git a/packages/nonce/src/main.ts b/packages/nonce/src/main.ts index f49442d2..d1f3e765 100644 --- a/packages/nonce/src/main.ts +++ b/packages/nonce/src/main.ts @@ -60,7 +60,7 @@ export class NonceManager extends AbstractSigner { } async getNonce(blockTag?: BlockTag): Promise { - debugLogger("getNonce"); + debugLogger("getNonce", process.pid); if (blockTag === "pending") { await this._acquireLock(); const address = await this.signer.getAddress(); @@ -73,6 +73,10 @@ export class NonceManager extends AbstractSigner { const nonce = currentCount + (typeof deltaCount === "number" ? deltaCount : 0); + // Need to make sure we increment before returning the nonce value. + // Otherwise there is a race condition between other processes getting a + // nonce after the lock release and this call to increment. + await this.increment(); const release = this._releaseLock.bind(this); setImmediate(function () { release().catch(function (err: any) { @@ -80,50 +84,25 @@ export class NonceManager extends AbstractSigner { }); }); - debugLogger("getNonce: nonce (pending):", nonce); + debugLogger("getNonce: nonce (pending):", nonce, process.pid); return nonce; } const nonce = await super.getNonce(blockTag); - debugLogger("getNonce: nonce (not pending):", nonce); + debugLogger("getNonce: nonce (not pending):", nonce, process.pid); return nonce; } - async reset(): Promise { - debugLogger("reset"); - await this._acquireLock(); - await this._resetDelta(); - await this._releaseLock(); - } - - async increment(count?: number): Promise { - debugLogger("increment"); - return await this.memStore.incrby( - `delta:${await this.getAddress()}`, - count == null ? 1 : count, - ); - } - - async signMessage(message: string | Uint8Array): Promise { - return await this.signer.signMessage(message); - } - - async signTransaction(transaction: TransactionRequest): Promise { - debugLogger("signTransaction"); - return await this.signer.signTransaction(transaction); - } - async sendTransaction( transaction: TransactionRequest, ): Promise { - debugLogger("sendTransaction"); + debugLogger("sendTransaction: (start)", process.pid); if (transaction.nonce == null) { transaction = { ...transaction }; transaction.nonce = await this.getNonce("pending"); - await this.increment(); } else { await this.reset(); - await this.memStore.incr(`delta:${await this.getAddress()}`); + await this.increment(); } transaction = await this.signer.populateTransaction(transaction); @@ -132,13 +111,42 @@ export class NonceManager extends AbstractSigner { this.provider .getTransactionReceipt(tx.hash) .then(async () => { + debugLogger("sendTransaction: (provider response)", process.pid); await this._resetDelta(); }) .catch((err) => console.log("Error resetting delta:", err)); + debugLogger("sendTransaction: (end)", process.pid); + return tx; } + async reset(): Promise { + debugLogger("reset", process.pid); + await this._acquireLock(); + await this._resetDelta(); + await this._releaseLock(); + } + + async increment(count?: number): Promise { + debugLogger("increment (start)", process.pid); + const res = await this.memStore.incrby( + `delta:${await this.getAddress()}`, + count == null ? 1 : count, + ); + debugLogger("increment (end)", process.pid); + return res; + } + + async signMessage(message: string | Uint8Array): Promise { + return await this.signer.signMessage(message); + } + + async signTransaction(transaction: TransactionRequest): Promise { + debugLogger("signTransaction", process.pid); + return await this.signer.signTransaction(transaction); + } + async signTypedData( domain: TypedDataDomain, types: Record, @@ -199,6 +207,8 @@ export class NonceManager extends AbstractSigner { // returns null or "OK" const res = await acquire(); + // The _setLock method will only return a value if the lock didn't + // perviously exist. This is because the nx option is used. if (res === null) { this._lock = undefined; return resolve(await doTry()); @@ -217,7 +227,7 @@ export class NonceManager extends AbstractSigner { async _releaseLock() { if (!this._lock) return; - debugLogger("_releaseLock", process.pid); + debugLogger("_releaseLock (start)", process.pid); // we are using a Lua script to atomically make sure we only delete the // lock if this process created it. This covers the case where the lock ttl @@ -233,11 +243,13 @@ export class NonceManager extends AbstractSigner { [this._lock], ); this._lock = undefined; + + debugLogger("_releaseLock (end)", process.pid); } // NOTE: The delta key expires in 30 seconds. This should be fine for Nova. async _resetDelta() { - debugLogger("_resetDelta", process.pid); + debugLogger("_resetDelta (start)", process.pid); await this.memStore.eval( `if redis.call("get",KEYS[1]) ~= ARGV[1] then @@ -248,5 +260,7 @@ export class NonceManager extends AbstractSigner { [`delta:${await this.getAddress()}`], [0], ); + + debugLogger("_resetDelta (end)", process.pid); } } diff --git a/packages/nonce/test/main.test.ts b/packages/nonce/test/main.test.ts index 334e7d74..92ee9ac8 100644 --- a/packages/nonce/test/main.test.ts +++ b/packages/nonce/test/main.test.ts @@ -43,11 +43,41 @@ const redis = new Redis({ token: process.env.KV_REST_API_TOKEN, }); +const GLOBAL_TEST_RUNNING = "GLOBAL_TEST_RUNNING"; + +const ensureSingularTest = async function () { + const checkRunning: any = async function (resolve: any, reject: any) { + // eslint-disable-next-line promise/param-names + await new Promise(function (waitResolve) { + setTimeout(() => waitResolve(undefined), 1000); + }); + + const running = await redis.get(GLOBAL_TEST_RUNNING); + console.log("running", running); + if (running) { + // if running wait one second and check again + await checkRunning(resolve, reject); + return; + } + + // if not running mark as running and return + await redis.set(GLOBAL_TEST_RUNNING, "true", { px: 30000 }); + resolve(); + }; + + return await new Promise(checkRunning); +}; + describe("NonceManager", function () { this.timeout(30000 * TEST_TIMEOUT_FACTOR); beforeEach(async function () { await redis.del(`delta:${account2Public}`); + await ensureSingularTest(); + }); + + afterEach(async function () { + await redis.del(GLOBAL_TEST_RUNNING); }); after(async function () { @@ -158,10 +188,11 @@ describe("NonceManager", function () { }); }; - test("sending transactions from two processes WITHOUT nonce manager fails", async function () { - // We can't guarantee that sending 4 transactions from the same wallet will - // result in a nonce failure, but it's very likely. This means that this - // test might fail occasionally. + // We can't guarantee that sending 4 transactions from the same wallet will + // result in a nonce failure. This means that this test might fail + // occasionally, because of that it's being skipped here. + // Un-skip it to test locally + test.skip("sending transactions from two processes WITHOUT nonce manager fails", async function () { const results = await Promise.all([ parallelFork(true), parallelFork(true),