This repository has been archived by the owner on Mar 16, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Package Renames 1] Add popularity transfers data client (#765)
The `auxiliary2azuresearch` job needs to know which popularity transfers have changed to properly update the search index. To do this, the last seen popularity transfers data need to be persisted so that it can be compared against the latest data. This adds new types `IPopularityTransferDataClient` and `PopularityTransferDataClient` that are unused for now. They will be added to the dependency injection container later. Part of NuGet/NuGetGallery#7898
- Loading branch information
1 parent
6da1018
commit 11e04f1
Showing
11 changed files
with
760 additions
and
39 deletions.
There are no files selected for viewing
36 changes: 36 additions & 0 deletions
36
src/NuGet.Services.AzureSearch/AuxiliaryFiles/IPopularityTransferDataClient.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
using NuGetGallery; | ||
|
||
namespace NuGet.Services.AzureSearch.AuxiliaryFiles | ||
{ | ||
/// <summary> | ||
/// The purpose of this interface is allow reading and writing populairty transfer information from storage. | ||
/// The Auxiliary2AzureSearch job does a comparison of latest popularity transfer data from the database with | ||
/// a snapshot of information stored in Azure Blob Storage. This interface handles the reading and writing of | ||
/// that snapshot from storage. | ||
/// </summary> | ||
public interface IPopularityTransferDataClient | ||
{ | ||
/// <summary> | ||
/// Read all of the latest indexed popularity transfers from storage. Also, return the current etag to allow | ||
/// optimistic concurrency checks on the writing of the file. The returned dictionary's key is the | ||
/// package ID that is transferring away its popularity, and the values are the package IDs receiving popularity. | ||
/// The dictionary and the sets are case-insensitive. | ||
/// </summary> | ||
Task<ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>> ReadLatestIndexedAsync(); | ||
|
||
/// <summary> | ||
/// Replace the existing latest indexed popularity transfers file (i.e. "popularityTransfers.v1.json" file). | ||
/// </summary> | ||
/// <param name="newData">The new data to be serialized into storage.</param> | ||
/// <param name="accessCondition">The access condition (i.e. etag) to use during the upload.</param> | ||
Task ReplaceLatestIndexedAsync( | ||
SortedDictionary<string, SortedSet<string>> newData, | ||
IAccessCondition accessCondition); | ||
} | ||
} | ||
|
139 changes: 139 additions & 0 deletions
139
src/NuGet.Services.AzureSearch/AuxiliaryFiles/PopularityTransferDataClient.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Diagnostics; | ||
using System.IO; | ||
using System.Net; | ||
using System.Threading.Tasks; | ||
using Microsoft.Extensions.Logging; | ||
using Microsoft.Extensions.Options; | ||
using Microsoft.WindowsAzure.Storage; | ||
using Newtonsoft.Json; | ||
using NuGetGallery; | ||
|
||
namespace NuGet.Services.AzureSearch.AuxiliaryFiles | ||
{ | ||
public class PopularityTransferDataClient : IPopularityTransferDataClient | ||
{ | ||
private static readonly JsonSerializer Serializer = new JsonSerializer(); | ||
|
||
private readonly ICloudBlobClient _cloudBlobClient; | ||
private readonly IOptionsSnapshot<AzureSearchConfiguration> _options; | ||
private readonly IAzureSearchTelemetryService _telemetryService; | ||
private readonly ILogger<PopularityTransferDataClient> _logger; | ||
private readonly Lazy<ICloudBlobContainer> _lazyContainer; | ||
|
||
public PopularityTransferDataClient( | ||
ICloudBlobClient cloudBlobClient, | ||
IOptionsSnapshot<AzureSearchConfiguration> options, | ||
IAzureSearchTelemetryService telemetryService, | ||
ILogger<PopularityTransferDataClient> logger) | ||
{ | ||
_cloudBlobClient = cloudBlobClient ?? throw new ArgumentNullException(nameof(cloudBlobClient)); | ||
_options = options ?? throw new ArgumentNullException(nameof(options)); | ||
_telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService)); | ||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||
|
||
_lazyContainer = new Lazy<ICloudBlobContainer>( | ||
() => _cloudBlobClient.GetContainerReference(_options.Value.StorageContainer)); | ||
} | ||
|
||
private ICloudBlobContainer Container => _lazyContainer.Value; | ||
|
||
public async Task<ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>> ReadLatestIndexedAsync() | ||
{ | ||
var stopwatch = Stopwatch.StartNew(); | ||
var blobName = GetLatestIndexedBlobName(); | ||
var blobReference = Container.GetBlobReference(blobName); | ||
|
||
_logger.LogInformation("Reading the latest indexed popularity transfers from {BlobName}.", blobName); | ||
|
||
var builder = new PackageIdToPopularityTransfersBuilder(_logger); | ||
IAccessCondition accessCondition; | ||
try | ||
{ | ||
using (var stream = await blobReference.OpenReadAsync(AccessCondition.GenerateEmptyCondition())) | ||
{ | ||
accessCondition = AccessConditionWrapper.GenerateIfMatchCondition(blobReference.ETag); | ||
ReadStream(stream, builder.Add); | ||
} | ||
} | ||
catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == (int)HttpStatusCode.NotFound) | ||
{ | ||
accessCondition = AccessConditionWrapper.GenerateIfNotExistsCondition(); | ||
_logger.LogInformation("The blob {BlobName} does not exist.", blobName); | ||
} | ||
|
||
var output = new ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>( | ||
builder.GetResult(), | ||
accessCondition); | ||
|
||
stopwatch.Stop(); | ||
_telemetryService.TrackReadLatestIndexedPopularityTransfers(output.Result.Count, stopwatch.Elapsed); | ||
|
||
return output; | ||
} | ||
|
||
public async Task ReplaceLatestIndexedAsync( | ||
SortedDictionary<string, SortedSet<string>> newData, | ||
IAccessCondition accessCondition) | ||
{ | ||
using (_telemetryService.TrackReplaceLatestIndexedPopularityTransfers(newData.Count)) | ||
{ | ||
var blobName = GetLatestIndexedBlobName(); | ||
_logger.LogInformation("Replacing the latest indexed popularity transfers from {BlobName}.", blobName); | ||
|
||
var mappedAccessCondition = new AccessCondition | ||
{ | ||
IfNoneMatchETag = accessCondition.IfNoneMatchETag, | ||
IfMatchETag = accessCondition.IfMatchETag, | ||
}; | ||
|
||
var blobReference = Container.GetBlobReference(blobName); | ||
|
||
using (var stream = await blobReference.OpenWriteAsync(mappedAccessCondition)) | ||
using (var streamWriter = new StreamWriter(stream)) | ||
using (var jsonTextWriter = new JsonTextWriter(streamWriter)) | ||
{ | ||
blobReference.Properties.ContentType = "application/json"; | ||
Serializer.Serialize(jsonTextWriter, newData); | ||
} | ||
} | ||
} | ||
|
||
private static void ReadStream(Stream stream, Action<string, IReadOnlyList<string>> add) | ||
{ | ||
using (var textReader = new StreamReader(stream)) | ||
using (var jsonReader = new JsonTextReader(textReader)) | ||
{ | ||
Guard.Assert(jsonReader.Read(), "The blob should be readable."); | ||
Guard.Assert(jsonReader.TokenType == JsonToken.StartObject, "The first token should be the start of an object."); | ||
Guard.Assert(jsonReader.Read(), "There should be a second token."); | ||
while (jsonReader.TokenType == JsonToken.PropertyName) | ||
{ | ||
var id = (string)jsonReader.Value; | ||
|
||
Guard.Assert(jsonReader.Read(), "There should be a token after the property name."); | ||
Guard.Assert(jsonReader.TokenType == JsonToken.StartArray, "The token after the property name should be the start of an object."); | ||
|
||
var transfers = Serializer.Deserialize<List<string>>(jsonReader); | ||
add(id, transfers); | ||
|
||
Guard.Assert(jsonReader.TokenType == JsonToken.EndArray, "The token after reading the array should be the end of an array."); | ||
Guard.Assert(jsonReader.Read(), "There should be a token after the end of the array."); | ||
} | ||
|
||
Guard.Assert(jsonReader.TokenType == JsonToken.EndObject, "The last token should be the end of an object."); | ||
Guard.Assert(!jsonReader.Read(), "There should be no token after the end of the object."); | ||
} | ||
} | ||
|
||
private string GetLatestIndexedBlobName() | ||
{ | ||
return $"{_options.Value.NormalizeStoragePath()}popularity-transfers/popularity-transfers.v1.json"; | ||
} | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
89 changes: 89 additions & 0 deletions
89
src/NuGet.Services.AzureSearch/PackageIdToPopularityTransfersBuilder.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using Microsoft.Extensions.Logging; | ||
|
||
namespace NuGet.Services.AzureSearch | ||
{ | ||
public class PackageIdToPopularityTransfersBuilder | ||
{ | ||
private readonly ILogger _logger; | ||
private int _addCount; | ||
private readonly Dictionary<string, string> _idInternPool; | ||
private readonly SortedDictionary<string, SortedSet<string>> _result; | ||
|
||
public PackageIdToPopularityTransfersBuilder(ILogger logger) | ||
{ | ||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||
_addCount = 0; | ||
_idInternPool = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase); | ||
_result = new SortedDictionary<string, SortedSet<string>>(StringComparer.OrdinalIgnoreCase); | ||
} | ||
|
||
/// <summary> | ||
/// Add multiple popularity transfers. | ||
/// </summary> | ||
/// <param name="fromId">The package that is transferring its popularity away.</param> | ||
/// <param name="toIds">The packages that are receiving the transferred popularity.</param> | ||
public void Add(string fromId, IReadOnlyList<string> toIds) | ||
{ | ||
foreach (var toId in toIds) | ||
{ | ||
Add(fromId, toId); | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Add a popularity transfers | ||
/// </summary> | ||
/// <param name="fromId">The package that is transferring its popularity away.</param> | ||
/// <param name="toId">The package that is receiving the transferred popularity.</param> | ||
public void Add(string fromId, string toId) | ||
{ | ||
_addCount++; | ||
if (_addCount % 10000 == 0) | ||
{ | ||
_logger.LogInformation("{AddCount} popularity transfers have been added so far.", _addCount); | ||
} | ||
|
||
// Use a single instance of each "toId" string. | ||
toId = InternId(toId); | ||
|
||
if (!_result.TryGetValue(fromId, out var toIds)) | ||
{ | ||
toIds = new SortedSet<string>(StringComparer.OrdinalIgnoreCase); | ||
fromId = InternId(fromId); | ||
|
||
_result.Add(fromId, toIds); | ||
} | ||
|
||
toIds.Add(toId); | ||
} | ||
|
||
/// <summary> | ||
/// Get the popularity transfers. | ||
/// </summary> | ||
/// <returns>A map of packages transferring popularity away to the packages receiving the popularity.</returns> | ||
public SortedDictionary<string, SortedSet<string>> GetResult() | ||
{ | ||
_logger.LogInformation("{RecordCount} popularity transfers were found.", _addCount); | ||
_logger.LogInformation("{FromTransfers} packages transfer popularity away.", _result.Count); | ||
_logger.LogInformation("{UniqueIds} unique package IDs.", _idInternPool.Count); | ||
|
||
return _result; | ||
} | ||
|
||
private string InternId(string id) | ||
{ | ||
if (_idInternPool.TryGetValue(id, out var existingId)) | ||
{ | ||
return existingId; | ||
} | ||
|
||
_idInternPool.Add(id, id); | ||
return id; | ||
} | ||
} | ||
} |
Oops, something went wrong.