Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncLocalStorage driver #39

Merged
merged 16 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,660 changes: 2,592 additions & 1,068 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@
},
"dependencies": {
"@types/cls-hooked": "^4.3.3",
"cls-hooked": "^4.2.2"
"cls-hooked": "^4.2.2",
"semver": "^7.5.1"
},
"devDependencies": {
"@nestjs/common": "^9.0.7",
"@nestjs/core": "^9.0.7",
"@nestjs/testing": "^9.0.7",
"@nestjs/typeorm": "^9.0.0",
"@types/jest": "^28.1.6",
"@types/semver": "^7.5.0",
"@typescript-eslint/eslint-plugin": "^5.31.0",
"@typescript-eslint/parser": "^5.31.0",
"eslint": "^8.20.0",
Expand Down
29 changes: 20 additions & 9 deletions src/common/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { createNamespace, getNamespace, Namespace } from 'cls-hooked';
import { DataSource, EntityManager, Repository } from 'typeorm';
import { EventEmitter } from 'events';

import {
NAMESPACE_NAME,
TYPEORM_DATA_SOURCE_NAME,
TYPEORM_DATA_SOURCE_NAME_PREFIX,
TYPEORM_ENTITY_MANAGER_NAME,
TYPEORM_HOOK_NAME,
} from './constants';
import { EventEmitter } from 'events';
import { StorageDriver as StorageDriverEnum } from '../enums/storage-driver';
import { TypeOrmUpdatedPatchError } from '../errors/typeorm-updated-patch';
import { StorageDriver } from '../storage/driver/interface';
import { isDataSource } from '../utils';
import { storage } from '../storage';

export type DataSourceName = string | 'default';

Expand All @@ -23,6 +25,13 @@ interface TypeormTransactionalOptions {
* You can set this options to `0` or `Infinity` to indicate an unlimited number of listeners.
*/
maxHookHandlers: number;

/**
* Controls storage driver used for providing persistency during the async request timespan.
* You can force any of the available drivers with this option.
* By default, the modern AsyncLocalStorage will be preferred, if it is supported by your runtime.
*/
storageDriver: StorageDriverEnum;
}

/**
Expand Down Expand Up @@ -60,19 +69,20 @@ const dataSources = new Map<DataSourceName, DataSource>();
const data: TypeormTransactionalData = {
options: {
maxHookHandlers: 10,
storageDriver: StorageDriverEnum.CLS_HOOKED,
},
};

export const getTransactionalContext = () => getNamespace(NAMESPACE_NAME);
export const getTransactionalContext = () => storage.get();

export const getEntityManagerByDataSourceName = (context: Namespace, name: DataSourceName) => {
export const getEntityManagerByDataSourceName = (context: StorageDriver, name: DataSourceName) => {
if (!dataSources.has(name)) return null;

return (context.get(TYPEORM_DATA_SOURCE_NAME_PREFIX + name) as EntityManager) || null;
};

export const setEntityManagerByDataSourceName = (
context: Namespace,
context: StorageDriver,
name: DataSourceName,
entityManager: EntityManager | null,
) => {
Expand Down Expand Up @@ -192,7 +202,8 @@ export const initializeTransactionalContext = (options?: Partial<TypeormTransact

patchManager(Repository.prototype);

return createNamespace(NAMESPACE_NAME) || getNamespace(NAMESPACE_NAME);
const { storageDriver } = getTransactionalOptions();
return storage.create(storageDriver);
};

export const addTransactionalDataSource = (input: DataSource | AddTransactionalDataSourceInput) => {
Expand Down Expand Up @@ -221,8 +232,8 @@ export const getDataSourceByName = (name: DataSourceName) => dataSources.get(nam

export const deleteDataSourceByName = (name: DataSourceName) => dataSources.delete(name);

export const getHookInContext = (context: Namespace | undefined) =>
export const getHookInContext = (context: StorageDriver | undefined) =>
context?.get(TYPEORM_HOOK_NAME) as EventEmitter | null;

export const setHookInContext = (context: Namespace, emitter: EventEmitter | null) =>
export const setHookInContext = (context: StorageDriver, emitter: EventEmitter | null) =>
context.set(TYPEORM_HOOK_NAME, emitter);
20 changes: 20 additions & 0 deletions src/enums/storage-driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Enumeration that represents storage engines to use with {@link initializeTransactionalContext}
*/
export enum StorageDriver {
/**
* Uses AsyncLocalStorage when node >= 16 and cls-hooked otherwise
*/
AUTO = 'AUTO',

/**
* Supports legacy node versions
* Uses AcyncWrap for node < 8.2.1 and async_hooks otherwise
*/
CLS_HOOKED = 'CLS_HOOKED',

/**
* Uses AsyncLocalStorage which is available sice node 16
*/
ASYNC_LOCAL_STORAGE = 'ASYNC_LOCAL_STORAGE',
}
18 changes: 7 additions & 11 deletions src/hooks/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { EventEmitter } from 'events';
import { Namespace } from 'cls-hooked';

import {
getHookInContext,
getTransactionalContext,
getTransactionalOptions,
setHookInContext,
} from '../common';
import { StorageDriver } from '../storage/driver/interface';

export const getTransactionalContextHook = () => {
const context = getTransactionalContext();
Expand Down Expand Up @@ -43,22 +43,18 @@ export const runAndTriggerHooks = async (hook: EventEmitter, cb: () => unknown)
}
};

