From 4d230f1ddaac88375dd44a9af273eed60a34b1a9 Mon Sep 17 00:00:00 2001 From: Joel Verhagen Date: Sat, 6 Apr 2019 15:44:00 -0700 Subject: [PATCH] Complete Owners2AzureSearch job and wire-up the entry point (#515) Address https://github.com/NuGet/NuGetGallery/issues/6475 --- src/NuGet.Jobs.Owners2AzureSearch/Job.cs | 5 +- src/NuGet.Services.AzureSearch/BatchPusher.cs | 2 +- .../DependencyInjectionExtensions.cs | 11 + .../IndexActions.cs | 2 + .../NuGet.Services.AzureSearch.csproj | 1 + .../Owners2AzureSearchCommand.cs | 106 +++++++++ .../NuGet.Services.AzureSearch.Tests.csproj | 1 + .../Owners2AzureSearchCommandFacts.cs | 204 ++++++++++++++++++ 8 files changed, 330 insertions(+), 2 deletions(-) create mode 100644 src/NuGet.Services.AzureSearch/Owners2AzureSearch/Owners2AzureSearchCommand.cs create mode 100644 tests/NuGet.Services.AzureSearch.Tests/Owners2AzureSearch/Owners2AzureSearchCommandFacts.cs diff --git a/src/NuGet.Jobs.Owners2AzureSearch/Job.cs b/src/NuGet.Jobs.Owners2AzureSearch/Job.cs index 3af311fdf..6cf9be27d 100644 --- a/src/NuGet.Jobs.Owners2AzureSearch/Job.cs +++ b/src/NuGet.Jobs.Owners2AzureSearch/Job.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using NuGet.Services.AzureSearch; +using NuGet.Services.AzureSearch.Owners2AzureSearch; namespace NuGet.Jobs { @@ -19,7 +20,9 @@ public override async Task Run() ServicePointManager.DefaultConnectionLimit = 64; ServicePointManager.MaxServicePointIdleTime = 10000; - await Task.Yield(); + await _serviceProvider + .GetRequiredService() + .ExecuteAsync(); } protected override void ConfigureAutofacServices(ContainerBuilder containerBuilder) diff --git a/src/NuGet.Services.AzureSearch/BatchPusher.cs b/src/NuGet.Services.AzureSearch/BatchPusher.cs index 8453ba3a9..d3054f8df 100644 --- a/src/NuGet.Services.AzureSearch/BatchPusher.cs +++ b/src/NuGet.Services.AzureSearch/BatchPusher.cs @@ -69,7 +69,7 @@ public void EnqueueIndexActions(string packageId, IndexActions indexActions) throw new ArgumentException("This package ID has already been enqueued.", nameof(packageId)); } - if (!indexActions.Hijack.Any() && !indexActions.Search.Any()) + if (indexActions.IsEmpty) { throw new ArgumentException("There must be at least one index action.", nameof(indexActions)); } diff --git a/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs b/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs index 86156e5d1..de5f9d3a6 100644 --- a/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs +++ b/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs @@ -165,6 +165,16 @@ private static void RegisterAzureSearchJobStorageServices(ContainerBuilder conta c.ResolveKeyed(key), c.Resolve>(), c.Resolve>())); + + containerBuilder + .Register(c => new Owners2AzureSearchCommand( + c.Resolve(), + c.Resolve(), + c.Resolve(), + c.Resolve(), + c.Resolve>(), + c.Resolve>(), + c.Resolve>())); } private static void RegisterAuxiliaryDataStorageServices(ContainerBuilder containerBuilder, string key) @@ -229,6 +239,7 @@ public static IServiceCollection AddAzureSearch(this IServiceCollection services services.AddTransient(); services.AddTransient(); services.AddTransient(); + services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); diff --git a/src/NuGet.Services.AzureSearch/IndexActions.cs b/src/NuGet.Services.AzureSearch/IndexActions.cs index 715b04827..19f1ee3c9 100644 --- a/src/NuGet.Services.AzureSearch/IndexActions.cs +++ b/src/NuGet.Services.AzureSearch/IndexActions.cs @@ -26,5 +26,7 @@ public IndexActions( public IReadOnlyList> Search { get; } public IReadOnlyList> Hijack { get; } public ResultAndAccessCondition VersionListDataResult { get; } + + public bool IsEmpty => Search.Count == 0 && Hijack.Count == 0; } } diff --git a/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj b/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj index 0eb679542..79bfcab76 100644 --- a/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj +++ b/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj @@ -57,6 +57,7 @@ + diff --git a/src/NuGet.Services.AzureSearch/Owners2AzureSearch/Owners2AzureSearchCommand.cs b/src/NuGet.Services.AzureSearch/Owners2AzureSearch/Owners2AzureSearchCommand.cs new file mode 100644 index 000000000..f72701554 --- /dev/null +++ b/src/NuGet.Services.AzureSearch/Owners2AzureSearch/Owners2AzureSearchCommand.cs @@ -0,0 +1,106 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace NuGet.Services.AzureSearch.Owners2AzureSearch +{ + public class Owners2AzureSearchCommand + { + private readonly IDatabaseOwnerFetcher _databaseOwnerFetcher; + private readonly IOwnerDataClient _ownerDataClient; + private readonly IOwnerSetComparer _ownerSetComparer; + private readonly IOwnerIndexActionBuilder _indexActionBuilder; + private readonly Func _batchPusherFactory; + private readonly IOptionsSnapshot _options; + private readonly ILogger _logger; + + public Owners2AzureSearchCommand( + IDatabaseOwnerFetcher databaseOwnerFetcher, + IOwnerDataClient ownerDataClient, + IOwnerSetComparer ownerSetComparer, + IOwnerIndexActionBuilder indexActionBuilder, + Func batchPusherFactory, + IOptionsSnapshot options, + ILogger logger) + { + _databaseOwnerFetcher = databaseOwnerFetcher ?? throw new ArgumentNullException(nameof(databaseOwnerFetcher)); + _ownerDataClient = ownerDataClient ?? throw new ArgumentNullException(nameof(ownerDataClient)); + _ownerSetComparer = ownerSetComparer ?? throw new ArgumentNullException(nameof(ownerSetComparer)); + _indexActionBuilder = indexActionBuilder ?? throw new ArgumentNullException(nameof(indexActionBuilder)); + _batchPusherFactory = batchPusherFactory ?? throw new ArgumentNullException(nameof(batchPusherFactory)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + if (_options.Value.MaxConcurrentBatches <= 0) + { + throw new ArgumentException( + $"The {nameof(AzureSearchJobConfiguration.MaxConcurrentBatches)} must be greater than zero.", + nameof(options)); + } + } + + public async Task ExecuteAsync() + { + _logger.LogInformation("Fetching old owner data from blob storage."); + var storageResult = await _ownerDataClient.ReadLatestIndexedAsync(); + + _logger.LogInformation("Fetching new owner data from the database."); + var databaseResult = await _databaseOwnerFetcher.GetPackageIdToOwnersAsync(); + + _logger.LogInformation("Detecting owner changes."); + var changes = _ownerSetComparer.Compare(storageResult.Result, databaseResult); + var changesBag = new ConcurrentBag>(changes.Select(x => new IdAndValue(x.Key, x.Value))); + _logger.LogInformation("{Count} package IDs have owner changes.", changesBag.Count); + + if (!changes.Any()) + { + return; + } + + _logger.LogInformation( + "Starting {Count} workers pushing owners changes to Azure Search.", + _options.Value.MaxConcurrentBatches); + var workerTasks = Enumerable + .Range(0, _options.Value.MaxConcurrentBatches) + .Select(x => WorkAsync(changesBag)) + .ToList(); + await Task.WhenAll(workerTasks); + _logger.LogInformation("All of the owner changes have been pushed to Azure Search."); + + // Persist in storage the list of all package IDs that have owner changes. This allows debugging and future + // analytics on frequency of ownership changes. + _logger.LogInformation("Uploading the package IDs that have owner changes to blob storage."); + await _ownerDataClient.UploadChangeHistoryAsync(changes.Keys.ToList()); + + _logger.LogInformation("Uploading the new owner data to blob storage."); + await _ownerDataClient.ReplaceLatestIndexedAsync(databaseResult, storageResult.AccessCondition); + } + + private async Task WorkAsync(ConcurrentBag> changesBag) + { + await Task.Yield(); + + var batchPusher = _batchPusherFactory(); + while (changesBag.TryTake(out var changes)) + { + var indexActions = await _indexActionBuilder.UpdateOwnersAsync(changes.Id, changes.Value); + if (indexActions.IsEmpty) + { + continue; + } + + batchPusher.EnqueueIndexActions(changes.Id, indexActions); + await batchPusher.PushFullBatchesAsync(); + } + + await batchPusher.FinishAsync(); + } + } +} + diff --git a/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj b/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj index 38153cb73..b53451dcb 100644 --- a/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj +++ b/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj @@ -60,6 +60,7 @@ + diff --git a/tests/NuGet.Services.AzureSearch.Tests/Owners2AzureSearch/Owners2AzureSearchCommandFacts.cs b/tests/NuGet.Services.AzureSearch.Tests/Owners2AzureSearch/Owners2AzureSearchCommandFacts.cs new file mode 100644 index 000000000..fd015cced --- /dev/null +++ b/tests/NuGet.Services.AzureSearch.Tests/Owners2AzureSearch/Owners2AzureSearchCommandFacts.cs @@ -0,0 +1,204 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Search.Models; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; +using NuGet.Services.AzureSearch.Support; +using NuGetGallery; +using Xunit; +using Xunit.Abstractions; + +namespace NuGet.Services.AzureSearch.Owners2AzureSearch +{ + public class Owners2AzureSearchCommandFacts + { + public class ExecuteAsync : Facts + { + public ExecuteAsync(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task DoesNotPushWhenThereAreNoChanges() + { + await Target.ExecuteAsync(); + + Pusher.Verify( + x => x.EnqueueIndexActions(It.IsAny(), It.IsAny()), + Times.Never); + Pusher.Verify(x => x.PushFullBatchesAsync(), Times.Never); + Pusher.Verify(x => x.FinishAsync(), Times.Never); + OwnerDataClient.Verify(x => x.UploadChangeHistoryAsync(It.IsAny>()), Times.Never); + OwnerDataClient.Verify( + x => x.ReplaceLatestIndexedAsync( + It.IsAny>>(), + It.IsAny()), + Times.Never); + } + + [Fact] + public async Task ComparesInTheRightOrder() + { + await Target.ExecuteAsync(); + + OwnerSetComparer.Verify( + x => x.Compare( + It.IsAny>>(), + It.IsAny>>()), + Times.Once); + OwnerSetComparer.Verify( + x => x.Compare(StorageResult.Result, DatabaseResult), + Times.Once); + } + + [Fact] + public async Task PushesAllChanges() + { + Changes["NuGet.Core"] = new string[0]; + Changes["NuGet.Versioning"] = new string[0]; + Changes["EntityFramework"] = new string[0]; + + await Target.ExecuteAsync(); + + Pusher.Verify( + x => x.EnqueueIndexActions(It.IsAny(), It.IsAny()), + Times.Exactly(3)); + Pusher.Verify( + x => x.EnqueueIndexActions("NuGet.Core", It.IsAny()), + Times.Once); + Pusher.Verify( + x => x.EnqueueIndexActions("NuGet.Versioning", It.IsAny()), + Times.Once); + Pusher.Verify( + x => x.EnqueueIndexActions("EntityFramework", It.IsAny()), + Times.Once); + Pusher.Verify(x => x.PushFullBatchesAsync(), Times.Exactly(3)); + Pusher.Verify(x => x.FinishAsync(), Times.Once); + } + + [Fact] + public async Task UpdatesBlobStorageAfterIndexing() + { + var actions = new List(); + Pusher + .Setup(x => x.FinishAsync()) + .Returns(Task.CompletedTask) + .Callback(() => actions.Add(nameof(IBatchPusher.FinishAsync))); + OwnerDataClient + .Setup(x => x.UploadChangeHistoryAsync(It.IsAny>())) + .Returns(Task.CompletedTask) + .Callback(() => actions.Add(nameof(IOwnerDataClient.UploadChangeHistoryAsync))); + OwnerDataClient + .Setup(x => x.ReplaceLatestIndexedAsync(It.IsAny>>(), It.IsAny())) + .Returns(Task.CompletedTask) + .Callback(() => actions.Add(nameof(IOwnerDataClient.ReplaceLatestIndexedAsync))); + + Changes["NuGet.Core"] = new string[0]; + + await Target.ExecuteAsync(); + + Assert.Equal( + new[] { nameof(IBatchPusher.FinishAsync), nameof(IOwnerDataClient.UploadChangeHistoryAsync), nameof(IOwnerDataClient.ReplaceLatestIndexedAsync) }, + actions.ToArray()); + } + + [Fact] + public async Task UpdatesBlobStorage() + { + IReadOnlyList changeHistory = null; + OwnerDataClient + .Setup(x => x.UploadChangeHistoryAsync(It.IsAny>())) + .Returns(Task.CompletedTask) + .Callback>(x => changeHistory = x); + + Changes["NuGet.Versioning"] = new string[0]; + Changes["NuGet.Core"] = new string[0]; + + await Target.ExecuteAsync(); + + Assert.Equal(new[] { "NuGet.Core", "NuGet.Versioning" }, changeHistory.ToArray()); + OwnerDataClient.Verify( + x => x.ReplaceLatestIndexedAsync(DatabaseResult, StorageResult.AccessCondition), + Times.Once); + } + } + + public abstract class Facts + { + public Facts(ITestOutputHelper output) + { + DatabaseOwnerFetcher = new Mock(); + OwnerDataClient = new Mock(); + OwnerSetComparer = new Mock(); + OwnerIndexActionBuilder = new Mock(); + Pusher = new Mock(); + Options = new Mock>(); + Logger = output.GetLogger(); + + Configuration = new AzureSearchJobConfiguration + { + MaxConcurrentBatches = 1, + }; + DatabaseResult = new SortedDictionary>(); + StorageResult = new ResultAndAccessCondition>>( + new SortedDictionary>(), + new Mock().Object); + Changes = new SortedDictionary(); + IndexActions = new IndexActions( + new List> { IndexAction.Merge(new KeyedDocument()) }, + new List> { IndexAction.Merge(new KeyedDocument()) }, + new ResultAndAccessCondition( + new VersionListData(new Dictionary()), + new Mock().Object)); + + Options + .Setup(x => x.Value) + .Returns(() => Configuration); + DatabaseOwnerFetcher + .Setup(x => x.GetPackageIdToOwnersAsync()) + .ReturnsAsync(() => DatabaseResult); + OwnerDataClient + .Setup(x => x.ReadLatestIndexedAsync()) + .ReturnsAsync(() => StorageResult); + OwnerSetComparer + .Setup(x => x.Compare( + It.IsAny>>(), + It.IsAny>>())) + .Returns(() => Changes); + OwnerIndexActionBuilder + .Setup(x => x.UpdateOwnersAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(() => IndexActions); + + Target = new Owners2AzureSearchCommand( + DatabaseOwnerFetcher.Object, + OwnerDataClient.Object, + OwnerSetComparer.Object, + OwnerIndexActionBuilder.Object, + () => Pusher.Object, + Options.Object, + Logger); + } + + public Mock DatabaseOwnerFetcher { get; } + public Mock OwnerDataClient { get; } + public Mock OwnerSetComparer { get; } + public Mock OwnerIndexActionBuilder { get; } + public Mock Pusher { get; } + public Mock> Options { get; } + public RecordingLogger Logger { get; } + public AzureSearchJobConfiguration Configuration { get; } + public SortedDictionary> DatabaseResult { get; } + public ResultAndAccessCondition>> StorageResult { get; } + public SortedDictionary Changes { get; } + public IndexActions IndexActions { get; } + public Owners2AzureSearchCommand Target { get; } + } + } +}