Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backoff retry logic for ingest + send proper v2 messages #40

Merged
merged 1 commit into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 62 additions & 19 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
Expand All @@ -42,6 +44,10 @@ const (
BHEAuthSignature string = "bhesignature"
)

var (
ErrExceededRetryLimit = errors.New("exceeded max retry limit for ingest due to 504s")
)

func init() {
configs := append(config.AzureConfig, config.BloodHoundEnterpriseConfig...)
config.Init(startCmd, configs)
Expand Down Expand Up @@ -86,7 +92,10 @@ func start(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

var currentTask *models.ClientTask
var (
currentTask *models.ClientTask
message string
)

for {
select {
Expand Down Expand Up @@ -134,13 +143,17 @@ func start(ctx context.Context) {
// Batch data out for ingestion
stream := listAll(ctx, azClient)
batches := pipeline.Batch(ctx.Done(), stream, 999, 10*time.Second)
if err := ingest(ctx, *bheInstance, bheClient, batches); err != nil {
log.Error(err, "ingestion failed")
}
hasIngestErr := ingest(ctx, *bheInstance, bheClient, batches)

// Notify BHE instance of task end
duration := time.Since(start)
if err := endTask(ctx, *bheInstance, bheClient); err != nil {

if hasIngestErr {
message = "Collection completed with errors during ingest."
} else {
message = "Collection completed successfully."
}
if err := endTask(ctx, *bheInstance, bheClient, models.JobStatusComplete, message); err != nil {
log.Error(err, "failed to end task")
} else {
log.Info("finished collection task", "id", currentTask.Id, "duration", duration.String())
Expand All @@ -158,8 +171,13 @@ func start(ctx context.Context) {
}
}

func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) error {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v1/ingest"})
func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) bool {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v2/ingest"})

var (
hasErrors = false
maxRetries = 3
)

for data := range pipeline.OrDone(ctx.Done(), in) {
body := models.IngestRequest{
Expand All @@ -170,18 +188,38 @@ func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-ch
}

if req, err := rest.NewRequest(ctx, "POST", endpoint, body, nil, nil); err != nil {
return fmt.Errorf("unable to create ingest HTTP request: %w", err)
} else if response, err := bheClient.Do(req); err != nil {
return fmt.Errorf("unable to send data to BHE ingest endpoint %v: %w", bheUrl, err)
} else if response.StatusCode != http.StatusAccepted {
if bodyBytes, err := io.ReadAll(response.Body); err != nil {
return fmt.Errorf("BHE returned HTTP status %d(%s). Failure reading response body: %w", response.StatusCode, response.Status, err)
} else {
return fmt.Errorf("BHE returned error %d(%s): %s", response.StatusCode, response.Status, string(bodyBytes))
log.Error(err, "unable to create ingest HTTP request")
hasErrors = true
} else {
for retry := 0; retry < maxRetries; retry++ {
//No retries on regular err cases, only on HTTP 504 timeouts
if response, err := bheClient.Do(req); err != nil {
log.Error(err, fmt.Sprintf("unable to send data to BHE ingest endpoint %v", bheUrl))
hasErrors = true
break
} else if response.StatusCode == http.StatusGatewayTimeout {
backoff := math.Pow(5, float64(retry+1))
time.Sleep(time.Second * time.Duration(backoff))
if retry == maxRetries-1 {
log.Error(ErrExceededRetryLimit, "")
hasErrors = true
}
continue
} else if response.StatusCode != http.StatusAccepted {
if bodyBytes, err := io.ReadAll(response.Body); err != nil {
log.Error(err, fmt.Sprintf("BHE returned HTTP status %d(%s). Failure reading response body", response.StatusCode, response.Status))
hasErrors = true
break
} else {
log.Error(err, fmt.Sprintf("BHE returned error %d(%s): %s", response.StatusCode, response.Status, string(bodyBytes)))
hasErrors = true
break
}
}
}
}
}
return nil
return hasErrors
}

func getAvailableTasks(ctx context.Context, bheUrl url.URL, bheClient *http.Client) ([]models.ClientTask, error) {
Expand Down Expand Up @@ -233,10 +271,15 @@ func startTask(ctx context.Context, bheUrl url.URL, bheClient *http.Client, task
}
}

func endTask(ctx context.Context, bheUrl url.URL, bheClient *http.Client) error {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v1/clients/endtask"})
func endTask(ctx context.Context, bheUrl url.URL, bheClient *http.Client, status models.JobStatus, message string) error {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v2/jobs/end"})

body := models.CompleteJobRequest{
Status: status.String(),
Message: message,
}

if req, err := rest.NewRequest(ctx, "POST", endpoint, nil, nil, nil); err != nil {
if req, err := rest.NewRequest(ctx, "POST", endpoint, body, nil, nil); err != nil {
return err
} else if _, err := bheClient.Do(req); err != nil {
return err
Expand Down
48 changes: 48 additions & 0 deletions models/job-complete-request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package models

type CompleteJobRequest struct {
Status string `json:"status"`
StatusEnum JobStatus `json:"-"`
Message string `json:"message"`
}

type JobStatus int

const (
JobStatusInvalid JobStatus = -1
JobStatusReady JobStatus = 0
JobStatusRunning JobStatus = 1
JobStatusComplete JobStatus = 2
JobStatusCanceled JobStatus = 3
JobStatusTimedOut JobStatus = 4
JobStatusFailed JobStatus = 5
JobStatusIngesting JobStatus = 6
)

func (s JobStatus) String() string {
switch s {
case JobStatusReady:
return "READY"

case JobStatusRunning:
return "RUNNING"

case JobStatusComplete:
return "COMPLETE"

case JobStatusCanceled:
return "CANCELED"

case JobStatusTimedOut:
return "TIMEDOUT"

case JobStatusFailed:
return "FAILED"

case JobStatusIngesting:
return "INGESTING"

default:
return "INVALIDSTATUS"
}
}