Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graphdb batch writer resiliency #295

Merged
merged 13 commits into from
Dec 2, 2024
Prev Previous commit
Next Next commit
chore(ci): fix linter findings.
Zenithar committed Nov 28, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 99e875f406e96362ffa15e721f4c4b84c9f19407
8 changes: 4 additions & 4 deletions pkg/kubehound/storage/graphdb/errors.go
Original file line number Diff line number Diff line change
@@ -2,21 +2,21 @@ package graphdb

import "fmt"

// errBatchWriter is an error type that wraps an error and indicates whether the
// batchWriterError is an error type that wraps an error and indicates whether the
// error is retryable.
type errBatchWriter struct {
type batchWriterError struct {
err error
retryable bool
}

func (e errBatchWriter) Error() string {
func (e batchWriterError) Error() string {
if e.err == nil {
return fmt.Sprintf("batch writer error (retriable:%v)", e.retryable)
}

return fmt.Sprintf("batch writer error (retriable:%v): %v", e.retryable, e.err.Error())
}

func (e errBatchWriter) Unwrap() error {
func (e batchWriterError) Unwrap() error {
return e.err
}
9 changes: 6 additions & 3 deletions pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go
Original file line number Diff line number Diff line change
@@ -83,23 +83,26 @@ func (jgv *JanusGraphEdgeWriter) startBackgroundWriter(ctx context.Context) {
// If the channel is closed, return.
if !ok {
log.Trace(ctx).Info("Closed background janusgraph worker on channel close")

return
}

// If the batch is empty, return.
if len(batch.data) == 0 {
log.Trace(ctx).Warn("Empty edge batch received in background janusgraph worker, skipping")

return
}

_ = statsd.Count(ctx, metric.BackgroundWriterCall, 1, jgv.tags, 1)
err := jgv.batchWrite(ctx, batch.data)
if err != nil {
var e *errBatchWriter
var e *batchWriterError
if errors.As(err, &e) {
// If the error is retryable, retry the write operation with a smaller batch.
if e.retryable && batch.retryCount < jgv.maxRetry {
jgv.retrySplitAndRequeue(ctx, &batch, e)

continue
}

@@ -120,7 +123,7 @@ func (jgv *JanusGraphEdgeWriter) startBackgroundWriter(ctx context.Context) {
}

// retrySplitAndRequeue will split the batch into smaller chunks and requeue them for writing.
func (jgv *JanusGraphEdgeWriter) retrySplitAndRequeue(ctx context.Context, batch *batchItem, e *errBatchWriter) {
func (jgv *JanusGraphEdgeWriter) retrySplitAndRequeue(ctx context.Context, batch *batchItem, e *batchWriterError) {
_ = statsd.Count(ctx, metric.RetryWriterCall, 1, jgv.tags, 1)

// Compute the new batch size.
@@ -168,7 +171,7 @@ func (jgv *JanusGraphEdgeWriter) batchWrite(ctx context.Context, data []any) err
return ctx.Err()
case <-time.After(jgv.writerTimeout):
// If the write operation takes too long, return an error.
return &errBatchWriter{
return &batchWriterError{
err: errors.New("edge write operation timed out"),
retryable: true,
}
11 changes: 8 additions & 3 deletions pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go
Original file line number Diff line number Diff line change
@@ -94,23 +94,26 @@ func (jgv *JanusGraphVertexWriter) startBackgroundWriter(ctx context.Context) {
// If the channel is closed, return.
if !ok {
log.Trace(ctx).Info("Closed background janusgraph worker on channel close")

return
}

// If the batch is empty, return.
if len(batch.data) == 0 {
log.Trace(ctx).Warn("Empty vertex batch received in background janusgraph worker, skipping")

return
}

_ = statsd.Count(ctx, metric.BackgroundWriterCall, 1, jgv.tags, 1)
err := jgv.batchWrite(ctx, batch.data)
if err != nil {
var e *errBatchWriter
var e *batchWriterError
if errors.As(err, &e) {
// If the error is retryable, retry the write operation with a smaller batch.
if e.retryable && batch.retryCount < jgv.maxRetry {
jgv.retrySplitAndRequeue(ctx, &batch, e)

continue
}

@@ -131,7 +134,7 @@ func (jgv *JanusGraphVertexWriter) startBackgroundWriter(ctx context.Context) {
}

// retrySplitAndRequeue will split the batch into smaller chunks and requeue them for writing.
func (jgv *JanusGraphVertexWriter) retrySplitAndRequeue(ctx context.Context, batch *batchItem, e *errBatchWriter) {
func (jgv *JanusGraphVertexWriter) retrySplitAndRequeue(ctx context.Context, batch *batchItem, e *batchWriterError) {
_ = statsd.Count(ctx, metric.RetryWriterCall, 1, jgv.tags, 1)

// Compute the new batch size.
@@ -207,6 +210,7 @@ func (jgv *JanusGraphVertexWriter) batchWrite(ctx context.Context, data []any) e
ToList()
if err != nil {
errChan <- fmt.Errorf("%s vertex insert: %w", jgv.builder, err)

return
}

@@ -215,6 +219,7 @@ func (jgv *JanusGraphVertexWriter) batchWrite(ctx context.Context, data []any) e
// We need to parse each map entry and add to our cache.
if err = jgv.cacheIds(ctx, raw); err != nil {
errChan <- fmt.Errorf("cache ids: %w", err)

return
}

@@ -228,7 +233,7 @@ func (jgv *JanusGraphVertexWriter) batchWrite(ctx context.Context, data []any) e
return ctx.Err()
case <-time.After(jgv.writerTimeout):
// If the write operation takes too long, return an error.
return &errBatchWriter{
return &batchWriterError{
err: errors.New("vertex write operation timed out"),
retryable: true,
}