From 9db8dce163fd84be3d1f3b8e9b54bc9ac59be6b4 Mon Sep 17 00:00:00 2001 From: Irshad Ahmed Date: Thu, 12 Jan 2023 17:16:45 -0600 Subject: [PATCH] added ingest failure logging and retry logic --- cmd/start.go | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 626806a..d8f5e6d 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "os" @@ -96,7 +97,7 @@ func start(ctx context.Context) { select { case <-ticker.C: if currentTask != nil { - log.V(1).Info("curently performing collection; continuing...") + log.V(1).Info("currently performing collection; continuing...") } else { log.V(2).Info("checking for available collection tasks") if availableTasks, err := getAvailableTasks(ctx, *bheInstance, bheClient); err != nil { @@ -129,14 +130,16 @@ func start(ctx context.Context) { // Batch data out for ingestion stream := listAll(ctx, azClient) batches := pipeline.Batch(ctx.Done(), stream, 999, 10*time.Second) - ingest(ctx, *bheInstance, bheClient, batches) - - // Notify BHE instance of task end - duration := time.Since(start) - endTask(ctx, *bheInstance, bheClient) - log.Info("finished collection task", "id", currentTask.Id, "duration", duration.String()) - - currentTask = nil + if err := ingest(ctx, *bheInstance, bheClient, batches); err != nil { + log.Error(err, "ingestion failed; collection will be re-attempted") + } else { + // Notify BHE instance of task end + duration := time.Since(start) + endTask(ctx, *bheInstance, bheClient) + log.Info("finished collection task", "id", currentTask.Id, "duration", duration.String()) + + currentTask = nil + } } } } @@ -147,7 +150,7 @@ func start(ctx context.Context) { } } -func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) { +func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) error { endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v1/ingest"}) for data := range pipeline.OrDone(ctx.Done(), in) { @@ -159,11 +162,18 @@ func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-ch } if req, err := rest.NewRequest(ctx, "POST", endpoint, body, nil, nil); err != nil { - log.Error(err, "unable to create request") - } else if _, err := bheClient.Do(req); err != nil { - log.Error(err, "unable to send data to bloodhound enterprise", "bheUrl", bheUrl) + return fmt.Errorf("unable to create ingest HTTP request: %w", err) + } else if response, err := bheClient.Do(req); err != nil { + return fmt.Errorf("unable to send data to BHE ingest endpoint %v: %w", bheUrl, err) + } else if response.StatusCode != http.StatusAccepted { + if bodyBytes, err := io.ReadAll(response.Body); err != nil { + return fmt.Errorf("BHE returned HTTP status %d(%s). Failure reading response body: %w", response.StatusCode, response.Status, err) + } else { + return fmt.Errorf("BHE returned error %d(%s): %s", response.StatusCode, response.Status, string(bodyBytes)) + } } } + return nil } func getAvailableTasks(ctx context.Context, bheUrl url.URL, bheClient *http.Client) ([]models.ClientTask, error) {