From f0b4c0e8476e3920e772c96ab879b4b35279f87a Mon Sep 17 00:00:00 2001 From: vmarchaud Date: Sat, 23 May 2020 15:31:50 +0200 Subject: [PATCH 1/3] fix(asynchooks-scope): fix context loss using .with() #1101 --- .../src/AsyncHooksContextManager.ts | 116 +++++----- .../test/AsyncHooksContextManager.test.ts | 213 ++++-------------- 2 files changed, 90 insertions(+), 239 deletions(-) diff --git a/packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts b/packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts index f970ab397d..790acec6f0 100644 --- a/packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts +++ b/packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts @@ -29,19 +29,6 @@ type PatchedEventEmitter = { __ot_listeners?: { [name: string]: WeakMap, Func> }; } & EventEmitter; -class Reference { - constructor(private _value: T) {} - - set(value: T) { - this._value = value; - return this; - } - - get() { - return this._value; - } -} - const ADD_LISTENER_METHODS = [ 'addListener' as 'addListener', 'on' as 'on', @@ -52,72 +39,39 @@ const ADD_LISTENER_METHODS = [ export class AsyncHooksContextManager implements ContextManager { private _asyncHook: asyncHooks.AsyncHook; - private _contextRefs: Map | undefined> = new Map(); + private _contexts: Map = new Map(); + private _stack: Array = []; constructor() { this._asyncHook = asyncHooks.createHook({ init: this._init.bind(this), + before: this._before.bind(this), + after: this._after.bind(this), destroy: this._destroy.bind(this), promiseResolve: this._destroy.bind(this), }); } active(): Context { - const ref = this._contextRefs.get(asyncHooks.executionAsyncId()); - return ref === undefined ? Context.ROOT_CONTEXT : ref.get(); + return this._stack[0] ?? Context.ROOT_CONTEXT; } with ReturnType>( context: Context, fn: T ): ReturnType { - const uid = asyncHooks.executionAsyncId(); - let ref = this._contextRefs.get(uid); - let oldContext: Context | undefined = undefined; - if (ref === undefined) { - ref = new Reference(context); - this._contextRefs.set(uid, ref); - } else { - oldContext = ref.get(); - ref.set(context); - } + this._enterContext(context); try { - return fn(); - } finally { - if (oldContext === undefined) { - this._destroy(uid); - } else { - ref.set(oldContext); - } + const result = fn(); + this._exitContext(); + return result; + } catch (err) { + this._exitContext(); + throw err; } } - async withAsync, U extends (...args: unknown[]) => T>( - context: Context, - fn: U - ): Promise { - const uid = asyncHooks.executionAsyncId(); - let ref = this._contextRefs.get(uid); - let oldContext: Context | undefined = undefined; - if (ref === undefined) { - ref = new Reference(context); - this._contextRefs.set(uid, ref); - } else { - oldContext = ref.get(); - ref.set(context); - } - try { - return await fn(); - } finally { - if (oldContext === undefined) { - this._destroy(uid); - } else { - ref.set(oldContext); - } - } - } - - bind(target: T, context: Context): T { + bind(target: T, context?: Context): T { // if no specific context to propagate is given, we use the current one if (context === undefined) { context = this.active(); @@ -137,7 +91,8 @@ export class AsyncHooksContextManager implements ContextManager { disable(): this { this._asyncHook.disable(); - this._contextRefs.clear(); + this._contexts.clear(); + this._stack = []; return this; } @@ -156,6 +111,7 @@ export class AsyncHooksContextManager implements ContextManager { * It isn't possible to tell Typescript that contextWrapper is the same as T * so we forced to cast as any here. */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any return contextWrapper as any; } @@ -271,9 +227,9 @@ export class AsyncHooksContextManager implements ContextManager { * @param uid id of the async context */ private _init(uid: number) { - const ref = this._contextRefs.get(asyncHooks.executionAsyncId()); - if (ref !== undefined) { - this._contextRefs.set(uid, ref); + const context = this._stack[0]; + if (context !== undefined) { + this._contexts.set(uid, context); } } @@ -283,6 +239,38 @@ export class AsyncHooksContextManager implements ContextManager { * @param uid uid of the async context */ private _destroy(uid: number) { - this._contextRefs.delete(uid); + this._contexts.delete(uid); + } + + /** + * Before hook is called just beforing executing a async context. + * @param uid uid of the async context + */ + private _before(uid: number) { + const context = this._contexts.get(uid); + if (context !== undefined) { + this._enterContext(context); + } + } + + /** + * After hook is called just after completing the execution of a async context. + */ + private _after() { + this._exitContext(); + } + + /** + * Set the given context as active + */ + private _enterContext(context: Context) { + this._stack.unshift(context); + } + + /** + * Remove the context at the root of the stack + */ + private _exitContext() { + this._stack.shift(); } } diff --git a/packages/opentelemetry-context-async-hooks/test/AsyncHooksContextManager.test.ts b/packages/opentelemetry-context-async-hooks/test/AsyncHooksContextManager.test.ts index 1c2429694e..49269b798b 100644 --- a/packages/opentelemetry-context-async-hooks/test/AsyncHooksContextManager.test.ts +++ b/packages/opentelemetry-context-async-hooks/test/AsyncHooksContextManager.test.ts @@ -102,172 +102,71 @@ describe('AsyncHooksContextManager', () => { return done(); }); }); - }); - - describe('.withAsync()', () => { - it('should run the callback', async () => { - let done = false; - await contextManager.withAsync(Context.ROOT_CONTEXT, async () => { - done = true; - }); - - assert.ok(done); - }); - - it('should run the callback with active scope', async () => { - const test = Context.ROOT_CONTEXT.setValue(key1, 1); - await contextManager.withAsync(test, async () => { - assert.strictEqual(contextManager.active(), test, 'should have scope'); - }); - }); - - it('should run the callback (when disabled)', async () => { - contextManager.disable(); - let done = false; - await contextManager.withAsync(Context.ROOT_CONTEXT, async () => { - done = true; - }); - - assert.ok(done); - }); - it('should rethrow errors', async () => { - contextManager.disable(); - let done = false; - const err = new Error(); - - try { - await contextManager.withAsync(Context.ROOT_CONTEXT, async () => { - throw err; - }); - } catch (e) { - assert.ok(e === err); - done = true; - } - - assert.ok(done); - }); - - it('should finally restore an old scope', async () => { - const scope1 = '1' as any; - const scope2 = '2' as any; - let done = false; - - await contextManager.withAsync(scope1, async () => { - assert.strictEqual(contextManager.active(), scope1); - await contextManager.withAsync(scope2, async () => { - assert.strictEqual(contextManager.active(), scope2); - done = true; + it('should finally restore an old context', done => { + const ctx1 = Context.ROOT_CONTEXT.setValue(key1, 'ctx1'); + contextManager.with(ctx1, () => { + assert.strictEqual(contextManager.active(), ctx1); + setTimeout(() => { + assert.strictEqual(contextManager.active(), ctx1); + return done(); }); - assert.strictEqual(contextManager.active(), scope1); }); - - assert.ok(done); }); - }); - describe('.withAsync/with()', () => { - it('with() inside withAsync() should correctly restore context', async () => { + it('async function called from nested "with" sync function should return nested context', done => { const scope1 = '1' as any; const scope2 = '2' as any; - let done = false; - await contextManager.withAsync(scope1, async () => { + const asyncFuncCalledDownstreamFromSync = async () => { + await (async () => {})(); + assert.strictEqual(contextManager.active(), scope2); + return done(); + }; + + contextManager.with(scope1, () => { assert.strictEqual(contextManager.active(), scope1); - contextManager.with(scope2, () => { - assert.strictEqual(contextManager.active(), scope2); - done = true; - }); + contextManager.with(scope2, () => asyncFuncCalledDownstreamFromSync()); assert.strictEqual(contextManager.active(), scope1); }); - - assert.ok(done); + assert.strictEqual(contextManager.active(), Context.ROOT_CONTEXT); }); - it('withAsync() inside with() should correctly restore conxtext', done => { + it('should not loose the context', done => { const scope1 = '1' as any; - const scope2 = '2' as any; contextManager.with(scope1, async () => { assert.strictEqual(contextManager.active(), scope1); - await contextManager.withAsync(scope2, async () => { - assert.strictEqual(contextManager.active(), scope2); - }); + await new Promise(resolve => setTimeout(resolve, 100)); assert.strictEqual(contextManager.active(), scope1); return done(); }); assert.strictEqual(contextManager.active(), Context.ROOT_CONTEXT); }); - it('not awaited withAsync() inside with() should not restore context', done => { + it('should correctly restore context using async/await', async () => { const scope1 = '1' as any; const scope2 = '2' as any; - let _done = false; + const scope3 = '3' as any; + const scope4 = '4' as any; - contextManager.with(scope1, () => { + await contextManager.with(scope1, async () => { assert.strictEqual(contextManager.active(), scope1); - contextManager - .withAsync(scope2, async () => { - assert.strictEqual(contextManager.active(), scope2); - }) - .then(() => { - assert.strictEqual(contextManager.active(), scope1); - _done = true; - }); - // in this case the current scope is 2 since we - // didnt waited the withAsync call - assert.strictEqual(contextManager.active(), scope2); - setTimeout(() => { - assert.strictEqual(contextManager.active(), scope1); - assert(_done); - return done(); - }, 100); - }); - assert.strictEqual(contextManager.active(), Context.ROOT_CONTEXT); - }); - - it('withAsync() inside a setTimeout inside a with() should correctly restore context', done => { - const scope1 = '1' as any; - const scope2 = '2' as any; - - contextManager.with(scope1, () => { - assert.strictEqual(contextManager.active(), scope1); - setTimeout(() => { - assert.strictEqual(contextManager.active(), scope1); - contextManager - .withAsync(scope2, async () => { - assert.strictEqual(contextManager.active(), scope2); - }) - .then(() => { - assert.strictEqual(contextManager.active(), scope1); - return done(); + await contextManager.with(scope2, async () => { + assert.strictEqual(contextManager.active(), scope2); + await contextManager.with(scope3, async () => { + assert.strictEqual(contextManager.active(), scope3); + await contextManager.with(scope4, async () => { + assert.strictEqual(contextManager.active(), scope4); }); - }, 5); + assert.strictEqual(contextManager.active(), scope3); + }); + assert.strictEqual(contextManager.active(), scope2); + }); assert.strictEqual(contextManager.active(), scope1); }); assert.strictEqual(contextManager.active(), Context.ROOT_CONTEXT); }); - - it('with() inside a setTimeout inside withAsync() should correctly restore context', done => { - const scope1 = '1' as any; - const scope2 = '2' as any; - - contextManager - .withAsync(scope1, async () => { - assert.strictEqual(contextManager.active(), scope1); - setTimeout(() => { - assert.strictEqual(contextManager.active(), scope1); - contextManager.with(scope2, () => { - assert.strictEqual(contextManager.active(), scope2); - return done(); - }); - }, 5); - assert.strictEqual(contextManager.active(), scope1); - }) - .then(() => { - assert.strictEqual(contextManager.active(), scope1); - }); - }); }); describe('.bind(function)', () => { @@ -320,31 +219,15 @@ describe('AsyncHooksContextManager', () => { fn(); }); - it('should fail to return current context (when disabled + async op)', done => { - contextManager.disable(); - const context = Context.ROOT_CONTEXT.setValue(key1, 1); - const fn = contextManager.bind(() => { - setTimeout(() => { - assert.strictEqual( - contextManager.active(), - Context.ROOT_CONTEXT, - 'should have no context' - ); - return done(); - }, 100); - }, context); - fn(); - }); - - it('should return current context (when re-enabled + async op)', done => { - contextManager.enable(); + it('should fail to return current context with async op', done => { const context = Context.ROOT_CONTEXT.setValue(key1, 1); const fn = contextManager.bind(() => { + assert.strictEqual(contextManager.active(), context); setTimeout(() => { assert.strictEqual( contextManager.active(), context, - 'should have context' + 'should have no context' ); return done(); }, 100); @@ -363,7 +246,6 @@ describe('AsyncHooksContextManager', () => { const ee = new EventEmitter(); contextManager.disable(); assert.deepStrictEqual(contextManager.bind(ee, Context.ROOT_CONTEXT), ee); - contextManager.enable(); }); it('should return current context and removeListener (when enabled)', done => { @@ -409,7 +291,6 @@ describe('AsyncHooksContextManager', () => { assert.deepStrictEqual(contextManager.active(), context); patchedEe.removeListener('test', handler); assert.strictEqual(patchedEe.listeners('test').length, 0); - contextManager.enable(); return done(); }; patchedEe.on('test', handler); @@ -417,30 +298,12 @@ describe('AsyncHooksContextManager', () => { patchedEe.emit('test'); }); - it('should not return current context (when disabled + async op)', done => { - contextManager.disable(); - const ee = new EventEmitter(); - const context = Context.ROOT_CONTEXT.setValue(key1, 1); - const patchedEe = contextManager.bind(ee, context); - const handler = () => { - setImmediate(() => { - assert.deepStrictEqual(contextManager.active(), Context.ROOT_CONTEXT); - patchedEe.removeAllListeners('test'); - assert.strictEqual(patchedEe.listeners('test').length, 0); - return done(); - }); - }; - patchedEe.on('test', handler); - assert.strictEqual(patchedEe.listeners('test').length, 1); - patchedEe.emit('test'); - }); - - it('should return current context (when enabled + async op)', done => { - contextManager.enable(); + it('should not return current context with async op', done => { const ee = new EventEmitter(); const context = Context.ROOT_CONTEXT.setValue(key1, 1); const patchedEe = contextManager.bind(ee, context); const handler = () => { + assert.deepStrictEqual(contextManager.active(), context); setImmediate(() => { assert.deepStrictEqual(contextManager.active(), context); patchedEe.removeAllListeners('test'); From fa3b76a74a6a17a9704c0443393a3390c39c611d Mon Sep 17 00:00:00 2001 From: vmarchaud Date: Thu, 28 May 2020 14:43:24 +0200 Subject: [PATCH 2/3] chore: address PR comments --- .../src/AsyncHooksContextManager.ts | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts b/packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts index 790acec6f0..adeef82c25 100644 --- a/packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts +++ b/packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts @@ -53,7 +53,7 @@ export class AsyncHooksContextManager implements ContextManager { } active(): Context { - return this._stack[0] ?? Context.ROOT_CONTEXT; + return this._stack[this._stack.length - 1] ?? Context.ROOT_CONTEXT; } with ReturnType>( @@ -62,12 +62,9 @@ export class AsyncHooksContextManager implements ContextManager { ): ReturnType { this._enterContext(context); try { - const result = fn(); + return fn(); + } finally { this._exitContext(); - return result; - } catch (err) { - this._exitContext(); - throw err; } } @@ -227,7 +224,7 @@ export class AsyncHooksContextManager implements ContextManager { * @param uid id of the async context */ private _init(uid: number) { - const context = this._stack[0]; + const context = this._stack[this._stack.length - 1]; if (context !== undefined) { this._contexts.set(uid, context); } @@ -264,13 +261,13 @@ export class AsyncHooksContextManager implements ContextManager { * Set the given context as active */ private _enterContext(context: Context) { - this._stack.unshift(context); + this._stack.push(context); } /** * Remove the context at the root of the stack */ private _exitContext() { - this._stack.shift(); + this._stack.pop(); } } From b2ae575fd76771f3c3eb4c4ad2630e477e8beb66 Mon Sep 17 00:00:00 2001 From: vmarchaud Date: Thu, 28 May 2020 15:10:24 +0200 Subject: [PATCH 3/3] test: add case for multiple concurrent operations --- .../test/AsyncHooksContextManager.test.ts | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/packages/opentelemetry-context-async-hooks/test/AsyncHooksContextManager.test.ts b/packages/opentelemetry-context-async-hooks/test/AsyncHooksContextManager.test.ts index 49269b798b..15f403d622 100644 --- a/packages/opentelemetry-context-async-hooks/test/AsyncHooksContextManager.test.ts +++ b/packages/opentelemetry-context-async-hooks/test/AsyncHooksContextManager.test.ts @@ -167,6 +167,40 @@ describe('AsyncHooksContextManager', () => { }); assert.strictEqual(contextManager.active(), Context.ROOT_CONTEXT); }); + + it('should works with multiple concurrent operations', done => { + const scope1 = '1' as any; + const scope2 = '2' as any; + const scope3 = '3' as any; + const scope4 = '4' as any; + let scope4Called = false; + + contextManager.with(scope1, async () => { + assert.strictEqual(contextManager.active(), scope1); + setTimeout(async () => { + await contextManager.with(scope3, async () => { + assert.strictEqual(contextManager.active(), scope3); + }); + assert.strictEqual(contextManager.active(), scope1); + assert.strictEqual(scope4Called, true); + return done(); + }, 100); + assert.strictEqual(contextManager.active(), scope1); + }); + assert.strictEqual(contextManager.active(), Context.ROOT_CONTEXT); + contextManager.with(scope2, async () => { + assert.strictEqual(contextManager.active(), scope2); + setTimeout(() => { + contextManager.with(scope4, async () => { + assert.strictEqual(contextManager.active(), scope4); + scope4Called = true; + }); + assert.strictEqual(contextManager.active(), scope2); + }, 20); + assert.strictEqual(contextManager.active(), scope2); + }); + assert.strictEqual(contextManager.active(), Context.ROOT_CONTEXT); + }); }); describe('.bind(function)', () => {