Skip to content

Commit

Permalink
(nonce) fix increment race condition (#289)
Browse files Browse the repository at this point in the history
  • Loading branch information
joewagner authored Jun 13, 2024
1 parent a00fb13 commit e5f3d8a
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 37 deletions.
2 changes: 1 addition & 1 deletion packages/nonce/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tableland/nonce",
"version": "1.0.1",
"version": "1.0.2",
"description": "",
"repository": "https://github.com/tablelandnetwork/studio/packages/nonce",
"publishConfig": {
Expand Down
78 changes: 46 additions & 32 deletions packages/nonce/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class NonceManager extends AbstractSigner<Provider> {
}

async getNonce(blockTag?: BlockTag): Promise<number> {
debugLogger("getNonce");
debugLogger("getNonce", process.pid);
if (blockTag === "pending") {
await this._acquireLock();
const address = await this.signer.getAddress();
Expand All @@ -73,57 +73,36 @@ export class NonceManager extends AbstractSigner<Provider> {
const nonce =
currentCount + (typeof deltaCount === "number" ? deltaCount : 0);

// Need to make sure we increment before returning the nonce value.
// Otherwise there is a race condition between other processes getting a
// nonce after the lock release and this call to increment.
await this.increment();
const release = this._releaseLock.bind(this);
setImmediate(function () {
release().catch(function (err: any) {
console.log("_releaseLock error:", err);
});
});

debugLogger("getNonce: nonce (pending):", nonce);
debugLogger("getNonce: nonce (pending):", nonce, process.pid);
return nonce;
}

const nonce = await super.getNonce(blockTag);
debugLogger("getNonce: nonce (not pending):", nonce);
debugLogger("getNonce: nonce (not pending):", nonce, process.pid);
return nonce;
}

async reset(): Promise<void> {
debugLogger("reset");
await this._acquireLock();
await this._resetDelta();
await this._releaseLock();
}

async increment(count?: number): Promise<number> {
debugLogger("increment");
return await this.memStore.incrby(
`delta:${await this.getAddress()}`,
count == null ? 1 : count,
);
}

async signMessage(message: string | Uint8Array): Promise<string> {
return await this.signer.signMessage(message);
}

async signTransaction(transaction: TransactionRequest): Promise<string> {
debugLogger("signTransaction");
return await this.signer.signTransaction(transaction);
}

async sendTransaction(
transaction: TransactionRequest,
): Promise<TransactionResponse> {
debugLogger("sendTransaction");
debugLogger("sendTransaction: (start)", process.pid);
if (transaction.nonce == null) {
transaction = { ...transaction };
transaction.nonce = await this.getNonce("pending");
await this.increment();
} else {
await this.reset();
await this.memStore.incr(`delta:${await this.getAddress()}`);
await this.increment();
}

transaction = await this.signer.populateTransaction(transaction);
Expand All @@ -132,13 +111,42 @@ export class NonceManager extends AbstractSigner<Provider> {
this.provider
.getTransactionReceipt(tx.hash)
.then(async () => {
debugLogger("sendTransaction: (provider response)", process.pid);
await this._resetDelta();
})
.catch((err) => console.log("Error resetting delta:", err));

debugLogger("sendTransaction: (end)", process.pid);

return tx;
}

async reset(): Promise<void> {
debugLogger("reset", process.pid);
await this._acquireLock();
await this._resetDelta();
await this._releaseLock();
}

async increment(count?: number): Promise<number> {
debugLogger("increment (start)", process.pid);
const res = await this.memStore.incrby(
`delta:${await this.getAddress()}`,
count == null ? 1 : count,
);
debugLogger("increment (end)", process.pid);
return res;
}

async signMessage(message: string | Uint8Array): Promise<string> {
return await this.signer.signMessage(message);
}

async signTransaction(transaction: TransactionRequest): Promise<string> {
debugLogger("signTransaction", process.pid);
return await this.signer.signTransaction(transaction);
}

async signTypedData(
domain: TypedDataDomain,
types: Record<string, TypedDataField[]>,
Expand Down Expand Up @@ -199,6 +207,8 @@ export class NonceManager extends AbstractSigner<Provider> {
// returns null or "OK"
const res = await acquire();

// The _setLock method will only return a value if the lock didn't
// perviously exist. This is because the nx option is used.
if (res === null) {
this._lock = undefined;
return resolve(await doTry());
Expand All @@ -217,7 +227,7 @@ export class NonceManager extends AbstractSigner<Provider> {
async _releaseLock() {
if (!this._lock) return;

debugLogger("_releaseLock", process.pid);
debugLogger("_releaseLock (start)", process.pid);

// we are using a Lua script to atomically make sure we only delete the
// lock if this process created it. This covers the case where the lock ttl
Expand All @@ -233,11 +243,13 @@ export class NonceManager extends AbstractSigner<Provider> {
[this._lock],
);
this._lock = undefined;

debugLogger("_releaseLock (end)", process.pid);
}

// NOTE: The delta key expires in 30 seconds. This should be fine for Nova.
async _resetDelta() {
debugLogger("_resetDelta", process.pid);
debugLogger("_resetDelta (start)", process.pid);

await this.memStore.eval(
`if redis.call("get",KEYS[1]) ~= ARGV[1] then
Expand All @@ -248,5 +260,7 @@ export class NonceManager extends AbstractSigner<Provider> {
[`delta:${await this.getAddress()}`],
[0],
);

debugLogger("_resetDelta (end)", process.pid);
}
}
39 changes: 35 additions & 4 deletions packages/nonce/test/main.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,41 @@ const redis = new Redis({
token: process.env.KV_REST_API_TOKEN,
});

const GLOBAL_TEST_RUNNING = "GLOBAL_TEST_RUNNING";

const ensureSingularTest = async function () {
const checkRunning: any = async function (resolve: any, reject: any) {
// eslint-disable-next-line promise/param-names
await new Promise(function (waitResolve) {
setTimeout(() => waitResolve(undefined), 1000);
});

const running = await redis.get(GLOBAL_TEST_RUNNING);
console.log("running", running);
if (running) {
// if running wait one second and check again
await checkRunning(resolve, reject);
return;
}

// if not running mark as running and return
await redis.set(GLOBAL_TEST_RUNNING, "true", { px: 30000 });
resolve();
};

return await new Promise(checkRunning);
};

describe("NonceManager", function () {
this.timeout(30000 * TEST_TIMEOUT_FACTOR);

beforeEach(async function () {
await redis.del(`delta:${account2Public}`);
await ensureSingularTest();
});

afterEach(async function () {
await redis.del(GLOBAL_TEST_RUNNING);
});

after(async function () {
Expand Down Expand Up @@ -158,10 +188,11 @@ describe("NonceManager", function () {
});
};

test("sending transactions from two processes WITHOUT nonce manager fails", async function () {
// We can't guarantee that sending 4 transactions from the same wallet will
// result in a nonce failure, but it's very likely. This means that this
// test might fail occasionally.
// We can't guarantee that sending 4 transactions from the same wallet will
// result in a nonce failure. This means that this test might fail
// occasionally, because of that it's being skipped here.
// Un-skip it to test locally
test.skip("sending transactions from two processes WITHOUT nonce manager fails", async function () {
const results = await Promise.all([
parallelFork(true),
parallelFork(true),
Expand Down

1 comment on commit e5f3d8a

@vercel
Copy link

@vercel vercel bot commented on e5f3d8a Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.