Skip to content

Commit

Permalink
Merge pull request #15 from streamich/test-kv
Browse files Browse the repository at this point in the history
Advanced testing
  • Loading branch information
streamich authored Jun 11, 2024
2 parents 913c592 + 3e171ad commit 2045eaa
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 15 deletions.
47 changes: 47 additions & 0 deletions src/__tests__/kv/KvBlobStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type {StandaloneClient} from '../../standalone';

export type KvBlobStoreClient = Pick<StandaloneClient, 'cmd'>;

export class KvBlobStore {
constructor(
private readonly pfx: Uint8Array,
private readonly redis: KvBlobStoreClient,
) {}

public async create(key: Uint8Array, value: Uint8Array): Promise<void> {
const redisKey = Buffer.concat([this.pfx, key]);
const result = await this.redis.cmd(['SET', redisKey, value, 'NX']);
if (result !== 'OK') throw new Error('KEY_EXISTS');
}

public async update(key: Uint8Array, value: Uint8Array): Promise<void> {
const redisKey = Buffer.concat([this.pfx, key]);
const result = await this.redis.cmd(['SET', redisKey, value, 'XX']);
if (result !== 'OK') throw new Error('CANNOT_UPDATE');
}

public async get(key: Uint8Array): Promise<Uint8Array> {
const redisKey = Buffer.concat([this.pfx, key]);
const result = await this.redis.cmd(['GET', redisKey]);
if (!(result instanceof Uint8Array)) throw new Error('NOT_FOUND');
return result;
}

public async remove(key: Uint8Array): Promise<boolean> {
const redisKey = Buffer.concat([this.pfx, key]);
const result = await this.redis.cmd(['DEL', redisKey]);
return !!result;
}

public async exists(key: Uint8Array): Promise<boolean> {
const redisKey = Buffer.concat([this.pfx, key]);
const result = await this.redis.cmd(['EXISTS', redisKey]);
return result === 1;
}

public async length(key: Uint8Array): Promise<number> {
const redisKey = Buffer.concat([this.pfx, key]);
const result = await this.redis.cmd(['STRLEN', redisKey], {utf8Res: true});
return Number(result);
}
}
186 changes: 186 additions & 0 deletions src/__tests__/kv/runKvBlobStoreTests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import {KvBlobStore} from './KvBlobStore';
import {blob, utf8} from '../../util/buf';
import {of} from 'thingies';

let cnt = 0;
const getKey = () => utf8`key-${cnt++}:${Math.random()}${Date.now()}`;

