-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resolve 'collection modified' exception when logging results #2549
Merged
Merged
Changes from 7 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
5f23a47
Introduce slim semaphore to enable per-file-hash results caching.
michaelcfanning 3af60e8
Latest iteration of concurrency fix for 'collection modified' exception.
michaelcfanning 46d62b5
Improvements to concurrency change.
michaelcfanning b69f396
Skip Coyote testing for now.
michaelcfanning 096af40
Add release note and incidental PR clean-up.
michaelcfanning 1f897cb
Final PR clean-up.
michaelcfanning 739b18e
Merge remote-tracking branch 'origin' into current-threading-poc
michaelcfanning e3fa657
Null hash data when not present.
michaelcfanning File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,17 +28,16 @@ public abstract class MultithreadedAnalyzeCommandBase<TContext, TOptions> : Plug | |
internal bool _captureConsoleOutput; | ||
|
||
internal ConsoleLogger _consoleLogger; | ||
internal ConcurrentDictionary<string, IAnalysisLogger> _analysisLoggerCache; | ||
|
||
private Run _run; | ||
private Tool _tool; | ||
private bool _computeHashes; | ||
internal TContext _rootContext; | ||
private int _fileContextsCount; | ||
private Channel<int> _hashChannel; | ||
private Channel<int> readyToHashChannel; | ||
private OptionallyEmittedData _dataToInsert; | ||
private Channel<int> _resultsWritingChannel; | ||
private Channel<int> _fileEnumerationChannel; | ||
private Channel<int> readyToScanChannel; | ||
private IDictionary<string, HashData> _pathToHashDataMap; | ||
private ConcurrentDictionary<int, TContext> _fileContexts; | ||
|
||
|
@@ -190,12 +189,18 @@ private void MultithreadedAnalyzeTargets(TOptions options, | |
: (Debugger.IsAttached) ? 1 : Environment.ProcessorCount; | ||
|
||
var channelOptions = new BoundedChannelOptions(2000) | ||
{ | ||
SingleWriter = true, | ||
SingleReader = true, | ||
}; | ||
readyToHashChannel = Channel.CreateBounded<int>(channelOptions); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we have a line break here and six lines down to improve readability? |
||
|
||
channelOptions = new BoundedChannelOptions(2000) | ||
{ | ||
SingleWriter = true, | ||
SingleReader = false, | ||
}; | ||
_fileEnumerationChannel = Channel.CreateBounded<int>(channelOptions); | ||
_hashChannel = Channel.CreateBounded<int>(channelOptions); | ||
readyToScanChannel = Channel.CreateBounded<int>(channelOptions); | ||
|
||
channelOptions = new BoundedChannelOptions(2000) | ||
{ | ||
|
@@ -206,26 +211,39 @@ private void MultithreadedAnalyzeTargets(TOptions options, | |
|
||
var sw = Stopwatch.StartNew(); | ||
|
||
var workers = new Task<bool>[options.Threads]; | ||
// 1: First we initiate an asynchronous operation to locate disk files for | ||
// analysis, as specified in analysis configuration (file names, wildcards). | ||
Task<bool> enumerateFilesOnDisk = EnumerateFilesOnDiskAsync(options, rootContext); | ||
|
||
// 2: Files found on disk are put in a specific sort order, after which a | ||
// reference to each scan target is put into a channel for hashing, | ||
// if hashing is enabled. | ||
Task<bool> hashFilesAndPutInAnalysisQueue = HashFilesAndPutInAnalysisQueueAsnc(); | ||
|
||
// 3: A dedicated set of threads pull scan targets and analyze them. | ||
// On completing a scan, the thread writes the index of the | ||
// scanned item to a channel that drives logging. | ||
var workers = new Task<bool>[options.Threads]; | ||
for (int i = 0; i < options.Threads; i++) | ||
{ | ||
workers[i] = AnalyzeTargetAsync(skimmers, disabledSkimmers); | ||
workers[i] = ScanTargetsAsync(skimmers, disabledSkimmers); | ||
} | ||
|
||
Task<bool> hashFiles = HashAsync(); | ||
Task<bool> findFiles = FindFilesAsync(options, rootContext); | ||
Task<bool> writeResults = WriteResultsAsync(rootContext); | ||
|
||
// FindFiles is single-thread and will close its write channel | ||
findFiles.Wait(); | ||
hashFiles.Wait(); | ||
// 4: A single-threaded consumer watches for completed scans | ||
// and logs results, if any. This operation is singlle-threaded | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
// in order to help ensure determinism in log output. i.e., any | ||
// scan of the same targets using the same production code | ||
// should produce a log file that is byte-for-byte identical | ||
// to the previous output. | ||
Task<bool> logScanResults = LogScanResultsAsync(rootContext); | ||
|
||
Task.WhenAll(workers) | ||
.ContinueWith(_ => _resultsWritingChannel.Writer.Complete()) | ||
.Wait(); | ||
|
||
writeResults.Wait(); | ||
enumerateFilesOnDisk.Wait(); | ||
hashFilesAndPutInAnalysisQueue.Wait(); | ||
logScanResults.Wait(); | ||
|
||
Console.WriteLine(); | ||
|
||
|
@@ -239,7 +257,7 @@ private void MultithreadedAnalyzeTargets(TOptions options, | |
} | ||
} | ||
|
||
private async Task<bool> WriteResultsAsync(TContext rootContext) | ||
private async Task<bool> LogScanResultsAsync(TContext rootContext) | ||
{ | ||
int currentIndex = 0; | ||
|
||
|
@@ -265,15 +283,7 @@ private async Task<bool> WriteResultsAsync(TContext rootContext) | |
|
||
while (context?.AnalysisComplete == true) | ||
{ | ||
if (_computeHashes) | ||
{ | ||
bool cache = _analysisLoggerCache.TryGetValue(context.Hashes.Sha256, out IAnalysisLogger logger); | ||
LogCachingLogger(rootContext, logger ?? context.Logger, context, clone: cache); | ||
} | ||
else | ||
{ | ||
LogCachingLogger(rootContext, context.Logger, context); | ||
} | ||
LogCachingLogger(rootContext, context, clone: _computeHashes); | ||
|
||
RuntimeErrors |= context.RuntimeErrors; | ||
|
||
|
@@ -300,9 +310,9 @@ private async Task<bool> WriteResultsAsync(TContext rootContext) | |
return true; | ||
} | ||
|
||
private void LogCachingLogger(TContext rootContext, IAnalysisLogger logger, TContext context, bool clone = false) | ||
private void LogCachingLogger(TContext rootContext, TContext context, bool clone = false) | ||
{ | ||
var cachingLogger = (CachingLogger)logger; | ||
var cachingLogger = (CachingLogger)context.Logger; | ||
IDictionary<ReportingDescriptor, IList<Result>> results = cachingLogger.Results; | ||
|
||
if (results?.Count > 0) | ||
|
@@ -352,7 +362,7 @@ private void LogCachingLogger(TContext rootContext, IAnalysisLogger logger, TCon | |
} | ||
} | ||
|
||
private async Task<bool> FindFilesAsync(TOptions options, TContext rootContext) | ||
private async Task<bool> EnumerateFilesOnDiskAsync(TOptions options, TContext rootContext) | ||
{ | ||
this._fileContextsCount = 0; | ||
this._fileContexts = new ConcurrentDictionary<int, TContext>(); | ||
|
@@ -432,12 +442,12 @@ private async Task<bool> FindFilesAsync(TOptions options, TContext rootContext) | |
filePath: file) | ||
); | ||
|
||
await _hashChannel.Writer.WriteAsync(_fileContextsCount++); | ||
await readyToHashChannel.Writer.WriteAsync(_fileContextsCount++); | ||
} | ||
} | ||
} | ||
|
||
_hashChannel.Writer.Complete(); | ||
readyToHashChannel.Writer.Complete(); | ||
|
||
if (_fileContextsCount == 0) | ||
{ | ||
|
@@ -464,9 +474,11 @@ private void EnqueueAllDirectories(Queue<string> queue, string directory) | |
} | ||
} | ||
|
||
private async Task<bool> HashAsync() | ||
private async Task<bool> HashFilesAndPutInAnalysisQueueAsnc() | ||
{ | ||
ChannelReader<int> reader = _hashChannel.Reader; | ||
ChannelReader<int> reader = readyToHashChannel.Reader; | ||
|
||
Dictionary<string, CachingLogger> loggerCache = null; | ||
|
||
// Wait until there is work or the channel is closed. | ||
while (await reader.WaitToReadAsync()) | ||
|
@@ -487,20 +499,28 @@ private async Task<bool> HashAsync() | |
{ | ||
_pathToHashDataMap.Add(localPath, hashData); | ||
} | ||
|
||
loggerCache ??= new Dictionary<string, CachingLogger>(); | ||
|
||
if (!loggerCache.TryGetValue(hashData.Sha256, out CachingLogger logger)) | ||
{ | ||
logger = loggerCache[hashData.Sha256] = (CachingLogger)context.Logger; | ||
} | ||
context.Logger = logger; | ||
} | ||
|
||
await _fileEnumerationChannel.Writer.WriteAsync(index); | ||
await readyToScanChannel.Writer.WriteAsync(index); | ||
} | ||
} | ||
|
||
_fileEnumerationChannel.Writer.Complete(); | ||
readyToScanChannel.Writer.Complete(); | ||
|
||
return true; | ||
} | ||
|
||
private async Task<bool> AnalyzeTargetAsync(IEnumerable<Skimmer<TContext>> skimmers, ISet<string> disabledSkimmers) | ||
private async Task<bool> ScanTargetsAsync(IEnumerable<Skimmer<TContext>> skimmers, ISet<string> disabledSkimmers) | ||
{ | ||
ChannelReader<int> reader = _fileEnumerationChannel.Reader; | ||
ChannelReader<int> reader = readyToScanChannel.Reader; | ||
|
||
// Wait until there is work or the channel is closed. | ||
while (await reader.WaitToReadAsync()) | ||
|
@@ -532,9 +552,6 @@ private async Task<bool> AnalyzeTargetAsync(IEnumerable<Skimmer<TContext>> skimm | |
|
||
protected virtual void ValidateOptions(TOptions options, TContext context) | ||
{ | ||
_dataToInsert = options.DataToInsert.ToFlags(); | ||
_computeHashes = (_dataToInsert & OptionallyEmittedData.Hashes) != 0; | ||
|
||
bool succeeded = true; | ||
|
||
succeeded &= ValidateFile(context, options.OutputFilePath, DefaultPolicyName, shouldExist: null); | ||
|
@@ -564,10 +581,8 @@ internal AggregatingLogger InitializeLogger(AnalyzeOptionsBase analyzeOptions) | |
logger.Loggers.Add(_consoleLogger); | ||
} | ||
|
||
if ((analyzeOptions.DataToInsert.ToFlags() & OptionallyEmittedData.Hashes) != 0) | ||
{ | ||
_analysisLoggerCache = new ConcurrentDictionary<string, IAnalysisLogger>(); | ||
} | ||
_dataToInsert = analyzeOptions.DataToInsert.ToFlags(); | ||
_computeHashes = (_dataToInsert & OptionallyEmittedData.Hashes) != 0; | ||
|
||
return logger; | ||
} | ||
|
@@ -868,29 +883,20 @@ protected virtual TContext DetermineApplicabilityAndAnalyze( | |
return context; | ||
} | ||
|
||
IAnalysisLogger logger = context.Logger; | ||
|
||
if (_computeHashes) | ||
{ | ||
if (_analysisLoggerCache.ContainsKey(context.Hashes.Sha256)) | ||
{ | ||
return context; | ||
} | ||
} | ||
|
||
context.Logger.AnalyzingTarget(context); | ||
CachingLogger logger = (CachingLogger)context.Logger; | ||
logger.AnalyzingTarget(context); | ||
|
||
if (_computeHashes) | ||
if (logger.CacheFinalized) | ||
{ | ||
if (!_analysisLoggerCache.TryAdd(context.Hashes.Sha256, logger)) | ||
{ | ||
return context; | ||
} | ||
context.Logger = logger; | ||
logger.ReleaseLock(); | ||
return context; | ||
} | ||
|
||
IEnumerable<Skimmer<TContext>> applicableSkimmers = DetermineApplicabilityForTarget(context, skimmers, disabledSkimmers); | ||
AnalyzeTarget(context, applicableSkimmers, disabledSkimmers); | ||
|
||
logger.ReleaseLock(); | ||
return context; | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the hashing channel is single-threaded. There is an opportunity here, I think, to improve performance by making this phase multi-threaded.