From 18dd88f874af36377a98ac243dbe99db37a2aa27 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Fri, 5 Apr 2024 18:19:56 +0100 Subject: [PATCH 01/20] Implement MVP and add some initial tests --- receiver/awss3receiver/config.go | 12 +- receiver/awss3receiver/go.mod | 20 ++- receiver/awss3receiver/go.sum | 36 +++++ receiver/awss3receiver/receiver.go | 47 ++++++- receiver/awss3receiver/s3intf.go | 64 +++++++++ receiver/awss3receiver/s3reader.go | 140 ++++++++++++++++++++ receiver/awss3receiver/s3reader_test.go | 166 ++++++++++++++++++++++++ 7 files changed, 474 insertions(+), 11 deletions(-) create mode 100644 receiver/awss3receiver/s3intf.go create mode 100644 receiver/awss3receiver/s3reader.go create mode 100644 receiver/awss3receiver/s3reader_test.go diff --git a/receiver/awss3receiver/config.go b/receiver/awss3receiver/config.go index 400c5a8a31a7..06bd9144fa04 100644 --- a/receiver/awss3receiver/config.go +++ b/receiver/awss3receiver/config.go @@ -47,26 +47,26 @@ func (c Config) Validate() error { if c.StartTime == "" { errs = multierr.Append(errs, errors.New("start time is required")) } else { - if err := validateTime(c.StartTime); err != nil { + if _, err := parseTime(c.StartTime); err != nil { errs = multierr.Append(errs, errors.New("unable to parse start date")) } } if c.EndTime == "" { errs = multierr.Append(errs, errors.New("end time is required")) } else { - if err := validateTime(c.EndTime); err != nil { + if _, err := parseTime(c.EndTime); err != nil { errs = multierr.Append(errs, errors.New("unable to parse end time")) } } return errs } -func validateTime(str string) error { +func parseTime(str string) (time.Time, error) { layouts := []string{"2006-01-02 15:04", time.DateOnly} for _, layout := range layouts { - if _, err := time.Parse(layout, str); err == nil { - return nil + if t, err := time.Parse(layout, str); err == nil { + return t, nil } } - return errors.New("unable to parse time string") + return time.Time{}, errors.New("unable to parse time string") } diff --git a/receiver/awss3receiver/go.mod b/receiver/awss3receiver/go.mod index 67f833de0ad0..a8d715d67b2b 100644 --- a/receiver/awss3receiver/go.mod +++ b/receiver/awss3receiver/go.mod @@ -3,10 +3,14 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3r go 1.21 require ( + github.com/aws/aws-sdk-go-v2 v1.26.1 + github.com/aws/aws-sdk-go-v2/config v1.27.10 + github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.97.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/collector/confmap v0.97.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/collector/consumer v0.97.1-0.20240404121116-4f1a8936d26b + go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/collector/receiver v0.97.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 @@ -16,6 +20,21 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.10 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.20.4 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect + github.com/aws/smithy-go v1.20.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -39,7 +58,6 @@ require ( github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240404121116-4f1a8936d26b // indirect - go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index 2dee1e0ee992..d06affb9b699 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -1,3 +1,39 @@ +github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= +github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg= +github.com/aws/aws-sdk-go-v2/config v1.27.10 h1:PS+65jThT0T/snC5WjyfHHyUgG+eBoupSDV+f838cro= +github.com/aws/aws-sdk-go-v2/config v1.27.10/go.mod h1:BePM7Vo4OBpHreKRUMuDXX+/+JWP38FLkzl5m27/Jjs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.10 h1:qDZ3EA2lv1KangvQB6y258OssCHD0xvaGiEDkG4X/10= +github.com/aws/aws-sdk-go-v2/credentials v1.17.10/go.mod h1:6t3sucOaYDwDssHQa0ojH1RpmVmF5/jArkye1b2FKMI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 h1:81KE7vaZzrl7yHBYHVEzYB8sypz11NMOZ40YlWvPxsU= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5/go.mod h1:LIt2rg7Mcgn09Ygbdh/RdIm0rQ+3BNkbP1gyVMFtRK0= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 h1:ZMeFZ5yk+Ek+jNr1+uwCd2tG89t6oTS5yVWpa6yy2es= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7/go.mod h1:mxV05U+4JiHqIpGqqYXOHLPKUC6bDXC44bsUhNjOEwY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 h1:f9RyWNtS8oH7cZlbn+/JNPpjUk5+5fLd5lM9M0i49Ys= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5/go.mod h1:h5CoMZV2VF297/VLhRhO1WF+XYWOzXo+4HsObA4HjBQ= +github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 h1:6cnno47Me9bRykw9AEv9zkXE+5or7jz8TsskTTccbgc= +github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1/go.mod h1:qmdkIIAC+GCLASF7R2whgNrJADz0QZPX+Seiw/i4S3o= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.4 h1:WzFol5Cd+yDxPAdnzTA5LmpHYSWinhmSj4rQChV0ee8= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.4/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2KCuY61kzoCpvtvJJBtOE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index ee4c04e3e1be..a898352c50d2 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -5,23 +5,62 @@ package awss3receiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" + "strings" ) type awss3Receiver struct { + s3Reader *s3Reader + consumer consumer.Traces + logger *zap.Logger + cancel context.CancelFunc } -func newAWSS3TraceReceiver(_ *Config, _ consumer.Traces, _ *zap.Logger) (*awss3Receiver, error) { - return &awss3Receiver{}, nil +func newAWSS3TraceReceiver(cfg *Config, traces consumer.Traces, logger *zap.Logger) (*awss3Receiver, error) { + reader, err := newS3Reader(cfg) + if err != nil { + return nil, err + } + return &awss3Receiver{ + s3Reader: reader, + consumer: traces, + logger: logger, + cancel: nil, + }, nil } -func (r *awss3Receiver) Start(_ context.Context, _ component.Host) error { +func (r *awss3Receiver) Start(ctx context.Context, _ component.Host) error { + ctx, r.cancel = context.WithCancel(ctx) + go func() { + _ = r.s3Reader.readAll(ctx, "traces", r.receiveBytes) + }() return nil } func (r *awss3Receiver) Shutdown(_ context.Context) error { + if r.cancel != nil { + r.cancel() + } + return nil +} + +func (r *awss3Receiver) receiveBytes(ctx context.Context, key string, data []byte) error { + var unmarshaler ptrace.Unmarshaler + if strings.HasSuffix(key, ".json") { + unmarshaler = &ptrace.JSONUnmarshaler{} + } + if strings.HasSuffix(key, ".binpb") { + unmarshaler = &ptrace.ProtoUnmarshaler{} + } + if unmarshaler != nil { + traces, err := unmarshaler.UnmarshalTraces(data) + if err != nil { + return err + } + return r.consumer.ConsumeTraces(ctx, traces) + } return nil } diff --git a/receiver/awss3receiver/s3intf.go b/receiver/awss3receiver/s3intf.go new file mode 100644 index 000000000000..ee030f0e1ae9 --- /dev/null +++ b/receiver/awss3receiver/s3intf.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver" + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "log" +) + +type ListObjectsV2Pager interface { + HasMorePages() bool + NextPage(context.Context, ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) +} + +type ListObjectsAPI interface { + NewListObjectsV2Paginator(params *s3.ListObjectsV2Input) ListObjectsV2Pager +} + +type GetObjectAPI interface { + GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) +} + +type s3ListObjectsAPIImpl struct { + client *s3.Client +} + +func newS3Client(cfg S3DownloaderConfig) (ListObjectsAPI, GetObjectAPI, error) { + optionsFuncs := make([]func(*config.LoadOptions) error, 0) + if cfg.Region != "" { + optionsFuncs = append(optionsFuncs, config.WithRegion(cfg.Region)) + } + + if cfg.Endpoint != "" { + customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + PartitionID: "aws", + URL: cfg.Endpoint, + SigningRegion: cfg.Region, + }, nil + }) + optionsFuncs = append(optionsFuncs, config.WithEndpointResolverWithOptions(customResolver)) + } + awsCfg, err := config.LoadDefaultConfig(context.TODO(), optionsFuncs...) + if err != nil { + log.Fatalf("unable to load SDK config, %v", err) + return nil, nil, err + } + s3OptionFuncs := make([]func(options *s3.Options), 0) + if cfg.S3ForcePathStyle { + s3OptionFuncs = append(s3OptionFuncs, func(o *s3.Options) { + o.UsePathStyle = true + }) + } + client := s3.NewFromConfig(awsCfg, s3OptionFuncs...) + return &s3ListObjectsAPIImpl{client: client}, client, nil +} + +func (api *s3ListObjectsAPIImpl) NewListObjectsV2Paginator(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + return s3.NewListObjectsV2Paginator(api.client, params) +} diff --git a/receiver/awss3receiver/s3reader.go b/receiver/awss3receiver/s3reader.go new file mode 100644 index 000000000000..2e4d10962d09 --- /dev/null +++ b/receiver/awss3receiver/s3reader.go @@ -0,0 +1,140 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver" + +import ( + "context" + "fmt" + "github.com/aws/aws-sdk-go-v2/service/s3" + "io" + "time" +) + +type s3Reader struct { + listObjectsClient ListObjectsAPI + getObjectClient GetObjectAPI + s3Bucket string + s3Prefix string + filePrefix string + startTime time.Time + endTime time.Time + timeStep time.Duration + getTimeKey func(time.Time) string +} + +type s3ReaderDataCallback func(context.Context, string, []byte) error + +func newS3Reader(cfg *Config) (*s3Reader, error) { + listObjectsClient, getObjectClient, err := newS3Client(cfg.S3Downloader) + if err != nil { + return nil, err + } + startTime, err := parseTime(cfg.StartTime) + if err != nil { + return nil, err + } + endTime, err := parseTime(cfg.EndTime) + if err != nil { + return nil, err + } + + var getTimeKey func(time.Time) string + var timeStep time.Duration + if cfg.S3Downloader.S3Partition == "hour" { + getTimeKey = getTimeKeyPartitionHour + timeStep = time.Hour + } else { + getTimeKey = getTimeKeyPartitionMinute + timeStep = time.Minute + } + return &s3Reader{ + listObjectsClient: listObjectsClient, + getObjectClient: getObjectClient, + s3Bucket: cfg.S3Downloader.S3Bucket, + s3Prefix: cfg.S3Downloader.S3Prefix, + filePrefix: cfg.S3Downloader.FilePrefix, + startTime: startTime, + endTime: endTime, + timeStep: timeStep, + getTimeKey: getTimeKey, + }, nil +} + +func (s3Reader *s3Reader) readAll(ctx context.Context, telemetryType string, dataCallback s3ReaderDataCallback) error { + for currentTime := s3Reader.startTime; currentTime.Before(s3Reader.endTime); currentTime = currentTime.Add(s3Reader.timeStep) { + select { + case <-ctx.Done(): + return nil + default: + if err := s3Reader.readTelemetryForTime(ctx, currentTime, telemetryType, dataCallback); err != nil { + return err + } + } + } + return nil +} + +func (s3Reader *s3Reader) readTelemetryForTime(ctx context.Context, t time.Time, telemetryType string, dataCallback s3ReaderDataCallback) error { + params := &s3.ListObjectsV2Input{ + Bucket: &s3Reader.s3Bucket, + } + prefix := s3Reader.getObjectPrefixForTime(t, telemetryType) + params.Prefix = &prefix + + p := s3Reader.listObjectsClient.NewListObjectsV2Paginator(params) + + for p.HasMorePages() { + page, err := p.NextPage(context.TODO()) + if err != nil { + return err + } + for _, obj := range page.Contents { + if data, err := s3Reader.retrieveObject(*obj.Key); err != nil { + return err + } else { + if err := dataCallback(ctx, *obj.Key, data); err != nil { + return err + } + } + } + } + return nil +} + +func (s3Reader *s3Reader) getObjectPrefixForTime(t time.Time, telemetryType string) string { + timeKey := s3Reader.getTimeKey(t) + if s3Reader.s3Prefix != "" { + return fmt.Sprintf("%s/%s/%s%s_", s3Reader.s3Prefix, timeKey, s3Reader.filePrefix, telemetryType) + } else { + return fmt.Sprintf("%s/%s%s_", timeKey, s3Reader.filePrefix, telemetryType) + } +} + +func (s3Reader *s3Reader) retrieveObject(key string) ([]byte, error) { + params := s3.GetObjectInput{ + Bucket: &s3Reader.s3Bucket, + Key: &key, + } + if output, err := s3Reader.getObjectClient.GetObject(context.TODO(), ¶ms); err != nil { + return nil, err + } else { + contents, err := io.ReadAll(output.Body) + if err != nil { + return nil, err + } + return contents, nil + } +} + +func getTimeKeyPartitionHour(t time.Time) string { + year, month, day := t.Date() + hour := t.Hour() + return fmt.Sprintf("year=%d/month=%02d/day=%02d/hour=%02d", year, month, day, hour) +} + +func getTimeKeyPartitionMinute(t time.Time) string { + year, month, day := t.Date() + hour, minute, _ := t.Clock() + return fmt.Sprintf("year=%d/month=%02d/day=%02d/hour=%02d/minute=%02d", year, month, day, hour, minute) +} diff --git a/receiver/awss3receiver/s3reader_test.go b/receiver/awss3receiver/s3reader_test.go new file mode 100644 index 000000000000..bd96f6c7163d --- /dev/null +++ b/receiver/awss3receiver/s3reader_test.go @@ -0,0 +1,166 @@ +package awss3receiver + +import ( + "bytes" + "context" + "fmt" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/require" + "io" + "testing" + "time" +) + +var testTime = time.Date(2021, 02, 01, 17, 32, 00, 00, time.UTC) + +func Test_getTimeKeyPartitionHour(t *testing.T) { + result := getTimeKeyPartitionHour(testTime) + require.Equal(t, "year=2021/month=02/day=01/hour=17", result) +} + +func Test_getTimeKeyPartitionMinute(t *testing.T) { + result := getTimeKeyPartitionMinute(testTime) + require.Equal(t, "year=2021/month=02/day=01/hour=17/minute=32", result) +} + +func Test_s3Reader_getObjectPrefixForTime(t *testing.T) { + type args struct { + s3Prefix string + filePrefix string + getTimeKey func(t time.Time) string + telemetryType string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "hour, prefix and file prefix", + args: args{ + s3Prefix: "prefix", + filePrefix: "file", + getTimeKey: getTimeKeyPartitionHour, + telemetryType: "traces", + }, + want: "prefix/year=2021/month=02/day=01/hour=17/filetraces_", + }, + { + name: "minute, prefix and file prefix", + args: args{ + s3Prefix: "prefix", + filePrefix: "file", + getTimeKey: getTimeKeyPartitionMinute, + telemetryType: "metrics", + }, + want: "prefix/year=2021/month=02/day=01/hour=17/minute=32/filemetrics_", + }, + { + name: "hour, prefix and no file prefix", + args: args{ + s3Prefix: "prefix", + filePrefix: "", + getTimeKey: getTimeKeyPartitionHour, + telemetryType: "logs", + }, + want: "prefix/year=2021/month=02/day=01/hour=17/logs_", + }, + { + name: "minute, no prefix and no file prefix", + args: args{ + s3Prefix: "", + filePrefix: "", + getTimeKey: getTimeKeyPartitionMinute, + telemetryType: "metrics", + }, + want: "year=2021/month=02/day=01/hour=17/minute=32/metrics_", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reader := s3Reader{ + s3Prefix: test.args.s3Prefix, + filePrefix: test.args.filePrefix, + getTimeKey: test.args.getTimeKey, + } + result := reader.getObjectPrefixForTime(testTime, test.args.telemetryType) + require.Equal(t, test.want, result) + }) + } +} + +type mockGetObjectAPI func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + +func (m mockGetObjectAPI) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + return m(ctx, params, optFns...) +} + +type mockListObjectsAPI func(params *s3.ListObjectsV2Input) ListObjectsV2Pager + +func (m mockListObjectsAPI) NewListObjectsV2Paginator(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + return m(params) +} + +type mockListObjectsV2Pager struct { + PageNum int + Pages []*s3.ListObjectsV2Output +} + +func (m *mockListObjectsV2Pager) HasMorePages() bool { + return m.PageNum < len(m.Pages) +} + +func (m *mockListObjectsV2Pager) NextPage(ctx context.Context, f ...func(*s3.Options)) (output *s3.ListObjectsV2Output, err error) { + if m.PageNum >= len(m.Pages) { + return nil, fmt.Errorf("no more pages") + } + output = m.Pages[m.PageNum] + m.PageNum++ + return output, nil +} + +func Test_readTelemetryForTime(t *testing.T) { + testKey := "year=2021/month=02/day=01/hour=17/minute=32/traces_1" + reader := s3Reader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, "year=2021/month=02/day=01/hour=17/minute=32/traces_", *params.Prefix) + + return &mockListObjectsV2Pager{ + Pages: []*s3.ListObjectsV2Output{ + { + Contents: []types.Object{ + { + Key: &testKey, + }, + }, + }, + }, + } + }), + getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }), + s3Bucket: "bucket", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute), + timeStep: time.Minute, + getTimeKey: getTimeKeyPartitionMinute, + } + + err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(ctx context.Context, key string, data []byte) error { + require.Equal(t, testKey, key) + require.Equal(t, "this is the body of the object", string(data)) + return nil + }) + require.NoError(t, err) +} From 3237ff3520a5c107f6b94316ac661716321cde19 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Sat, 6 Apr 2024 22:08:06 +0100 Subject: [PATCH 02/20] Add more tests for readTelemetryForTime and readAll --- receiver/awss3receiver/s3reader_test.go | 195 +++++++++++++++++++++++- 1 file changed, 194 insertions(+), 1 deletion(-) diff --git a/receiver/awss3receiver/s3reader_test.go b/receiver/awss3receiver/s3reader_test.go index bd96f6c7163d..81871219666d 100644 --- a/receiver/awss3receiver/s3reader_test.go +++ b/receiver/awss3receiver/s3reader_test.go @@ -3,9 +3,11 @@ package awss3receiver import ( "bytes" "context" + "errors" "fmt" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/require" "io" "testing" @@ -105,6 +107,7 @@ func (m mockListObjectsAPI) NewListObjectsV2Paginator(params *s3.ListObjectsV2In type mockListObjectsV2Pager struct { PageNum int Pages []*s3.ListObjectsV2Output + Error error } func (m *mockListObjectsV2Pager) HasMorePages() bool { @@ -112,6 +115,10 @@ func (m *mockListObjectsV2Pager) HasMorePages() bool { } func (m *mockListObjectsV2Pager) NextPage(ctx context.Context, f ...func(*s3.Options)) (output *s3.ListObjectsV2Output, err error) { + if m.Error != nil { + return nil, m.Error + } + if m.PageNum >= len(m.Pages) { return nil, fmt.Errorf("no more pages") } @@ -121,6 +128,108 @@ func (m *mockListObjectsV2Pager) NextPage(ctx context.Context, f ...func(*s3.Opt } func Test_readTelemetryForTime(t *testing.T) { + testKey_1 := "year=2021/month=02/day=01/hour=17/minute=32/traces_1" + testKey_2 := "year=2021/month=02/day=01/hour=17/minute=32/traces_2" + reader := s3Reader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, "year=2021/month=02/day=01/hour=17/minute=32/traces_", *params.Prefix) + + return &mockListObjectsV2Pager{ + Pages: []*s3.ListObjectsV2Output{ + { + Contents: []types.Object{ + { + Key: &testKey_1, + }, + }, + }, + { + Contents: []types.Object{ + { + Key: &testKey_2, + }, + }, + }, + }, + } + }), + getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Contains(t, []string{testKey_1, testKey_2}, *params.Key) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }), + s3Bucket: "bucket", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute), + timeStep: time.Minute, + getTimeKey: getTimeKeyPartitionMinute, + } + + dataCallbackKeys := make([]string, 0) + + err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(ctx context.Context, key string, data []byte) error { + t.Helper() + require.Equal(t, "this is the body of the object", string(data)) + dataCallbackKeys = append(dataCallbackKeys, key) + return nil + }) + require.Contains(t, dataCallbackKeys, testKey_1) + require.Contains(t, dataCallbackKeys, testKey_2) + require.NoError(t, err) +} + +func Test_readTelemetryForTime_GetObjectError(t *testing.T) { + testKey := "year=2021/month=02/day=01/hour=17/minute=32/traces_1" + testError := errors.New("test error") + reader := s3Reader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, "year=2021/month=02/day=01/hour=17/minute=32/traces_", *params.Prefix) + + return &mockListObjectsV2Pager{ + Pages: []*s3.ListObjectsV2Output{ + { + Contents: []types.Object{ + { + Key: &testKey, + }, + }, + }, + }, + } + }), + getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + return nil, testError + }), + s3Bucket: "bucket", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute), + timeStep: time.Minute, + getTimeKey: getTimeKeyPartitionMinute, + } + + err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(ctx context.Context, key string, data []byte) error { + t.Helper() + t.Fail() + return nil + }) + require.Error(t, err, "test error") +} + +func Test_readTelemetryForTime_ListObjectsNoResults(t *testing.T) { testKey := "year=2021/month=02/day=01/hour=17/minute=32/traces_1" reader := s3Reader{ listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { @@ -128,7 +237,44 @@ func Test_readTelemetryForTime(t *testing.T) { require.Equal(t, "bucket", *params.Bucket) require.Equal(t, "year=2021/month=02/day=01/hour=17/minute=32/traces_", *params.Prefix) + return &mockListObjectsV2Pager{} + }), + getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }), + s3Bucket: "bucket", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute), + timeStep: time.Minute, + getTimeKey: getTimeKeyPartitionMinute, + } + + err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(ctx context.Context, key string, data []byte) error { + t.Helper() + t.Fail() + return nil + }) + require.NoError(t, err) +} + +func Test_readTelemetryForTime_NextPageError(t *testing.T) { + testKey := "year=2021/month=02/day=01/hour=17/minute=32/traces_1" + testError := errors.New("test page error") + reader := s3Reader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, "year=2021/month=02/day=01/hour=17/minute=32/traces_", *params.Prefix) + return &mockListObjectsV2Pager{ + Error: testError, Pages: []*s3.ListObjectsV2Output{ { Contents: []types.Object{ @@ -158,9 +304,56 @@ func Test_readTelemetryForTime(t *testing.T) { } err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(ctx context.Context, key string, data []byte) error { - require.Equal(t, testKey, key) + t.Helper() + t.Fail() + return nil + }) + require.Error(t, err) +} + +func Test_readAll(t *testing.T) { + reader := s3Reader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + key := fmt.Sprintf("%s%s", *params.Prefix, "1") + return &mockListObjectsV2Pager{ + Pages: []*s3.ListObjectsV2Output{ + { + Contents: []types.Object{ + { + Key: &key, + }, + }, + }, + }, + } + }), + getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }), + s3Bucket: "bucket", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute * 2), + timeStep: time.Minute, + getTimeKey: getTimeKeyPartitionMinute, + } + + dataCallbackKeys := make([]string, 0) + + err := reader.readAll(context.Background(), "traces", func(ctx context.Context, key string, data []byte) error { + t.Helper() require.Equal(t, "this is the body of the object", string(data)) + dataCallbackKeys = append(dataCallbackKeys, key) return nil }) require.NoError(t, err) + require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=32/traces_1") + require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=33/traces_1") } From f51c7b2d20e0ceb956a93f0fbe96bdf56815a4f1 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Sat, 6 Apr 2024 22:08:39 +0100 Subject: [PATCH 03/20] rename awss3Receiver to awss3TraceReceiver --- receiver/awss3receiver/receiver.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index a898352c50d2..6a005c7213dc 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -12,19 +12,19 @@ import ( "strings" ) -type awss3Receiver struct { +type awss3TraceReceiver struct { s3Reader *s3Reader consumer consumer.Traces logger *zap.Logger cancel context.CancelFunc } -func newAWSS3TraceReceiver(cfg *Config, traces consumer.Traces, logger *zap.Logger) (*awss3Receiver, error) { +func newAWSS3TraceReceiver(cfg *Config, traces consumer.Traces, logger *zap.Logger) (*awss3TraceReceiver, error) { reader, err := newS3Reader(cfg) if err != nil { return nil, err } - return &awss3Receiver{ + return &awss3TraceReceiver{ s3Reader: reader, consumer: traces, logger: logger, @@ -32,7 +32,7 @@ func newAWSS3TraceReceiver(cfg *Config, traces consumer.Traces, logger *zap.Logg }, nil } -func (r *awss3Receiver) Start(ctx context.Context, _ component.Host) error { +func (r *awss3TraceReceiver) Start(ctx context.Context, _ component.Host) error { ctx, r.cancel = context.WithCancel(ctx) go func() { _ = r.s3Reader.readAll(ctx, "traces", r.receiveBytes) @@ -40,14 +40,14 @@ func (r *awss3Receiver) Start(ctx context.Context, _ component.Host) error { return nil } -func (r *awss3Receiver) Shutdown(_ context.Context) error { +func (r *awss3TraceReceiver) Shutdown(_ context.Context) error { if r.cancel != nil { r.cancel() } return nil } -func (r *awss3Receiver) receiveBytes(ctx context.Context, key string, data []byte) error { +func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data []byte) error { var unmarshaler ptrace.Unmarshaler if strings.HasSuffix(key, ".json") { unmarshaler = &ptrace.JSONUnmarshaler{} From cb06ce7527ef7a7734bdf5f9c830867b2cf80369 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Sun, 7 Apr 2024 18:08:45 +0100 Subject: [PATCH 04/20] finish unit tests --- receiver/awss3receiver/factory_test.go | 24 ----- receiver/awss3receiver/go.mod | 1 + receiver/awss3receiver/go.sum | 2 + receiver/awss3receiver/metadata.yaml | 4 + receiver/awss3receiver/receiver.go | 15 +++ receiver/awss3receiver/receiver_test.go | 118 ++++++++++++++++++++++++ receiver/awss3receiver/s3reader_test.go | 46 +++++++++ 7 files changed, 186 insertions(+), 24 deletions(-) delete mode 100644 receiver/awss3receiver/factory_test.go create mode 100644 receiver/awss3receiver/receiver_test.go diff --git a/receiver/awss3receiver/factory_test.go b/receiver/awss3receiver/factory_test.go deleted file mode 100644 index 18620f939462..000000000000 --- a/receiver/awss3receiver/factory_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package awss3receiver - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver/receivertest" -) - -func TestNewFactory(t *testing.T) { - f := NewFactory() - _, err := f.CreateTracesReceiver( - context.Background(), - receivertest.NewNopCreateSettings(), - f.CreateDefaultConfig(), - consumertest.NewNop(), - ) - require.NoError(t, err) -} diff --git a/receiver/awss3receiver/go.mod b/receiver/awss3receiver/go.mod index a8d715d67b2b..5498121572a4 100644 --- a/receiver/awss3receiver/go.mod +++ b/receiver/awss3receiver/go.mod @@ -12,6 +12,7 @@ require ( go.opentelemetry.io/collector/consumer v0.97.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/collector/receiver v0.97.1-0.20240404121116-4f1a8936d26b + go.opentelemetry.io/collector/semconv v0.97.0 go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/goleak v1.3.0 diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index d06affb9b699..7b9f97b3141d 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -114,6 +114,8 @@ go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b h1:HQqz go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b/go.mod h1:TYj8aKRWZyT/KuKQXKyqSEvK/GV+slFaDMEI+Ke64Yw= go.opentelemetry.io/collector/receiver v0.97.1-0.20240404121116-4f1a8936d26b h1:d9xejxpSk5O46aM1X5nUb1qGQl1ToGQJy39csqnYl7c= go.opentelemetry.io/collector/receiver v0.97.1-0.20240404121116-4f1a8936d26b/go.mod h1:oj/eoc8Wf9u82gaPeRVdHmFbJ5e3m5F1v5CFTpjiVFU= +go.opentelemetry.io/collector/semconv v0.97.0 h1:iF3nTfThbiOwz7o5Pocn0dDnDoffd18ijDuf6Mwzi1s= +go.opentelemetry.io/collector/semconv v0.97.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ= diff --git a/receiver/awss3receiver/metadata.yaml b/receiver/awss3receiver/metadata.yaml index 24677eb5fa57..5bfa60a318b5 100644 --- a/receiver/awss3receiver/metadata.yaml +++ b/receiver/awss3receiver/metadata.yaml @@ -7,3 +7,7 @@ status: distributions: [] codeowners: active: [atoulme, adcharre] +tests: + config: + starttime: "2024-01-31" + endtime: "2024-02-03" diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index 6a005c7213dc..93abfe951787 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -4,11 +4,14 @@ package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver" import ( + "bytes" + "compress/gzip" "context" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" + "io" "strings" ) @@ -49,6 +52,18 @@ func (r *awss3TraceReceiver) Shutdown(_ context.Context) error { func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data []byte) error { var unmarshaler ptrace.Unmarshaler + if strings.HasSuffix(key, ".gz") { + if reader, err := gzip.NewReader(bytes.NewReader(data)); err != nil { + return err + } else { + key = strings.TrimSuffix(key, ".gz") + data, err = io.ReadAll(reader) + if err != nil { + return err + } + } + } + if strings.HasSuffix(key, ".json") { unmarshaler = &ptrace.JSONUnmarshaler{} } diff --git a/receiver/awss3receiver/receiver_test.go b/receiver/awss3receiver/receiver_test.go new file mode 100644 index 000000000000..0fd25d2fc66c --- /dev/null +++ b/receiver/awss3receiver/receiver_test.go @@ -0,0 +1,118 @@ +package awss3receiver + +import ( + "bytes" + "compress/gzip" + "context" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.22.0" + "testing" +) + +func generateTraceData() ptrace.Traces { + td := ptrace.NewTraces() + rs := td.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test") + span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) + span.SetTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 7, 6, 5, 4, 3, 2, 1, 0}) + span.SetStartTimestamp(1581452772000000000) + span.SetEndTimestamp(1581452773000000000) + return td +} + +func gzipCompress(data []byte) []byte { + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + _, _ = gz.Write(data) + _ = gz.Close() + return buf.Bytes() +} + +func Test_receiveBytes(t *testing.T) { + testTrace := generateTraceData() + + jsonTrace, err := (&ptrace.JSONMarshaler{}).MarshalTraces(testTrace) + require.NoError(t, err) + protobufTrace, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(testTrace) + require.NoError(t, err) + + type args struct { + key string + data []byte + } + tests := []struct { + name string + args args + wantErr bool + wantTrace bool + }{ + { + name: ".json", + args: args{ + key: "test.json", + data: jsonTrace, + }, + wantErr: false, + wantTrace: true, + }, + { + name: ".binpb", + args: args{ + key: "test.binpb", + data: protobufTrace, + }, + wantErr: false, + wantTrace: true, + }, + { + name: ".unknown", + args: args{ + key: "test.unknown", + data: []byte("unknown"), + }, + wantErr: false, + wantTrace: false, + }, + { + name: ".json.gz", + args: args{ + key: "test.json.gz", + data: gzipCompress(jsonTrace), + }, + wantErr: false, + wantTrace: true, + }, + { + name: ".binpb.gz", + args: args{ + key: "test.binpb.gz", + data: gzipCompress(protobufTrace), + }, + wantErr: false, + wantTrace: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tracesConsumer, _ := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + t.Helper() + if !tt.wantTrace { + t.Errorf("receiveBytes() received unexpected trace") + } else { + require.Equal(t, testTrace, td) + } + return nil + }) + r := &awss3TraceReceiver{ + consumer: tracesConsumer, + } + if err := r.receiveBytes(context.Background(), tt.args.key, tt.args.data); (err != nil) != tt.wantErr { + t.Errorf("receiveBytes() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/receiver/awss3receiver/s3reader_test.go b/receiver/awss3receiver/s3reader_test.go index 81871219666d..b84d7bd1a8ae 100644 --- a/receiver/awss3receiver/s3reader_test.go +++ b/receiver/awss3receiver/s3reader_test.go @@ -357,3 +357,49 @@ func Test_readAll(t *testing.T) { require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=32/traces_1") require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=33/traces_1") } + +func Test_readAll_ContextDone(t *testing.T) { + reader := s3Reader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + key := fmt.Sprintf("%s%s", *params.Prefix, "1") + return &mockListObjectsV2Pager{ + Pages: []*s3.ListObjectsV2Output{ + { + Contents: []types.Object{ + { + Key: &key, + }, + }, + }, + }, + } + }), + getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }), + s3Bucket: "bucket", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute * 2), + timeStep: time.Minute, + getTimeKey: getTimeKeyPartitionMinute, + } + + dataCallbackKeys := make([]string, 0) + ctx, cancelFunc := context.WithCancel(context.Background()) + cancelFunc() + err := reader.readAll(ctx, "traces", func(ctx context.Context, key string, data []byte) error { + t.Helper() + dataCallbackKeys = append(dataCallbackKeys, key) + return nil + }) + require.NoError(t, err) + require.Len(t, dataCallbackKeys, 0) +} From e9ac872903e9386a4239edf72177f3d5f9708c82 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Sun, 7 Apr 2024 18:12:34 +0100 Subject: [PATCH 05/20] Create awss3receiver_impl_mvp.yaml --- .chloggen/awss3receiver_impl_mvp.yaml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/awss3receiver_impl_mvp.yaml diff --git a/.chloggen/awss3receiver_impl_mvp.yaml b/.chloggen/awss3receiver_impl_mvp.yaml new file mode 100644 index 000000000000..ef72c41495fb --- /dev/null +++ b/.chloggen/awss3receiver_impl_mvp.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awss3receiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Initial implementation of the AWS S3 receiver." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30750] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From cf8e3a1555bf63d71c90b85935c1e5505de31c70 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Mon, 8 Apr 2024 16:20:18 +0100 Subject: [PATCH 06/20] package dependency update --- receiver/awss3receiver/go.mod | 8 +++++--- receiver/awss3receiver/go.sum | 21 +++++++++++++++------ receiver/awss3receiver/s3intf.go | 4 ++++ 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/receiver/awss3receiver/go.mod b/receiver/awss3receiver/go.mod index 5498121572a4..b384abe399fe 100644 --- a/receiver/awss3receiver/go.mod +++ b/receiver/awss3receiver/go.mod @@ -4,7 +4,8 @@ go 1.21 require ( github.com/aws/aws-sdk-go-v2 v1.26.1 - github.com/aws/aws-sdk-go-v2/config v1.27.10 + github.com/aws/aws-sdk-go-v2/config v1.27.11 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15 github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.97.1-0.20240404121116-4f1a8936d26b @@ -22,7 +23,7 @@ require ( require ( github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.17.10 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.11 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect @@ -32,7 +33,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.20.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect github.com/aws/smithy-go v1.20.2 // indirect @@ -45,6 +46,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index 7b9f97b3141d..6b6039fe8275 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -2,12 +2,14 @@ github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+ github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg= -github.com/aws/aws-sdk-go-v2/config v1.27.10 h1:PS+65jThT0T/snC5WjyfHHyUgG+eBoupSDV+f838cro= -github.com/aws/aws-sdk-go-v2/config v1.27.10/go.mod h1:BePM7Vo4OBpHreKRUMuDXX+/+JWP38FLkzl5m27/Jjs= -github.com/aws/aws-sdk-go-v2/credentials v1.17.10 h1:qDZ3EA2lv1KangvQB6y258OssCHD0xvaGiEDkG4X/10= -github.com/aws/aws-sdk-go-v2/credentials v1.17.10/go.mod h1:6t3sucOaYDwDssHQa0ojH1RpmVmF5/jArkye1b2FKMI= +github.com/aws/aws-sdk-go-v2/config v1.27.11 h1:f47rANd2LQEYHda2ddSCKYId18/8BhSRM4BULGmfgNA= +github.com/aws/aws-sdk-go-v2/config v1.27.11/go.mod h1:SMsV78RIOYdve1vf36z8LmnszlRWkwMQtomCAI0/mIE= +github.com/aws/aws-sdk-go-v2/credentials v1.17.11 h1:YuIB1dJNf1Re822rriUOTxopaHHvIq0l/pX3fwO+Tzs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.11/go.mod h1:AQtFPsDH9bI2O+71anW6EKL+NcD7LG3dpKGMV4SShgo= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15 h1:7Zwtt/lP3KNRkeZre7soMELMGNoBrutx8nobg1jKWmo= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15/go.mod h1:436h2adoHb57yd+8W+gYPrrA9U/R/SuAuOO42Ushzhw= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= @@ -26,8 +28,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 h1:f9RyWNtS8oH7cZ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5/go.mod h1:h5CoMZV2VF297/VLhRhO1WF+XYWOzXo+4HsObA4HjBQ= github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 h1:6cnno47Me9bRykw9AEv9zkXE+5or7jz8TsskTTccbgc= github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1/go.mod h1:qmdkIIAC+GCLASF7R2whgNrJADz0QZPX+Seiw/i4S3o= -github.com/aws/aws-sdk-go-v2/service/sso v1.20.4 h1:WzFol5Cd+yDxPAdnzTA5LmpHYSWinhmSj4rQChV0ee8= -github.com/aws/aws-sdk-go-v2/service/sso v1.20.4/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 h1:vN8hEbpRnL7+Hopy9dzmRle1xmDc7o8tmY0klsr175w= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.5/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2KCuY61kzoCpvtvJJBtOE= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= @@ -59,6 +61,10 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -176,5 +182,8 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/receiver/awss3receiver/s3intf.go b/receiver/awss3receiver/s3intf.go index ee030f0e1ae9..6d2a794625cb 100644 --- a/receiver/awss3receiver/s3intf.go +++ b/receiver/awss3receiver/s3intf.go @@ -7,10 +7,13 @@ import ( "context" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "log" ) +var downloadManager *manager.Downloader + type ListObjectsV2Pager interface { HasMorePages() bool NextPage(context.Context, ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) @@ -56,6 +59,7 @@ func newS3Client(cfg S3DownloaderConfig) (ListObjectsAPI, GetObjectAPI, error) { }) } client := s3.NewFromConfig(awsCfg, s3OptionFuncs...) + return &s3ListObjectsAPIImpl{client: client}, client, nil } From e1f06e86d84e78b6107b9224f3a9e6b64195408e Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Mon, 8 Apr 2024 17:24:13 +0100 Subject: [PATCH 07/20] Fix lint issues --- receiver/awss3receiver/receiver.go | 19 ++++----- receiver/awss3receiver/receiver_test.go | 10 +++-- receiver/awss3receiver/s3intf.go | 7 ++-- receiver/awss3receiver/s3reader.go | 30 +++++++------- receiver/awss3receiver/s3reader_test.go | 53 +++++++++++++------------ 5 files changed, 64 insertions(+), 55 deletions(-) diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index 93abfe951787..9b47250935a9 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -7,12 +7,13 @@ import ( "bytes" "compress/gzip" "context" + "io" + "strings" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" - "io" - "strings" ) type awss3TraceReceiver struct { @@ -53,14 +54,14 @@ func (r *awss3TraceReceiver) Shutdown(_ context.Context) error { func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data []byte) error { var unmarshaler ptrace.Unmarshaler if strings.HasSuffix(key, ".gz") { - if reader, err := gzip.NewReader(bytes.NewReader(data)); err != nil { + reader, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return err + } + key = strings.TrimSuffix(key, ".gz") + data, err = io.ReadAll(reader) + if err != nil { return err - } else { - key = strings.TrimSuffix(key, ".gz") - data, err = io.ReadAll(reader) - if err != nil { - return err - } } } diff --git a/receiver/awss3receiver/receiver_test.go b/receiver/awss3receiver/receiver_test.go index 0fd25d2fc66c..7296a93829aa 100644 --- a/receiver/awss3receiver/receiver_test.go +++ b/receiver/awss3receiver/receiver_test.go @@ -1,14 +1,18 @@ -package awss3receiver +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver" import ( "bytes" "compress/gzip" "context" + "testing" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.22.0" - "testing" ) func generateTraceData() ptrace.Traces { @@ -98,7 +102,7 @@ func Test_receiveBytes(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tracesConsumer, _ := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + tracesConsumer, _ := consumer.NewTraces(func(_ context.Context, td ptrace.Traces) error { t.Helper() if !tt.wantTrace { t.Errorf("receiveBytes() received unexpected trace") diff --git a/receiver/awss3receiver/s3intf.go b/receiver/awss3receiver/s3intf.go index 6d2a794625cb..56411a145864 100644 --- a/receiver/awss3receiver/s3intf.go +++ b/receiver/awss3receiver/s3intf.go @@ -5,14 +5,15 @@ package awss3receiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" + "log" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" - "log" ) -var downloadManager *manager.Downloader +var downloadManager *manager.Downloader //nolint:golint,unused type ListObjectsV2Pager interface { HasMorePages() bool @@ -38,7 +39,7 @@ func newS3Client(cfg S3DownloaderConfig) (ListObjectsAPI, GetObjectAPI, error) { } if cfg.Endpoint != "" { - customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + customResolver := aws.EndpointResolverWithOptionsFunc(func(_, _ string, _ ...any) (aws.Endpoint, error) { return aws.Endpoint{ PartitionID: "aws", URL: cfg.Endpoint, diff --git a/receiver/awss3receiver/s3reader.go b/receiver/awss3receiver/s3reader.go index 2e4d10962d09..80cbfc3cde3f 100644 --- a/receiver/awss3receiver/s3reader.go +++ b/receiver/awss3receiver/s3reader.go @@ -6,9 +6,10 @@ package awss3receiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" "fmt" - "github.com/aws/aws-sdk-go-v2/service/s3" "io" "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" ) type s3Reader struct { @@ -90,12 +91,12 @@ func (s3Reader *s3Reader) readTelemetryForTime(ctx context.Context, t time.Time, return err } for _, obj := range page.Contents { - if data, err := s3Reader.retrieveObject(*obj.Key); err != nil { + data, err := s3Reader.retrieveObject(*obj.Key) + if err != nil { + return err + } + if err := dataCallback(ctx, *obj.Key, data); err != nil { return err - } else { - if err := dataCallback(ctx, *obj.Key, data); err != nil { - return err - } } } } @@ -106,9 +107,8 @@ func (s3Reader *s3Reader) getObjectPrefixForTime(t time.Time, telemetryType stri timeKey := s3Reader.getTimeKey(t) if s3Reader.s3Prefix != "" { return fmt.Sprintf("%s/%s/%s%s_", s3Reader.s3Prefix, timeKey, s3Reader.filePrefix, telemetryType) - } else { - return fmt.Sprintf("%s/%s%s_", timeKey, s3Reader.filePrefix, telemetryType) } + return fmt.Sprintf("%s/%s%s_", timeKey, s3Reader.filePrefix, telemetryType) } func (s3Reader *s3Reader) retrieveObject(key string) ([]byte, error) { @@ -116,15 +116,15 @@ func (s3Reader *s3Reader) retrieveObject(key string) ([]byte, error) { Bucket: &s3Reader.s3Bucket, Key: &key, } - if output, err := s3Reader.getObjectClient.GetObject(context.TODO(), ¶ms); err != nil { + output, err := s3Reader.getObjectClient.GetObject(context.TODO(), ¶ms) + if err != nil { + return nil, err + } + contents, err := io.ReadAll(output.Body) + if err != nil { return nil, err - } else { - contents, err := io.ReadAll(output.Body) - if err != nil { - return nil, err - } - return contents, nil } + return contents, nil } func getTimeKeyPartitionHour(t time.Time) string { diff --git a/receiver/awss3receiver/s3reader_test.go b/receiver/awss3receiver/s3reader_test.go index b84d7bd1a8ae..81572c0e7663 100644 --- a/receiver/awss3receiver/s3reader_test.go +++ b/receiver/awss3receiver/s3reader_test.go @@ -1,17 +1,20 @@ -package awss3receiver +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver" import ( "bytes" "context" "errors" "fmt" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" - - "github.com/stretchr/testify/require" "io" "testing" "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/require" ) var testTime = time.Date(2021, 02, 01, 17, 32, 00, 00, time.UTC) @@ -114,7 +117,7 @@ func (m *mockListObjectsV2Pager) HasMorePages() bool { return m.PageNum < len(m.Pages) } -func (m *mockListObjectsV2Pager) NextPage(ctx context.Context, f ...func(*s3.Options)) (output *s3.ListObjectsV2Output, err error) { +func (m *mockListObjectsV2Pager) NextPage(_ context.Context, _ ...func(*s3.Options)) (output *s3.ListObjectsV2Output, err error) { if m.Error != nil { return nil, m.Error } @@ -128,8 +131,8 @@ func (m *mockListObjectsV2Pager) NextPage(ctx context.Context, f ...func(*s3.Opt } func Test_readTelemetryForTime(t *testing.T) { - testKey_1 := "year=2021/month=02/day=01/hour=17/minute=32/traces_1" - testKey_2 := "year=2021/month=02/day=01/hour=17/minute=32/traces_2" + testKey1 := "year=2021/month=02/day=01/hour=17/minute=32/traces_1" + testKey2 := "year=2021/month=02/day=01/hour=17/minute=32/traces_2" reader := s3Reader{ listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { t.Helper() @@ -141,24 +144,24 @@ func Test_readTelemetryForTime(t *testing.T) { { Contents: []types.Object{ { - Key: &testKey_1, + Key: &testKey1, }, }, }, { Contents: []types.Object{ { - Key: &testKey_2, + Key: &testKey2, }, }, }, }, } }), - getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { t.Helper() require.Equal(t, "bucket", *params.Bucket) - require.Contains(t, []string{testKey_1, testKey_2}, *params.Key) + require.Contains(t, []string{testKey1, testKey2}, *params.Key) return &s3.GetObjectOutput{ Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), }, nil @@ -174,14 +177,14 @@ func Test_readTelemetryForTime(t *testing.T) { dataCallbackKeys := make([]string, 0) - err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(ctx context.Context, key string, data []byte) error { + err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(_ context.Context, key string, data []byte) error { t.Helper() require.Equal(t, "this is the body of the object", string(data)) dataCallbackKeys = append(dataCallbackKeys, key) return nil }) - require.Contains(t, dataCallbackKeys, testKey_1) - require.Contains(t, dataCallbackKeys, testKey_2) + require.Contains(t, dataCallbackKeys, testKey1) + require.Contains(t, dataCallbackKeys, testKey2) require.NoError(t, err) } @@ -206,7 +209,7 @@ func Test_readTelemetryForTime_GetObjectError(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { t.Helper() require.Equal(t, "bucket", *params.Bucket) require.Equal(t, testKey, *params.Key) @@ -221,7 +224,7 @@ func Test_readTelemetryForTime_GetObjectError(t *testing.T) { getTimeKey: getTimeKeyPartitionMinute, } - err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(ctx context.Context, key string, data []byte) error { + err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(_ context.Context, _ string, _ []byte) error { t.Helper() t.Fail() return nil @@ -239,7 +242,7 @@ func Test_readTelemetryForTime_ListObjectsNoResults(t *testing.T) { return &mockListObjectsV2Pager{} }), - getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { t.Helper() require.Equal(t, "bucket", *params.Bucket) require.Equal(t, testKey, *params.Key) @@ -256,7 +259,7 @@ func Test_readTelemetryForTime_ListObjectsNoResults(t *testing.T) { getTimeKey: getTimeKeyPartitionMinute, } - err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(ctx context.Context, key string, data []byte) error { + err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(_ context.Context, _ string, _ []byte) error { t.Helper() t.Fail() return nil @@ -286,7 +289,7 @@ func Test_readTelemetryForTime_NextPageError(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { t.Helper() require.Equal(t, "bucket", *params.Bucket) require.Equal(t, testKey, *params.Key) @@ -303,7 +306,7 @@ func Test_readTelemetryForTime_NextPageError(t *testing.T) { getTimeKey: getTimeKeyPartitionMinute, } - err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(ctx context.Context, key string, data []byte) error { + err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(_ context.Context, _ string, _ []byte) error { t.Helper() t.Fail() return nil @@ -329,7 +332,7 @@ func Test_readAll(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { t.Helper() require.Equal(t, "bucket", *params.Bucket) return &s3.GetObjectOutput{ @@ -347,7 +350,7 @@ func Test_readAll(t *testing.T) { dataCallbackKeys := make([]string, 0) - err := reader.readAll(context.Background(), "traces", func(ctx context.Context, key string, data []byte) error { + err := reader.readAll(context.Background(), "traces", func(_ context.Context, key string, data []byte) error { t.Helper() require.Equal(t, "this is the body of the object", string(data)) dataCallbackKeys = append(dataCallbackKeys, key) @@ -376,7 +379,7 @@ func Test_readAll_ContextDone(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { t.Helper() require.Equal(t, "bucket", *params.Bucket) return &s3.GetObjectOutput{ @@ -395,7 +398,7 @@ func Test_readAll_ContextDone(t *testing.T) { dataCallbackKeys := make([]string, 0) ctx, cancelFunc := context.WithCancel(context.Background()) cancelFunc() - err := reader.readAll(ctx, "traces", func(ctx context.Context, key string, data []byte) error { + err := reader.readAll(ctx, "traces", func(_ context.Context, key string, _ []byte) error { t.Helper() dataCallbackKeys = append(dataCallbackKeys, key) return nil From edd33f27129d71c66dc36db497613ba24923b49b Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Mon, 8 Apr 2024 17:25:37 +0100 Subject: [PATCH 08/20] Fix package version issue --- receiver/awss3receiver/go.mod | 2 +- receiver/awss3receiver/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/receiver/awss3receiver/go.mod b/receiver/awss3receiver/go.mod index b384abe399fe..a3a2760d0205 100644 --- a/receiver/awss3receiver/go.mod +++ b/receiver/awss3receiver/go.mod @@ -13,7 +13,7 @@ require ( go.opentelemetry.io/collector/consumer v0.97.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/collector/receiver v0.97.1-0.20240404121116-4f1a8936d26b - go.opentelemetry.io/collector/semconv v0.97.0 + go.opentelemetry.io/collector/semconv v0.97.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/goleak v1.3.0 diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index 6b6039fe8275..24395ad3a910 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -120,8 +120,8 @@ go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b h1:HQqz go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b/go.mod h1:TYj8aKRWZyT/KuKQXKyqSEvK/GV+slFaDMEI+Ke64Yw= go.opentelemetry.io/collector/receiver v0.97.1-0.20240404121116-4f1a8936d26b h1:d9xejxpSk5O46aM1X5nUb1qGQl1ToGQJy39csqnYl7c= go.opentelemetry.io/collector/receiver v0.97.1-0.20240404121116-4f1a8936d26b/go.mod h1:oj/eoc8Wf9u82gaPeRVdHmFbJ5e3m5F1v5CFTpjiVFU= -go.opentelemetry.io/collector/semconv v0.97.0 h1:iF3nTfThbiOwz7o5Pocn0dDnDoffd18ijDuf6Mwzi1s= -go.opentelemetry.io/collector/semconv v0.97.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= +go.opentelemetry.io/collector/semconv v0.97.1-0.20240404121116-4f1a8936d26b h1:2ApIgbCJPzABy6TDKlc9b55J/zo6ixAIMPvIUC2nB9U= +go.opentelemetry.io/collector/semconv v0.97.1-0.20240404121116-4f1a8936d26b/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ= From d444c126add7cbfb6cb18dafe1e57858d86572c4 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Tue, 9 Apr 2024 17:07:25 +0100 Subject: [PATCH 09/20] Improve start/end time parsing error messages --- receiver/awss3receiver/config.go | 21 ++++++++++++--------- receiver/awss3receiver/config_test.go | 4 ++-- receiver/awss3receiver/s3reader.go | 4 ++-- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/receiver/awss3receiver/config.go b/receiver/awss3receiver/config.go index 06bd9144fa04..02121a419f3f 100644 --- a/receiver/awss3receiver/config.go +++ b/receiver/awss3receiver/config.go @@ -5,6 +5,8 @@ package awss3receiver // import "github.com/open-telemetry/opentelemetry-collect import ( "errors" + "fmt" + "strings" "time" "go.opentelemetry.io/collector/component" @@ -45,28 +47,29 @@ func (c Config) Validate() error { errs = multierr.Append(errs, errors.New("bucket is required")) } if c.StartTime == "" { - errs = multierr.Append(errs, errors.New("start time is required")) + errs = multierr.Append(errs, errors.New("starttime is required")) } else { - if _, err := parseTime(c.StartTime); err != nil { - errs = multierr.Append(errs, errors.New("unable to parse start date")) + if _, err := parseTime(c.StartTime, "starttime"); err != nil { + errs = multierr.Append(errs, err) } } if c.EndTime == "" { - errs = multierr.Append(errs, errors.New("end time is required")) + errs = multierr.Append(errs, errors.New("endtime is required")) } else { - if _, err := parseTime(c.EndTime); err != nil { - errs = multierr.Append(errs, errors.New("unable to parse end time")) + if _, err := parseTime(c.EndTime, "endtime"); err != nil { + errs = multierr.Append(errs, err) } } return errs } -func parseTime(str string) (time.Time, error) { +func parseTime(timeStr, configName string) (time.Time, error) { layouts := []string{"2006-01-02 15:04", time.DateOnly} + for _, layout := range layouts { - if t, err := time.Parse(layout, str); err == nil { + if t, err := time.Parse(layout, timeStr); err == nil { return t, nil } } - return time.Time{}, errors.New("unable to parse time string") + return time.Time{}, fmt.Errorf("unable to parse %s (%s), accepted formats: %s", configName, timeStr, strings.Join(layouts, ", ")) } diff --git a/receiver/awss3receiver/config_test.go b/receiver/awss3receiver/config_test.go index ddb3a3f0b559..7a60bd4fdf06 100644 --- a/receiver/awss3receiver/config_test.go +++ b/receiver/awss3receiver/config_test.go @@ -48,11 +48,11 @@ func TestLoadConfig(t *testing.T) { }{ { id: component.NewIDWithName(metadata.Type, ""), - errorMessage: "bucket is required; start time is required; end time is required", + errorMessage: "bucket is required; starttime is required; endtime is required", }, { id: component.NewIDWithName(metadata.Type, "1"), - errorMessage: "unable to parse start date; unable to parse end time", + errorMessage: "unable to parse starttime (a date), accepted formats: 2006-01-02 15:04, 2006-01-02; unable to parse endtime (2024-02-03a), accepted formats: 2006-01-02 15:04, 2006-01-02", }, { id: component.NewIDWithName(metadata.Type, "2"), diff --git a/receiver/awss3receiver/s3reader.go b/receiver/awss3receiver/s3reader.go index 80cbfc3cde3f..ff0f78fd233c 100644 --- a/receiver/awss3receiver/s3reader.go +++ b/receiver/awss3receiver/s3reader.go @@ -31,11 +31,11 @@ func newS3Reader(cfg *Config) (*s3Reader, error) { if err != nil { return nil, err } - startTime, err := parseTime(cfg.StartTime) + startTime, err := parseTime(cfg.StartTime, "starttime") if err != nil { return nil, err } - endTime, err := parseTime(cfg.EndTime) + endTime, err := parseTime(cfg.EndTime, "endtime") if err != nil { return nil, err } From fcb09919e094d4fbc74decbdc58470a2c65fbce3 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Tue, 9 Apr 2024 21:30:50 +0100 Subject: [PATCH 10/20] Further PR comment fixes --- receiver/awss3receiver/config.go | 10 ++- receiver/awss3receiver/config_test.go | 4 +- receiver/awss3receiver/receiver.go | 2 + receiver/awss3receiver/s3reader.go | 45 +++++----- receiver/awss3receiver/s3reader_test.go | 94 ++++++++++----------- receiver/awss3receiver/testdata/config.yaml | 1 + 6 files changed, 84 insertions(+), 72 deletions(-) diff --git a/receiver/awss3receiver/config.go b/receiver/awss3receiver/config.go index 02121a419f3f..0bd39f18a8e5 100644 --- a/receiver/awss3receiver/config.go +++ b/receiver/awss3receiver/config.go @@ -32,11 +32,16 @@ type Config struct { EndTime string `mapstructure:"endtime"` } +const ( + S3PartitionMinute = "minute" + S3PartitionHour = "hour" +) + func createDefaultConfig() component.Config { return &Config{ S3Downloader: S3DownloaderConfig{ Region: "us-east-1", - S3Partition: "minute", + S3Partition: S3PartitionMinute, }, } } @@ -46,6 +51,9 @@ func (c Config) Validate() error { if c.S3Downloader.S3Bucket == "" { errs = multierr.Append(errs, errors.New("bucket is required")) } + if c.S3Downloader.S3Partition != S3PartitionHour && c.S3Downloader.S3Partition != S3PartitionMinute { + errs = multierr.Append(errs, errors.New("s3_partition must be either 'hour' or 'minute'")) + } if c.StartTime == "" { errs = multierr.Append(errs, errors.New("starttime is required")) } else { diff --git a/receiver/awss3receiver/config_test.go b/receiver/awss3receiver/config_test.go index 7a60bd4fdf06..b8edffd1a38b 100644 --- a/receiver/awss3receiver/config_test.go +++ b/receiver/awss3receiver/config_test.go @@ -26,7 +26,7 @@ func TestConfig_Validate_Valid(t *testing.T) { Region: "", S3Bucket: "abucket", S3Prefix: "", - S3Partition: "", + S3Partition: "minute", FilePrefix: "", Endpoint: "", S3ForcePathStyle: false, @@ -52,7 +52,7 @@ func TestLoadConfig(t *testing.T) { }, { id: component.NewIDWithName(metadata.Type, "1"), - errorMessage: "unable to parse starttime (a date), accepted formats: 2006-01-02 15:04, 2006-01-02; unable to parse endtime (2024-02-03a), accepted formats: 2006-01-02 15:04, 2006-01-02", + errorMessage: "s3_partition must be either 'hour' or 'minute'; unable to parse starttime (a date), accepted formats: 2006-01-02 15:04, 2006-01-02; unable to parse endtime (2024-02-03a), accepted formats: 2006-01-02 15:04, 2006-01-02", }, { id: component.NewIDWithName(metadata.Type, "2"), diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index 9b47250935a9..eafd954baec6 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -77,6 +77,8 @@ func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data return err } return r.consumer.ConsumeTraces(ctx, traces) + } else { + r.logger.Warn("Unsupported file format", zap.String("key", key)) } return nil } diff --git a/receiver/awss3receiver/s3reader.go b/receiver/awss3receiver/s3reader.go index ff0f78fd233c..fe66e87f142e 100644 --- a/receiver/awss3receiver/s3reader.go +++ b/receiver/awss3receiver/s3reader.go @@ -5,6 +5,7 @@ package awss3receiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" + "errors" "fmt" "io" "time" @@ -17,11 +18,10 @@ type s3Reader struct { getObjectClient GetObjectAPI s3Bucket string s3Prefix string + s3Partition string filePrefix string startTime time.Time endTime time.Time - timeStep time.Duration - getTimeKey func(time.Time) string } type s3ReaderDataCallback func(context.Context, string, []byte) error @@ -39,31 +39,31 @@ func newS3Reader(cfg *Config) (*s3Reader, error) { if err != nil { return nil, err } - - var getTimeKey func(time.Time) string - var timeStep time.Duration - if cfg.S3Downloader.S3Partition == "hour" { - getTimeKey = getTimeKeyPartitionHour - timeStep = time.Hour - } else { - getTimeKey = getTimeKeyPartitionMinute - timeStep = time.Minute + if cfg.S3Downloader.S3Partition != S3PartitionHour && cfg.S3Downloader.S3Partition != S3PartitionMinute { + return nil, errors.New("s3_partition must be either 'hour' or 'minute'") } + return &s3Reader{ listObjectsClient: listObjectsClient, getObjectClient: getObjectClient, s3Bucket: cfg.S3Downloader.S3Bucket, s3Prefix: cfg.S3Downloader.S3Prefix, filePrefix: cfg.S3Downloader.FilePrefix, + s3Partition: cfg.S3Downloader.S3Partition, startTime: startTime, endTime: endTime, - timeStep: timeStep, - getTimeKey: getTimeKey, }, nil } func (s3Reader *s3Reader) readAll(ctx context.Context, telemetryType string, dataCallback s3ReaderDataCallback) error { - for currentTime := s3Reader.startTime; currentTime.Before(s3Reader.endTime); currentTime = currentTime.Add(s3Reader.timeStep) { + var timeStep time.Duration + if s3Reader.s3Partition == "hour" { + timeStep = time.Hour + } else { + timeStep = time.Minute + } + + for currentTime := s3Reader.startTime; currentTime.Before(s3Reader.endTime); currentTime = currentTime.Add(timeStep) { select { case <-ctx.Done(): return nil @@ -86,12 +86,12 @@ func (s3Reader *s3Reader) readTelemetryForTime(ctx context.Context, t time.Time, p := s3Reader.listObjectsClient.NewListObjectsV2Paginator(params) for p.HasMorePages() { - page, err := p.NextPage(context.TODO()) + page, err := p.NextPage(ctx) if err != nil { return err } for _, obj := range page.Contents { - data, err := s3Reader.retrieveObject(*obj.Key) + data, err := s3Reader.retrieveObject(ctx, *obj.Key) if err != nil { return err } @@ -104,22 +104,29 @@ func (s3Reader *s3Reader) readTelemetryForTime(ctx context.Context, t time.Time, } func (s3Reader *s3Reader) getObjectPrefixForTime(t time.Time, telemetryType string) string { - timeKey := s3Reader.getTimeKey(t) + var timeKey string + switch s3Reader.s3Partition { + case S3PartitionMinute: + timeKey = getTimeKeyPartitionMinute(t) + case S3PartitionHour: + timeKey = getTimeKeyPartitionHour(t) + } if s3Reader.s3Prefix != "" { return fmt.Sprintf("%s/%s/%s%s_", s3Reader.s3Prefix, timeKey, s3Reader.filePrefix, telemetryType) } return fmt.Sprintf("%s/%s%s_", timeKey, s3Reader.filePrefix, telemetryType) } -func (s3Reader *s3Reader) retrieveObject(key string) ([]byte, error) { +func (s3Reader *s3Reader) retrieveObject(ctx context.Context, key string) ([]byte, error) { params := s3.GetObjectInput{ Bucket: &s3Reader.s3Bucket, Key: &key, } - output, err := s3Reader.getObjectClient.GetObject(context.TODO(), ¶ms) + output, err := s3Reader.getObjectClient.GetObject(ctx, ¶ms) if err != nil { return nil, err } + defer output.Body.Close() contents, err := io.ReadAll(output.Body) if err != nil { return nil, err diff --git a/receiver/awss3receiver/s3reader_test.go b/receiver/awss3receiver/s3reader_test.go index 81572c0e7663..7b329735cc62 100644 --- a/receiver/awss3receiver/s3reader_test.go +++ b/receiver/awss3receiver/s3reader_test.go @@ -32,8 +32,8 @@ func Test_getTimeKeyPartitionMinute(t *testing.T) { func Test_s3Reader_getObjectPrefixForTime(t *testing.T) { type args struct { s3Prefix string + s3Partition string filePrefix string - getTimeKey func(t time.Time) string telemetryType string } tests := []struct { @@ -45,8 +45,8 @@ func Test_s3Reader_getObjectPrefixForTime(t *testing.T) { name: "hour, prefix and file prefix", args: args{ s3Prefix: "prefix", + s3Partition: "hour", filePrefix: "file", - getTimeKey: getTimeKeyPartitionHour, telemetryType: "traces", }, want: "prefix/year=2021/month=02/day=01/hour=17/filetraces_", @@ -55,8 +55,8 @@ func Test_s3Reader_getObjectPrefixForTime(t *testing.T) { name: "minute, prefix and file prefix", args: args{ s3Prefix: "prefix", + s3Partition: "minute", filePrefix: "file", - getTimeKey: getTimeKeyPartitionMinute, telemetryType: "metrics", }, want: "prefix/year=2021/month=02/day=01/hour=17/minute=32/filemetrics_", @@ -65,8 +65,8 @@ func Test_s3Reader_getObjectPrefixForTime(t *testing.T) { name: "hour, prefix and no file prefix", args: args{ s3Prefix: "prefix", + s3Partition: "hour", filePrefix: "", - getTimeKey: getTimeKeyPartitionHour, telemetryType: "logs", }, want: "prefix/year=2021/month=02/day=01/hour=17/logs_", @@ -75,8 +75,8 @@ func Test_s3Reader_getObjectPrefixForTime(t *testing.T) { name: "minute, no prefix and no file prefix", args: args{ s3Prefix: "", + s3Partition: "minute", filePrefix: "", - getTimeKey: getTimeKeyPartitionMinute, telemetryType: "metrics", }, want: "year=2021/month=02/day=01/hour=17/minute=32/metrics_", @@ -85,9 +85,9 @@ func Test_s3Reader_getObjectPrefixForTime(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { reader := s3Reader{ - s3Prefix: test.args.s3Prefix, - filePrefix: test.args.filePrefix, - getTimeKey: test.args.getTimeKey, + s3Prefix: test.args.s3Prefix, + s3Partition: test.args.s3Partition, + filePrefix: test.args.filePrefix, } result := reader.getObjectPrefixForTime(testTime, test.args.telemetryType) require.Equal(t, test.want, result) @@ -166,13 +166,12 @@ func Test_readTelemetryForTime(t *testing.T) { Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), }, nil }), - s3Bucket: "bucket", - s3Prefix: "", - filePrefix: "", - startTime: testTime, - endTime: testTime.Add(time.Minute), - timeStep: time.Minute, - getTimeKey: getTimeKeyPartitionMinute, + s3Bucket: "bucket", + s3Partition: "minute", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute), } dataCallbackKeys := make([]string, 0) @@ -215,13 +214,12 @@ func Test_readTelemetryForTime_GetObjectError(t *testing.T) { require.Equal(t, testKey, *params.Key) return nil, testError }), - s3Bucket: "bucket", - s3Prefix: "", - filePrefix: "", - startTime: testTime, - endTime: testTime.Add(time.Minute), - timeStep: time.Minute, - getTimeKey: getTimeKeyPartitionMinute, + s3Bucket: "bucket", + s3Partition: "minute", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute), } err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(_ context.Context, _ string, _ []byte) error { @@ -250,13 +248,12 @@ func Test_readTelemetryForTime_ListObjectsNoResults(t *testing.T) { Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), }, nil }), - s3Bucket: "bucket", - s3Prefix: "", - filePrefix: "", - startTime: testTime, - endTime: testTime.Add(time.Minute), - timeStep: time.Minute, - getTimeKey: getTimeKeyPartitionMinute, + s3Bucket: "bucket", + s3Partition: "minute", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute), } err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(_ context.Context, _ string, _ []byte) error { @@ -297,13 +294,12 @@ func Test_readTelemetryForTime_NextPageError(t *testing.T) { Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), }, nil }), - s3Bucket: "bucket", - s3Prefix: "", - filePrefix: "", - startTime: testTime, - endTime: testTime.Add(time.Minute), - timeStep: time.Minute, - getTimeKey: getTimeKeyPartitionMinute, + s3Bucket: "bucket", + s3Partition: "minute", + s3Prefix: "", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute), } err := reader.readTelemetryForTime(context.Background(), testTime, "traces", func(_ context.Context, _ string, _ []byte) error { @@ -339,13 +335,12 @@ func Test_readAll(t *testing.T) { Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), }, nil }), - s3Bucket: "bucket", - s3Prefix: "", - filePrefix: "", - startTime: testTime, - endTime: testTime.Add(time.Minute * 2), - timeStep: time.Minute, - getTimeKey: getTimeKeyPartitionMinute, + s3Bucket: "bucket", + s3Prefix: "", + s3Partition: "minute", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute * 2), } dataCallbackKeys := make([]string, 0) @@ -386,13 +381,12 @@ func Test_readAll_ContextDone(t *testing.T) { Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), }, nil }), - s3Bucket: "bucket", - s3Prefix: "", - filePrefix: "", - startTime: testTime, - endTime: testTime.Add(time.Minute * 2), - timeStep: time.Minute, - getTimeKey: getTimeKeyPartitionMinute, + s3Bucket: "bucket", + s3Prefix: "", + s3Partition: "minute", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute * 2), } dataCallbackKeys := make([]string, 0) diff --git a/receiver/awss3receiver/testdata/config.yaml b/receiver/awss3receiver/testdata/config.yaml index d0ed4f145850..2a83432577b1 100644 --- a/receiver/awss3receiver/testdata/config.yaml +++ b/receiver/awss3receiver/testdata/config.yaml @@ -2,6 +2,7 @@ awss3: awss3/1: s3downloader: s3_bucket: abucket + s3_partition: notapartition starttime: "a date" endtime: "2024-02-03a" awss3/2: From cdd1c6a6fd00feae4ad0723af40ae616f5ef7574 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Tue, 9 Apr 2024 22:17:47 +0100 Subject: [PATCH 11/20] Add nil check for data --- receiver/awss3receiver/receiver.go | 6 +++++- receiver/awss3receiver/receiver_test.go | 11 +++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index eafd954baec6..2b7df897bd4b 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -52,7 +52,10 @@ func (r *awss3TraceReceiver) Shutdown(_ context.Context) error { } func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data []byte) error { - var unmarshaler ptrace.Unmarshaler + if data == nil { + return nil + } + if strings.HasSuffix(key, ".gz") { reader, err := gzip.NewReader(bytes.NewReader(data)) if err != nil { @@ -65,6 +68,7 @@ func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data } } + var unmarshaler ptrace.Unmarshaler if strings.HasSuffix(key, ".json") { unmarshaler = &ptrace.JSONUnmarshaler{} } diff --git a/receiver/awss3receiver/receiver_test.go b/receiver/awss3receiver/receiver_test.go index 7296a93829aa..cd62076de106 100644 --- a/receiver/awss3receiver/receiver_test.go +++ b/receiver/awss3receiver/receiver_test.go @@ -7,6 +7,7 @@ import ( "bytes" "compress/gzip" "context" + "go.uber.org/zap" "testing" "github.com/stretchr/testify/require" @@ -53,6 +54,15 @@ func Test_receiveBytes(t *testing.T) { wantErr bool wantTrace bool }{ + { + name: "nil data", + args: args{ + key: "test.json", + data: nil, + }, + wantErr: false, + wantTrace: false, + }, { name: ".json", args: args{ @@ -113,6 +123,7 @@ func Test_receiveBytes(t *testing.T) { }) r := &awss3TraceReceiver{ consumer: tracesConsumer, + logger: zap.NewNop(), } if err := r.receiveBytes(context.Background(), tt.args.key, tt.args.data); (err != nil) != tt.wantErr { t.Errorf("receiveBytes() error = %v, wantErr %v", err, tt.wantErr) From 34debf55609148fe5f1fc980a3de3a0c09d0029b Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Wed, 10 Apr 2024 10:16:02 +0100 Subject: [PATCH 12/20] Fix lint --- receiver/awss3receiver/receiver.go | 15 +++++++-------- receiver/awss3receiver/receiver_test.go | 2 +- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index 2b7df897bd4b..ce4b62fbd208 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -75,14 +75,13 @@ func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data if strings.HasSuffix(key, ".binpb") { unmarshaler = &ptrace.ProtoUnmarshaler{} } - if unmarshaler != nil { - traces, err := unmarshaler.UnmarshalTraces(data) - if err != nil { - return err - } - return r.consumer.ConsumeTraces(ctx, traces) - } else { + if unmarshaler == nil { r.logger.Warn("Unsupported file format", zap.String("key", key)) + return nil } - return nil + traces, err := unmarshaler.UnmarshalTraces(data) + if err != nil { + return err + } + return r.consumer.ConsumeTraces(ctx, traces) } diff --git a/receiver/awss3receiver/receiver_test.go b/receiver/awss3receiver/receiver_test.go index cd62076de106..0f234af01956 100644 --- a/receiver/awss3receiver/receiver_test.go +++ b/receiver/awss3receiver/receiver_test.go @@ -7,13 +7,13 @@ import ( "bytes" "compress/gzip" "context" - "go.uber.org/zap" "testing" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.22.0" + "go.uber.org/zap" ) func generateTraceData() ptrace.Traces { From c6713e545fa595d84425119ce074eebd7020235c Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Fri, 12 Apr 2024 10:15:30 +0100 Subject: [PATCH 13/20] Add endpoint partition id --- receiver/awss3receiver/README.md | 1 + receiver/awss3receiver/config.go | 20 +++++++++++--------- receiver/awss3receiver/config_test.go | 22 ++++++++++++---------- receiver/awss3receiver/s3intf.go | 2 +- 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/receiver/awss3receiver/README.md b/receiver/awss3receiver/README.md index 2a7cf295ffe5..5dd30a000953 100644 --- a/receiver/awss3receiver/README.md +++ b/receiver/awss3receiver/README.md @@ -27,6 +27,7 @@ The following exporter configuration parameters are supported. | `s3_partition` | time granularity of S3 key: hour or minute | "minute" | Optional | | `file_prefix` | file prefix defined by user | | Optional | | `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | Optional | +| `endpoint_partition_id` | partition id to use if `endpoint` is specified. | "aws" | Optional | | `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | Optional | ### Time format for `starttime` and `endtime` diff --git a/receiver/awss3receiver/config.go b/receiver/awss3receiver/config.go index 0bd39f18a8e5..60c954acc7d2 100644 --- a/receiver/awss3receiver/config.go +++ b/receiver/awss3receiver/config.go @@ -16,13 +16,14 @@ import ( // S3DownloaderConfig contains aws s3 downloader related config to controls things // like bucket, prefix, batching, connections, retries, etc. type S3DownloaderConfig struct { - Region string `mapstructure:"region"` - S3Bucket string `mapstructure:"s3_bucket"` - S3Prefix string `mapstructure:"s3_prefix"` - S3Partition string `mapstructure:"s3_partition"` - FilePrefix string `mapstructure:"file_prefix"` - Endpoint string `mapstructure:"endpoint"` - S3ForcePathStyle bool `mapstructure:"s3_force_path_style"` + Region string `mapstructure:"region"` + S3Bucket string `mapstructure:"s3_bucket"` + S3Prefix string `mapstructure:"s3_prefix"` + S3Partition string `mapstructure:"s3_partition"` + FilePrefix string `mapstructure:"file_prefix"` + Endpoint string `mapstructure:"endpoint"` + EndpointPartitionID string `mapstructure:"endpoint_partition_id"` + S3ForcePathStyle bool `mapstructure:"s3_force_path_style"` } // Config defines the configuration for the file receiver. @@ -40,8 +41,9 @@ const ( func createDefaultConfig() component.Config { return &Config{ S3Downloader: S3DownloaderConfig{ - Region: "us-east-1", - S3Partition: S3PartitionMinute, + Region: "us-east-1", + S3Partition: S3PartitionMinute, + EndpointPartitionID: "aws", }, } } diff --git a/receiver/awss3receiver/config_test.go b/receiver/awss3receiver/config_test.go index b8edffd1a38b..fd5aee3d4306 100644 --- a/receiver/awss3receiver/config_test.go +++ b/receiver/awss3receiver/config_test.go @@ -23,13 +23,14 @@ func TestLoadConfig_Validate_Invalid(t *testing.T) { func TestConfig_Validate_Valid(t *testing.T) { cfg := Config{ S3Downloader: S3DownloaderConfig{ - Region: "", - S3Bucket: "abucket", - S3Prefix: "", - S3Partition: "minute", - FilePrefix: "", - Endpoint: "", - S3ForcePathStyle: false, + Region: "", + S3Bucket: "abucket", + S3Prefix: "", + S3Partition: "minute", + FilePrefix: "", + Endpoint: "", + EndpointPartitionID: "aws", + S3ForcePathStyle: false, }, StartTime: "2024-01-01", EndTime: "2024-01-01", @@ -58,9 +59,10 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "2"), expected: &Config{ S3Downloader: S3DownloaderConfig{ - Region: "us-east-1", - S3Bucket: "abucket", - S3Partition: "minute", + Region: "us-east-1", + S3Bucket: "abucket", + S3Partition: "minute", + EndpointPartitionID: "aws", }, StartTime: "2024-01-31 15:00", EndTime: "2024-02-03", diff --git a/receiver/awss3receiver/s3intf.go b/receiver/awss3receiver/s3intf.go index 56411a145864..f5a52f4ba9d6 100644 --- a/receiver/awss3receiver/s3intf.go +++ b/receiver/awss3receiver/s3intf.go @@ -41,7 +41,7 @@ func newS3Client(cfg S3DownloaderConfig) (ListObjectsAPI, GetObjectAPI, error) { if cfg.Endpoint != "" { customResolver := aws.EndpointResolverWithOptionsFunc(func(_, _ string, _ ...any) (aws.Endpoint, error) { return aws.Endpoint{ - PartitionID: "aws", + PartitionID: cfg.EndpointPartitionID, URL: cfg.Endpoint, SigningRegion: cfg.Region, }, nil From 0649be0f3bdd23f76a00d3b46dec194b1c44582a Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Sat, 13 Apr 2024 19:19:33 +0100 Subject: [PATCH 14/20] Update README.md --- receiver/awss3receiver/README.md | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/receiver/awss3receiver/README.md b/receiver/awss3receiver/README.md index 5dd30a000953..402cb8882bf4 100644 --- a/receiver/awss3receiver/README.md +++ b/receiver/awss3receiver/README.md @@ -16,19 +16,19 @@ Receiver for retrieving trace previously stored in S3 by the [AWS S3 Exporter](. ## Configuration The following exporter configuration parameters are supported. -| Name | Description | Default | Required | -|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|----------| -| `starttime` | The time at which to start retrieving data. | | Required | -| `endtime` | The time at which to stop retrieving data. | | Required | -| `s3downloader:` | | | | -| `region` | AWS region. | "us-east-1" | Optional | -| `s3_bucket` | S3 bucket | | Required | -| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | Required | -| `s3_partition` | time granularity of S3 key: hour or minute | "minute" | Optional | -| `file_prefix` | file prefix defined by user | | Optional | -| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | Optional | -| `endpoint_partition_id` | partition id to use if `endpoint` is specified. | "aws" | Optional | -| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | Optional | +| Name | Description | Default | Required | +|:------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|----------| +| `starttime` | The time at which to start retrieving data. | | Required | +| `endtime` | The time at which to stop retrieving data. | | Required | +| `s3downloader:` | | | | +| `region` | AWS region. | "us-east-1" | Optional | +| `s3_bucket` | S3 bucket | | Required | +| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | Required | +| `s3_partition` | time granularity of S3 key: hour or minute | "minute" | Optional | +| `file_prefix` | file prefix defined by user | | Optional | +| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | Optional | +| `endpoint_partition_id` | partition id to use if `endpoint` is specified. | "aws" | Optional | +| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | Optional | ### Time format for `starttime` and `endtime` The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data. From a0b617aa5c4233fd7e5c386c60fb4002849b58d1 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Tue, 16 Apr 2024 13:02:43 +0100 Subject: [PATCH 15/20] Fix context pass through --- receiver/awss3receiver/factory.go | 4 ++-- receiver/awss3receiver/go.sum | 30 ++++++++++++++++++++++++++++++ receiver/awss3receiver/receiver.go | 4 ++-- receiver/awss3receiver/s3intf.go | 4 ++-- receiver/awss3receiver/s3reader.go | 4 ++-- 5 files changed, 38 insertions(+), 8 deletions(-) diff --git a/receiver/awss3receiver/factory.go b/receiver/awss3receiver/factory.go index cdeae17aecc3..2e3bacf48ff0 100644 --- a/receiver/awss3receiver/factory.go +++ b/receiver/awss3receiver/factory.go @@ -21,6 +21,6 @@ func NewFactory() receiver.Factory { ) } -func createTracesReceiver(_ context.Context, settings receiver.CreateSettings, cc component.Config, consumer consumer.Traces) (receiver.Traces, error) { - return newAWSS3TraceReceiver(cc.(*Config), consumer, settings.Logger) +func createTracesReceiver(ctx context.Context, settings receiver.CreateSettings, cc component.Config, consumer consumer.Traces) (receiver.Traces, error) { + return newAWSS3TraceReceiver(ctx, cc.(*Config), consumer, settings.Logger) } diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index c6436a4b6f55..b2dd4633555c 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -106,21 +106,40 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector v0.98.0 h1:O7bpARGWzNfFQEYevLl4iigDrpGTJY3vV/kKqNZzMOk= +go.opentelemetry.io/collector/component v0.98.0 h1:0TMaBOyCdABiVLFdGOgG8zd/1IeGldCinYonbY08xWk= go.opentelemetry.io/collector/component v0.98.0/go.mod h1:F6zyQLsoExl6r2q6WWZm8rmSSALbwG2zwIHLrMzZVio= +go.opentelemetry.io/collector/config/configtelemetry v0.98.0 h1:f8RNZ1l/kYPPoxFmKKvTUli8iON7CMsm85KM38PVNts= go.opentelemetry.io/collector/config/configtelemetry v0.98.0/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= +go.opentelemetry.io/collector/confmap v0.98.0 h1:qQreBlrqio1y7uhrAvr+W86YbQ6fw7StgkbYpvJ2vVc= go.opentelemetry.io/collector/confmap v0.98.0/go.mod h1:BWKPIpYeUzSG6ZgCJMjF7xsLvyrvJCfYURl57E5vhiQ= +go.opentelemetry.io/collector/consumer v0.98.0 h1:47zJ5HFKXVA0RciuwkZnPU5W8j0TYUxToB1/zzzgEhs= go.opentelemetry.io/collector/consumer v0.98.0/go.mod h1:c2edTq38uVJET/NE6VV7/Qpyznnlz8b6VE7J6TXD57c= +go.opentelemetry.io/collector/pdata v1.5.0 h1:1fKTmUpr0xCOhP/B0VEvtz7bYPQ45luQ8XFyA07j8LE= go.opentelemetry.io/collector/pdata v1.5.0/go.mod h1:TYj8aKRWZyT/KuKQXKyqSEvK/GV+slFaDMEI+Ke64Yw= +go.opentelemetry.io/collector/pdata/testdata v0.98.0 h1:8gohV+LFXqMzuDwfOOQy9GcZBOX0C9xGoQkoeXFTzmI= +go.opentelemetry.io/collector/pdata/testdata v0.98.0/go.mod h1:B/IaHcf6+RtxI292CZu9TjfYQdi1n4+v6b8rHEonpKs= +go.opentelemetry.io/collector/receiver v0.98.0 h1:qw6JYwm+sHcZvM1DByo3QlGe6yGHuwd0yW4hEPVqYKU= go.opentelemetry.io/collector/receiver v0.98.0/go.mod h1:AwIWn+KnquTR+kbhXQrMH+i2PvTCFldSIJznBWFYs0s= +go.opentelemetry.io/collector/semconv v0.98.0 h1:zO4L4TmlxXoYu8UgPeYElGY19BW7wPjM+quL5CzoOoY= go.opentelemetry.io/collector/semconv v0.98.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= +go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= +go.opentelemetry.io/otel/exporters/prometheus v0.47.0 h1:OL6yk1Z/pEGdDnrBbxSsH+t4FY1zXfBRGd7bjwhlMLU= go.opentelemetry.io/otel/exporters/prometheus v0.47.0/go.mod h1:xF3N4OSICZDVbbYZydz9MHFro1RjmkPUKEvar2utG+Q= +go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s= +go.opentelemetry.io/otel/sdk v1.25.0 h1:PDryEJPC8YJZQSyLY5eqLeafHtG+X7FWnf3aXMtxbqo= go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw= +go.opentelemetry.io/otel/sdk/metric v1.25.0 h1:7CiHOy08LbrxMAp4vWpbiPcklunUshVpAvGBrdDRlGw= go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o= +go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1Dq6RM= go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -131,6 +150,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -138,9 +158,11 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -150,11 +172,19 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= +google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index ce4b62fbd208..e2953810e8e3 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -23,8 +23,8 @@ type awss3TraceReceiver struct { cancel context.CancelFunc } -func newAWSS3TraceReceiver(cfg *Config, traces consumer.Traces, logger *zap.Logger) (*awss3TraceReceiver, error) { - reader, err := newS3Reader(cfg) +func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Traces, logger *zap.Logger) (*awss3TraceReceiver, error) { + reader, err := newS3Reader(ctx, cfg) if err != nil { return nil, err } diff --git a/receiver/awss3receiver/s3intf.go b/receiver/awss3receiver/s3intf.go index f5a52f4ba9d6..b3b73d5bfd7a 100644 --- a/receiver/awss3receiver/s3intf.go +++ b/receiver/awss3receiver/s3intf.go @@ -32,7 +32,7 @@ type s3ListObjectsAPIImpl struct { client *s3.Client } -func newS3Client(cfg S3DownloaderConfig) (ListObjectsAPI, GetObjectAPI, error) { +func newS3Client(ctx context.Context, cfg S3DownloaderConfig) (ListObjectsAPI, GetObjectAPI, error) { optionsFuncs := make([]func(*config.LoadOptions) error, 0) if cfg.Region != "" { optionsFuncs = append(optionsFuncs, config.WithRegion(cfg.Region)) @@ -48,7 +48,7 @@ func newS3Client(cfg S3DownloaderConfig) (ListObjectsAPI, GetObjectAPI, error) { }) optionsFuncs = append(optionsFuncs, config.WithEndpointResolverWithOptions(customResolver)) } - awsCfg, err := config.LoadDefaultConfig(context.TODO(), optionsFuncs...) + awsCfg, err := config.LoadDefaultConfig(ctx, optionsFuncs...) if err != nil { log.Fatalf("unable to load SDK config, %v", err) return nil, nil, err diff --git a/receiver/awss3receiver/s3reader.go b/receiver/awss3receiver/s3reader.go index fe66e87f142e..1733cdbee5d7 100644 --- a/receiver/awss3receiver/s3reader.go +++ b/receiver/awss3receiver/s3reader.go @@ -26,8 +26,8 @@ type s3Reader struct { type s3ReaderDataCallback func(context.Context, string, []byte) error -func newS3Reader(cfg *Config) (*s3Reader, error) { - listObjectsClient, getObjectClient, err := newS3Client(cfg.S3Downloader) +func newS3Reader(ctx context.Context, cfg *Config) (*s3Reader, error) { + listObjectsClient, getObjectClient, err := newS3Client(ctx, cfg.S3Downloader) if err != nil { return nil, err } From 7cb312f4f7bfc7f5a9f666fbd082776034a9f1e9 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Mon, 29 Apr 2024 17:05:58 +0100 Subject: [PATCH 16/20] Update receiver.go --- receiver/awss3receiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index e2953810e8e3..eb0e2b107c19 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -37,7 +37,7 @@ func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Tra } func (r *awss3TraceReceiver) Start(ctx context.Context, _ component.Host) error { - ctx, r.cancel = context.WithCancel(ctx) + ctx, r.cancel = context.WithCancel(context.Background()) go func() { _ = r.s3Reader.readAll(ctx, "traces", r.receiveBytes) }() From 86524905f0fda12e32bead730096e7e5274c6009 Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Mon, 29 Apr 2024 17:25:23 +0100 Subject: [PATCH 17/20] Update receiver.go --- receiver/awss3receiver/receiver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index eb0e2b107c19..06dc1ea7d678 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -36,7 +36,8 @@ func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Tra }, nil } -func (r *awss3TraceReceiver) Start(ctx context.Context, _ component.Host) error { +func (r *awss3TraceReceiver) Start(_ context.Context, _ component.Host) error { + var ctx context.Context ctx, r.cancel = context.WithCancel(context.Background()) go func() { _ = r.s3Reader.readAll(ctx, "traces", r.receiveBytes) From cbdbe972d45499b3b173cc776b2346a80746bc65 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Fri, 3 May 2024 00:17:13 -0700 Subject: [PATCH 18/20] Update receiver/awss3receiver/go.mod --- receiver/awss3receiver/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/awss3receiver/go.mod b/receiver/awss3receiver/go.mod index 0d09ac6be886..34ccdc97fb2a 100644 --- a/receiver/awss3receiver/go.mod +++ b/receiver/awss3receiver/go.mod @@ -59,7 +59,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.53.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.99.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.99.1-0.20240502202854-2875844e3c35 // indirect go.opentelemetry.io/otel v1.26.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.48.0 // indirect go.opentelemetry.io/otel/sdk v1.26.0 // indirect From fae5807af11d3ee55bc62ae1860d8666d735977b Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Fri, 3 May 2024 09:18:15 +0100 Subject: [PATCH 19/20] Update go.sum --- receiver/awss3receiver/go.sum | 1 + 1 file changed, 1 insertion(+) diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index 5bc1a2553a56..73322b93f9cd 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -107,6 +107,7 @@ go.opentelemetry.io/collector/component v0.99.1-0.20240502202854-2875844e3c35 h1 go.opentelemetry.io/collector/component v0.99.1-0.20240502202854-2875844e3c35/go.mod h1:+b56nMIvo3CO5TShFn38RwX4FsXv0lVt2HoGmsaXObo= go.opentelemetry.io/collector/config/configtelemetry v0.99.0 h1:Fks8xkTUnxw1nEcTyYOXnIHttI9BGgjOCB0bwBH3LcU= go.opentelemetry.io/collector/config/configtelemetry v0.99.0/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= +go.opentelemetry.io/collector/config/configtelemetry v0.99.1-0.20240502202854-2875844e3c35/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= go.opentelemetry.io/collector/confmap v0.99.1-0.20240502202854-2875844e3c35 h1:/dWChNi4ONcP64OcBQqAcbuD7h/Z1v7Gnikr8gsWj9k= go.opentelemetry.io/collector/confmap v0.99.1-0.20240502202854-2875844e3c35/go.mod h1:BWKPIpYeUzSG6ZgCJMjF7xsLvyrvJCfYURl57E5vhiQ= go.opentelemetry.io/collector/consumer v0.99.1-0.20240502202854-2875844e3c35 h1:pwjW04Z7re8m3CoXiC+NSQNs/BK8c4rsf/pQsTjOOA8= From f533058e069cf993312feb47aeb7167120e2937a Mon Sep 17 00:00:00 2001 From: Adam Charrett Date: Fri, 3 May 2024 09:49:09 +0100 Subject: [PATCH 20/20] Update go.sum --- receiver/awss3receiver/go.sum | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index 73322b93f9cd..a503c72da4f6 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -105,8 +105,7 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec go.opentelemetry.io/collector v0.99.0 h1:O3EtCr+Bp2FoYI4KZCcC10FbMOjtRPXN1JBgFmi2WvY= go.opentelemetry.io/collector/component v0.99.1-0.20240502202854-2875844e3c35 h1:EL4n8QWeq7fHVR0J4jTvyWYYa+/NYiIFvdmA6nWlaM4= go.opentelemetry.io/collector/component v0.99.1-0.20240502202854-2875844e3c35/go.mod h1:+b56nMIvo3CO5TShFn38RwX4FsXv0lVt2HoGmsaXObo= -go.opentelemetry.io/collector/config/configtelemetry v0.99.0 h1:Fks8xkTUnxw1nEcTyYOXnIHttI9BGgjOCB0bwBH3LcU= -go.opentelemetry.io/collector/config/configtelemetry v0.99.0/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= +go.opentelemetry.io/collector/config/configtelemetry v0.99.1-0.20240502202854-2875844e3c35 h1:mq2oX1YBRhK5Oet/7ujaw7CjKWcoAAjGzKIuEL15gXo= go.opentelemetry.io/collector/config/configtelemetry v0.99.1-0.20240502202854-2875844e3c35/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= go.opentelemetry.io/collector/confmap v0.99.1-0.20240502202854-2875844e3c35 h1:/dWChNi4ONcP64OcBQqAcbuD7h/Z1v7Gnikr8gsWj9k= go.opentelemetry.io/collector/confmap v0.99.1-0.20240502202854-2875844e3c35/go.mod h1:BWKPIpYeUzSG6ZgCJMjF7xsLvyrvJCfYURl57E5vhiQ=