Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove build log queue based fanout #3789

Merged
merged 1 commit into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class BlobUploadProcessor
private readonly BlobContainerClient testRunsContainerClient;
private readonly BlobContainerClient buildDefinitionsContainerClient;
private readonly BlobContainerClient buildFailuresContainerClient;
private readonly QueueClient queueClient;
private readonly IOptions<PipelineWitnessSettings> options;
private readonly Dictionary<string, int?> cachedDefinitionRevisions = new();
private readonly IFailureAnalyzer failureAnalyzer;
Expand All @@ -61,7 +60,6 @@ public BlobUploadProcessor(
ILogger<BlobUploadProcessor> logger,
BuildLogProvider logProvider,
BlobServiceClient blobServiceClient,
QueueServiceClient queueServiceClient,
BuildHttpClient buildClient,
TestResultsHttpClient testResultsClient,
IOptions<PipelineWitnessSettings> options,
Expand All @@ -72,11 +70,6 @@ public BlobUploadProcessor(
throw new ArgumentNullException(nameof(blobServiceClient));
}

if (queueServiceClient == null)
{
throw new ArgumentNullException(nameof(queueServiceClient));
}

this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.options = options ?? throw new ArgumentNullException(nameof(options));
this.logProvider = logProvider ?? throw new ArgumentNullException(nameof(logProvider));
Expand All @@ -89,8 +82,6 @@ public BlobUploadProcessor(
this.buildFailuresContainerClient = blobServiceClient.GetBlobContainerClient(BuildFailuresContainerName);
this.testRunsContainerClient = blobServiceClient.GetBlobContainerClient(TestRunsContainerName);
this.buildDefinitionsContainerClient = blobServiceClient.GetBlobContainerClient(BuildDefinitionsContainerName);
this.queueClient = queueServiceClient.GetQueueClient(this.options.Value.BuildLogBundlesQueueName);
this.queueClient.CreateIfNotExists();
this.failureAnalyzer = failureAnalyzer;
}

Expand Down Expand Up @@ -178,15 +169,12 @@ public async Task UploadBuildBlobsAsync(string account, Guid projectId, int buil
logger.LogWarning("No logs available for build {Project}: {BuildId}", build.Project.Name, build.Id);
return;
}

var bundles = BuildLogBundles(account, build, timeline, logs);

// We no longer process log bundles on separate messages.
// During zero downtime upgrade phase, process all the bundles sequentially but allow for processing message in the log bundle queue
// After the upgrade phase, this should be rewritten to remove bundling but keep the log -> timeline record association
foreach(var bundle in bundles)
var buildLogInfos = GetBuildLogInfos(account, build, timeline, logs);

foreach (var log in buildLogInfos)
{
await ProcessBuildLogBundleAsync(bundle);
await UploadLogLinesBlobAsync(account, build, log);
}
}

Expand Down Expand Up @@ -246,14 +234,6 @@ private async Task UploadBuildFailureBlobAsync(string account, Build build, Time
}
}

public async Task ProcessBuildLogBundleAsync(BuildLogBundle buildLogBundle)
{
foreach (var log in buildLogBundle.TimelineLogs)
{
await UploadLogLinesBlobAsync(buildLogBundle, log);
}
}

public async Task UploadBuildDefinitionBlobsAsync(string account, string projectName)
{
var definitions = await buildClient.GetFullDefinitionsAsync2(project: projectName);
Expand Down Expand Up @@ -361,35 +341,14 @@ private async Task UploadBuildDefinitionBlobAsync(string account, BuildDefinitio
}
}

