Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

[Package Renames] Expose popularity transfers in search #774

Merged
merged 4 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Diagnostics;
using System.Linq;
using Microsoft.Extensions.Logging;
using NuGet.Services.AzureSearch.AuxiliaryFiles;

namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch
{
Expand All @@ -28,6 +29,16 @@ public SortedDictionary<string, string[]> CompareOwners(
SortedDictionary<string, SortedSet<string>> oldData,
SortedDictionary<string, SortedSet<string>> newData)
{
if (oldData.Comparer != StringComparer.OrdinalIgnoreCase)
{
throw new ArgumentException("The old data should have a case-insensitive comparer.", nameof(oldData));
}

if (newData.Comparer != StringComparer.OrdinalIgnoreCase)
{
throw new ArgumentException("The new data should have a case-insensitive comparer.", nameof(newData));
}

// Use ordinal comparison to allow username case changes to flow through.
var stopwatch = Stopwatch.StartNew();
var result = CompareData(
Expand All @@ -44,8 +55,8 @@ public SortedDictionary<string, string[]> CompareOwners(
}

public SortedDictionary<string, string[]> ComparePopularityTransfers(
SortedDictionary<string, SortedSet<string>> oldData,
SortedDictionary<string, SortedSet<string>> newData)
PopularityTransferData oldData,
PopularityTransferData newData)
{
// Ignore case changes in popularity transfers.
var stopwatch = Stopwatch.StartNew();
Expand All @@ -63,22 +74,12 @@ public SortedDictionary<string, string[]> ComparePopularityTransfers(
}

private SortedDictionary<string, string[]> CompareData(
SortedDictionary<string, SortedSet<string>> oldData,
SortedDictionary<string, SortedSet<string>> newData,
IReadOnlyDictionary<string, SortedSet<string>> oldData,
IReadOnlyDictionary<string, SortedSet<string>> newData,
string keyName,
string valuesName,
StringComparer valuesComparer)
{
if (oldData.Comparer != StringComparer.OrdinalIgnoreCase)
{
throw new ArgumentException("The old data should have a case-insensitive comparer.", nameof(oldData));
}

if (newData.Comparer != StringComparer.OrdinalIgnoreCase)
{
throw new ArgumentException("The new data should have a case-insensitive comparer.", nameof(newData));
}

// We use a very simplistic algorithm here. Perform one pass on the new data to find the added or changed
// values. Then perform a second pass on the old data to find removed keys. We can optimize
// this later if necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using NuGet.Services.AzureSearch.AuxiliaryFiles;

namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch
{
Expand Down Expand Up @@ -32,7 +33,7 @@ SortedDictionary<string, string[]> CompareOwners(
/// <param name="oldData">The old popularity transfers, typically from storage.</param>
/// <param name="newData">The new popularity transfers, typically from gallery DB.</param>
SortedDictionary<string, string[]> ComparePopularityTransfers(
SortedDictionary<string, SortedSet<string>> oldData,
SortedDictionary<string, SortedSet<string>> newData);
PopularityTransferData oldData,
PopularityTransferData newData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ private async Task<bool> PushIndexChangesAsync()
// The "old" data is the popularity transfers data that was last indexed by this job (or
// initialized by Db2AzureSearch).
_logger.LogInformation("Fetching old popularity transfer data from blob storage.");
var oldTransfers = await _popularityTransferDataClient.ReadLatestIndexedAsync();
var oldTransfers = await _popularityTransferDataClient.ReadLatestIndexedAsync(
AccessConditionWrapper.GenerateEmptyCondition(),
_stringCache);

// The "new" data is the latest popularity transfers data from the database.
_logger.LogInformation("Fetching new popularity transfer data from database.");
Expand All @@ -143,7 +145,7 @@ private async Task<bool> PushIndexChangesAsync()
_logger.LogInformation("Applying download transfers to download changes.");
ApplyDownloadTransfers(
newData,
oldTransfers.Result,
oldTransfers.Data,
newTransfers,
downloadOverrides,
changes);
Expand All @@ -170,27 +172,27 @@ await ParallelAsync.Repeat(
_logger.LogInformation("Uploading the new popularity transfer data to blob storage.");
await _popularityTransferDataClient.ReplaceLatestIndexedAsync(
newTransfers,
oldTransfers.AccessCondition);
oldTransfers.Metadata.GetIfMatchCondition());
return true;
}

private async Task<SortedDictionary<string, SortedSet<string>>> GetPopularityTransfersAsync()
private async Task<PopularityTransferData> GetPopularityTransfersAsync()
{
if (!_featureFlags.IsPopularityTransferEnabled())
{
_logger.LogWarning(
"Popularity transfers feature flag is disabled. " +
"All popularity transfers will be removed.");
return new SortedDictionary<string, SortedSet<string>>(StringComparer.OrdinalIgnoreCase);
return new PopularityTransferData();
}

return await _databaseFetcher.GetPackageIdToPopularityTransfersAsync();
return await _databaseFetcher.GetPopularityTransfersAsync();
}

private void ApplyDownloadTransfers(
DownloadData newData,
SortedDictionary<string, SortedSet<string>> oldTransfers,
SortedDictionary<string, SortedSet<string>> newTransfers,
PopularityTransferData oldTransfers,
PopularityTransferData newTransfers,
IReadOnlyDictionary<string, long> downloadOverrides,
SortedDictionary<string, long> downloadChanges)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Threading.Tasks;
using NuGetGallery;

Expand All @@ -17,19 +16,19 @@ public interface IPopularityTransferDataClient
{
/// <summary>
/// Read all of the latest indexed popularity transfers from storage. Also, return the current etag to allow
/// optimistic concurrency checks on the writing of the file. The returned dictionary's key is the
/// package ID that is transferring away its popularity, and the values are the package IDs receiving popularity.
/// The dictionary and the sets are case-insensitive.
/// optimistic concurrency checks on the writing of the file.
/// </summary>
Task<ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>> ReadLatestIndexedAsync();
Task<AuxiliaryFileResult<PopularityTransferData>> ReadLatestIndexedAsync(
IAccessCondition accessCondition,
StringCache stringCache);

/// <summary>
/// Replace the existing latest indexed popularity transfers file (i.e. "popularityTransfers.v1.json" file).
/// </summary>
/// <param name="newData">The new data to be serialized into storage.</param>
/// <param name="accessCondition">The access condition (i.e. etag) to use during the upload.</param>
Task ReplaceLatestIndexedAsync(
SortedDictionary<string, SortedSet<string>> newData,
PopularityTransferData newData,
IAccessCondition accessCondition);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections;
using System.Collections.Generic;

namespace NuGet.Services.AzureSearch.AuxiliaryFiles
{
/// <summary>
/// Maps packages that transfer their popularity away to the
/// set of packages receiving the popularity.
/// </summary>
public class PopularityTransferData : IReadOnlyDictionary<string, SortedSet<string>>
{
private readonly SortedDictionary<string, SortedSet<string>> _transfers =
new SortedDictionary<string, SortedSet<string>>(StringComparer.OrdinalIgnoreCase);

public void AddTransfer(string fromId, string toId)
{
if (!_transfers.TryGetValue(fromId, out var toIds))
{
toIds = new SortedSet<string>(StringComparer.OrdinalIgnoreCase);
_transfers.Add(fromId, toIds);
}

toIds.Add(toId);
}

public SortedSet<string> this[string key] => _transfers[key];
public IEnumerable<string> Keys => _transfers.Keys;
public IEnumerable<SortedSet<string>> Values => _transfers.Values;
public int Count => _transfers.Count;
public bool ContainsKey(string key) => _transfers.ContainsKey(key);
public IEnumerator<KeyValuePair<string, SortedSet<string>>> GetEnumerator() => _transfers.GetEnumerator();
public bool TryGetValue(string key, out SortedSet<string> value) => _transfers.TryGetValue(key, out value);
IEnumerator IEnumerable.GetEnumerator() => _transfers.GetEnumerator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
Expand Down Expand Up @@ -42,42 +41,51 @@ public PopularityTransferDataClient(

private ICloudBlobContainer Container => _lazyContainer.Value;

public async Task<ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>> ReadLatestIndexedAsync()
public async Task<AuxiliaryFileResult<PopularityTransferData>> ReadLatestIndexedAsync(
IAccessCondition accessCondition,
StringCache stringCache)
{
var stopwatch = Stopwatch.StartNew();
var blobName = GetLatestIndexedBlobName();
var blobReference = Container.GetBlobReference(blobName);

_logger.LogInformation("Reading the latest indexed popularity transfers from {BlobName}.", blobName);

var builder = new PackageIdToPopularityTransfersBuilder(_logger);
IAccessCondition accessCondition;
bool modified;
var data = new PopularityTransferData();
AuxiliaryFileMetadata metadata;
try
{
using (var stream = await blobReference.OpenReadAsync(AccessCondition.GenerateEmptyCondition()))
using (var stream = await blobReference.OpenReadAsync(accessCondition))
{
accessCondition = AccessConditionWrapper.GenerateIfMatchCondition(blobReference.ETag);
ReadStream(stream, builder.Add);
ReadStream(stream, (from, to) => data.AddTransfer(stringCache.Dedupe(from), stringCache.Dedupe(to)));
modified = true;
metadata = new AuxiliaryFileMetadata(
lastModified: new DateTimeOffset(blobReference.LastModifiedUtc, TimeSpan.Zero),
loadDuration: stopwatch.Elapsed,
fileSize: blobReference.Properties.Length,
etag: blobReference.ETag);
}
}
catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == (int)HttpStatusCode.NotFound)
catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == (int)HttpStatusCode.NotModified)
{
accessCondition = AccessConditionWrapper.GenerateIfNotExistsCondition();
_logger.LogInformation("The blob {BlobName} does not exist.", blobName);
_logger.LogInformation("The blob {BlobName} has not changed.", blobName);
modified = false;
data = null;
metadata = null;
}

var output = new ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>(
builder.GetResult(),
accessCondition);

stopwatch.Stop();
_telemetryService.TrackReadLatestIndexedPopularityTransfers(output.Result.Count, stopwatch.Elapsed);
_telemetryService.TrackReadLatestIndexedPopularityTransfers(data?.Count, modified, stopwatch.Elapsed);

return output;
return new AuxiliaryFileResult<PopularityTransferData>(
modified,
data,
metadata);
}

public async Task ReplaceLatestIndexedAsync(
SortedDictionary<string, SortedSet<string>> newData,
PopularityTransferData newData,
IAccessCondition accessCondition)
{
using (_telemetryService.TrackReplaceLatestIndexedPopularityTransfers(newData.Count))
Expand All @@ -103,23 +111,29 @@ public async Task ReplaceLatestIndexedAsync(
}
}

private static void ReadStream(Stream stream, Action<string, IReadOnlyList<string>> add)
private static void ReadStream(Stream stream, Action<string, string> add)
{
using (var textReader = new StreamReader(stream))
using (var jsonReader = new JsonTextReader(textReader))
{
Guard.Assert(jsonReader.Read(), "The blob should be readable.");
Guard.Assert(jsonReader.TokenType == JsonToken.StartObject, "The first token should be the start of an object.");
Guard.Assert(jsonReader.Read(), "There should be a second token.");

while (jsonReader.TokenType == JsonToken.PropertyName)
{
var id = (string)jsonReader.Value;
var fromId = (string)jsonReader.Value;

Guard.Assert(jsonReader.Read(), "There should be a token after the property name.");
Guard.Assert(jsonReader.TokenType == JsonToken.StartArray, "The token after the property name should be the start of an object.");
Guard.Assert(jsonReader.TokenType == JsonToken.StartArray, "The token after the property name should be the start of an array.");
Guard.Assert(jsonReader.Read(), "There should be a token after the start of the transfer array.");

while (jsonReader.TokenType == JsonToken.String)
{
add(fromId, (string)jsonReader.Value);

var transfers = Serializer.Deserialize<List<string>>(jsonReader);
add(id, transfers);
Guard.Assert(jsonReader.Read(), "There should be a token after the 'to' package ID.");
}

Guard.Assert(jsonReader.TokenType == JsonToken.EndArray, "The token after reading the array should be the end of an array.");
Guard.Assert(jsonReader.Read(), "There should be a token after the end of the array.");
Expand Down
12 changes: 8 additions & 4 deletions src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,18 @@ public void TrackOwnerSetComparison(int oldCount, int newCount, int changeCount,
});
}

public void TrackReadLatestIndexedPopularityTransfers(int outgoingTransfers, TimeSpan elapsed)
public void TrackReadLatestIndexedPopularityTransfers(
int? outgoingTransfers,
bool modified,
TimeSpan elapsed)
{
_telemetryClient.TrackMetric(
Prefix + "ReadLatestIndexedPopularityTransfersSeconds",
elapsed.TotalSeconds,
new Dictionary<string, string>
{
{ "OutgoingTransfers", outgoingTransfers.ToString() }
{ "OutgoingTransfers", outgoingTransfers.ToString() },
{ "Modified", modified.ToString() }
});
}

Expand Down Expand Up @@ -403,15 +407,15 @@ public void TrackV2GetDocumentWithHijackIndex(TimeSpan elapsed)
elapsed.TotalMilliseconds);
}

public void TrackReadLatestVerifiedPackages(int? packageIdCount, bool notModified, TimeSpan elapsed)
public void TrackReadLatestVerifiedPackages(int? packageIdCount, bool modified, TimeSpan elapsed)
{
_telemetryClient.TrackMetric(
Prefix + "ReadLatestVerifiedPackagesSeconds",
elapsed.TotalSeconds,
new Dictionary<string, string>
{
{ "PackageIdCount", packageIdCount?.ToString() },
{ "NotModified", notModified.ToString() },
{ "Modified", modified.ToString() },
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property's name was incorrect. See:

bool modified;
var data = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
AuxiliaryFileMetadata metadata;
try
{
using (var stream = await blobReference.OpenReadAsync(accessCondition))
{
ReadStream(stream, id => data.Add(stringCache.Dedupe(id)));
modified = true;
metadata = new AuxiliaryFileMetadata(
lastModified: new DateTimeOffset(blobReference.LastModifiedUtc, TimeSpan.Zero),
loadDuration: stopwatch.Elapsed,
fileSize: blobReference.Properties.Length,
etag: blobReference.ETag);
}
}
catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == (int)HttpStatusCode.NotModified)
{
_logger.LogInformation("The blob {BlobName} has not changed.", blobName);
modified = false;
data = null;
metadata = null;
}
stopwatch.Stop();
_telemetryService.TrackReadLatestVerifiedPackages(data?.Count, modified, stopwatch.Elapsed);

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Microsoft.Extensions.Logging;
using NuGet.Jobs;
using NuGet.Jobs.Configuration;
using NuGet.Services.AzureSearch.AuxiliaryFiles;

namespace NuGet.Services.AzureSearch
{
Expand Down Expand Up @@ -153,10 +154,10 @@ public async Task<SortedDictionary<string, SortedSet<string>>> GetPackageIdToOwn
}
}

public async Task<SortedDictionary<string, SortedSet<string>>> GetPackageIdToPopularityTransfersAsync()
public async Task<PopularityTransferData> GetPopularityTransfersAsync()
{
var stopwatch = Stopwatch.StartNew();
var builder = new PackageIdToPopularityTransfersBuilder(_logger);
var output = new PopularityTransferData();
using (var connection = await _connectionFactory.OpenAsync())
using (var command = connection.CreateCommand())
{
Expand All @@ -183,15 +184,14 @@ public async Task<SortedDictionary<string, SortedSet<string>>> GetPackageIdToPop
var fromId = reader.GetString(0);
var toId = reader.GetString(1);

builder.Add(fromId, toId);
output.AddTransfer(fromId, toId);
}
}

totalResults += currentPageResults;
}
while (currentPageResults == GetPopularityTransfersPageSize);

var output = builder.GetResult();
stopwatch.Stop();
_telemetryService.TrackReadLatestPopularityTransfersFromDatabase(output.Count, stopwatch.Elapsed);

Expand Down
Loading