Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

Commit

Permalink
Complete Owners2AzureSearch job and wire-up the entry point (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
joelverhagen committed May 7, 2019
1 parent 21a2e98 commit 4d230f1
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 2 deletions.
5 changes: 4 additions & 1 deletion src/NuGet.Jobs.Owners2AzureSearch/Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using NuGet.Services.AzureSearch;
using NuGet.Services.AzureSearch.Owners2AzureSearch;

namespace NuGet.Jobs
{
Expand All @@ -19,7 +20,9 @@ public override async Task Run()
ServicePointManager.DefaultConnectionLimit = 64;
ServicePointManager.MaxServicePointIdleTime = 10000;

await Task.Yield();
await _serviceProvider
.GetRequiredService<Owners2AzureSearchCommand>()
.ExecuteAsync();
}

protected override void ConfigureAutofacServices(ContainerBuilder containerBuilder)
Expand Down
2 changes: 1 addition & 1 deletion src/NuGet.Services.AzureSearch/BatchPusher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void EnqueueIndexActions(string packageId, IndexActions indexActions)
throw new ArgumentException("This package ID has already been enqueued.", nameof(packageId));
}

if (!indexActions.Hijack.Any() && !indexActions.Search.Any())
if (indexActions.IsEmpty)
{
throw new ArgumentException("There must be at least one index action.", nameof(indexActions));
}
Expand Down
11 changes: 11 additions & 0 deletions src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ private static void RegisterAzureSearchJobStorageServices(ContainerBuilder conta
c.ResolveKeyed<IStorageFactory>(key),
c.Resolve<IOptionsSnapshot<Db2AzureSearchConfiguration>>(),
c.Resolve<ILogger<Db2AzureSearchCommand>>()));

containerBuilder
.Register(c => new Owners2AzureSearchCommand(
c.Resolve<IDatabaseOwnerFetcher>(),
c.Resolve<IOwnerDataClient>(),
c.Resolve<IOwnerSetComparer>(),
c.Resolve<IOwnerIndexActionBuilder>(),
c.Resolve<Func<IBatchPusher>>(),
c.Resolve<IOptionsSnapshot<AzureSearchJobConfiguration>>(),
c.Resolve<ILogger<Owners2AzureSearchCommand>>()));
}