private List<BuildLogBundle> BuildLogBundles(string account, Build build, Timeline timeline, List<BuildLog> logs)
private List<BuildLogInfo> GetBuildLogInfos(string account, Build build, Timeline timeline, List<BuildLog> logs)
{
BuildLogBundle CreateBundle() => new BuildLogBundle
{
Account = account,
BuildId = build.Id,
ProjectId = build.Project.Id,
ProjectName = build.Project.Name,
QueueTime = build.QueueTime.Value,
StartTime = build.StartTime.Value,
FinishTime = build.FinishTime.Value,
DefinitionId = build.Definition.Id,
DefinitionName = build.Definition.Name,
DefinitionPath = build.Definition.Path
};

BuildLogBundle currentBundle;
var logBundles = new List<BuildLogBundle>();
logBundles.Add(currentBundle = CreateBundle());

var logsById = logs.ToDictionary(l => l.Id);

var buildLogInfos = new List<BuildLogInfo>();

foreach (var log in logs)
{
if(currentBundle.TimelineLogs.Count >= this.options.Value.BuildLogBundleSize)
{
logBundles.Add(currentBundle = CreateBundle());
}

var logRecords = timeline.Records.Where(x => x.Log?.Id == log.Id).ToArray();

if(logRecords.Length > 1)
Expand Down Expand Up @@ -419,7 +378,7 @@ private List<BuildLogBundle> BuildLogBundles(string account, Build build, Timeli
}
}

currentBundle.TimelineLogs.Add(new BuildLogInfo
buildLogInfos.Add(new BuildLogInfo
{
LogId = log.Id,
LineCount = log.LineCount,
Expand All @@ -430,7 +389,7 @@ private List<BuildLogBundle> BuildLogBundles(string account, Build build, Timeli
});
}

return logBundles;
return buildLogInfos;
}

private async Task UploadBuildBlobAsync(string account, Build build)
Expand Down Expand Up @@ -599,29 +558,29 @@ private async Task UploadTimelineBlobAsync(string account, Build build, Timeline
}
}

