Skip to content

Commit

Permalink
Merge pull request #40 from BloodHoundAD/BED-3285
Browse files Browse the repository at this point in the history
Add backoff retry logic for ingest + send proper v2 messages
  • Loading branch information
ddlees authored May 11, 2023
2 parents 2379ab5 + b2652ff commit 18e629e
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 19 deletions.
81 changes: 62 additions & 19 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
Expand All @@ -42,6 +44,10 @@ const (
BHEAuthSignature string = "bhesignature"
)

var (
ErrExceededRetryLimit = errors.New("exceeded max retry limit for ingest due to 504s")
)

func init() {
configs := append(config.AzureConfig, config.BloodHoundEnterpriseConfig...)
config.Init(startCmd, configs)
Expand Down Expand Up @@ -86,7 +92,10 @@ func start(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

var currentTask *models.ClientTask
var (
currentTask *models.ClientTask
message string
)

for {
select {
Expand Down Expand Up @@ -134,13 +143,17 @@ func start(ctx context.Context) {
// Batch data out for ingestion
stream := listAll(ctx, azClient)
batches := pipeline.Batch(ctx.Done(), stream, 999, 10*time.Second)
if err := ingest(ctx, *bheInstance, bheClient, batches); err != nil {
log.Error(err, "ingestion failed")
}
hasIngestErr := ingest(ctx, *bheInstance, bheClient, batches)

// Notify BHE instance of task end
duration := time.Since(start)
if err := endTask(ctx, *bheInstance, bheClient); err != nil {

if hasIngestErr {
message = "Collection completed with errors during ingest."
} else {
message = "Collection completed successfully."
}
if err := endTask(ctx, *bheInstance, bheClient, models.JobStatusComplete, message); err != nil {
log.Error(err, "failed to end task")
} else {
log.Info("finished collection task", "id", currentTask.Id, "duration", duration.String())
Expand All @@ -158,8 +171,13 @@ func start(ctx context.Context) {
}
}

func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) error {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v1/ingest"})
func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-chan []interface{}) bool {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v2/ingest"})

var (
hasErrors = false
maxRetries = 3
)

for data := range pipeline.OrDone(ctx.Done(), in) {
body := models.IngestRequest{
Expand All @@ -170,18 +188,38 @@ func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-ch
}

if req, err := rest.NewRequest(ctx, "POST", endpoint, body, nil, nil); err != nil {
return fmt.Errorf("unable to create ingest HTTP request: %w", err)
} else if response, err := bheClient.Do(req); err != nil {
return fmt.Errorf("unable to send data to BHE ingest endpoint %v: %w", bheUrl, err)
} else if response.StatusCode != http.StatusAccepted {
if bodyBytes, err := io.ReadAll(response.Body); err != nil {
return fmt.Errorf("BHE returned HTTP status %d(%s). Failure reading response body: %w", response.StatusCode, response.Status, err)
} else {
return fmt.Errorf("BHE returned error %d(%s): %s", response.StatusCode, response.Status, string(bodyBytes))
log.Error(err, "unable to create ingest HTTP request")
hasErrors = true
} else {
for retry := 0; retry < maxRetries; retry++ {
//No retries on regular err cases, only on HTTP 504 timeouts
if response, err := bheClient.Do(req); err != nil {
log.Error(err, fmt.Sprintf("unable to send data to BHE ingest endpoint %v", bheUrl))
hasErrors = true
break
} else if response.StatusCode == http.StatusGatewayTimeout {
backoff := math.Pow(5, float64(retry+1))
time.Sleep(time.Second * time.Duration(backoff))
if retry == maxRetries-1 {
log.Error(ErrExceededRetryLimit, "")
hasErrors = true
}
continue
} else if response.StatusCode != http.StatusAccepted {
if bodyBytes, err := io.ReadAll(response.Body); err != nil {
log.Error(err, fmt.Sprintf("BHE returned HTTP status %d(%s). Failure reading response body", response.StatusCode, response.Status))
hasErrors = true
break
} else {
log.Error(err, fmt.Sprintf("BHE returned error %d(%s): %s", response.StatusCode, response.Status, string(bodyBytes)))
hasErrors = true
break
}
}
}
}
}
return nil
return hasErrors
}

func getAvailableTasks(ctx context.Context, bheUrl url.URL, bheClient *http.Client) ([]models.ClientTask, error) {
Expand Down Expand Up @@ -233,10 +271,15 @@ func startTask(ctx context.Context, bheUrl url.URL, bheClient *http.Client, task
}
}

func endTask(ctx context.Context, bheUrl url.URL, bheClient *http.Client) error {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v1/clients/endtask"})
func endTask(ctx context.Context, bheUrl url.URL, bheClient *http.Client, status models.JobStatus, message string) error {
endpoint := bheUrl.ResolveReference(&url.URL{Path: "/api/v2/jobs/end"})

body := models.CompleteJobRequest{
Status: status.String(),
Message: message,
}

if req, err := rest.NewRequest(ctx, "POST", endpoint, nil, nil, nil); err != nil {
if req, err := rest.NewRequest(ctx, "POST", endpoint, body, nil, nil); err != nil {
return err
} else if _, err := bheClient.Do(req); err != nil {
return err
Expand Down
48 changes: 48 additions & 0 deletions models/job-complete-request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package models

type CompleteJobRequest struct {
Status string `json:"status"`
StatusEnum JobStatus `json:"-"`
Message string `json:"message"`
}

type JobStatus int

const (
JobStatusInvalid JobStatus = -1
JobStatusReady JobStatus = 0
JobStatusRunning JobStatus = 1
JobStatusComplete JobStatus = 2
JobStatusCanceled JobStatus = 3
JobStatusTimedOut JobStatus = 4
JobStatusFailed JobStatus = 5
JobStatusIngesting JobStatus = 6
)

func (s JobStatus) String() string {
switch s {
case JobStatusReady:
return "READY"

case JobStatusRunning:
return "RUNNING"

case JobStatusComplete:
return "COMPLETE"

case JobStatusCanceled:
return "CANCELED"

case JobStatusTimedOut:
return "TIMEDOUT"

case JobStatusFailed:
return "FAILED"

case JobStatusIngesting:
return "INGESTING"

default:
return "INVALIDSTATUS"
}
}

0 comments on commit 18e629e

Please sign in to comment.