Skip to content

Commit

Permalink
changes to support specific run service URL (actions#2158)
Browse files Browse the repository at this point in the history
* changes to support run service url

* feedback
  • Loading branch information
yaananth authored and nikola-jokic committed May 12, 2023
1 parent d719a7b commit d977f4c
Show file tree
Hide file tree
Showing 15 changed files with 1,408 additions and 96 deletions.
51 changes: 51 additions & 0 deletions src/Runner.Common/ActionsRunServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Services.Common;
using GitHub.Services.WebApi;

namespace GitHub.Runner.Common
{
[ServiceLocator(Default = typeof(ActionsRunServer))]
public interface IActionsRunServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);

Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken token);
}

public sealed class ActionsRunServer : RunnerService, IActionsRunServer
{
private bool _hasConnection;
private VssConnection _connection;
private TaskAgentHttpClient _taskAgentClient;

public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials)
{
_connection = await EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
_hasConnection = true;
}

private void CheckConnection()
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
}

public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken cancellationToken)
{
CheckConnection();
var jobMessage = RetryRequest<AgentJobRequestMessage>(async () =>
{
return await _taskAgentClient.GetJobMessageAsync(id, cancellationToken);
}, cancellationToken);

return jobMessage;
}
}
}
78 changes: 17 additions & 61 deletions src/Runner.Common/RunServer.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using Sdk.WebApi.WebApi.RawClient;

namespace GitHub.Runner.Common
{
Expand All @@ -20,40 +21,17 @@ public interface IRunServer : IRunnerService
public sealed class RunServer : RunnerService, IRunServer
{
private bool _hasConnection;
private VssConnection _connection;
private TaskAgentHttpClient _taskAgentClient;
private Uri requestUri;
private RawConnection _connection;
private RunServiceHttpClient _runServiceHttpClient;

public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials)
public async Task ConnectAsync(Uri serverUri, VssCredentials credentials)
{
_connection = await EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
_hasConnection = true;
}

private async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
{
Trace.Info($"EstablishVssConnection");
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);

await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}
requestUri = serverUri;

// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
_connection = VssUtil.CreateRawConnection(new Uri(serverUri.Authority), credentials);
_runServiceHttpClient = await _connection.GetClientAsync<RunServiceHttpClient>();
_hasConnection = true;
}

private void CheckConnection()
Expand All @@ -67,37 +45,15 @@ private void CheckConnection()
public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken cancellationToken)
{
CheckConnection();
var jobMessage = RetryRequest<AgentJobRequestMessage>(async () =>
{
return await _taskAgentClient.GetJobMessageAsync(id, cancellationToken);
}, cancellationToken);
return jobMessage;
}

private async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5
)
{
var retryCount = 0;
while (true)
var jobMessage = RetryRequest<AgentJobRequestMessage>(
async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, cancellationToken), cancellationToken);
if (jobMessage == null)
{
retryCount++;
cancellationToken.ThrowIfCancellationRequested();
try
{
return await func();
}
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount)
{
Trace.Error("Catch exception during get full job message");
Trace.Error(ex);
var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left.");
await Task.Delay(backOff, cancellationToken);
}
throw new TaskOrchestrationJobNotFoundException(id);
}

return jobMessage;
}

}
}
25 changes: 0 additions & 25 deletions src/Runner.Common/RunnerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,6 @@ public void SetConnectionTimeout(RunnerConnectionType connectionType, TimeSpan t
}
}

private async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
{
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);

await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}

// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
}

private void CheckConnection(RunnerConnectionType connectionType)
{
switch (connectionType)
Expand Down
62 changes: 60 additions & 2 deletions src/Runner.Common/RunnerService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using Sdk.WebApi.WebApi.RawClient;

namespace GitHub.Runner.Common
{
Expand All @@ -21,9 +27,9 @@ public abstract class RunnerService
protected IHostContext HostContext { get; private set; }
protected Tracing Trace { get; private set; }

public string TraceName
public string TraceName
{
get
get
{
return GetType().Name;
}
Expand All @@ -35,5 +41,57 @@ public virtual void Initialize(IHostContext hostContext)
Trace = HostContext.GetTrace(TraceName);
Trace.Entering();
}

protected async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
{
Trace.Info($"EstablishVssConnection");
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);

await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}

// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
}

