From 3e63517472db37d9dbaecc6fbbb420daa62fd1df Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 18 Sep 2024 13:50:35 -0500 Subject: [PATCH] Log unprocessed items after dynamo batch put operation --- core/models/channel_logs.go | 31 +++++++++++++++++++++++++------ utils/clogs/batch.go | 36 ------------------------------------ utils/clogs/clog_test.go | 4 +++- 3 files changed, 28 insertions(+), 43 deletions(-) delete mode 100644 utils/clogs/batch.go diff --git a/core/models/channel_logs.go b/core/models/channel_logs.go index 4accfb79b..1425f872d 100644 --- a/core/models/channel_logs.go +++ b/core/models/channel_logs.go @@ -4,8 +4,12 @@ import ( "context" "encoding/json" "fmt" + "log/slog" + "slices" "time" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/mailroom/runtime" @@ -84,12 +88,27 @@ type dbChannelLog struct { // InsertChannelLogs writes the given channel logs to the db func InsertChannelLogs(ctx context.Context, rt *runtime.Runtime, logs []*ChannelLog) error { // write all logs to DynamoDB - cls := make([]*clogs.Log, len(logs)) - for i, l := range logs { - cls[i] = l.Log - } - if err := clogs.BatchPut(ctx, rt.Dynamo, "ChannelLogs", cls); err != nil { - return fmt.Errorf("error writing channel logs: %w", err) + for batch := range slices.Chunk(logs, 25) { + writeReqs := make([]types.WriteRequest, len(batch)) + + for i, l := range batch { + d, err := l.MarshalDynamo() + if err != nil { + return fmt.Errorf("error marshalling log: %w", err) + } + writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: d}} + } + + resp, err := rt.Dynamo.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]types.WriteRequest{rt.Dynamo.TableName("ChannelLogs"): writeReqs}, + }) + if err != nil { + return fmt.Errorf("error writing logs to dynamo: %w", err) + } + if len(resp.UnprocessedItems) > 0 { + // TODO shouldn't happend.. but need to figure out how we would retry these + slog.Error("unprocessed items writing logs to dynamo", "count", len(resp.UnprocessedItems)) + } } unattached := make([]*dbChannelLog, 0, len(logs)) diff --git a/utils/clogs/batch.go b/utils/clogs/batch.go deleted file mode 100644 index 7d174b886..000000000 --- a/utils/clogs/batch.go +++ /dev/null @@ -1,36 +0,0 @@ -package clogs - -import ( - "context" - "fmt" - "slices" - - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/nyaruka/gocommon/aws/dynamo" -) - -// BatchPut writes multiple logs to DynamoDB in batches of 25. This should probably be a generic function in the -// gocommon/dynamo package but need to think more about errors. -func BatchPut(ctx context.Context, ds *dynamo.Service, table string, logs []*Log) error { - for batch := range slices.Chunk(logs, 25) { - writeReqs := make([]types.WriteRequest, len(batch)) - - for i, l := range batch { - d, err := l.MarshalDynamo() - if err != nil { - return fmt.Errorf("error marshalling log: %w", err) - } - writeReqs[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: d}} - } - - _, err := ds.Client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ - RequestItems: map[string][]types.WriteRequest{ds.TableName(table): writeReqs}, - }) - if err != nil { - return fmt.Errorf("error writing logs to db: %w", err) - } - } - - return nil -} diff --git a/utils/clogs/clog_test.go b/utils/clogs/clog_test.go index 2df4a929c..0a8cc1ee9 100644 --- a/utils/clogs/clog_test.go +++ b/utils/clogs/clog_test.go @@ -54,7 +54,9 @@ func TestLogs(t *testing.T) { l2.Error(clogs.NewLogError("code2", "ext", "message")) // write both logs to db - err = clogs.BatchPut(ctx, ds, "ChannelLogs", []*clogs.Log{l1, l2}) + err = ds.PutItem(ctx, "ChannelLogs", l1) + assert.NoError(t, err) + err = ds.PutItem(ctx, "ChannelLogs", l2) assert.NoError(t, err) // read log 1 back from db