Skip to content

Commit

Permalink
add a new class TaskQueue that can be used to control the parallellis…
Browse files Browse the repository at this point in the history
…m of git commands within a single git directory
  • Loading branch information
scbedd committed Sep 12, 2022
1 parent 547343a commit 8d150f6
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 51 deletions.
129 changes: 78 additions & 51 deletions tools/test-proxy/Azure.Sdk.Tools.TestProxy/Store/GitProcessHandler.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Azure.Sdk.Tools.TestProxy.Common;
using Azure.Sdk.Tools.TestProxy.Common.Exceptions;

Expand Down Expand Up @@ -36,6 +38,8 @@ class GitMinVersion
public static string minVersionString = $"{Major}.{Minor}.{Patch}";
}

private ConcurrentDictionary<string, TaskQueue> AssetTasks = new ConcurrentDictionary<string, TaskQueue>();

/// <summary>
/// Create a ProcessStartInfo that's exclusively used for execution of git commands
/// </summary>
Expand Down Expand Up @@ -106,43 +110,52 @@ public virtual CommandResult Run(string arguments, string workingDirectory)
Arguments = arguments
};

try
if (!AssetTasks.TryGetValue(workingDirectory, out var value))
{
DebugLogger.LogInformation($"git {arguments}");
var process = Process.Start(processStartInfo);
string stdOut = process.StandardOutput.ReadToEnd();
string stdErr = process.StandardError.ReadToEnd();
process.WaitForExit();
value = new TaskQueue();
// only add a new task queue if it doesn't exist
// if it does exist, just use the old one so that pending tasks will still be able to complete
AssetTasks.AddOrUpdate(workingDirectory, value, (key, oldValue) => oldValue);
}

int returnCode = process.ExitCode;
value.Enqueue(() =>
{
try
{
DebugLogger.LogInformation($"git {arguments}");
var process = Process.Start(processStartInfo);
string stdOut = process.StandardOutput.ReadToEnd();
string stdErr = process.StandardError.ReadToEnd();
process.WaitForExit();

DebugLogger.LogDebug($"StdOut: {stdOut}");
DebugLogger.LogDebug($"StdErr: {stdErr}");
DebugLogger.LogDebug($"ExitCode: {process.ExitCode}");
int returnCode = process.ExitCode;

DebugLogger.LogDebug($"StdOut: {stdOut}");
DebugLogger.LogDebug($"StdErr: {stdErr}");
DebugLogger.LogDebug($"ExitCode: {process.ExitCode}");

result.ExitCode = process.ExitCode;
result.StdErr = stdErr;
result.StdOut = stdOut;

result.ExitCode = process.ExitCode;
result.StdErr = stdErr;
result.StdOut = stdOut;

if (result.ExitCode == 0){
return result;
if (result.ExitCode != 0)
{
throw new GitProcessException(result);
}
}
else
catch (Exception e)
{
DebugLogger.LogDebug(e.Message);

result.ExitCode = -1;
result.CommandException = e;

throw new GitProcessException(result);
}
}
catch (Exception e)
{
DebugLogger.LogDebug(e.Message);
});

result.ExitCode = -1;
result.CommandException = e;

throw new GitProcessException(result);
}
return result;
}

/// <summary>
Expand All @@ -156,39 +169,53 @@ public virtual bool TryRun(string arguments, GitAssetsConfiguration config, out
{
ProcessStartInfo processStartInfo = CreateGitProcessInfo(config.AssetsRepoLocation);
processStartInfo.Arguments = arguments;
var commandResult = new CommandResult();

try
if (!AssetTasks.TryGetValue(config.AssetsRepoLocation, out var queue))
{
DebugLogger.LogInformation($"git {arguments}");
var process = Process.Start(processStartInfo);
string stdOut = process.StandardOutput.ReadToEnd();
string stdErr = process.StandardError.ReadToEnd();
process.WaitForExit();
queue = new TaskQueue();
// only add a new task queue if it doesn't exist
// if it does exist, just use the old one so that pending tasks will still be able to complete
AssetTasks.AddOrUpdate(config.AssetsRepoLocation, queue, (key, oldValue) => oldValue);
}

int returnCode = process.ExitCode;
queue.Enqueue(() =>
{
try
{
DebugLogger.LogInformation($"git {arguments}");
var process = Process.Start(processStartInfo);
string stdOut = process.StandardOutput.ReadToEnd();
string stdErr = process.StandardError.ReadToEnd();
process.WaitForExit();

DebugLogger.LogDebug($"StdOut: {stdOut}");
DebugLogger.LogDebug($"StdErr: {stdErr}");
DebugLogger.LogDebug($"ExitCode: {process.ExitCode}");
int returnCode = process.ExitCode;

result = new CommandResult()
{
ExitCode = process.ExitCode,
StdErr = stdErr,
StdOut = stdOut,
Arguments = arguments
};
}
catch (Exception e)
{
DebugLogger.LogDebug(e.Message);
DebugLogger.LogDebug($"StdOut: {stdOut}");
DebugLogger.LogDebug($"StdErr: {stdErr}");
DebugLogger.LogDebug($"ExitCode: {process.ExitCode}");

result = new CommandResult()
commandResult = new CommandResult()
{
ExitCode = process.ExitCode,
StdErr = stdErr,
StdOut = stdOut,
Arguments = arguments
};
}
catch (Exception e)
{
ExitCode = -1,
CommandException = e
};
}
DebugLogger.LogDebug(e.Message);

commandResult = new CommandResult()
{
ExitCode = -1,
CommandException = e
};
}
});

result = commandResult;

if (result.ExitCode != 0)
{
Expand Down
76 changes: 76 additions & 0 deletions tools/test-proxy/Azure.Sdk.Tools.TestProxy/Store/TaskQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System.Threading.Tasks;
using System.Threading;
using System;

namespace Azure.Sdk.Tools.TestProxy.Store
{
/// <summary>
/// This class is used to control access to a directory. Within the GitProcessHandler, a queue is used per targeted git directory. This ensures
/// that multiple Async requests hitting the asset store functionality will NEVER be able to stomp on each other.
/// </summary>
public class TaskQueue
{
private SemaphoreSlim semaphore;

public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}

/// <summary>
/// Used to await the running of a single block of code. Returns a value of type T.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="incomingTask"></param>
/// <returns></returns>
public async Task<T> EnqueueAsync<T>(Func<Task<T>> incomingTask)
{
await semaphore.WaitAsync();
try
{
return await incomingTask();
}
finally
{
semaphore.Release();
}
}

/// <summary>
/// Used to await the running of a single block of code. No incoming arguments, no return types.
/// </summary>
/// <param name="incomingTask"></param>
/// <returns></returns>
public async Task EnqueueAsync(Func<Task> incomingTask)
{
await semaphore.WaitAsync();

try
{
await incomingTask();
}
finally
{
semaphore.Release();
}
}

/// <summary>
/// Used to invoke a block of code. No incoming arguments, no output arguments.
/// </summary>
/// <param name="incomingTask"></param>
public void Enqueue(Action incomingTask)
{
semaphore.Wait();

try
{
incomingTask();
}
finally
{
semaphore.Release();
}
}
}
}

0 comments on commit 8d150f6

Please sign in to comment.