export const createEventEmitterInNewContext = (context: Namespace) => {
export const createEventEmitterInNewContext = (context: StorageDriver) => {
const options = getTransactionalOptions();

return context.runAndReturn(() => {
const emitter = new EventEmitter();
emitter.setMaxListeners(options.maxHookHandlers);

context.bindEmitter(emitter);
return emitter;
});
const emitter = new EventEmitter();
emitter.setMaxListeners(options.maxHookHandlers);
return emitter;
};

export const runInNewHookContext = async (context: Namespace, cb: () => unknown) => {
export const runInNewHookContext = async (context: StorageDriver, cb: () => unknown) => {
const hook = createEventEmitterInNewContext(context);

return await context.runAndReturn(() => {
return await context.run(() => {
setHookInContext(context, hook);

return runAndTriggerHooks(hook, cb);
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export {
runOnTransactionComplete,
} from './hooks';
export { Transactional } from './decorators/transactional';
export { StorageDriver } from './enums/storage-driver';
export { Propagation } from './enums/propagation';
export { IsolationLevel } from './enums/isolation-level';
export { runInTransaction } from './transactions/run-in-transaction';
Expand Down
83 changes: 83 additions & 0 deletions src/storage/driver/async-local-storage/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { AsyncLocalStorage } from 'node:async_hooks';

import { StorageKey, StorageValue, StorageDriver, Storage } from '../interface';

class Store {
// Ref: https://github.com/Jeff-Lewis/cls-hooked/blob/master/context.js#L24
// AsyncLocalStorage behaves differently from cls-hooked, as it expects
// store to be passed on each `.run` call. cls-hooked manages the storage
// and it's layers for the user. This class replicates cls-hooked behavior.

private layers: (Storage | undefined)[] = [];
private storage: Storage | undefined;

get active() {
return !!this.storage;
}

public get<T>(key: StorageKey): T {
return this.storage?.get(key) as T;
}

public set(key: StorageKey, value: StorageValue): void {
this.storage?.set(key, value);
}

public enter() {
// Ref: https://github.com/Jeff-Lewis/cls-hooked/blob/master/context.js#L184-L195

const newStorage = new Map(this.storage);
this.layers.push(this.storage);
this.storage = newStorage;
return newStorage;
}

public exit(storage: Storage) {
// Ref: https://github.com/Jeff-Lewis/cls-hooked/blob/master/context.js#L197-L225

if (this.storage === storage) {
this.storage = this.layers.pop() ?? new Map();
return;
}

const index = this.layers.lastIndexOf(storage);

if (index >= 0) {
this.layers.splice(index, 1);
}
}
}

export class AsyncLocalStorageDriver implements StorageDriver {
private context: AsyncLocalStorage<Store>;

constructor() {
this.context = new AsyncLocalStorage();
}

get active() {
return this.store.active;
}

private get store() {
return this.context.getStore() || new Store();
}

public get<T>(key: StorageKey): T {
return this.store?.get(key);
}

public set(key: StorageKey, value: StorageValue): void {
this.store?.set(key, value);
}

public async run<T>(cb: () => Promise<T>): Promise<T> {
const storage = this.store.enter();

try {
return await this.context.run(this.store, cb);
} finally {
this.store.exit(storage);
}
}
}
1 change: 1 addition & 0 deletions src/storage/driver/cls-hooked/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const NAMESPACE_TOKEN = '@transactional/namespace';
32 changes: 32 additions & 0 deletions src/storage/driver/cls-hooked/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Namespace, createNamespace, getNamespace } from 'cls-hooked';

import { StorageKey, StorageValue, StorageDriver } from '../interface';
import { NAMESPACE_TOKEN } from './constants';

export class ClsHookedDriver implements StorageDriver {
private context: Namespace;

constructor() {
this.context = getNamespace(NAMESPACE_TOKEN) ?? createNamespace(NAMESPACE_TOKEN);
}

get active() {
return !!this.context.active;
}

private get store() {
return this.context.active;
}

public get<T>(key: StorageKey): T {
return this.context.get(key) as T;
}

public set(key: StorageKey, value: StorageValue): void {
this.context.set(key, value);
}

public run<T>(cb: () => T): T {
return this.context.runAndReturn(cb);
}
}
10 changes: 10 additions & 0 deletions src/storage/driver/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export type StorageKey = string;
export type StorageValue = unknown;
export type Storage = Map<StorageKey, StorageValue>;

export interface StorageDriver {
active: boolean;
get(key: StorageKey): StorageValue;
set(key: StorageKey, value: StorageValue): void;
run<T>(cb: () => Promise<T>): Promise<T>;
}
3 changes: 3 additions & 0 deletions src/storage/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { Storage } from './storage';

export const storage = new Storage();
56 changes: 56 additions & 0 deletions src/storage/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { gte } from 'semver';

import { StorageDriver as StorageDriverEnum } from '../enums/storage-driver';
import { AsyncLocalStorageDriver } from './driver/async-local-storage';
import type { StorageDriver } from './driver/interface';
import { ClsHookedDriver } from './driver/cls-hooked';

interface StorageDriverConstructor {
new (): StorageDriver;
}

export class Storage {
private driver: StorageDriver;

public create(storageDriverEnum?: StorageDriverEnum) {
if (this.driver) {
// We probably should not allow calling this function when driver is already defined
return this.driver;
}

const DriverConstructor = this.getDriverConstructor(storageDriverEnum);
this.driver = new DriverConstructor();

return this.driver;
}

public get() {
if (!this.driver) {
throw new Error(
'No storage driver defined in your app ... please call initializeTransactionalContext() before application start.',
);
}

return this.driver;
}

private getDriverConstructor(storageDriverEnum?: StorageDriverEnum): StorageDriverConstructor {
switch (storageDriverEnum) {
case StorageDriverEnum.ASYNC_LOCAL_STORAGE:
return AsyncLocalStorageDriver;
case StorageDriverEnum.CLS_HOOKED:
return ClsHookedDriver;
case StorageDriverEnum.AUTO:
default:
return this.getBestSupportedDriverConstructor();
}
}

private getBestSupportedDriverConstructor(): StorageDriverConstructor {
if (process && gte(process.versions.node, '16.0.0')) {
return AsyncLocalStorageDriver;
}

return ClsHookedDriver;
}
}
2 changes: 1 addition & 1 deletion src/transactions/wrap-in-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export const wrapInTransaction = <Fn extends (this: any, ...args: any[]) => Retu
}
};

return context.runAndReturn(async () => {
return context.run(async () => {
const currentTransaction = getEntityManagerByDataSourceName(context, connectionName);
switch (propagation) {
case Propagation.MANDATORY:
Expand Down
7 changes: 6 additions & 1 deletion tests/custom-repos.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { PostRepository } from './repositories/post.repository';
import { extendPostRepository } from './repositories/extend-post-repository';

import {
StorageDriver,
addTransactionalDataSource,
initializeTransactionalContext,
runInTransaction,
Expand Down Expand Up @@ -33,7 +34,11 @@ describe('Custom repositories tests', () => {
synchronize: true,
});

initializeTransactionalContext();
const storageDriver = process.env.TEST_STORAGE_DRIVER && process.env.TEST_STORAGE_DRIVER in StorageDriver
? StorageDriver[process.env.TEST_STORAGE_DRIVER as keyof typeof StorageDriver]
: StorageDriver.CLS_HOOKED;

initializeTransactionalContext({ storageDriver });

addTransactionalDataSource(dataSource);

Expand Down
Loading