export const runKvBlobStoreTests = (kv: KvBlobStore) => {
test('can set and get a key', async () => {
const key = getKey();
await kv.create(key, utf8`hello`);
const value = await kv.get(key);
expect(value).toEqual(utf8`hello`);
});

describe('create', () => {
test('creates a value', async () => {
const key = getKey();
await kv.create(key, blob(1, 2, 3));
const result = await kv.get(key);
expect(result).toEqual(blob(1, 2, 3));
});

test('cannot rewrite key', async () => {
const key = getKey();
await kv.create(key, blob(1, 2, 3));
let error;
try {
await kv.create(key, blob(1, 2, 3));
} catch (err) {
error = err;
}
expect(error).toBeInstanceOf(Error);
expect(await kv.get(key)).toEqual(blob(1, 2, 3));
const value = await kv.get(key);
expect(value).toEqual(blob(1, 2, 3));
});
});

describe('update', () => {
test('throws when updating non-existing key', async () => {
const key = getKey();
const [, error] = await of(kv.update(key, Buffer.from([1, 2, 3])));
expect(error).toBeInstanceOf(Error);
});

test('can update existing key', async () => {
const key = getKey();
await kv.create(key, utf8`foo`);
await kv.update(key, utf8`bar`);
const res = await kv.get(key);
expect(res).toEqual(utf8`bar`);
});
});

describe('get', () => {
test('returns value', async () => {
const key = getKey();
await kv.create(key, utf8`foo`);
expect(await kv.get(key)).toEqual(utf8`foo`);
});

test('throws PublicErrorNotFound for non-existing key', async () => {
const key = getKey();
let error;
try {
await kv.get(key);
} catch (err) {
error = err;
}
expect(error).toBeInstanceOf(Error);
});

test('can set and get nulls', async () => {
await kv.remove(blob(0));
const key = blob(0);
await kv.create(key, blob(0));
expect(await kv.get(key)).toEqual(blob(0));
await kv.remove(blob(0));
});
});

describe('remove', () => {
test('can delete key', async () => {
const key = getKey();
await kv.create(key, utf8`foo`);
expect(await kv.get(key)).toEqual(utf8`foo`);
await kv.remove(key);
const [, error] = await of(kv.get(key));
expect(error).toBeInstanceOf(Error);
});

test('does not throw on missing key', async () => {
const key = getKey();
await kv.remove(key);
});

test('can set new key (remove and set)', async () => {
const key = getKey();
await kv.create(key, utf8`foo`);
await kv.remove(key);
await kv.create(key, utf8`bar`);
expect(await kv.get(key)).toEqual(utf8`bar`);
});

test('returns true if item was deleted', async () => {
const key = getKey();
await kv.create(key, Buffer.from('foo'));
expect(await kv.remove(key)).toBe(true);
expect(await kv.remove(key)).toBe(false);
});

test('returns false when deleting non-existing key', async () => {
const key = getKey();
expect(await kv.remove(key)).toBe(false);
});
});

describe('exists', () => {
test('returns false if key does not exist', async () => {
const key = getKey();
expect(await kv.exists(key)).toBe(false);
});

test('returns true if key exists', async () => {
const key = getKey();
await kv.create(key, Buffer.from('foo'));
expect(await kv.exists(key)).toBe(true);
});

test('returns false if key was deleted', async () => {
const key = getKey();
await kv.create(key, Buffer.from('foo'));
await kv.remove(key);
expect(await kv.exists(key)).toBe(false);
});

test('returns true if key was re-created', async () => {
const key = getKey();
await kv.create(key, Buffer.from('foo'));
await kv.remove(key);
await kv.create(key, Buffer.from('bar'));
expect(await kv.exists(key)).toBe(true);
});
});

describe('length', () => {
test('returns size of value in bytes', async () => {
const key = getKey();
await kv.create(key, Buffer.from('test'));
const res = await kv.length(key);
expect(res).toBe(4);
});

test('the length of emoji', async () => {
const key = getKey();
await kv.create(key, Buffer.from('🤷‍♂️'));
const res = await kv.length(key);
expect(res).toBe(13);
const value = await kv.get(key);
expect(value).toEqual(utf8`🤷‍♂️`);
});

test('returns size of value in bytes after update', async () => {
const key = getKey();
await kv.create(key, Buffer.from('test'));
await kv.update(key, Buffer.from('agaga'));
const res = await kv.length(key);
expect(res).toBe(5);
});

test('returns 0 for a non-existing key', async () => {
const key = getKey();
const res = await kv.length(key);
expect(res).toBe(0);
});

test('returns 0 for deleted key', async () => {
const key = getKey();
await kv.create(key, Buffer.from('test'));
await kv.remove(key);
const res = await kv.length(key);
expect(res).toBe(0);
});
});
};
11 changes: 7 additions & 4 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@ export class RedisCluster implements Printable {
}

public stop(): void {
this.stopped = true;
this._routerReadyUnsub?.();
clearTimeout(this.initialTableBuildTimer);
this.initialTableBuildTimer = undefined;
this.initialTableBuildAttempt = 0;
clearTimeout(this.rebuildTimer);
this.rebuildTimer = undefined;
this.isRebuildingRouteTable = false;
this.routeTableRebuildRetry = 0;
this.stopped = true;
this.clients.forEach((client) => client.stop());
}

Expand Down Expand Up @@ -129,14 +132,14 @@ export class RedisCluster implements Printable {
}

private _routerReady = false;

