Skip to content

Commit

Permalink
added ingest failure logging and retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
irshadaj committed Jan 13, 2023
1 parent 33bb4f0 commit 9db8dce
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -96,7 +97,7 @@ func start(ctx context.Context) {
select {
case <-ticker.C:
if currentTask != nil {
log.V(1).Info("curently performing collection; continuing...")
log.V(1).Info("currently performing collection; continuing...")
} else {
log.V(2).Info("checking for available collection tasks")
if availableTasks, err := getAvailableTasks(ctx, *bheInstance, bheClient); err != nil {
Expand Down Expand Up @@ -129,14 +130,16 @@ func start(ctx context.Context) {
// Batch data out for ingestion
stream := listAll(ctx, azClient)
batches := pipeline.Batch(ctx.Done(), stream, 999, 10*time.Second)
ingest(ctx, *bheInstance, bheClient, batches)

// Notify BHE instance of task end
duration := time.Since(start)
endTask(ctx, *bheInstance, bheClient)
log.Info("finished collection task", "id", currentTask.Id, "duration", duration.String())

currentTask = nil
if err := ingest(ctx, *bheInstance, bheClient, batches); err != nil {
log.Error(err, "ingestion failed; collection will be re-attempted")
} else {
// Notify BHE instance of task end
duration := time.Since(start)
endTask(ctx, *bheInstance, bheClient)
log.Info("finished collection task", "id", currentTask.Id, "duration", duration.String())

currentTask = nil
}
}
}
}
Expand All @@ -147,7 +150,7 @@ func start(ctx context.Context) {
}
}

func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) {
func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) error {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v1/ingest"})

for data := range pipeline.OrDone(ctx.Done(), in) {
Expand All @@ -159,11 +162,18 @@ func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-ch
}

if req, err := rest.NewRequest(ctx, "POST", endpoint, body, nil, nil); err != nil {
log.Error(err, "unable to create request")
} else if _, err := bheClient.Do(req); err != nil {
log.Error(err, "unable to send data to bloodhound enterprise", "bheUrl", bheUrl)
return fmt.Errorf("unable to create ingest HTTP request: %w", err)
} else if response, err := bheClient.Do(req); err != nil {
return fmt.Errorf("unable to send data to BHE ingest endpoint %v: %w", bheUrl, err)
} else if response.StatusCode != http.StatusAccepted {
if bodyBytes, err := io.ReadAll(response.Body); err != nil {
return fmt.Errorf("BHE returned HTTP status %d(%s). Failure reading response body: %w", response.StatusCode, response.Status, err)
} else {
return fmt.Errorf("BHE returned error %d(%s): %s", response.StatusCode, response.Status, string(bodyBytes))
}
}
}
return nil
}

func getAvailableTasks(ctx context.Context, bheUrl url.URL, bheClient *http.Client) ([]models.ClientTask, error) {
Expand Down

0 comments on commit 9db8dce

Please sign in to comment.