protected async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5
)
{
var retryCount = 0;
while (true)
{
retryCount++;
cancellationToken.ThrowIfCancellationRequested();
try
{
return await func();
}
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount)
{
Trace.Error("Catch exception during get full job message");
Trace.Error(ex);
var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left.");
await Task.Delay(backOff, cancellationToken);
}
}
}
}
}
18 changes: 14 additions & 4 deletions src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -496,16 +496,26 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
else
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
Pipelines.AgentJobRequestMessage jobRequestMessage = null;

// Create connection
var credMgr = HostContext.GetService<ICredentialManager>();
var creds = credMgr.LoadCredentials();

var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
var jobMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
if (string.IsNullOrEmpty(messageRef.RunServiceUrl))
{
var actionsRunServer = HostContext.CreateService<IActionsRunServer>();
await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
else
{
var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}

jobDispatcher.Run(jobMessage, runOnce);
jobDispatcher.Run(jobRequestMessage, runOnce);
if (runOnce)
{
Trace.Info("One time used runner received job message.");
Expand Down
4 changes: 3 additions & 1 deletion src/Runner.Listener/RunnerJobRequestRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ public sealed class RunnerJobRequestRef
public string Id { get; set; }
[DataMember(Name = "runner_request_id")]
public string RunnerRequestId { get; set; }
[DataMember(Name = "run_service_url")]
public string RunServiceUrl { get; set; }
}
}
}
47 changes: 46 additions & 1 deletion src/Runner.Sdk/Util/VssUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Net.Http.Headers;
using System.Runtime.InteropServices;
using System.Net;
using Sdk.WebApi.WebApi.RawClient;

namespace GitHub.Runner.Sdk
{
Expand All @@ -34,7 +35,11 @@ public static void InitializeVssClientSettings(List<ProductInfoHeaderValue> addi
}
}

public static VssConnection CreateConnection(Uri serverUri, VssCredentials credentials, IEnumerable<DelegatingHandler> additionalDelegatingHandler = null, TimeSpan? timeout = null)
public static VssConnection CreateConnection(
Uri serverUri,
VssCredentials credentials,
IEnumerable<DelegatingHandler> additionalDelegatingHandler = null,
TimeSpan? timeout = null)
{
VssClientHttpRequestSettings settings = VssClientHttpRequestSettings.Default.Clone();

Expand Down Expand Up @@ -75,6 +80,46 @@ public static VssConnection CreateConnection(Uri serverUri, VssCredentials crede
return connection;
}

public static RawConnection CreateRawConnection(
Uri serverUri,
VssCredentials credentials,
IEnumerable<DelegatingHandler> additionalDelegatingHandler = null,
TimeSpan? timeout = null)
{
RawClientHttpRequestSettings settings = RawClientHttpRequestSettings.Default.Clone();

int maxRetryRequest;
if (!int.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_HTTP_RETRY") ?? string.Empty, out maxRetryRequest))
{
maxRetryRequest = 3;
}

// make sure MaxRetryRequest in range [3, 10]
settings.MaxRetryRequest = Math.Min(Math.Max(maxRetryRequest, 3), 10);

if (!int.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_HTTP_TIMEOUT") ?? string.Empty, out int httpRequestTimeoutSeconds))
{
settings.SendTimeout = timeout ?? TimeSpan.FromSeconds(100);
}
else
{
// prefer environment variable
settings.SendTimeout = TimeSpan.FromSeconds(Math.Min(Math.Max(httpRequestTimeoutSeconds, 100), 1200));
}

// Remove Invariant from the list of accepted languages.
//
// The constructor of VssHttpRequestSettings (base class of VssClientHttpRequestSettings) adds the current
// UI culture to the list of accepted languages. The UI culture will be Invariant on OSX/Linux when the
// LANG environment variable is not set when the program starts. If Invariant is in the list of accepted
// languages, then "System.ArgumentException: The value cannot be null or empty." will be thrown when the
// settings are applied to an HttpRequestMessage.
settings.AcceptLanguages.Remove(CultureInfo.InvariantCulture);

RawConnection connection = new RawConnection(serverUri, new RawHttpMessageHandler(credentials.ToOAuthCredentials(), settings), additionalDelegatingHandler);
return connection;
}

public static VssCredentials GetVssCredential(ServiceEndpoint serviceEndpoint)
{
ArgUtil.NotNull(serviceEndpoint, nameof(serviceEndpoint));
Expand Down
Loading

0 comments on commit d977f4c

Please sign in to comment.