-
Notifications
You must be signed in to change notification settings - Fork 645
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Automatic failover to other endpoints when possible #643
Automatic failover to other endpoints when possible #643 - see NuGet/Home#643
- Loading branch information
Showing
12 changed files
with
644 additions
and
162 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
81 changes: 81 additions & 0 deletions
81
src/NuGet.Services.Search.Client/Client/BaseUrlHealthIndicatorStore.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Concurrent; | ||
|
||
namespace NuGet.Services.Search.Client | ||
{ | ||
public class BaseUrlHealthIndicatorStore : IEndpointHealthIndicatorStore | ||
{ | ||
private static readonly int[] HealthIndicatorRange = { 100, 90, 75, 50, 25, 20, 15, 10, 5, 1 }; | ||
private readonly ConcurrentDictionary<string, int> _healthIndicators = new ConcurrentDictionary<string, int>(); | ||
|
||
public int GetHealth(Uri endpoint) | ||
{ | ||
int health; | ||
if (!_healthIndicators.TryGetValue(GetBaseUrl(endpoint), out health)) | ||
{ | ||
health = HealthIndicatorRange[0]; | ||
} | ||
return health; | ||
} | ||
|
||
public void DecreaseHealth(Uri endpoint) | ||
{ | ||
var queryLessUri = GetBaseUrl(endpoint); | ||
|
||
_healthIndicators.AddOrUpdate(queryLessUri, HealthIndicatorRange[1], (key, currentValue) => | ||
{ | ||
if (currentValue <= HealthIndicatorRange[HealthIndicatorRange.Length - 1]) | ||
{ | ||
return HealthIndicatorRange[HealthIndicatorRange.Length - 1]; | ||
} | ||
|
||
for (int i = 0; i < HealthIndicatorRange.Length; i++) | ||
{ | ||
if (HealthIndicatorRange[i] < currentValue) | ||
{ | ||
return HealthIndicatorRange[i]; | ||
} | ||
} | ||
|
||
return HealthIndicatorRange[HealthIndicatorRange.Length - 1]; | ||
}); | ||
} | ||
|
||
public void IncreaseHealth(Uri endpoint) | ||
{ | ||
var queryLessUri = GetBaseUrl(endpoint); | ||
|
||
_healthIndicators.AddOrUpdate(queryLessUri, HealthIndicatorRange[0], (key, currentValue) => | ||
{ | ||
if (currentValue >= HealthIndicatorRange[0]) | ||
{ | ||
return HealthIndicatorRange[0]; | ||
} | ||
|
||
for (int i = HealthIndicatorRange.Length - 1; i >= 0; i--) | ||
{ | ||
if (HealthIndicatorRange[i] > currentValue) | ||
{ | ||
return HealthIndicatorRange[i]; | ||
} | ||
} | ||
|
||
return HealthIndicatorRange[0]; | ||
}); | ||
} | ||
|
||
private static string GetBaseUrl(Uri uri) | ||
{ | ||
var uriString = uri.ToString(); | ||
var queryStart = uriString.IndexOf("?", StringComparison.Ordinal); | ||
if (queryStart >= 0) | ||
{ | ||
return uriString.Substring(0, queryStart); | ||
} | ||
return uriString; | ||
} | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
src/NuGet.Services.Search.Client/Client/IEndpointHealthIndicatorStore.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
|
||
namespace NuGet.Services.Search.Client | ||
{ | ||
public interface IEndpointHealthIndicatorStore | ||
{ | ||
int GetHealth(Uri endpoint); | ||
void DecreaseHealth(Uri endpoint); | ||
void IncreaseHealth(Uri endpoint); | ||
} | ||
} |
212 changes: 212 additions & 0 deletions
212
src/NuGet.Services.Search.Client/Client/RetryingHttpClientWrapper.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Net; | ||
using System.Net.Http; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace NuGet.Services.Search.Client | ||
{ | ||
public sealed class RetryingHttpClientWrapper | ||
{ | ||
private readonly HttpClient _httpClient; | ||
private readonly IEndpointHealthIndicatorStore _endpointHealthIndicatorStore; | ||
|
||
private static readonly Random Random = new Random((int) DateTime.UtcNow.Ticks); | ||
private static readonly int PeriodToDelayAlternateRequest = 4000; | ||
private static readonly IComparer<int> HealthComparer; | ||
|
||
static RetryingHttpClientWrapper() | ||
{ | ||
HealthComparer = new WeightedRandomComparer(Random); | ||
} | ||
|
||
public RetryingHttpClientWrapper(HttpClient httpClient) | ||
: this (httpClient, new BaseUrlHealthIndicatorStore()) | ||
{ | ||
_httpClient = httpClient; | ||
} | ||
|
||
public RetryingHttpClientWrapper(HttpClient httpClient, IEndpointHealthIndicatorStore endpointHealthIndicatorStore) | ||
{ | ||
_httpClient = httpClient; | ||
_endpointHealthIndicatorStore = endpointHealthIndicatorStore; | ||
} | ||
|
||
public async Task<string> GetStringAsync(IEnumerable<Uri> endpoints) | ||
{ | ||
return await GetWithRetry(endpoints, (client, uri, cancellationToken) => _httpClient.GetStringAsync(uri)); | ||
} | ||
|
||
public async Task<HttpResponseMessage> GetAsync(IEnumerable<Uri> endpoints) | ||
{ | ||
return await GetWithRetry(endpoints, (client, uri, cancellationToken) => _httpClient.GetAsync(uri, cancellationToken)); | ||
} | ||
|
||
private async Task<TResponseType> GetWithRetry<TResponseType>(IEnumerable<Uri> endpoints, Func<HttpClient, Uri, CancellationToken, Task<TResponseType>> run) | ||
{ | ||
// Build endpoints, ordered by health (with a chance of less health) | ||
var healthyEndpoints = endpoints.OrderByDescending(e => _endpointHealthIndicatorStore.GetHealth(e), HealthComparer).ToList(); | ||
|
||
// Make all requests cancellable using this CancellationTokenSource | ||
var cancellationTokenSource = new CancellationTokenSource(); | ||
|
||
// Create requests queue | ||
var tasks = CreateRequestQueue(healthyEndpoints, run, cancellationTokenSource); | ||
|
||
// When the first succesful task comes in, return it. If no succesfull tasks are returned, throw an AggregateException. | ||
var exceptions = new List<Exception>(); | ||
|
||
var taskList = tasks.ToList(); | ||
Task<TResponseType> completedTask = null; | ||
do | ||
{ | ||
completedTask = await Task.WhenAny(taskList); | ||
taskList.Remove(completedTask); | ||
|
||
if (completedTask.Exception != null) | ||
{ | ||
exceptions.AddRange(completedTask.Exception.InnerExceptions); | ||
} | ||
} while ((completedTask.IsFaulted || completedTask.IsCanceled) && taskList.Any()); | ||
|
||
cancellationTokenSource.Cancel(false); | ||
|
||
if (completedTask == null || completedTask.IsFaulted || completedTask.IsCanceled) | ||
{ | ||
throw new AggregateException(exceptions); | ||
} | ||
return await completedTask; | ||
} | ||
|
||
private List<Task<TResponseType>> CreateRequestQueue<TResponseType>(List<Uri> endpoints, Func<HttpClient, Uri, CancellationToken, Task<TResponseType>> run, CancellationTokenSource cancellatonTokenSource) | ||
{ | ||
// Queue up a series of requests. Make each request wait a little longer. | ||
var tasks = new List<Task<TResponseType>>(endpoints.Count); | ||
|
||
for (var i = 0; i < endpoints.Count; i++) | ||
{ | ||
var endpoint = endpoints[i]; | ||
|
||
tasks.Add(Task.Delay(i * PeriodToDelayAlternateRequest, cancellatonTokenSource.Token) | ||
.ContinueWith(task => | ||
{ | ||
try | ||
{ | ||
var response = run(_httpClient, endpoint, cancellatonTokenSource.Token).Result; | ||
|
||
var responseMessage = response as HttpResponseMessage; | ||
if (responseMessage != null && !responseMessage.IsSuccessStatusCode) | ||
{ | ||
if (ShouldTryOther(responseMessage)) | ||
{ | ||
_endpointHealthIndicatorStore.DecreaseHealth(endpoint); | ||
throw new HttpRequestException(responseMessage.ReasonPhrase); | ||
} | ||
else | ||
{ | ||
cancellatonTokenSource.Cancel(); | ||
} | ||
} | ||
|
||
_endpointHealthIndicatorStore.IncreaseHealth(endpoint); | ||
|
||
return response; | ||
} | ||
catch (Exception ex) | ||
{ | ||
if (ShouldTryOther(ex)) | ||
{ | ||
_endpointHealthIndicatorStore.DecreaseHealth(endpoint); | ||
} | ||
else | ||
{ | ||
cancellatonTokenSource.Cancel(); | ||
} | ||
throw; | ||
} | ||
}, cancellatonTokenSource.Token)); | ||
} | ||
|
||
return tasks; | ||
} | ||
|
||
private static bool ShouldTryOther(Exception ex) | ||
{ | ||
var aex = ex as AggregateException; | ||
if (aex != null) | ||
{ | ||
ex = aex.InnerExceptions.FirstOrDefault(); | ||
} | ||
|
||
var wex = ex as WebException; | ||
if (wex == null) | ||
{ | ||
wex = ex.InnerException as WebException; | ||
} | ||
if (wex != null && ( | ||
wex.Status == WebExceptionStatus.UnknownError | ||
|| wex.Status == WebExceptionStatus.ConnectFailure | ||
|| (int)wex.Status == 1 // NameResolutionFailure | ||
)) | ||
{ | ||
return true; | ||
} | ||
|
||
var reqex = ex as HttpRequestException; | ||
if (reqex != null) | ||
{ | ||
return true; | ||
} | ||
|
||
if (ex is TaskCanceledException) | ||
{ | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
private static bool ShouldTryOther(HttpResponseMessage response) | ||
{ | ||
if (response.IsSuccessStatusCode | ||
|| response.StatusCode == HttpStatusCode.BadGateway | ||
|| response.StatusCode == HttpStatusCode.GatewayTimeout | ||
|| response.StatusCode == HttpStatusCode.ServiceUnavailable | ||
|| response.StatusCode == HttpStatusCode.RequestTimeout | ||
|| response.StatusCode == HttpStatusCode.InternalServerError) | ||
{ | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
class WeightedRandomComparer | ||
: IComparer<int> | ||
{ | ||
private readonly Random _random; | ||
|
||
public WeightedRandomComparer(Random random) | ||
{ | ||
_random = random; | ||
} | ||
|
||
public int Compare(int x, int y) | ||
{ | ||
var totalWeight = x + y; | ||
var randomNumber = _random.Next(0, totalWeight); | ||
|
||
if (randomNumber < x) | ||
{ | ||
return 1; | ||
} | ||
return -1; | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.