Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions PatchNotes.Sync/SyncPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,19 @@ private async Task ProduceAsync(

_logger.LogInformation("Pipeline: syncing {Count} packages", packages.Count);

var enqueuedPackageIds = new HashSet<string>();

foreach (var package in packages)
{
if (ct.IsCancellationRequested) break;

try
{
// Check for pre-existing stale releases BEFORE syncing,
// while the consumer cannot yet be modifying this package's data
var hadStaleReleases = await db.Releases
.AnyAsync(r => r.PackageId == package.Id && r.SummaryStale, ct);

var packageResult = await syncService.SyncPackageAsync(package, cancellationToken: ct);
result.PackagesSynced++;
result.ReleasesAdded += packageResult.ReleasesAdded;
Expand All @@ -86,12 +93,10 @@ private async Task ProduceAsync(
_logger.LogDebug("Synced {Package}: no new releases", package.Name);
}

// Enqueue if new releases need summaries or existing releases have stale summaries
var hasStaleReleases = packageResult.ReleasesNeedingSummary.Count > 0
|| await db.Releases.AnyAsync(
r => r.PackageId == package.Id && r.SummaryStale, ct);

if (hasStaleReleases)
// Enqueue if new releases need summaries or pre-existing stale summaries.
// HashSet.Add returns false if already present, preventing duplicate enqueues.
if ((packageResult.ReleasesNeedingSummary.Count > 0 || hadStaleReleases)
&& enqueuedPackageIds.Add(package.Id))
{
await writer.WriteAsync(package.Id, ct);
}
Expand Down