Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass workloads to proxy managers #4422

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal sealed class ParallelOperationManager<TManager, TEventHandler, TWorkloa
out int num)
? num
: PreStart;
private readonly Func<TestRuntimeProviderInfo, TManager> _createNewManager;
private readonly Func<TestRuntimeProviderInfo, TWorkload, TManager> _createNewManager;

/// <summary>
/// Default number of Processes
Expand All @@ -50,7 +50,7 @@ internal sealed class ParallelOperationManager<TManager, TEventHandler, TWorkloa
/// <param name="createNewManager">Creates a new manager that is responsible for running a single part of the overall workload.
/// A manager is typically a testhost, and the part of workload is discovering or running a single test dll.</param>
/// <param name="parallelLevel">Determines the maximum amount of parallel managers that can be active at the same time.</param>
public ParallelOperationManager(Func<TestRuntimeProviderInfo, TManager> createNewManager, int parallelLevel)
public ParallelOperationManager(Func<TestRuntimeProviderInfo, TWorkload, TManager> createNewManager, int parallelLevel)
{
_createNewManager = createNewManager;
MaxParallelLevel = parallelLevel;
Expand Down Expand Up @@ -144,7 +144,7 @@ private bool RunWorkInParallel()
var workload = workloadsToAdd[i];
slot.ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel;

var manager = _createNewManager(workload.Provider);
var manager = _createNewManager(workload.Provider, workload.Work);
var eventHandler = _getEventHandler(_eventHandler, manager);
slot.EventHandler = eventHandler;
slot.Manager = manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal sealed class ParallelProxyDiscoveryManager : IParallelProxyDiscoveryMan

public ParallelProxyDiscoveryManager(
IRequestData requestData,
Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> actualProxyManagerCreator,
Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> actualProxyManagerCreator,
DiscoveryDataAggregator dataAggregator,
int parallelLevel,
List<TestRuntimeProviderInfo> testHostProviders)
Expand All @@ -53,7 +53,7 @@ public ParallelProxyDiscoveryManager(

internal ParallelProxyDiscoveryManager(
IRequestData requestData,
Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> actualProxyManagerCreator,
Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> actualProxyManagerCreator,
DiscoveryDataAggregator dataAggregator,
IDataSerializer dataSerializer,
int parallelLevel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ internal sealed class ParallelProxyExecutionManager : IParallelProxyExecutionMan

public ParallelProxyExecutionManager(
IRequestData requestData,
Func<TestRuntimeProviderInfo, IProxyExecutionManager> actualProxyManagerCreator,
Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> actualProxyManagerCreator,
int parallelLevel,
List<TestRuntimeProviderInfo> testHostProviders)
: this(requestData, actualProxyManagerCreator, JsonDataSerializer.Instance, parallelLevel, testHostProviders)
Expand All @@ -73,7 +73,7 @@ public ParallelProxyExecutionManager(

internal ParallelProxyExecutionManager(
IRequestData requestData,
Func<TestRuntimeProviderInfo, IProxyExecutionManager> actualProxyManagerCreator,
Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> actualProxyManagerCreator,
IDataSerializer dataSerializer,
int parallelLevel,
List<TestRuntimeProviderInfo> testHostProviders)
Expand Down
15 changes: 8 additions & 7 deletions src/Microsoft.TestPlatform.CrossPlatEngine/TestEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ public IProxyDiscoveryManager GetDiscoveryManager(
// discovery manager to publish its current state. But doing so we are losing the collected state of all the
// other managers.
var discoveryDataAggregator = new DiscoveryDataAggregator();
Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> proxyDiscoveryManagerCreator = runtimeProviderInfo =>
Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> proxyDiscoveryManagerCreator = (runtimeProviderInfo, discoveryCriteria) =>
{
var sources = runtimeProviderInfo.SourceDetails.Select(r => r.Source!).ToList();
var sources = discoveryCriteria.Sources.ToList();
var hostManager = _testHostProviderManager.GetTestHostManagerByRunConfiguration(runtimeProviderInfo.RunSettings, sources);
hostManager?.Initialize(TestSessionMessageLogger.Instance, runtimeProviderInfo.RunSettings!);

Expand Down Expand Up @@ -241,15 +241,16 @@ public IProxyExecutionManager GetExecutionManager(
}

// This creates a single non-parallel execution manager, based requestData, isDataCollectorEnabled and the
// overall testRunCriteria. The overall testRunCriteria are split to smaller pieces (e.g. each source from the overall
// testRunCriteria) so we can run them in parallel, and those are then passed to those non-parallel execution managers.
// split testRunCriteria. The overall testRunCriteria are split to smaller pieces (e.g. each source from the overall
// testRunCriteria) so we can run them in parallel.
//
// The function below grabs most of the parameter via closure from the local context,
// but gets the runtime provider later, because that is specific info to the source (or sources) it will be running.
// but gets the runtime provider later, as well as the discovery request, because that is specific info to the source (or sources)
// it will be running.
// This creator does not get those smaller pieces of testRunCriteria, those come later when we call a method on
// the non-parallel execution manager we create here. E.g. StartTests(<single piece of testRunCriteria>).
Func<TestRuntimeProviderInfo, IProxyExecutionManager> proxyExecutionManagerCreator = runtimeProviderInfo =>
CreateNonParallelExecutionManager(requestData, testRunCriteria, isDataCollectorEnabled, runtimeProviderInfo);
Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> proxyExecutionManagerCreator = (runtimeProviderInfo, runCriteria) =>
CreateNonParallelExecutionManager(requestData, runCriteria, isDataCollectorEnabled, runtimeProviderInfo);

var executionManager = new ParallelProxyExecutionManager(requestData, proxyExecutionManagerCreator, parallelLevel, testHostProviders);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class ParallelOperationManagerTests
public void OperationManagerShouldRunOnlyMaximumParallelLevelOfWorkInParallelEvenWhenThereAreMoreWorkloads()
{
// Arrange
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ => new SampleManager();
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) => new SampleManager();
var maxParallelLevel = 3;
var parallelOperationManager = new ParallelOperationManager<SampleManager, SampleHandler, SampleWorkload>(createNewManager, maxParallelLevel);

Expand Down Expand Up @@ -72,7 +72,7 @@ public void OperationManagerShouldRunOnlyMaximumParallelLevelOfWorkInParallelEve
public void OperationManagerShouldCreateOnlyAsManyParallelWorkersAsThereAreWorkloadsWhenTheAmountOfWorkloadsIsSmallerThanMaxParallelLevel()
{
// Arrange
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ => new SampleManager();
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) => new SampleManager();
var maxParallelLevel = 10;
var parallelOperationManager = new ParallelOperationManager<SampleManager, SampleHandler, SampleWorkload>(createNewManager, maxParallelLevel);

Expand Down Expand Up @@ -108,7 +108,7 @@ public void OperationManagerShouldCreateOnlyAsManyParallelWorkersAsThereAreWorkl
public void OperationManagerShouldCreateAsManyMaxParallelLevel()
{
// Arrange
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ => new SampleManager();
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) => new SampleManager();
var maxParallelLevel = 10;
var parallelOperationManager = new ParallelOperationManager<SampleManager, SampleHandler, SampleWorkload>(createNewManager, maxParallelLevel);

Expand Down Expand Up @@ -148,7 +148,7 @@ public void OperationManagerShouldCreateAsManyMaxParallelLevel()
public void OperationManagerMovesToTheNextWorkloadOnlyWhenRunNextWorkIsCalled()
{
// Arrange
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ => new SampleManager();
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) => new SampleManager();
var maxParallelLevel = 2;
var parallelOperationManager = new ParallelOperationManager<SampleManager, SampleHandler, SampleWorkload>(createNewManager, maxParallelLevel);

Expand Down Expand Up @@ -195,7 +195,7 @@ public void OperationManagerRunsAnOperationOnAllActiveManagersWhenDoActionOnAllM
// Arrange
var createdManagers = new List<SampleManager>();
// Store the managers we created so we can inspect them later and see if Abort was called on them.
Func<TestRuntimeProviderInfo, SampleManager> createNewManager = _ =>
Func<TestRuntimeProviderInfo, SampleWorkload, SampleManager> createNewManager = (_, _2) =>
{
var manager = new SampleManager();
createdManagers.Add(manager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ParallelProxyDiscoveryManagerTests
private const int Timeout3Seconds = 3 * 1000;
private readonly Queue<Mock<IProxyDiscoveryManager>> _preCreatedMockManagers;
private readonly List<Mock<IProxyDiscoveryManager>> _usedMockManagers;
private readonly Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> _createMockManager;
private readonly Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> _createMockManager;
private readonly Mock<ITestDiscoveryEventsHandler2> _mockEventHandler;
private readonly List<string> _sources = new() { "1.dll", "2.dll" };
private readonly DiscoveryCriteria _discoveryCriteriaWith2Sources;
Expand All @@ -55,7 +55,7 @@ public ParallelProxyDiscoveryManagerTests()
new Mock<IProxyDiscoveryManager>(),
});
_usedMockManagers = new List<Mock<IProxyDiscoveryManager>>();
_createMockManager = _ =>
_createMockManager = (_, _2) =>
{
// We create the manager at the last possible
// moment now, not when we create the parallel proxy manager class
Expand Down Expand Up @@ -163,7 +163,7 @@ public void HandlePartialDiscoveryCompleteShouldReturnTrueIfDiscoveryWasAbortedA
{
var discoveryManagerMock = new Mock<IProxyDiscoveryManager>();
_preCreatedMockManagers.Enqueue(discoveryManagerMock);
var parallelDiscoveryManager = SetupDiscoveryManager(_ => discoveryManagerMock.Object, 1, true);
var parallelDiscoveryManager = SetupDiscoveryManager((_, _2) => discoveryManagerMock.Object, 1, true);
var proxyDiscovermanager = new ProxyDiscoveryManager(_mockRequestData.Object, new Mock<ITestRequestSender>().Object, new Mock<ITestRuntimeProvider>().Object);

parallelDiscoveryManager.DiscoverTests(_discoveryCriteriaWith2Sources, _mockEventHandler.Object);
Expand All @@ -190,7 +190,7 @@ public void HandlePartialDiscoveryCompleteShouldReturnTrueIfDiscoveryWasAbortedA
{
var discoveryManagerMock = new Mock<IProxyDiscoveryManager>();
_preCreatedMockManagers.Enqueue(discoveryManagerMock);
var parallelDiscoveryManager = SetupDiscoveryManager(_ => discoveryManagerMock.Object, 1, true);
var parallelDiscoveryManager = SetupDiscoveryManager((_, _2) => discoveryManagerMock.Object, 1, true);
var proxyDiscovermanager = new ProxyDiscoveryManager(_mockRequestData.Object, new Mock<ITestRequestSender>().Object, new Mock<ITestRuntimeProvider>().Object);

parallelDiscoveryManager.DiscoverTests(_discoveryCriteriaWith2Sources, _mockEventHandler.Object);
Expand All @@ -206,7 +206,7 @@ public void DiscoveryTestsShouldStopDiscoveryIfAbortionWasRequested()
// Since the hosts are aborted, total aggregated tests sent across will be -1
var discoveryManagerMock = new Mock<IProxyDiscoveryManager>();
_preCreatedMockManagers.Enqueue(discoveryManagerMock);
var parallelDiscoveryManager = SetupDiscoveryManager(_ => discoveryManagerMock.Object, 1, true);
var parallelDiscoveryManager = SetupDiscoveryManager((_, _2) => discoveryManagerMock.Object, 1, true);

Task.Run(() =>
{
Expand All @@ -224,7 +224,7 @@ public void DiscoveryTestsShouldStopDiscoveryIfAbortionWithEventHandlerWasReques
// Since the hosts are aborted, total aggregated tests sent across will be -1
var discoveryManagerMock = new Mock<IProxyDiscoveryManager>();
_preCreatedMockManagers.Enqueue(discoveryManagerMock);
var parallelDiscoveryManager = SetupDiscoveryManager(_ => discoveryManagerMock.Object, 1, true);
var parallelDiscoveryManager = SetupDiscoveryManager((_, _2) => discoveryManagerMock.Object, 1, true);

Task.Run(() =>
{
Expand Down Expand Up @@ -331,7 +331,7 @@ public void DiscoveryTestsWithCompletionMarksAllSourcesAsFullyDiscovered()
Assert.AreEqual(0, _dataAggregator.GetSourcesWithStatus(DiscoveryStatus.NotDiscovered).Count);
}

private ParallelProxyDiscoveryManager SetupDiscoveryManager(Func<TestRuntimeProviderInfo, IProxyDiscoveryManager> getProxyManager, int parallelLevel, bool abortDiscovery)
private ParallelProxyDiscoveryManager SetupDiscoveryManager(Func<TestRuntimeProviderInfo, DiscoveryCriteria, IProxyDiscoveryManager> getProxyManager, int parallelLevel, bool abortDiscovery)
{
var parallelDiscoveryManager = new ParallelProxyDiscoveryManager(_mockRequestData.Object, getProxyManager, dataAggregator: new(), parallelLevel, _runtimeProviders);
SetupDiscoveryTests(_processedSources, abortDiscovery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ParallelProxyExecutionManagerTests
private static readonly int Timeout3Seconds = 3 * 1000; // In milliseconds

private readonly List<Mock<IProxyExecutionManager>> _usedMockManagers;
private readonly Func<TestRuntimeProviderInfo, IProxyExecutionManager> _createMockManager;
private readonly Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> _createMockManager;
private readonly Mock<IInternalTestRunEventsHandler> _mockEventHandler;

private readonly List<string> _sources;
Expand Down Expand Up @@ -59,7 +59,7 @@ public ParallelProxyExecutionManagerTests()
new Mock<IProxyExecutionManager>(),
});
_usedMockManagers = new List<Mock<IProxyExecutionManager>>();
_createMockManager = _ =>
_createMockManager = (_, _2) =>
{
_createMockManagerCalled++;
var manager = _preCreatedMockManagers.Dequeue();
Expand Down Expand Up @@ -245,7 +245,7 @@ public void StartTestRunWithTestsShouldNotSendCompleteUntilAllTestsAreProcessed(
public void StartTestRunShouldNotProcessAllSourcesOnExecutionCancelsForAnySource()
{
var executionManagerMock = new Mock<IProxyExecutionManager>();
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, _ => executionManagerMock.Object, 1, _runtimeProviders);
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, (_, _2) => executionManagerMock.Object, 1, _runtimeProviders);
_preCreatedMockManagers.Enqueue(executionManagerMock);
SetupMockManagers(_processedSources, isCanceled: true, isAborted: false);
SetupHandleTestRunComplete(_executionCompleted);
Expand All @@ -260,7 +260,7 @@ public void StartTestRunShouldNotProcessAllSourcesOnExecutionCancelsForAnySource
public void StartTestRunShouldNotProcessAllSourcesOnExecutionAborted()
{
var executionManagerMock = new Mock<IProxyExecutionManager>();
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, _ => executionManagerMock.Object, 1, _runtimeProviders);
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, (_, _2) => executionManagerMock.Object, 1, _runtimeProviders);
_preCreatedMockManagers.Enqueue(executionManagerMock);
SetupMockManagers(_processedSources, isCanceled: false, isAborted: false);
SetupHandleTestRunComplete(_executionCompleted);
Expand All @@ -276,7 +276,7 @@ public void StartTestRunShouldNotProcessAllSourcesOnExecutionAborted()
public void StartTestRunShouldProcessAllSourcesOnExecutionAbortsForAnySource()
{
var executionManagerMock = new Mock<IProxyExecutionManager>();
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, _ => executionManagerMock.Object, 1, _runtimeProviders);
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, (_, _2) => executionManagerMock.Object, 1, _runtimeProviders);
_preCreatedMockManagers.Enqueue(executionManagerMock);
SetupMockManagers(_processedSources, isCanceled: false, isAborted: true);
SetupHandleTestRunComplete(_executionCompleted);
Expand Down Expand Up @@ -432,12 +432,12 @@ public void StartTestRunShouldAggregateRunData()
AssertMissingAndDuplicateSources(_processedSources);
}

private ParallelProxyExecutionManager SetupExecutionManager(Func<TestRuntimeProviderInfo, IProxyExecutionManager> proxyManagerFunc, int parallelLevel)
private ParallelProxyExecutionManager SetupExecutionManager(Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> proxyManagerFunc, int parallelLevel)
{
return SetupExecutionManager(proxyManagerFunc, parallelLevel, false);
}

private ParallelProxyExecutionManager SetupExecutionManager(Func<TestRuntimeProviderInfo, IProxyExecutionManager> proxyManagerFunc, int parallelLevel, bool setupTestCases)
private ParallelProxyExecutionManager SetupExecutionManager(Func<TestRuntimeProviderInfo, TestRunCriteria, IProxyExecutionManager> proxyManagerFunc, int parallelLevel, bool setupTestCases)
{
var parallelExecutionManager = new ParallelProxyExecutionManager(_mockRequestData.Object, proxyManagerFunc, parallelLevel, _runtimeProviders);

Expand Down