private _routerReadyUnsub?: () => void;
public async whenRouterReady(): Promise<void> {
if (this._routerReady) return;
if (!this.router.isEmpty()) return;
return new Promise((resolve) => {
const unsubscribe = this.onRouter.listen(() => {
this._routerReadyUnsub = this.onRouter.listen(() => {
this._routerReady = true;
unsubscribe();
this._routerReadyUnsub?.();
resolve();
});
});
Expand Down
29 changes: 29 additions & 0 deletions src/cluster/__tests__/kv.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {utf8} from '../../util/buf';
import {runKvBlobStoreTests} from '../../__tests__/kv/runKvBlobStoreTests';
import {tick} from 'thingies';
import {RedisCluster} from '../RedisCluster';
import {KvBlobStore} from '../../__tests__/kv/KvBlobStore';
import {ScriptRegistry} from '../../ScriptRegistry';

const setup = () => {
const scripts = new ScriptRegistry();
const client = new RedisCluster({
seeds: [{host: '127.0.0.1', port: 7000}],
});
const kv = new KvBlobStore(utf8`kv:`, client);
client.start();
return {client, scripts, kv};
};

if (process.env.TEST_LOCAL_CLUSTER) {
const {kv, client} = setup();

runKvBlobStoreTests(kv);

afterAll(async () => {
await client.stop();
await tick(50);
});
} else {
test.todo('To enable cluster tests, set TEST_LOCAL_CLUSTER=1 in your environment variables.');
}
29 changes: 29 additions & 0 deletions src/cluster/__tests__/refs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {RedisCluster} from '../RedisCluster';
import {tick} from 'thingies/es2020/tick';
import {ScriptRegistry} from '../../ScriptRegistry';

const setup = () => {
const scripts = new ScriptRegistry();
const client = new RedisCluster({
seeds: [{host: '127.0.0.1', port: 7000}],
});
return {client, scripts};
};

if (process.env.TEST_LOCAL_CLUSTER) {
const {client} = setup();

test('closes Node refs on .stop()', async () => {
client.start();
await tick(100);
const res = await client.cmd(['PING']);
expect(res).toBe('PONG');
});

afterAll(async () => {
await client.stop();
await tick(50);
});
} else {
test.todo('To enable cluster tests, set TEST_LOCAL_CLUSTER=1 in your environment variables.');
}
13 changes: 11 additions & 2 deletions src/standalone/StandaloneClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,19 @@ export class StandaloneClient {
public readonly psubs = new AvlMap<Uint8Array, FanOut<[channel: Uint8Array, message: Uint8Array]>>(cmpUint8Array);
public readonly ssubs = new AvlMap<Uint8Array, FanOut<Uint8Array>>(cmpUint8Array);

private _onDataUnsub?: () => void;
private _onReadyUnsub?: () => void;

constructor(opts: RedisClientOpts) {
this.scripts = opts.scripts ?? new ScriptRegistry();
const socket = (this.socket = opts.socket);
this.encoder = opts.encoder ?? new RespEncoder();
const decoder = (this.decoder = opts.decoder ?? new RespStreamingDecoder());
socket.onData.listen((data) => {
this._onDataUnsub = socket.onData.listen((data) => {
decoder.push(data);
this.scheduleRead();
});
socket.onReady.listen(() => {
this._onReadyUnsub = socket.onReady.listen(() => {
this.hello(3, opts.pwd, opts.user, true)
.then(() => {
this.__whenReady.resolve();
Expand Down Expand Up @@ -201,6 +204,12 @@ export class StandaloneClient {
}

public stop() {
this._onDataUnsub?.();
this._onReadyUnsub?.();
clearImmediate(this.encodingTimer);
this.encodingTimer = undefined;
clearImmediate(this.decodingTimer);
this.decodingTimer = undefined;
this.socket.stop();
}

Expand Down
33 changes: 33 additions & 0 deletions src/standalone/__tests__/kv.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import * as net from 'net';
import * as config from '../../__tests__/config';
import {ReconnectingSocket} from '../../util/ReconnectingSocket';
import {StandaloneClient} from '../StandaloneClient';
import {ScriptRegistry} from '../../ScriptRegistry';
import {KvBlobStore} from '../../__tests__/kv/KvBlobStore';
import {utf8} from '../../util/buf';
import {runKvBlobStoreTests} from '../../__tests__/kv/runKvBlobStoreTests';
import {tick} from 'thingies';

const setup = () => {
const host = config.standalone.host;
const port = config.standalone.port;
const scripts = new ScriptRegistry();
const client = new StandaloneClient({
scripts,
socket: new ReconnectingSocket({
createSocket: () => net.connect({host, port}),
}),
});
const kv = new KvBlobStore(utf8`kv:`, client);
client.start();
return {client, scripts, kv};
};

const {kv, client} = setup();

runKvBlobStoreTests(kv);

afterAll(async () => {
await client.stop();
await tick(50);
});
Loading

0 comments on commit 2045eaa

Please sign in to comment.