Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline failures #443

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions service/Abstractions/Models/DataPipelineStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,18 @@ public class DataPipelineStatus
[JsonPropertyOrder(17)]
[JsonPropertyName("completed_steps")]
public List<string> CompletedSteps { get; set; } = new();

/// <summary>
/// The failed step, if any.
/// </summary>
[JsonPropertyOrder(18)]
[JsonPropertyName("failed_step")]
public string? FailedStep { get; set; } = null;

/// <summary>
/// The error that caused the pipeline to fail, if any.
/// </summary>
[JsonPropertyOrder(19)]
[JsonPropertyName("logs")]
public string? Logs { get; set; } = null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid the "string" type which limits to a single unstructured entry, and use List<PipelineLogEntry> LogEntries if this is really needed. Thinking about the scenarios, I still don't see a use case though.

}
19 changes: 18 additions & 1 deletion service/Abstractions/Pipeline/DataPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@
[JsonPropertyName("completed_steps")]
public List<string> CompletedSteps { get; set; } = new();

/// <summary>
/// Tells if the pipeline failed.
/// </summary>
[JsonPropertyOrder(6)]
[JsonPropertyName("failed")]
public bool Failed { get; set; } = false;
Copy link
Collaborator

@dluc dluc Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we ensure this is consistent?

for instance, if ContentStorage becomes unavailable and the pipeline job expires, the value will remain false and never change to true because there's nothing left running to retry.


/// <summary>
/// Document tags
/// </summary>
Expand All @@ -268,6 +275,13 @@
[JsonPropertyName("files")]
public List<FileDetails> Files { get; set; } = new();

/// <summary>
/// The Erorr logs that caused the pipeline to fail, if any.

Check warning on line 279 in service/Abstractions/Pipeline/DataPipeline.cs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Erorr" should be "Error".
/// </summary>
[JsonPropertyOrder(19)]
[JsonPropertyName("logs")]
public string? Logs { get; set; } = null;
Copy link
Collaborator

@dluc dluc Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto about consistency. I think a better approach would be assigning an approximate completion time and using that to establish if the pipeline failed (one scenario would be "it failed because it timed out")


/// <summary>
/// Unstructured dictionary available to support custom tasks and business logic.
/// The orchestrator doesn't use this property, and it's up to custom handlers to manage it.
Expand Down Expand Up @@ -434,7 +448,7 @@
return new DataPipelineStatus
{
Completed = this.Complete,
Failed = false, // TODO
Failed = this.Failed,
Empty = this.Files.Count == 0,
Index = this.Index,
DocumentId = this.DocumentId,
Expand All @@ -444,6 +458,9 @@
Steps = this.Steps,
RemainingSteps = this.RemainingSteps,
CompletedSteps = this.CompletedSteps,
// If the pipeline failed, the first remaining step is the one that failed.
FailedStep = this.Failed ? this.RemainingSteps.FirstOrDefault() : null,
Logs = this.Logs,
};
}
}
43 changes: 35 additions & 8 deletions service/Core/Pipeline/DistributedPipelineOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,48 @@

string currentStepName = pipeline.RemainingSteps.First();

// Execute the business logic - exceptions are automatically handled by IQueue
(bool success, DataPipeline updatedPipeline) = await handler.InvokeAsync(pipeline, cancellationToken).ConfigureAwait(false);
if (success)
var success = false;
#pragma warning disable CA1031 // Do not catch general exception types
try
{
// Execute the business logic - exceptions are automatically handled by IQueue
(success, DataPipeline updatedPipeline) = await handler.InvokeAsync(pipeline, cancellationToken).ConfigureAwait(false);

pipeline = updatedPipeline;
pipeline.LastUpdate = DateTimeOffset.UtcNow;

this.Log.LogInformation("Handler {0} processed pipeline {1} successfully", currentStepName, pipeline.DocumentId);
pipeline.MoveToNextStep();
await this.MoveForwardAsync(pipeline, cancellationToken).ConfigureAwait(false);
if (success)
{
// If the pipeline succedds, resets the failed step and the failure reason that could be set by a previous failure.

Check warning on line 235 in service/Core/Pipeline/DistributedPipelineOrchestrator.cs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"succedds" should be "succeeds".
pipeline.Failed = false;
pipeline.Logs = null;

this.Log.LogInformation("Handler {0} processed pipeline {1} successfully", currentStepName, pipeline.DocumentId);
pipeline.MoveToNextStep();
await this.MoveForwardAsync(pipeline, cancellationToken).ConfigureAwait(false);
}
else
{
pipeline.Failed = true;
pipeline.Logs = string.Join(", ", pipeline.Files.Where(f => f.LogEntries is not null).SelectMany(f => f.LogEntries!).Select(l => $"{l.Source}: {l.Text}"));

this.Log.LogError("Handler {0} failed to process pipeline {1}", currentStepName, pipeline.DocumentId);
}
}
else
catch (Exception ex)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which code is throwing this exception? why stop retrying?