private static void RegisterAuxiliaryDataStorageServices(ContainerBuilder containerBuilder, string key)
Expand Down Expand Up @@ -229,6 +239,7 @@ public static IServiceCollection AddAzureSearch(this IServiceCollection services
services.AddTransient<IHijackDocumentBuilder, HijackDocumentBuilder>();
services.AddTransient<IIndexBuilder, IndexBuilder>();
services.AddTransient<INewPackageRegistrationProducer, NewPackageRegistrationProducer>();
services.AddTransient<IOwnerIndexActionBuilder, OwnerIndexActionBuilder>();
services.AddTransient<IOwnerSetComparer, OwnerSetComparer>();
services.AddTransient<IPackageEntityIndexActionBuilder, PackageEntityIndexActionBuilder>();
services.AddTransient<IRegistrationClient, RegistrationClient>();
Expand Down
2 changes: 2 additions & 0 deletions src/NuGet.Services.AzureSearch/IndexActions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ public IndexActions(
public IReadOnlyList<IndexAction<KeyedDocument>> Search { get; }
public IReadOnlyList<IndexAction<KeyedDocument>> Hijack { get; }
public ResultAndAccessCondition<VersionListData> VersionListDataResult { get; }

public bool IsEmpty => Search.Count == 0 && Hijack.Count == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<Compile Include="Owners2AzureSearch\IOwnerDataClient.cs" />
<Compile Include="Owners2AzureSearch\OwnerDataClient.cs" />
<Compile Include="ScoringProfiles\DownloadCountBoosterProfile.cs" />
<Compile Include="Owners2AzureSearch\Owners2AzureSearchCommand.cs" />
<Compile Include="SearchService\AzureSearchQueryBuilder.cs" />
<Compile Include="BlobContainerBuilder.cs" />
<Compile Include="IBlobContainerBuilder.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace NuGet.Services.AzureSearch.Owners2AzureSearch
{
public class Owners2AzureSearchCommand
{
private readonly IDatabaseOwnerFetcher _databaseOwnerFetcher;
private readonly IOwnerDataClient _ownerDataClient;
private readonly IOwnerSetComparer _ownerSetComparer;
private readonly IOwnerIndexActionBuilder _indexActionBuilder;
private readonly Func<IBatchPusher> _batchPusherFactory;
private readonly IOptionsSnapshot<AzureSearchJobConfiguration> _options;
private readonly ILogger<Owners2AzureSearchCommand> _logger;

public Owners2AzureSearchCommand(
IDatabaseOwnerFetcher databaseOwnerFetcher,
IOwnerDataClient ownerDataClient,
IOwnerSetComparer ownerSetComparer,
IOwnerIndexActionBuilder indexActionBuilder,
Func<IBatchPusher> batchPusherFactory,
IOptionsSnapshot<AzureSearchJobConfiguration> options,
ILogger<Owners2AzureSearchCommand> logger)
{
_databaseOwnerFetcher = databaseOwnerFetcher ?? throw new ArgumentNullException(nameof(databaseOwnerFetcher));
_ownerDataClient = ownerDataClient ?? throw new ArgumentNullException(nameof(ownerDataClient));
_ownerSetComparer = ownerSetComparer ?? throw new ArgumentNullException(nameof(ownerSetComparer));
_indexActionBuilder = indexActionBuilder ?? throw new ArgumentNullException(nameof(indexActionBuilder));
_batchPusherFactory = batchPusherFactory ?? throw new ArgumentNullException(nameof(batchPusherFactory));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

if (_options.Value.MaxConcurrentBatches <= 0)
{
throw new ArgumentException(
$"The {nameof(AzureSearchJobConfiguration.MaxConcurrentBatches)} must be greater than zero.",
nameof(options));
}
}

public async Task ExecuteAsync()
{
_logger.LogInformation("Fetching old owner data from blob storage.");
var storageResult = await _ownerDataClient.ReadLatestIndexedAsync();

_logger.LogInformation("Fetching new owner data from the database.");
var databaseResult = await _databaseOwnerFetcher.GetPackageIdToOwnersAsync();

_logger.LogInformation("Detecting owner changes.");
var changes = _ownerSetComparer.Compare(storageResult.Result, databaseResult);
var changesBag = new ConcurrentBag<IdAndValue<string[]>>(changes.Select(x => new IdAndValue<string[]>(x.Key, x.Value)));
_logger.LogInformation("{Count} package IDs have owner changes.", changesBag.Count);

if (!changes.Any())
{
return;
}

_logger.LogInformation(
"Starting {Count} workers pushing owners changes to Azure Search.",
_options.Value.MaxConcurrentBatches);
var workerTasks = Enumerable
.Range(0, _options.Value.MaxConcurrentBatches)
.Select(x => WorkAsync(changesBag))
.ToList();
await Task.WhenAll(workerTasks);
_logger.LogInformation("All of the owner changes have been pushed to Azure Search.");

// Persist in storage the list of all package IDs that have owner changes. This allows debugging and future
// analytics on frequency of ownership changes.
_logger.LogInformation("Uploading the package IDs that have owner changes to blob storage.");
await _ownerDataClient.UploadChangeHistoryAsync(changes.Keys.ToList());

_logger.LogInformation("Uploading the new owner data to blob storage.");
await _ownerDataClient.ReplaceLatestIndexedAsync(databaseResult, storageResult.AccessCondition);
}

private async Task WorkAsync(ConcurrentBag<IdAndValue<string[]>> changesBag)
{
await Task.Yield();

var batchPusher = _batchPusherFactory();
while (changesBag.TryTake(out var changes))
{
var indexActions = await _indexActionBuilder.UpdateOwnersAsync(changes.Id, changes.Value);
if (indexActions.IsEmpty)
{
continue;
}

batchPusher.EnqueueIndexActions(changes.Id, indexActions);
await batchPusher.PushFullBatchesAsync();
}

await batchPusher.FinishAsync();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<Compile Include="Models\CommittedDocumentFacts.cs" />
<Compile Include="Owners2AzureSearch\OwnerDataClientFacts.cs" />
<Compile Include="Owners2AzureSearch\OwnerIndexActionBuilderFacts.cs" />
<Compile Include="Owners2AzureSearch\Owners2AzureSearchCommandFacts.cs" />
<Compile Include="Owners2AzureSearch\OwnerSetComparerFacts.cs" />
<Compile Include="Registration\RegistrationUrlBuilderFacts.cs" />
<Compile Include="SearchDocumentBuilderFacts.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Search.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Moq;
using NuGet.Services.AzureSearch.Support;
using NuGetGallery;
using Xunit;
using Xunit.Abstractions;

namespace NuGet.Services.AzureSearch.Owners2AzureSearch
{
public class Owners2AzureSearchCommandFacts
{
public class ExecuteAsync : Facts
{
public ExecuteAsync(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task DoesNotPushWhenThereAreNoChanges()
{
await Target.ExecuteAsync();

Pusher.Verify(
x => x.EnqueueIndexActions(It.IsAny<string>(), It.IsAny<IndexActions>()),
Times.Never);
Pusher.Verify(x => x.PushFullBatchesAsync(), Times.Never);
Pusher.Verify(x => x.FinishAsync(), Times.Never);
OwnerDataClient.Verify(x => x.UploadChangeHistoryAsync(It.IsAny<IReadOnlyList<string>>()), Times.Never);
OwnerDataClient.Verify(
x => x.ReplaceLatestIndexedAsync(
It.IsAny<SortedDictionary<string, SortedSet<string>>>(),
It.IsAny<IAccessCondition>()),
Times.Never);
}

[Fact]
public async Task ComparesInTheRightOrder()
{
await Target.ExecuteAsync();

OwnerSetComparer.Verify(
x => x.Compare(
It.IsAny<SortedDictionary<string, SortedSet<string>>>(),
It.IsAny<SortedDictionary<string, SortedSet<string>>>()),
Times.Once);
OwnerSetComparer.Verify(
x => x.Compare(StorageResult.Result, DatabaseResult),
Times.Once);
}

[Fact]
public async Task PushesAllChanges()
{
Changes["NuGet.Core"] = new string[0];
Changes["NuGet.Versioning"] = new string[0];
Changes["EntityFramework"] = new string[0];

await Target.ExecuteAsync();

Pusher.Verify(
x => x.EnqueueIndexActions(It.IsAny<string>(), It.IsAny<IndexActions>()),
Times.Exactly(3));
Pusher.Verify(
x => x.EnqueueIndexActions("NuGet.Core", It.IsAny<IndexActions>()),
Times.Once);
Pusher.Verify(
x => x.EnqueueIndexActions("NuGet.Versioning", It.IsAny<IndexActions>()),
Times.Once);
Pusher.Verify(
x => x.EnqueueIndexActions("EntityFramework", It.IsAny<IndexActions>()),
Times.Once);
Pusher.Verify(x => x.PushFullBatchesAsync(), Times.Exactly(3));
Pusher.Verify(x => x.FinishAsync(), Times.Once);
}

[Fact]
public async Task UpdatesBlobStorageAfterIndexing()
{
var actions = new List<string>();
Pusher
.Setup(x => x.FinishAsync())
.Returns(Task.CompletedTask)
.Callback(() => actions.Add(nameof(IBatchPusher.FinishAsync)));
OwnerDataClient
.Setup(x => x.UploadChangeHistoryAsync(It.IsAny<IReadOnlyList<string>>()))
.Returns(Task.CompletedTask)
.Callback(() => actions.Add(nameof(IOwnerDataClient.UploadChangeHistoryAsync)));
OwnerDataClient
.Setup(x => x.ReplaceLatestIndexedAsync(It.IsAny<SortedDictionary<string, SortedSet<string>>>(), It.IsAny<IAccessCondition>()))
.Returns(Task.CompletedTask)
.Callback(() => actions.Add(nameof(IOwnerDataClient.ReplaceLatestIndexedAsync)));

Changes["NuGet.Core"] = new string[0];

await Target.ExecuteAsync();

Assert.Equal(
new[] { nameof(IBatchPusher.FinishAsync), nameof(IOwnerDataClient.UploadChangeHistoryAsync), nameof(IOwnerDataClient.ReplaceLatestIndexedAsync) },
actions.ToArray());
}

[Fact]
public async Task UpdatesBlobStorage()
{
IReadOnlyList<string> changeHistory = null;
OwnerDataClient
.Setup(x => x.UploadChangeHistoryAsync(It.IsAny<IReadOnlyList<string>>()))
.Returns(Task.CompletedTask)
.Callback<IReadOnlyList<string>>(x => changeHistory = x);

Changes["NuGet.Versioning"] = new string[0];
Changes["NuGet.Core"] = new string[0];

await Target.ExecuteAsync();

Assert.Equal(new[] { "NuGet.Core", "NuGet.Versioning" }, changeHistory.ToArray());
OwnerDataClient.Verify(
x => x.ReplaceLatestIndexedAsync(DatabaseResult, StorageResult.AccessCondition),
Times.Once);
}
}

public abstract class Facts
{
public Facts(ITestOutputHelper output)
{
DatabaseOwnerFetcher = new Mock<IDatabaseOwnerFetcher>();
OwnerDataClient = new Mock<IOwnerDataClient>();
OwnerSetComparer = new Mock<IOwnerSetComparer>();
OwnerIndexActionBuilder = new Mock<IOwnerIndexActionBuilder>();
Pusher = new Mock<IBatchPusher>();
Options = new Mock<IOptionsSnapshot<AzureSearchJobConfiguration>>();
Logger = output.GetLogger<Owners2AzureSearchCommand>();

Configuration = new AzureSearchJobConfiguration
{
MaxConcurrentBatches = 1,
};
DatabaseResult = new SortedDictionary<string, SortedSet<string>>();
StorageResult = new ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>(
new SortedDictionary<string, SortedSet<string>>(),
new Mock<IAccessCondition>().Object);
Changes = new SortedDictionary<string, string[]>();
IndexActions = new IndexActions(
new List<IndexAction<KeyedDocument>> { IndexAction.Merge(new KeyedDocument()) },
new List<IndexAction<KeyedDocument>> { IndexAction.Merge(new KeyedDocument()) },
new ResultAndAccessCondition<VersionListData>(
new VersionListData(new Dictionary<string, VersionPropertiesData>()),
new Mock<IAccessCondition>().Object));

Options
.Setup(x => x.Value)
.Returns(() => Configuration);
DatabaseOwnerFetcher
.Setup(x => x.GetPackageIdToOwnersAsync())
.ReturnsAsync(() => DatabaseResult);
OwnerDataClient
.Setup(x => x.ReadLatestIndexedAsync())
.ReturnsAsync(() => StorageResult);
OwnerSetComparer
.Setup(x => x.Compare(
It.IsAny<SortedDictionary<string, SortedSet<string>>>(),
It.IsAny<SortedDictionary<string, SortedSet<string>>>()))
.Returns(() => Changes);
OwnerIndexActionBuilder
.Setup(x => x.UpdateOwnersAsync(It.IsAny<string>(), It.IsAny<string[]>()))
.ReturnsAsync(() => IndexActions);

Target = new Owners2AzureSearchCommand(
DatabaseOwnerFetcher.Object,
OwnerDataClient.Object,
OwnerSetComparer.Object,
OwnerIndexActionBuilder.Object,
() => Pusher.Object,
Options.Object,
Logger);
}

public Mock<IDatabaseOwnerFetcher> DatabaseOwnerFetcher { get; }
public Mock<IOwnerDataClient> OwnerDataClient { get; }
public Mock<IOwnerSetComparer> OwnerSetComparer { get; }
public Mock<IOwnerIndexActionBuilder> OwnerIndexActionBuilder { get; }
public Mock<IBatchPusher> Pusher { get; }
public Mock<IOptionsSnapshot<AzureSearchJobConfiguration>> Options { get; }
public RecordingLogger<Owners2AzureSearchCommand> Logger { get; }
public AzureSearchJobConfiguration Configuration { get; }
public SortedDictionary<string, SortedSet<string>> DatabaseResult { get; }
public ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>> StorageResult { get; }
public SortedDictionary<string, string[]> Changes { get; }
public IndexActions IndexActions { get; }
public Owners2AzureSearchCommand Target { get; }
}
}
}

0 comments on commit 4d230f1

Please sign in to comment.