Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop writing channel logs to S3 #332

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 2 additions & 49 deletions core/models/channel_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"path"
"time"

s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/nyaruka/gocommon/aws/s3x"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/clogs"
)
Expand Down Expand Up @@ -85,21 +81,6 @@ type dbChannelLog struct {
CreatedOn time.Time `db:"created_on"`
}

// channel log to be written to logs storage
type stChannelLog struct {
UUID clogs.LogUUID `json:"uuid"`
Type clogs.LogType `json:"type"`
HTTPLogs []*httpx.Log `json:"http_logs"`
Errors []*clogs.LogError `json:"errors"`
ElapsedMS int `json:"elapsed_ms"`
CreatedOn time.Time `json:"created_on"`
ChannelUUID assets.ChannelUUID `json:"-"`
}

func (l *stChannelLog) path() string {
return path.Join("channels", string(l.ChannelUUID), string(l.UUID[:4]), fmt.Sprintf("%s.json", l.UUID))
}

// InsertChannelLogs writes the given channel logs to the db
func InsertChannelLogs(ctx context.Context, rt *runtime.Runtime, logs []*ChannelLog) error {
// write all logs to DynamoDB
Expand All @@ -111,23 +92,11 @@ func InsertChannelLogs(ctx context.Context, rt *runtime.Runtime, logs []*Channel
return fmt.Errorf("error writing channel logs: %w", err)
}

attached := make([]*stChannelLog, 0, len(logs))
unattached := make([]*dbChannelLog, 0, len(logs))

for _, l := range logs {
if l.attached {
// if log is attached to a call or message, only write to storage
attached = append(attached, &stChannelLog{
UUID: l.UUID,
Type: l.Type,
HTTPLogs: l.HttpLogs,
Errors: l.Errors,
ElapsedMS: int(l.Elapsed / time.Millisecond),
CreatedOn: l.CreatedOn,
ChannelUUID: l.channel.UUID(),
})
} else {
// otherwise write to database so it's retrievable
if !l.attached {
// if log isn't attached to a message or call we need to write it to the db so that it's retrievable
unattached = append(unattached, &dbChannelLog{
UUID: l.UUID,
ChannelID: l.channel.ID(),
Expand All @@ -141,22 +110,6 @@ func InsertChannelLogs(ctx context.Context, rt *runtime.Runtime, logs []*Channel
}
}

if len(attached) > 0 {
uploads := make([]*s3x.Upload, len(attached))
for i, l := range attached {
uploads[i] = &s3x.Upload{
Bucket: rt.Config.S3LogsBucket,
Key: l.path(),
ContentType: "application/json",
Body: jsonx.MustMarshal(l),
ACL: s3types.ObjectCannedACLPrivate,
}
}
if err := rt.S3.BatchPut(ctx, uploads, 32); err != nil {
return fmt.Errorf("error writing attached channel logs to storage: %w", err)
}
}

if len(unattached) > 0 {
err := BulkQuery(ctx, "insert channel log", rt.DB, sqlInsertChannelLog, unattached)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/models/channel_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func TestChannelLogsOutgoing(t *testing.T) {
ctx, rt := testsuite.Runtime()

defer rt.DB.MustExec(`DELETE FROM channels_channellog`)
defer testsuite.Reset(testsuite.ResetData | testsuite.ResetDynamo)

defer httpx.SetRequestor(httpx.DefaultRequestor)
httpx.SetRequestor(httpx.NewMockRequestor(map[string][]*httpx.MockResponse{
Expand Down
5 changes: 0 additions & 5 deletions mailroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ func (mr *Mailroom) Start() error {
} else {
log.Info("sessions bucket ok")
}
if err := mr.rt.S3.Test(mr.ctx, c.S3LogsBucket); err != nil {
log.Error("logs bucket not accessible", "error", err)
} else {
log.Info("logs bucket ok")
}

// initialize our elastic client
mr.rt.ES, err = elasticsearch.NewTypedClient(elasticsearch.Config{Addresses: []string{c.Elastic}, Username: c.ElasticUsername, Password: c.ElasticPassword})
Expand Down
2 changes: 0 additions & 2 deletions runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ type Config struct {
S3Endpoint string `help:"S3 service endpoint, e.g. https://s3.amazonaws.com"`
S3AttachmentsBucket string `help:"S3 bucket to write attachments to"`
S3SessionsBucket string `help:"S3 bucket to write flow sessions to"`
S3LogsBucket string `help:"S3 bucket to write channel logs to"`
S3Minio bool `help:"S3 is actually Minio or other compatible service"`

CourierAuthToken string `help:"the authentication token used for requests to Courier"`
Expand Down Expand Up @@ -128,7 +127,6 @@ func NewDefaultConfig() *Config {
S3Endpoint: "https://s3.amazonaws.com",
S3AttachmentsBucket: "temba-attachments",
S3SessionsBucket: "temba-sessions",
S3LogsBucket: "temba-logs",

InstanceID: hostname,
LogLevel: slog.LevelWarn,
Expand Down
3 changes: 1 addition & 2 deletions testsuite/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func Runtime() (context.Context, *runtime.Runtime) {
cfg.S3Endpoint = "http://localhost:9000"
cfg.S3AttachmentsBucket = "test-attachments"
cfg.S3SessionsBucket = "test-sessions"
cfg.S3LogsBucket = "test-logs"
cfg.S3Minio = true
cfg.DynamoEndpoint = "http://localhost:6000"
cfg.DynamoTablePrefix = "Test"
Expand Down Expand Up @@ -213,7 +212,6 @@ func resetRedis() {
func resetStorage(ctx context.Context, rt *runtime.Runtime) {
rt.S3.EmptyBucket(ctx, rt.Config.S3AttachmentsBucket)
rt.S3.EmptyBucket(ctx, rt.Config.S3SessionsBucket)
rt.S3.EmptyBucket(ctx, rt.Config.S3LogsBucket)
}

// clears indexed data in Elastic
Expand Down Expand Up @@ -279,6 +277,7 @@ DELETE FROM triggers_trigger WHERE id >= 30000;
DELETE FROM channels_channel WHERE id >= 30000;
DELETE FROM channels_channelcount;
DELETE FROM channels_channelevent;
DELETE FROM channels_channellog;
DELETE FROM msgs_msg;
DELETE FROM flows_flowrun;
DELETE FROM flows_flowpathcount;
Expand Down
2 changes: 1 addition & 1 deletion utils/clogs/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

const (
dynamoTTL = 14 * 24 * time.Hour
dynamoTTL = 7 * 24 * time.Hour // 1 week
)

// LogUUID is the type of a channel log UUID (should be v7)
Expand Down
25 changes: 16 additions & 9 deletions web/ivr/ivr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
"sync"
"testing"

"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/goflow/test"
_ "github.com/nyaruka/mailroom/core/handlers"
"github.com/nyaruka/mailroom/core/models"
Expand Down Expand Up @@ -330,10 +331,12 @@ func TestTwilioIVR(t *testing.T) {
AND ((status = 'H' AND direction = 'I') OR (status = 'W' AND direction = 'O'))`, testdata.Bob.ID).Returns(2)

// check the generated channel logs
logs := getCallLogs(t, rt, testdata.TwilioChannel.UUID)
logs := getCallLogs(t, ctx, rt)
assert.Len(t, logs, 17)
for _, log := range logs {
assert.NotContains(t, string(log), "sesame") // auth token redacted
for _, httpLog := range log.HttpLogs {
assert.NotContains(t, string(jsonx.MustMarshal(httpLog)), "sesame") // auth token redacted
}
}
}

Expand Down Expand Up @@ -629,28 +632,32 @@ func TestVonageIVR(t *testing.T) {
assertdb.Query(t, rt.DB, `SELECT count(*) FROM ivr_call WHERE status = 'D' AND contact_id = $1`, testdata.George.ID).Returns(1)

// check the generated channel logs
logs := getCallLogs(t, rt, testdata.VonageChannel.UUID)
logs := getCallLogs(t, ctx, rt)
assert.Len(t, logs, 16)
for _, log := range logs {
assert.NotContains(t, string(log), "BEGIN PRIVATE KEY") // private key redacted
for _, httpLog := range log.HttpLogs {
assert.NotContains(t, string(jsonx.MustMarshal(httpLog)), "BEGIN PRIVATE KEY") // private key redacted
}
}

// and 2 unattached logs in the database
assertdb.Query(t, rt.DB, `SELECT count(*) FROM channels_channellog WHERE channel_id = $1`, testdata.VonageChannel.ID).Returns(2)
assertdb.Query(t, rt.DB, `SELECT array_agg(log_type ORDER BY id) FROM channels_channellog WHERE channel_id = $1`, testdata.VonageChannel.ID).Returns([]byte(`{ivr_status,ivr_status}`))
}

func getCallLogs(t *testing.T, rt *runtime.Runtime, channelUUID assets.ChannelUUID) [][]byte {
func getCallLogs(t *testing.T, ctx context.Context, rt *runtime.Runtime) []*clogs.Log {
var logUUIDs []clogs.LogUUID
err := rt.DB.Select(&logUUIDs, `SELECT unnest(log_uuids) FROM ivr_call ORDER BY id`)
require.NoError(t, err)

logs := make([][]byte, len(logUUIDs))
logs := make([]*clogs.Log, len(logUUIDs))

for i, logUUID := range logUUIDs {
_, body, err := rt.S3.GetObject(context.Background(), rt.Config.S3LogsBucket, fmt.Sprintf("channels/%s/%s/%s.json", channelUUID, logUUID[0:4], logUUID))
log := &clogs.Log{}
err = rt.Dynamo.GetItem(ctx, "ChannelLogs", map[string]types.AttributeValue{"UUID": &types.AttributeValueMemberS{Value: string(logUUID)}}, log)
require.NoError(t, err)
logs[i] = body
logs[i] = log
}

return logs
}
Loading