Skip to content

Commit

Permalink
🐛 Chunk sending results by size
Browse files Browse the repository at this point in the history
There is an upper limit to what is accepted by the platform per request. Make sure we do not go past it when sending data.
  • Loading branch information
jaym committed Oct 3, 2024
1 parent 693c111 commit 1656191
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 12 deletions.
43 changes: 32 additions & 11 deletions explorer/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.mondoo.com/cnquery/v11/explorer"
"go.mondoo.com/cnquery/v11/llx"
"go.mondoo.com/cnquery/v11/mqlc"
"go.mondoo.com/cnquery/v11/utils/iox"
"go.mondoo.com/cnquery/v11/utils/multierr"
)

Expand Down Expand Up @@ -235,24 +236,24 @@ func (e *instance) WaitUntilDone(timeout time.Duration) error {
}
}

func (e *instance) snapshotResults() map[string]*llx.Result {
func (e *instance) snapshotResults() []*llx.Result {
if e.datapoints != nil {
e.mutex.Lock()
results := make(map[string]*llx.Result, len(e.datapoints))
results := make([]*llx.Result, 0, len(e.datapoints))
for id := range e.datapoints {
c := e.results[id]
if c != nil {
results[id] = c.Result()
results = append(results, c.Result())
}
}
e.mutex.Unlock()
return results
}

e.mutex.Lock()
results := make(map[string]*llx.Result, len(e.results))
for id, v := range e.results {
results[id] = v.Result()
results := make([]*llx.Result, 0, len(e.results))
for _, v := range e.results {
results = append(results, v.Result())
}
e.mutex.Unlock()
return results
Expand All @@ -263,12 +264,32 @@ func (e *instance) StoreQueryData() error {
return errors.New("cannot store data, no collector provided")
}

_, err := e.collector.StoreResults(context.Background(), &explorer.StoreResultsReq{
AssetMrn: e.assetMrn,
Data: e.snapshotResults(),
})
results := e.snapshotResults()

return err
if len(results) > 0 {
err := iox.ChunkMessages(func(chunk []*llx.Result) error {
log.Debug().Msg("Sending datapoints")
resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range chunk {
resultsToSend[rr.CodeId] = rr
}
_, err := e.collector.StoreResults(context.Background(), &explorer.StoreResultsReq{
AssetMrn: e.assetMrn,
Data: resultsToSend,
})
if err != nil {
return err
}
return nil
}, func(item *llx.Result, msgSize int) {
log.Warn().Msgf("Data %s (%d) exceeds maximum message size", item.CodeId, msgSize)
}, results...)
if err != nil {
return err
}
}

return nil
}

func (e *instance) isCollected(query *llx.CodeBundle) bool {
Expand Down
3 changes: 2 additions & 1 deletion internal/datalakes/inmemory/query_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ func (db *Db) GetReport(ctx context.Context, assetMrn string, packMrn string) (*
Data: llx.NilPrimitive,
CodeId: id,
}
} else {
data[id] = datum.(*llx.Result)
}
data[id] = datum.(*llx.Result)
}

return &explorer.Report{
Expand Down
41 changes: 41 additions & 0 deletions utils/iox/chunker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Using license identifier: BUSL-1.1
// Using copyright holder: Mondoo, Inc.

package iox

import (
"google.golang.org/protobuf/proto"
)

var maxMessageSize = 6 * (1 << 20)

func ChunkMessages[T proto.Message](sendFunc func([]T) error, onTooLarge func(T, int), items ...T) error {
idx := 0
for {
buffer := make([]T, 0, len(items))

if idx >= len(items) {
break
}
size := 0
for i := idx; i < len(items); i++ {
msgSize := proto.Size(items[i])
if msgSize > maxMessageSize {
onTooLarge(items[i], msgSize)
idx++
continue
}
size += proto.Size(items[i])
if size > maxMessageSize {
break
}
buffer = append(buffer, items[i])
idx++
}
if err := sendFunc(buffer); err != nil {
return err
}
}

return nil
}
65 changes: 65 additions & 0 deletions utils/iox/chunker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Using license identifier: BUSL-1.1
// Using copyright holder: Mondoo, Inc.

package iox

import (
"bytes"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/interop/grpc_testing"
)

func TestChunkerIgnoreTooLargeMessages(t *testing.T) {
payloads := []*grpc_testing.Payload{
{
Body: bytes.Repeat([]byte{0x01}, maxMessageSize+1),
},
{
Body: bytes.Repeat([]byte{0x02}, maxMessageSize/2),
},
}

var chunks [][]*grpc_testing.Payload
err := ChunkMessages(func(chunk []*grpc_testing.Payload) error {
chunks = append(chunks, chunk)
return nil
}, func(*grpc_testing.Payload, int) {}, payloads...)
require.NoError(t, err)
require.Len(t, chunks, 1)
require.Len(t, chunks[0], 1)
require.Equal(t, payloads[1], chunks[0][0])
}

func TestChunker(t *testing.T) {
maxMessageSize = 100
payloads := []*grpc_testing.Payload{
{
Body: bytes.Repeat([]byte{0x01}, maxMessageSize-10),
},
{
Body: bytes.Repeat([]byte{0x02}, maxMessageSize-10),
},
{
Body: bytes.Repeat([]byte{0x03}, 10),
},
{
Body: bytes.Repeat([]byte{0x04}, 10),
},
}

var chunks [][]*grpc_testing.Payload
err := ChunkMessages(func(chunk []*grpc_testing.Payload) error {
chunks = append(chunks, chunk)
return nil
}, func(*grpc_testing.Payload, int) {}, payloads...)
require.NoError(t, err)
require.Len(t, chunks, 3)
require.Len(t, chunks[0], 1)
require.Equal(t, payloads[0], chunks[0][0])
require.Len(t, chunks[1], 1)
require.Equal(t, payloads[1], chunks[1][0])
require.Len(t, chunks[2], 2)
require.Equal(t, payloads[2], chunks[2][0])
}

0 comments on commit 1656191

Please sign in to comment.