private async Task UploadLogLinesBlobAsync(BuildLogBundle build, BuildLogInfo log)
private async Task UploadLogLinesBlobAsync(string account, Build build, BuildLogInfo log)
{
try
{
// we don't use FinishTime in the logs blob path to prevent duplicating logs when processing retries.
// i.e. logs with a given buildid/logid are immutable and retries only add new logs.
var blobPath = $"{build.ProjectName}/{build.QueueTime:yyyy/MM/dd}/{build.BuildId}-{log.LogId}.jsonl";
var blobPath = $"{build.Project.Name}/{build.QueueTime:yyyy/MM/dd}/{build.Id}-{log.LogId}.jsonl";
var blobClient = this.buildLogLinesContainerClient.GetBlobClient(blobPath);

if (await blobClient.ExistsAsync())
{
this.logger.LogInformation("Skipping existing log for build {BuildId}, record {RecordId}, log {LogId}", build.BuildId, log.RecordId, log.LogId);
this.logger.LogInformation("Skipping existing log for build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId);
return;
}

this.logger.LogInformation("Processing log for build {BuildId}, record {RecordId}, log {LogId}", build.BuildId, log.RecordId, log.LogId);
this.logger.LogInformation("Processing log for build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId);

var lineNumber = 0;
var characterCount = 0;

// Over an open read stream and an open write stream, one line at a time, read, process, and write to
// blob storage
using (var logStream = await this.logProvider.GetLogStreamAsync(build.ProjectName, build.BuildId, log.LogId))
using (var logStream = await this.logProvider.GetLogStreamAsync(build.Project.Name, build.Id, log.LogId))
using (var logReader = new StreamReader(logStream))
using (var blobStream = await blobClient.OpenWriteAsync(overwrite: true, new BlobOpenWriteOptions()))
using (var blobWriter = new StreamWriter(blobStream))
Expand Down Expand Up @@ -657,13 +616,13 @@ private async Task UploadLogLinesBlobAsync(BuildLogBundle build, BuildLogInfo lo

await blobWriter.WriteLineAsync(JsonConvert.SerializeObject(new
{
OrganizationName = build.Account,
ProjectId = build.ProjectId,
ProjectName = build.ProjectName,
BuildDefinitionId = build.DefinitionId,
BuildDefinitionPath = build.DefinitionPath,
BuildDefinitionName = build.DefinitionName,
BuildId = build.BuildId,
OrganizationName = account,
ProjectId = build.Project.Id,
ProjectName = build.Project.Name,
BuildDefinitionId = build.Definition.Id,
BuildDefinitionPath = build.Definition.Path,
BuildDefinitionName = build.Definition.Name,
BuildId = build.Id,
LogId = log.LogId,
LineNumber = lineNumber,
Length = message.Length,
Expand All @@ -674,15 +633,15 @@ await blobWriter.WriteLineAsync(JsonConvert.SerializeObject(new
}
}

logger.LogInformation("Processed {CharacterCount} characters and {LineCount} lines for build {BuildId}, record {RecordId}, log {LogId}", characterCount, lineNumber, build.BuildId, log.RecordId, log.LogId);
logger.LogInformation("Processed {CharacterCount} characters and {LineCount} lines for build {BuildId}, record {RecordId}, log {LogId}", characterCount, lineNumber, build.Id, log.RecordId, log.LogId);
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.Conflict)
{
this.logger.LogInformation("Ignoring existing blob exception for build {BuildId}, record {RecordId}, log {LogId}", build.BuildId, log.RecordId, log.LogId);
this.logger.LogInformation("Ignoring existing blob exception for build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing build {BuildId}, record {RecordId}, log {LogId}", build.BuildId, log.RecordId, log.LogId);
this.logger.LogError(ex, "Error processing build {BuildId}, record {RecordId}, log {LogId}", build.Id, log.RecordId, log.LogId);
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,25 @@ namespace Azure.Sdk.Tools.PipelineWitness
{
public class PipelineWitnessSettings
{
/// <summary>
/// Gets or sets the uri of the key vault to use
/// </summary>
public string KeyVaultUri { get; set; }

public string QueueStorageAccountUri { get; set; }


public string BlobStorageAccountUri { get; set; }

/// <summary>
/// Gets or sets the name of the build complete queue
/// Gets or sets uri of the storage account use for queue processing
/// </summary>
public string BuildCompleteQueueName { get; set; }
public string QueueStorageAccountUri { get; set; }

/// <summary>
/// Gets or sets the name of the build log bundles queue
/// Gets or sets uri of the blob storage account use for blob export
/// </summary>
public string BuildLogBundlesQueueName { get; set; }
public string BlobStorageAccountUri { get; set; }

/// <summary>
/// Gets or sets the number of build logs to add to each log bundle message
/// Gets or sets the name of the build complete queue
/// </summary>
public int BuildLogBundleSize { get; set; } = 50;
public string BuildCompleteQueueName { get; set; }

/// <summary>
/// Gets or sets the amount of time a message should be invisible in the queue while being processed
Expand Down Expand Up @@ -63,11 +61,6 @@ public class PipelineWitnessSettings
/// </summary>
public TimeSpan BuildDefinitionLoopPeriod { get; set; } = TimeSpan.FromMinutes(5);

/// <summary>
/// Gets or sets the number of concurrent log bundle queue workers to register
/// </summary>
public int BuildLogBundlesWorkerCount { get; set; } = 1;

/// <summary>
/// Gets or sets the number of concurrent build complete queue workers to register
/// </summary>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public static void Configure(WebApplicationBuilder builder)
builder.Services.Configure<PipelineWitnessSettings>(settingsSection);

builder.Services.AddHostedService<BuildCompleteQueueWorker>(settings.BuildCompleteWorkerCount);
builder.Services.AddHostedService<BuildLogBundleQueueWorker>(settings.BuildLogBundlesWorkerCount);
builder.Services.AddHostedService<AzurePipelinesBuildDefinitionWorker>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
"BlobStorageAccountUri": "https://azsdkengsyspipelinelogs.blob.core.windows.net",
"BuildCompleteQueueName": "azurepipelines-build-completed",
"BuildCompleteWorkerCount": 5,
"BuildLogBundlesQueueName": "azurepipelines-build-log-bundle",
"BuildLogBundlesWorkerCount": 5,
"BuildLogBundleSize": 50,
"MessageLeasePeriod": "00:03:00",
"MessageErrorSleepPeriod": "00:00:10",
"MaxDequeueCount": 5,
Expand Down