From 165619143adf6efddf1a1bc1dafaf92eb54967c3 Mon Sep 17 00:00:00 2001 From: Jay Mundrawala Date: Thu, 3 Oct 2024 04:37:01 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Chunk=20sending=20results=20by?= =?UTF-8?q?=20size?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There is an upper limit to what is accepted by the platform per request. Make sure we do not go past it when sending data. --- explorer/executor/executor.go | 43 ++++++++---- .../datalakes/inmemory/query_conductor.go | 3 +- utils/iox/chunker.go | 41 ++++++++++++ utils/iox/chunker_test.go | 65 +++++++++++++++++++ 4 files changed, 140 insertions(+), 12 deletions(-) create mode 100644 utils/iox/chunker.go create mode 100644 utils/iox/chunker_test.go diff --git a/explorer/executor/executor.go b/explorer/executor/executor.go index 8c0d6b3799..becb0364d7 100644 --- a/explorer/executor/executor.go +++ b/explorer/executor/executor.go @@ -15,6 +15,7 @@ import ( "go.mondoo.com/cnquery/v11/explorer" "go.mondoo.com/cnquery/v11/llx" "go.mondoo.com/cnquery/v11/mqlc" + "go.mondoo.com/cnquery/v11/utils/iox" "go.mondoo.com/cnquery/v11/utils/multierr" ) @@ -235,14 +236,14 @@ func (e *instance) WaitUntilDone(timeout time.Duration) error { } } -func (e *instance) snapshotResults() map[string]*llx.Result { +func (e *instance) snapshotResults() []*llx.Result { if e.datapoints != nil { e.mutex.Lock() - results := make(map[string]*llx.Result, len(e.datapoints)) + results := make([]*llx.Result, 0, len(e.datapoints)) for id := range e.datapoints { c := e.results[id] if c != nil { - results[id] = c.Result() + results = append(results, c.Result()) } } e.mutex.Unlock() @@ -250,9 +251,9 @@ func (e *instance) snapshotResults() map[string]*llx.Result { } e.mutex.Lock() - results := make(map[string]*llx.Result, len(e.results)) - for id, v := range e.results { - results[id] = v.Result() + results := make([]*llx.Result, 0, len(e.results)) + for _, v := range e.results { + results = append(results, v.Result()) } e.mutex.Unlock() return results @@ -263,12 +264,32 @@ func (e *instance) StoreQueryData() error { return errors.New("cannot store data, no collector provided") } - _, err := e.collector.StoreResults(context.Background(), &explorer.StoreResultsReq{ - AssetMrn: e.assetMrn, - Data: e.snapshotResults(), - }) + results := e.snapshotResults() - return err + if len(results) > 0 { + err := iox.ChunkMessages(func(chunk []*llx.Result) error { + log.Debug().Msg("Sending datapoints") + resultsToSend := make(map[string]*llx.Result, len(results)) + for _, rr := range chunk { + resultsToSend[rr.CodeId] = rr + } + _, err := e.collector.StoreResults(context.Background(), &explorer.StoreResultsReq{ + AssetMrn: e.assetMrn, + Data: resultsToSend, + }) + if err != nil { + return err + } + return nil + }, func(item *llx.Result, msgSize int) { + log.Warn().Msgf("Data %s (%d) exceeds maximum message size", item.CodeId, msgSize) + }, results...) + if err != nil { + return err + } + } + + return nil } func (e *instance) isCollected(query *llx.CodeBundle) bool { diff --git a/internal/datalakes/inmemory/query_conductor.go b/internal/datalakes/inmemory/query_conductor.go index 25ae710f22..88dbed41f8 100644 --- a/internal/datalakes/inmemory/query_conductor.go +++ b/internal/datalakes/inmemory/query_conductor.go @@ -172,8 +172,9 @@ func (db *Db) GetReport(ctx context.Context, assetMrn string, packMrn string) (* Data: llx.NilPrimitive, CodeId: id, } + } else { + data[id] = datum.(*llx.Result) } - data[id] = datum.(*llx.Result) } return &explorer.Report{ diff --git a/utils/iox/chunker.go b/utils/iox/chunker.go new file mode 100644 index 0000000000..2838d6f973 --- /dev/null +++ b/utils/iox/chunker.go @@ -0,0 +1,41 @@ +// Using license identifier: BUSL-1.1 +// Using copyright holder: Mondoo, Inc. + +package iox + +import ( + "google.golang.org/protobuf/proto" +) + +var maxMessageSize = 6 * (1 << 20) + +func ChunkMessages[T proto.Message](sendFunc func([]T) error, onTooLarge func(T, int), items ...T) error { + idx := 0 + for { + buffer := make([]T, 0, len(items)) + + if idx >= len(items) { + break + } + size := 0 + for i := idx; i < len(items); i++ { + msgSize := proto.Size(items[i]) + if msgSize > maxMessageSize { + onTooLarge(items[i], msgSize) + idx++ + continue + } + size += proto.Size(items[i]) + if size > maxMessageSize { + break + } + buffer = append(buffer, items[i]) + idx++ + } + if err := sendFunc(buffer); err != nil { + return err + } + } + + return nil +} diff --git a/utils/iox/chunker_test.go b/utils/iox/chunker_test.go new file mode 100644 index 0000000000..e2ac2fc9e8 --- /dev/null +++ b/utils/iox/chunker_test.go @@ -0,0 +1,65 @@ +// Using license identifier: BUSL-1.1 +// Using copyright holder: Mondoo, Inc. + +package iox + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/interop/grpc_testing" +) + +func TestChunkerIgnoreTooLargeMessages(t *testing.T) { + payloads := []*grpc_testing.Payload{ + { + Body: bytes.Repeat([]byte{0x01}, maxMessageSize+1), + }, + { + Body: bytes.Repeat([]byte{0x02}, maxMessageSize/2), + }, + } + + var chunks [][]*grpc_testing.Payload + err := ChunkMessages(func(chunk []*grpc_testing.Payload) error { + chunks = append(chunks, chunk) + return nil + }, func(*grpc_testing.Payload, int) {}, payloads...) + require.NoError(t, err) + require.Len(t, chunks, 1) + require.Len(t, chunks[0], 1) + require.Equal(t, payloads[1], chunks[0][0]) +} + +func TestChunker(t *testing.T) { + maxMessageSize = 100 + payloads := []*grpc_testing.Payload{ + { + Body: bytes.Repeat([]byte{0x01}, maxMessageSize-10), + }, + { + Body: bytes.Repeat([]byte{0x02}, maxMessageSize-10), + }, + { + Body: bytes.Repeat([]byte{0x03}, 10), + }, + { + Body: bytes.Repeat([]byte{0x04}, 10), + }, + } + + var chunks [][]*grpc_testing.Payload + err := ChunkMessages(func(chunk []*grpc_testing.Payload) error { + chunks = append(chunks, chunk) + return nil + }, func(*grpc_testing.Payload, int) {}, payloads...) + require.NoError(t, err) + require.Len(t, chunks, 3) + require.Len(t, chunks[0], 1) + require.Equal(t, payloads[0], chunks[0][0]) + require.Len(t, chunks[1], 1) + require.Equal(t, payloads[1], chunks[1][0]) + require.Len(t, chunks[2], 2) + require.Equal(t, payloads[2], chunks[2][0]) +}