-
Notifications
You must be signed in to change notification settings - Fork 250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(mutants): Prevent memory leak when transpiling mutants #1376
Changes from 2 commits
da18585
5fd66d6
cfb1606
1420195
8c9178c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,6 @@ | |
}, | ||
"devDependencies": { | ||
"surrial": "~0.1.1", | ||
"typed-inject": "^0.2.0" | ||
"typed-inject": "^0.2.1" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
import * as os from 'os'; | ||
import { Observable, range } from 'rxjs'; | ||
import { flatMap } from 'rxjs/operators'; | ||
import { range, Subject } from 'rxjs'; | ||
import { flatMap, tap, filter } from 'rxjs/operators'; | ||
import { File, StrykerOptions } from 'stryker-api/core'; | ||
import { TestFramework } from 'stryker-api/test_framework'; | ||
import Sandbox from './Sandbox'; | ||
|
@@ -9,23 +9,94 @@ import { tokens, commonTokens } from 'stryker-api/plugin'; | |
import { coreTokens } from './di'; | ||
import { InitialTestRunResult } from './process/InitialTestExecutor'; | ||
import { Logger } from 'stryker-api/logging'; | ||
import TranspiledMutant from './TranspiledMutant'; | ||
import { Task } from './utils/Task'; | ||
import { RunResult, RunStatus } from 'stryker-api/test_runner'; | ||
|
||
const MAX_CONCURRENT_INITIALIZING_SANDBOXES = 2; | ||
|
||
export class SandboxPool { | ||
|
||
private readonly sandboxes: Promise<Sandbox>[] = []; | ||
private readonly allSandboxes: Promise<Sandbox>[] = []; | ||
private readonly overheadTimeMS: number; | ||
private readonly workerPool: Sandbox[] = []; | ||
private readonly backlog: { mutant: TranspiledMutant, task: Task<RunResult> }[] = []; | ||
private isDisposed = false; | ||
private sandboxesStarted = false; | ||
private persistentError: any = null; | ||
|
||
public static inject = tokens(commonTokens.logger, commonTokens.options, coreTokens.testFramework, coreTokens.initialRunResult, coreTokens.loggingContext); | ||
public static inject = tokens( | ||
commonTokens.logger, | ||
commonTokens.options, | ||
coreTokens.testFramework, | ||
coreTokens.initialRunResult, | ||
coreTokens.transpiledFiles, | ||
coreTokens.loggingContext); | ||
constructor( | ||
private readonly log: Logger, | ||
private readonly options: StrykerOptions, | ||
private readonly testFramework: TestFramework | null, | ||
initialRunResult: InitialTestRunResult, | ||
private readonly initialFiles: ReadonlyArray<File>, | ||
private readonly loggingContext: LoggingClientContext) { | ||
this.overheadTimeMS = initialRunResult.overheadTimeMS; | ||
this.overheadTimeMS = initialRunResult.overheadTimeMS; | ||
} | ||
|
||
public run(mutant: TranspiledMutant): Promise<RunResult> { | ||
this.startSandboxes(); | ||
const task = new Task<RunResult>(); | ||
this.backlog.push({ mutant, task }); | ||
this.doWork(); | ||
return task.promise; | ||
} | ||
|
||
private readonly doWork = (): void => { | ||
const backlogItem = this.backlog.shift(); | ||
if (backlogItem) { | ||
if (this.isDisposed) { | ||
backlogItem.task.resolve({ status: RunStatus.Timeout, tests: [] }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if the entire pool is disposed you resolve the last backlog item as a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All remaining backlog items are timedout. We need another way of dealing with this, the problem is I don't know exactly how. It is non-trivial functionality. Note: this shouldn't happen. We only dispose the sandbox pool once all results are in. We'll be able to remove this code once we make typed-inject responsible to dispose our stuff. |
||
} else if (this.persistentError) { | ||
backlogItem.task.reject(this.persistentError); | ||
} else { | ||
const sandbox = this.workerPool.pop(); | ||
if (sandbox) { | ||
sandbox.runMutant(backlogItem.mutant) | ||
.then(backlogItem.task.resolve, backlogItem.task.reject) | ||
.then(() => this.releaseSandbox(sandbox)); | ||
} else { | ||
// False alarm | ||
this.backlog.push(backlogItem); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private startSandboxes(): void { | ||
if (!this.sandboxesStarted) { | ||
this.sandboxesStarted = true; | ||
const concurrency = this.determineConcurrency(); | ||
|
||
range(0, concurrency).pipe( | ||
flatMap(n => this.registerSandbox(Sandbox.create(this.options, n, this.initialFiles, this.testFramework, this.overheadTimeMS, this.loggingContext)), | ||
MAX_CONCURRENT_INITIALIZING_SANDBOXES) | ||
).subscribe({ | ||
error: error => { | ||
this.persistentError = error; | ||
this.doWork(); | ||
}, | ||
next: this.releaseSandbox | ||
}); | ||
} | ||
} | ||
|
||
public streamSandboxes(initialFiles: ReadonlyArray<File>): Observable<Sandbox> { | ||
private readonly releaseSandbox = (sandbox: Sandbox) => { | ||
this.workerPool.push(sandbox); | ||
// Let's see if there is work to be done in the next tick | ||
// (scheduling it next tick instead of immediately will prevent a stack overflow) | ||
process.nextTick(this.doWork); | ||
} | ||
|
||
private determineConcurrency() { | ||
let numConcurrentRunners = os.cpus().length; | ||
if (this.options.transpilers.length) { | ||
// If transpilers are configured, one core is reserved for the compiler (for now) | ||
|
@@ -40,18 +111,21 @@ export class SandboxPool { | |
numConcurrentRunners = 1; | ||
} | ||
this.log.info(`Creating ${numConcurrentRunners} test runners (based on ${numConcurrentRunnersSource})`); | ||
|
||
const sandboxes = range(0, numConcurrentRunners) | ||
.pipe(flatMap(n => this.registerSandbox(Sandbox.create(this.options, n, initialFiles, this.testFramework, this.overheadTimeMS, this.loggingContext)))); | ||
return sandboxes; | ||
return numConcurrentRunners; | ||
} | ||
|
||
private registerSandbox(promisedSandbox: Promise<Sandbox>): Promise<Sandbox> { | ||
this.sandboxes.push(promisedSandbox); | ||
private readonly registerSandbox = async (promisedSandbox: Promise<Sandbox>): Promise<Sandbox> => { | ||
if (this.isDisposed) { | ||
await promisedSandbox.then(sandbox => sandbox.dispose()); | ||
} else { | ||
this.allSandboxes.push(promisedSandbox); | ||
} | ||
return promisedSandbox; | ||
} | ||
|
||
public disposeAll() { | ||
return Promise.all(this.sandboxes.map(promisedSandbox => promisedSandbox.then(sandbox => sandbox.dispose()))); | ||
public async disposeAll() { | ||
this.isDisposed = true; | ||
const sandboxes = await Promise.all(this.allSandboxes); | ||
return Promise.all(sandboxes.map(sandbox => sandbox.dispose())); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we no longer have to make a slice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's no longer needed. We no longer mutate the original (it is a
ReadonlyArray
)