Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ingest failure logging and retry logic #25

Merged
merged 1 commit into from
Jan 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -96,7 +97,7 @@ func start(ctx context.Context) {
select {
case <-ticker.C:
if currentTask != nil {
log.V(1).Info("curently performing collection; continuing...")
log.V(1).Info("currently performing collection; continuing...")
} else {
log.V(2).Info("checking for available collection tasks")
if availableTasks, err := getAvailableTasks(ctx, *bheInstance, bheClient); err != nil {
Expand Down Expand Up @@ -129,14 +130,16 @@ func start(ctx context.Context) {
// Batch data out for ingestion
stream := listAll(ctx, azClient)
batches := pipeline.Batch(ctx.Done(), stream, 999, 10*time.Second)
ingest(ctx, *bheInstance, bheClient, batches)

// Notify BHE instance of task end
duration := time.Since(start)
endTask(ctx, *bheInstance, bheClient)
log.Info("finished collection task", "id", currentTask.Id, "duration", duration.String())

currentTask = nil
if err := ingest(ctx, *bheInstance, bheClient, batches); err != nil {
log.Error(err, "ingestion failed; collection will be re-attempted")
} else {
// Notify BHE instance of task end
duration := time.Since(start)
endTask(ctx, *bheInstance, bheClient)
log.Info("finished collection task", "id", currentTask.Id, "duration", duration.String())

currentTask = nil
}
}
}
}
Expand All @@ -147,7 +150,7 @@ func start(ctx context.Context) {
}
}

func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) {
func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) error {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v1/ingest"})

for data := range pipeline.OrDone(ctx.Done(), in) {
Expand All @@ -159,11 +162,18 @@ func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-ch
}

if req, err := rest.NewRequest(ctx, "POST", endpoint, body, nil, nil); err != nil {
log.Error(err, "unable to create request")
} else if _, err := bheClient.Do(req); err != nil {
log.Error(err, "unable to send data to bloodhound enterprise", "bheUrl", bheUrl)
return fmt.Errorf("unable to create ingest HTTP request: %w", err)
} else if response, err := bheClient.Do(req); err != nil {
return fmt.Errorf("unable to send data to BHE ingest endpoint %v: %w", bheUrl, err)
} else if response.StatusCode != http.StatusAccepted {
if bodyBytes, err := io.ReadAll(response.Body); err != nil {
return fmt.Errorf("BHE returned HTTP status %d(%s). Failure reading response body: %w", response.StatusCode, response.Status, err)
} else {
return fmt.Errorf("BHE returned error %d(%s): %s", response.StatusCode, response.Status, string(bodyBytes))
}
}
}
return nil
}

func getAvailableTasks(ctx context.Context, bheUrl url.URL, bheClient *http.Client) ([]models.ClientTask, error) {
Expand Down