Skip to content

Commit 58a62f2

Browse files
committed
Added DBTransaction.getForUpdate to address write skew, and added test demonstrating race condition thrashing
1 parent b8a6083 commit 58a62f2

File tree

2 files changed

+130
-0
lines changed

2 files changed

+130
-0
lines changed

src/DBTransaction.ts

+34
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,40 @@ class DBTransaction {
148148
return this._db.deserializeDecrypt<T>(data, raw as any);
149149
}
150150

151+
/**
152+
* Use this for to address write skews
153+
*/
154+
public async getForUpdate<T>(
155+
keyPath: KeyPath | string | Buffer,
156+
raw?: false,
157+
): Promise<T | undefined>;
158+
public async getForUpdate(
159+
keyPath: KeyPath | string | Buffer,
160+
raw: true,
161+
): Promise<Buffer | undefined>;
162+
@ready(new errors.ErrorDBTransactionDestroyed())
163+
public async getForUpdate<T>(
164+
keyPath: KeyPath | string | Buffer,
165+
raw: boolean = false,
166+
): Promise<T | Buffer | undefined> {
167+
keyPath = utils.toKeyPath(keyPath);
168+
keyPath = ['data', ...keyPath];
169+
let data: Buffer;
170+
try {
171+
const key = utils.keyPathToKey(keyPath);
172+
data = await rocksdbP.transactionGetForUpdate(this._transaction, key, {
173+
valueEncoding: 'buffer',
174+
snapshot: this.setupSnapshot(),
175+
});
176+
} catch (e) {
177+
if (e.code === 'NOT_FOUND') {
178+
return undefined;
179+
}
180+
throw e;
181+
}
182+
return this._db.deserializeDecrypt<T>(data, raw as any);
183+
}
184+
151185
public async put(
152186
keyPath: KeyPath | string | Buffer,
153187
value: any,

tests/DBTransaction.test.ts

+96
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import path from 'path';
44
import fs from 'fs';
55
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
66
import { withF } from '@matrixai/resources';
7+
import { Lock } from '@matrixai/async-locks';
78
import DB from '@/DB';
89
import DBTransaction from '@/DBTransaction';
910
import * as errors from '@/errors';
@@ -258,6 +259,101 @@ describe(DBTransaction.name, () => {
258259
});
259260
expect(await db.get('hello')).toBeUndefined();
260261
});
262+
test('getForUpdate addresses write-skew by promoting gets into same-value puts', async () => {
263+
// Snapshot isolation allows write skew anomalies to occur
264+
// A write skew means that 2 transactions concurrently read from overlapping keys
265+
// then make disjoint updates to the keys, that breaks a consistency constraint on those keys
266+
// For example:
267+
// T1 reads from k1, k2, writes to k1
268+
// T2 reads from k1, k2, writes to k2
269+
// Where k1 + k2 >= 0
270+
await db.put('balance1', '100');
271+
await db.put('balance2', '100');
272+
const t1 = withF([db.transaction()], async ([tran]) => {
273+
let balance1 = parseInt((await tran.getForUpdate('balance1'))!);
274+
const balance2 = parseInt((await tran.getForUpdate('balance2'))!);
275+
balance1 -= 100;
276+
expect(balance1 + balance2).toBeGreaterThanOrEqual(0);
277+
await tran.put('balance1', balance1.toString());
278+
});
279+
const t2 = withF([db.transaction()], async ([tran]) => {
280+
const balance1 = parseInt((await tran.getForUpdate('balance1'))!);
281+
let balance2 = parseInt((await tran.getForUpdate('balance2'))!);
282+
balance2 -= 100;
283+
expect(balance1 + balance2).toBeGreaterThanOrEqual(0);
284+
await tran.put('balance2', balance2.toString());
285+
});
286+
// By using getForUpdate, we promote the read to a write, where it writes the same value
287+
// this causes a write-write conflict
288+
const results = await Promise.allSettled([t1, t2]);
289+
// One will succeed, one will fail
290+
expect(results.some((result) => result.status === 'fulfilled')).toBe(true);
291+
expect(
292+
results.some((result) => {
293+
return (
294+
result.status === 'rejected' &&
295+
result.reason instanceof errors.ErrorDBTransactionConflict
296+
);
297+
}),
298+
).toBe(true);
299+
});
300+
test('PCC locking to prevent thrashing for racing counters', async () => {
301+
await db.put('counter', '0');
302+
let t1 = withF([db.transaction()], async ([tran]) => {
303+
// Can also use `getForUpdate`, but a conflict exists even for `get`
304+
let counter = parseInt((await tran.get('counter'))!);
305+
counter++;
306+
await tran.put('counter', counter.toString());
307+
});
308+
let t2 = withF([db.transaction()], async ([tran]) => {
309+
// Can also use `getForUpdate`, but a conflict exists even for `get`
310+
let counter = parseInt((await tran.get('counter'))!);
311+
counter++;
312+
await tran.put('counter', counter.toString());
313+
});
314+
let results = await Promise.allSettled([t1, t2]);
315+
expect(results.some((result) => result.status === 'fulfilled')).toBe(true);
316+
expect(
317+
results.some((result) => {
318+
return (
319+
result.status === 'rejected' &&
320+
result.reason instanceof errors.ErrorDBTransactionConflict
321+
);
322+
}),
323+
).toBe(true);
324+
expect(await db.get('counter')).toBe('1');
325+
// In OCC, concurrent requests to update an atomic counter would result
326+
// in race thrashing where only 1 request succeeds, and all other requests
327+
// keep failing. The only way to prevent this thrashing is to use PCC locking
328+
await db.put('counter', '0');
329+
const l = new Lock();
330+
t1 = l.withF(async () => {
331+
await withF([db.transaction()], async ([tran]) => {
332+
// Can also use `get`, no difference here
333+
let counter = parseInt((await tran.getForUpdate('counter'))!);
334+
counter++;
335+
await tran.put('counter', counter.toString());
336+
});
337+
});
338+
t2 = l.withF(async () => {
339+
await withF([db.transaction()], async ([tran]) => {
340+
// Can also use `get`, no difference here
341+
let counter = parseInt((await tran.getForUpdate('counter'))!);
342+
counter++;
343+
await tran.put('counter', counter.toString());
344+
});
345+
});
346+
results = await Promise.allSettled([t1, t2]);
347+
expect(results.every((result) => result.status === 'fulfilled'));
348+
expect(await db.get('counter')).toBe('2');
349+
// The PCC locks must be done outside of transaction creation
350+
// This is because the PCC locks enforce mutual exclusion between commit operations
351+
// If the locks were done inside the transaction, it's possible for the commit operations
352+
// to be delayed after all mutually exclusive callbacks are executed
353+
// resulting in a DBTransactionConflict
354+
// When this library gains native locking, it must deal with this problem
355+
// by only releasing the locks when the transaction is committed or rollbacked
356+
});
261357
test('iterator get after delete consistency', async () => {
262358
await db.put('hello', 'world');
263359
let results: Array<[KeyPath, Buffer]> = [];

0 commit comments

Comments
 (0)