diff --git a/packages/core/src/concurrent/pool.ts b/packages/core/src/concurrent/pool.ts index 468ac0c294..4b0d7077bd 100644 --- a/packages/core/src/concurrent/pool.ts +++ b/packages/core/src/concurrent/pool.ts @@ -1,11 +1,10 @@ -import { lastValueFrom, Observable, Subject, merge, zip } from 'rxjs'; -import { mergeMap, filter, shareReplay, tap } from 'rxjs/operators'; +import { TestRunner } from '@stryker-mutator/api/test-runner'; import { notEmpty } from '@stryker-mutator/util'; +import { BehaviorSubject, filter, ignoreElements, lastValueFrom, mergeMap, Observable, ReplaySubject, Subject, takeUntil, tap, zip } from 'rxjs'; import { Disposable, tokens } from 'typed-inject'; -import { TestRunner } from '@stryker-mutator/api/test-runner'; -import { coreTokens } from '../di/index.js'; import { CheckerFacade } from '../checker/index.js'; +import { coreTokens } from '../di/index.js'; const MAX_CONCURRENT_INIT = 2; @@ -28,6 +27,38 @@ export function createCheckerPool(factory: () => CheckerFacade, concurrencyToken return new Pool(factory, concurrencyToken$); } +/** + * Represents a work item: an input with a task and with a `result$` observable where the result (exactly one) will be streamed to. + */ +class WorkItem { + private readonly resultSubject = new Subject(); + public readonly result$ = this.resultSubject.asObservable(); + + /** + * @param input The input to the ask + * @param task The task, where a resource and input is presented + */ + constructor(private readonly input: TIn, private readonly task: (resource: TResource, input: TIn) => Promise | TOut) {} + + public async execute(resource: TResource) { + try { + const output = await this.task(resource, this.input); + this.resultSubject.next(output); + this.resultSubject.complete(); + } catch (err) { + this.resultSubject.error(err); + } + } + + public reject(error: unknown) { + this.resultSubject.error(error); + } + + public complete() { + this.resultSubject.complete(); + } +} + /** * Represents a pool of resources. Use `schedule` to schedule work to be executed on the resources. * The pool will automatically recycle the resources, but will make sure only one task is executed @@ -35,26 +66,69 @@ export function createCheckerPool(factory: () => CheckerFacade, concurrencyToken * Also takes care of the initialing of the resources (with `init()`) */ export class Pool implements Disposable { + // The init subject. Using an RxJS subject instead of a promise, so errors are silently ignored when nobody is listening + private readonly initSubject = new ReplaySubject(); + + // The disposedSubject emits true when it is disposed, and false when not disposed yet + private readonly disposedSubject = new BehaviorSubject(false); + + // The dispose$ only emits one `true` value when disposed (never emits `false`). Useful for `takeUntil` + private readonly dispose$ = this.disposedSubject.pipe(filter((isDisposed) => isDisposed)); + private readonly createdResources: TResource[] = []; - private readonly resource$: Observable; + // The queued work items. This is a replay subject, so scheduled work items can easily be rejected after it was picked up + private readonly todoSubject = new ReplaySubject>(); constructor(factory: () => TResource, concurrencyToken$: Observable) { - this.resource$ = concurrencyToken$.pipe( - mergeMap(async () => { - if (this.isDisposed) { - return null; - } else { + // Stream resources that are ready to pick up work + const resourcesSubject = new Subject(); + + // Stream ongoing work. + zip(resourcesSubject, this.todoSubject) + .pipe( + mergeMap(async ([resource, workItem]) => { + await workItem.execute(resource); + resourcesSubject.next(resource); // recycle resource so it can pick up more work + }), + ignoreElements(), + takeUntil(this.dispose$) + ) + .subscribe({ + error: (error) => { + this.todoSubject.subscribe((workItem) => workItem.reject(error)); + }, + }); + + // Create resources + concurrencyToken$ + .pipe( + takeUntil(this.dispose$), + mergeMap(async () => { + if (this.disposedSubject.value) { + // Don't create new resources when disposed + return; + } const resource = factory(); this.createdResources.push(resource); await resource.init?.(); return resource; - } - }, MAX_CONCURRENT_INIT), - filter(notEmpty), - // We use share replay here. This way the dry run can use a test runner that is later reused during mutation testing - // https://www.learnrxjs.io/learn-rxjs/operators/multicasting/sharereplay - shareReplay() - ); + }, MAX_CONCURRENT_INIT), + filter(notEmpty), + tap({ + complete: () => { + // Signal init complete + this.initSubject.next(); + this.initSubject.complete(); + }, + error: (err) => { + this.initSubject.error(err); + }, + }) + ) + .subscribe({ + next: (resource) => resourcesSubject.next(resource), + error: (err) => resourcesSubject.error(err), + }); } /** @@ -62,7 +136,7 @@ export class Pool implements Disposable { * This is optional, resources will get initialized either way. */ public async init(): Promise { - await lastValueFrom(this.resource$); + await lastValueFrom(this.initSubject); } /** @@ -71,27 +145,24 @@ export class Pool implements Disposable { * @param task The task to execute on each resource */ public schedule(input$: Observable, task: (resource: TResource, input: TIn) => Promise | TOut): Observable { - const recycleBin = new Subject(); - const resource$ = merge(recycleBin, this.resource$); - - return zip(resource$, input$).pipe( - mergeMap(async ([resource, input]) => { - const output = await task(resource, input); - // Recycles a resource so its re-emitted from the `resource$` observable. - recycleBin.next(resource); - return output; - }), - tap({ complete: () => recycleBin.complete() }) + return input$.pipe( + mergeMap((input) => { + const workItem = new WorkItem(input, task); + this.todoSubject.next(workItem); + return workItem.result$; + }) ); } - private isDisposed = false; - /** * Dispose the pool */ public async dispose(): Promise { - this.isDisposed = true; - await Promise.all(this.createdResources.map((resource) => resource.dispose?.())); + if (!this.disposedSubject.value) { + this.disposedSubject.next(true); + this.todoSubject.subscribe((workItem) => workItem.complete()); + this.todoSubject.complete(); + await Promise.all(this.createdResources.map((resource) => resource.dispose?.())); + } } } diff --git a/packages/core/test/unit/concurrent/pool.spec.ts b/packages/core/test/unit/concurrent/pool.spec.ts index 9cd02c10a4..ffe3d9f6a5 100644 --- a/packages/core/test/unit/concurrent/pool.spec.ts +++ b/packages/core/test/unit/concurrent/pool.spec.ts @@ -1,9 +1,8 @@ import { expect } from 'chai'; -import { toArray } from 'rxjs/operators'; import sinon from 'sinon'; import { factory, tick } from '@stryker-mutator/test-helpers'; import { Task, ExpirableTask } from '@stryker-mutator/util'; -import { lastValueFrom, range, ReplaySubject } from 'rxjs'; +import { toArray, mergeWith, lastValueFrom, range, ReplaySubject } from 'rxjs'; import { Pool, Resource } from '../../../src/concurrent/index.js'; @@ -12,20 +11,20 @@ describe(Pool.name, () => { let worker2: sinon.SinonStubbedInstance>; let genericWorkerForAllSubsequentCreates: sinon.SinonStubbedInstance>; let createWorkerStub: sinon.SinonStub; - let concurrencyToken$: ReplaySubject; + let concurrencyTokenSubject: ReplaySubject; let sut: Pool>; beforeEach(() => { - concurrencyToken$ = new ReplaySubject(); - worker1 = factory.testRunner(); - worker2 = factory.testRunner(); + concurrencyTokenSubject = new ReplaySubject(); + worker1 = factory.testRunner(1); + worker2 = factory.testRunner(2); genericWorkerForAllSubsequentCreates = factory.testRunner(); createWorkerStub = sinon.stub(); }); - afterEach(() => { - concurrencyToken$.complete(); - sut.dispose(); + afterEach(async () => { + concurrencyTokenSubject.complete(); + await sut.dispose(); }); function arrangeWorkers() { @@ -33,11 +32,11 @@ describe(Pool.name, () => { } function createSut() { - return new Pool>(createWorkerStub, concurrencyToken$); + return new Pool>(createWorkerStub, concurrencyTokenSubject); } function setConcurrency(n: number) { - Array.from({ length: n }).forEach((_, i) => concurrencyToken$.next(i)); + range(0, n).subscribe(concurrencyTokenSubject.next.bind(concurrencyTokenSubject)); } describe('schedule', () => { @@ -73,7 +72,7 @@ describe(Pool.name, () => { arrangeWorkers(); setConcurrency(8); sut = createSut(); - const result = await captureWorkers(sut, 8); + const result = await captureWorkers(8); expect(result).lengthOf(8); expect(result).deep.eq([ worker1, @@ -92,7 +91,7 @@ describe(Pool.name, () => { const expectedError = new Error('foo error'); createWorkerStub.throws(expectedError); sut = createSut(); - await expect(captureWorkers(sut, 1)).rejectedWith(expectedError); + await expect(captureWorkers(1)).rejectedWith(expectedError); }); it('should share workers across subscribers (for sharing between dry runner and mutation test runner)', async () => { @@ -141,6 +140,47 @@ describe(Pool.name, () => { ]); await onGoingWork; }); + + it('should allow for parallel schedules, without interference (#3473)', async () => { + // Arrange + arrangeWorkers(); + setConcurrency(2); + sut = createSut(); + let nrOfParallelTasks = 0; + let maxNrOfParallelTasks = 0; + const countParallelTasks = async () => { + nrOfParallelTasks++; + maxNrOfParallelTasks = Math.max(nrOfParallelTasks, maxNrOfParallelTasks); + await tick(); + nrOfParallelTasks--; + }; + + // Act + await lastValueFrom(sut.schedule(range(0, 3), countParallelTasks).pipe(mergeWith(sut.schedule(range(3, 3), countParallelTasks)))); + await sut.dispose(); + + // Assert + expect(maxNrOfParallelTasks).eq(2); + }); + + it('should reject when an error occurs', async () => { + // Arrange + arrangeWorkers(); + setConcurrency(2); + sut = createSut(); + + // Act + const expectedError = new Error('Expected error'); + await expect( + lastValueFrom( + sut.schedule(range(0, 3), (_, n) => { + if (n === 1) { + throw expectedError; + } + }) + ) + ).rejectedWith(expectedError); + }); }); describe('init', () => { @@ -156,7 +196,7 @@ describe(Pool.name, () => { // Act const timeoutResult = await ExpirableTask.timeout(sut.init(), 20); initWorker2Task.resolve(); - concurrencyToken$.complete(); + concurrencyTokenSubject.complete(); await sut.init(); // Assert @@ -170,12 +210,12 @@ describe(Pool.name, () => { arrangeWorkers(); setConcurrency(1); sut = createSut(); - concurrencyToken$.complete(); + concurrencyTokenSubject.complete(); // Act await sut.init(); await sut.init(); - const allWorkers = await captureWorkers(sut, 1); + const allWorkers = await captureWorkers(1); // Assert expect(createWorkerStub).calledOnce; @@ -192,7 +232,7 @@ describe(Pool.name, () => { // Act const timeoutResult = await ExpirableTask.timeout(sut.init(), 20); - concurrencyToken$.complete(); + concurrencyTokenSubject.complete(); // Assert expect(timeoutResult).eq(ExpirableTask.TimeoutExpired); @@ -211,22 +251,39 @@ describe(Pool.name, () => { // Act & Assert await expect(sut.init()).rejectedWith(expectedError); - concurrencyToken$.complete(); + concurrencyTokenSubject.complete(); }); }); describe('dispose', () => { - it('should have disposed all testRunners', async () => { + it('should have disposed all workers', async () => { setConcurrency(8); arrangeWorkers(); sut = createSut(); - await captureWorkers(sut, 9); + await captureWorkers(9); await sut.dispose(); expect(worker1.dispose).called; expect(worker2.dispose).called; expect(genericWorkerForAllSubsequentCreates.dispose).called; }); + it('should dispose workers only once', async () => { + // Arrange + setConcurrency(2); + arrangeWorkers(); + concurrencyTokenSubject.complete(); + sut = createSut(); + await sut.init(); + + // Act + await sut.dispose(); + await sut.dispose(); + + // Assert + expect(worker1.dispose).calledOnce; + expect(worker2.dispose).calledOnce; + }); + it('should not do anything if no workers were created', async () => { sut = createSut(); await sut.dispose(); @@ -249,7 +306,9 @@ describe(Pool.name, () => { task.resolve(); await sut.dispose(); task2.resolve(); - concurrencyToken$.complete(); + concurrencyTokenSubject.complete(); + + // Assert await resultPromise; expect(worker1.dispose).called; expect(worker2.dispose).called; @@ -266,23 +325,23 @@ describe(Pool.name, () => { sut = createSut(); // Act - const actualTestRunnersPromise = lastValueFrom(sut.schedule(range(0, 3), (worker) => worker).pipe(toArray())); + const actualWorkers = lastValueFrom(sut.schedule(range(0, 3), (worker) => worker).pipe(toArray())); const disposePromise = sut.dispose(); task.resolve(); task2.resolve(); await disposePromise; - concurrencyToken$.complete(); - await actualTestRunnersPromise; + concurrencyTokenSubject.complete(); + await actualWorkers; // Assert expect(createWorkerStub).calledTwice; }); }); - async function captureWorkers(suite: Pool>, inputCount: number) { + async function captureWorkers(inputCount: number) { // Eagerly get all test runners const createAllPromise = lastValueFrom( - suite + sut .schedule(range(0, inputCount), async (worker) => { await tick(); return worker; @@ -292,11 +351,11 @@ describe(Pool.name, () => { // But don't await yet, until after dispose. // Allow processes to be created - await tick(inputCount); + await tick(inputCount + 1); // Dispose completes the internal recycle bin subject, which in turn will complete. - await suite.dispose(); - concurrencyToken$.complete(); + await sut.dispose(); + concurrencyTokenSubject.complete(); return createAllPromise; } }); diff --git a/packages/test-helpers/src/factory.ts b/packages/test-helpers/src/factory.ts index b467ea369a..2ec5eb1464 100644 --- a/packages/test-helpers/src/factory.ts +++ b/packages/test-helpers/src/factory.ts @@ -194,8 +194,9 @@ export function logger(): sinon.SinonStubbedInstance { }; } -export function testRunner(): sinon.SinonStubbedInstance> { +export function testRunner(index = 0): sinon.SinonStubbedInstance & { index: number }> { return { + index, capabilities: sinon.stub(), init: sinon.stub(), dryRun: sinon.stub(),