Handle pipeline failure #621
Replies: 3 comments
-
What about using DataPipeline.Files.LogEntries to know what happened? work is always done on a file, and a global log can be extracted iterating the list of files. FailedStep can be calculated using Steps and RemainingSteps, there's also CompletedSteps if needed. |
Beta Was this translation helpful? Give feedback.
-
I was thinking about a generic public override async Task RunPipelineAsync(DataPipeline pipeline, CancellationToken cancellationToken = default)
{
// Files must be uploaded before starting any other task
await this.UploadFilesAsync(pipeline, cancellationToken).ConfigureAwait(false);
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
while (!pipeline.Complete)
{
string currentStepName = pipeline.RemainingSteps.First();
if (!this._handlers.TryGetValue(currentStepName, out var stepHandler))
{
pipeline.LastUpdate = DateTimeOffset.UtcNow;
pipeline.Failed = true;
pipeline.Logs = $"No handler found for step '{currentStepName}'";
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogError("No handler found for step '{0}'", currentStepName);
throw new OrchestrationException($"No handler found for step '{currentStepName}'");
}
try
{
// Run handler
(bool success, DataPipeline updatedPipeline) = await stepHandler
.InvokeAsync(pipeline, this.CancellationTokenSource.Token)
.ConfigureAwait(false);
pipeline = updatedPipeline;
pipeline.LastUpdate = DateTimeOffset.UtcNow;
if (success)
{
this.Log.LogInformation("Handler '{0}' processed pipeline '{1}/{2}' successfully", currentStepName, pipeline.Index, pipeline.DocumentId);
pipeline.MoveToNextStep();
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
}
else
{
pipeline.Failed = true;
pipeline.Logs = string.Join(", ", pipeline.Files.Where(f => f.LogEntries is not null).SelectMany(f => f.LogEntries!).Select(l => $"{l.Source}: {l.Text}"));
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed");
}
}
catch (Exception ex)
{
// Gets the exception and its inner message, that is some cases is more descriptive.
var failureReson = ex.Message;
if (ex.InnerException is not null)
{
failureReson += $" ({ex.InnerException.Message})";
}
pipeline.LastUpdate = DateTimeOffset.UtcNow;
pipeline.Failed = true;
pipeline.Logs = failureReson;
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogError(ex, "Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed", ex);
}
}
await this.CleanUpAfterCompletionAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogInformation("Pipeline '{0}/{1}' complete", pipeline.Index, pipeline.DocumentId);
} Note: I have renamed the |
Beta Was this translation helpful? Give feedback.
-
I have tried to put my idea in a PR: #443. So, you can see if it is the approach you're thinking about. |
Beta Was this translation helpful? Give feedback.
-
Context / Scenario
Currently, pipeline failures aren't handled at all:
kernel-memory/service/Abstractions/Pipeline/DataPipeline.cs
Lines 434 to 439 in 775301a
The problem
It is important to keep track of errors that occurs during pipeline exectuion.
Proposed solution
We can add a couple of properties to DataPipeline.cs:
FailureReason
can be useful to immediately obtain information about the problem, but it is not strictly necessary to implement this feature.Then, we need to handle exceptions during pipeline execution, both in InProcessPipelineOrchestrator.cs and in DistributedPipelineOrchestrator.cs.
Finally, after updating the DataPipelineStatus.cs class accordingly, we just need this code:
Importance
would be great to have
Beta Was this translation helpful? Give feedback.
All reactions