{
this.Log.LogError("Handler {0} failed to process pipeline {1}", currentStepName, pipeline.DocumentId);
// Gets the exception and its inner message, that is some cases is more descriptive.
var failureReson = ex.Message;

Check warning on line 254 in service/Core/Pipeline/DistributedPipelineOrchestrator.cs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Reson" should be "Reason".
if (ex.InnerException is not null)
{
failureReson += $" ({ex.InnerException.Message})";

Check warning on line 257 in service/Core/Pipeline/DistributedPipelineOrchestrator.cs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Reson" should be "Reason".
}

pipeline.Failed = true;
pipeline.Logs = failureReson;

Check warning on line 261 in service/Core/Pipeline/DistributedPipelineOrchestrator.cs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Reson" should be "Reason".
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
}
#pragma warning restore CA1031 // Do not catch general exception types

// Note: returning True, the message is removed from the queue
// Note: returning False, the message is put back in the queue and processed again
Expand Down
56 changes: 44 additions & 12 deletions service/Core/Pipeline/InProcessPipelineOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,25 +165,57 @@

if (!this._handlers.TryGetValue(currentStepName, out var stepHandler))
{
throw new OrchestrationException($"No handlers found for step '{currentStepName}'");
pipeline.LastUpdate = DateTimeOffset.UtcNow;
pipeline.Failed = true;
pipeline.Logs = $"No handler found for step '{currentStepName}'";
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);

this.Log.LogError("No handler found for step '{0}'", currentStepName);
throw new OrchestrationException($"No handler found for step '{currentStepName}'");
}

// Run handler
(bool success, DataPipeline updatedPipeline) = await stepHandler
.InvokeAsync(pipeline, this.CancellationTokenSource.Token)
.ConfigureAwait(false);
if (success)
try
{
// Run handler
(bool success, DataPipeline updatedPipeline) = await stepHandler
.InvokeAsync(pipeline, this.CancellationTokenSource.Token)
.ConfigureAwait(false);

pipeline = updatedPipeline;
pipeline.LastUpdate = DateTimeOffset.UtcNow;
this.Log.LogInformation("Handler '{0}' processed pipeline '{1}/{2}' successfully", currentStepName, pipeline.Index, pipeline.DocumentId);
pipeline.MoveToNextStep();
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);

if (success)
{
this.Log.LogInformation("Handler '{0}' processed pipeline '{1}/{2}' successfully", currentStepName, pipeline.Index, pipeline.DocumentId);
pipeline.MoveToNextStep();
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
}
else
{
pipeline.Failed = true;
pipeline.Logs = string.Join(", ", pipeline.Files.Where(f => f.LogEntries is not null).SelectMany(f => f.LogEntries!).Select(l => $"{l.Source}: {l.Text}"));
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);

this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed");
}
}
else
catch (Exception ex)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, where is the exception coming from? e.g. if UpdatePipelineStatusAsync is failing in the try block it will likely fail also in the catch block

{
this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed");
// Gets the exception and its inner message, that is some cases is more descriptive.
var failureReson = ex.Message;

Check warning on line 206 in service/Core/Pipeline/InProcessPipelineOrchestrator.cs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Reson" should be "Reason".
if (ex.InnerException is not null)
{
failureReson += $" ({ex.InnerException.Message})";

Check warning on line 209 in service/Core/Pipeline/InProcessPipelineOrchestrator.cs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Reson" should be "Reason".
}

pipeline.LastUpdate = DateTimeOffset.UtcNow;
pipeline.Failed = true;
pipeline.Logs = failureReson;

Check warning on line 214 in service/Core/Pipeline/InProcessPipelineOrchestrator.cs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Reson" should be "Reason".
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);

this.Log.LogError(ex, "Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed", ex);
}
}

Expand Down
Loading