Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/awss3receiver] Initial implementation #32222

Merged
merged 35 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
18dd88f
Implement MVP and add some initial tests
adcharre Apr 5, 2024
3237ff3
Add more tests for readTelemetryForTime and readAll
adcharre Apr 6, 2024
f51c7b2
rename awss3Receiver to awss3TraceReceiver
adcharre Apr 6, 2024
cb06ce7
finish unit tests
adcharre Apr 7, 2024
e9ac872
Create awss3receiver_impl_mvp.yaml
adcharre Apr 7, 2024
cf8e3a1
package dependency update
adcharre Apr 8, 2024
e1f06e8
Fix lint issues
adcharre Apr 8, 2024
edd33f2
Fix package version issue
adcharre Apr 8, 2024
c095fdd
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 8, 2024
d444c12
Improve start/end time parsing error messages
adcharre Apr 9, 2024
fcb0991
Further PR comment fixes
adcharre Apr 9, 2024
83117e4
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 9, 2024
cdd1c6a
Add nil check for data
adcharre Apr 9, 2024
34debf5
Fix lint
adcharre Apr 10, 2024
c6713e5
Add endpoint partition id
adcharre Apr 12, 2024
997cde0
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 13, 2024
0649be0
Update README.md
adcharre Apr 13, 2024
a0b617a
Fix context pass through
adcharre Apr 16, 2024
0af0a62
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 16, 2024
f1c7353
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 16, 2024
d6c6c9a
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 17, 2024
e2d2383
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 18, 2024
0dcda51
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 20, 2024
f090b47
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 22, 2024
ed8a2b0
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 23, 2024
7cb312f
Update receiver.go
adcharre Apr 29, 2024
8652490
Update receiver.go
adcharre Apr 29, 2024
e310b9a
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre Apr 30, 2024
a0a920a
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre May 2, 2024
f16319b
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre May 3, 2024
cbdbe97
Update receiver/awss3receiver/go.mod
atoulme May 3, 2024
fae5807
Update go.sum
adcharre May 3, 2024
f533058
Update go.sum
adcharre May 3, 2024
e4303d3
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre May 6, 2024
ead2cc3
Merge remote-tracking branch 'upstream/main' into awss3receiver_impl_mvp
adcharre May 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/awss3receiver_impl_mvp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Initial implementation of the AWS S3 receiver."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30750]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
12 changes: 6 additions & 6 deletions receiver/awss3receiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,26 @@ func (c Config) Validate() error {
if c.StartTime == "" {
errs = multierr.Append(errs, errors.New("start time is required"))
} else {
if err := validateTime(c.StartTime); err != nil {
if _, err := parseTime(c.StartTime); err != nil {
errs = multierr.Append(errs, errors.New("unable to parse start date"))
adcharre marked this conversation as resolved.
Show resolved Hide resolved
adcharre marked this conversation as resolved.
Show resolved Hide resolved
}
}
if c.EndTime == "" {
errs = multierr.Append(errs, errors.New("end time is required"))
} else {
if err := validateTime(c.EndTime); err != nil {
if _, err := parseTime(c.EndTime); err != nil {
errs = multierr.Append(errs, errors.New("unable to parse end time"))
adcharre marked this conversation as resolved.
Show resolved Hide resolved
}
}
return errs
}

func validateTime(str string) error {
func parseTime(str string) (time.Time, error) {
layouts := []string{"2006-01-02 15:04", time.DateOnly}
for _, layout := range layouts {
if _, err := time.Parse(layout, str); err == nil {
return nil
if t, err := time.Parse(layout, str); err == nil {
return t, nil
}
}
return errors.New("unable to parse time string")
return time.Time{}, errors.New("unable to parse time string")
}
24 changes: 0 additions & 24 deletions receiver/awss3receiver/factory_test.go

This file was deleted.

23 changes: 22 additions & 1 deletion receiver/awss3receiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3r
go 1.21

require (
github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go-v2/config v1.27.11
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.97.1-0.20240404121116-4f1a8936d26b
go.opentelemetry.io/collector/confmap v0.97.1-0.20240404121116-4f1a8936d26b
go.opentelemetry.io/collector/consumer v0.97.1-0.20240404121116-4f1a8936d26b
go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b
go.opentelemetry.io/collector/receiver v0.97.1-0.20240404121116-4f1a8936d26b
go.opentelemetry.io/collector/semconv v0.97.1-0.20240404121116-4f1a8936d26b
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/goleak v1.3.0
Expand All @@ -16,6 +22,21 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.11 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -25,6 +46,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand All @@ -39,7 +61,6 @@ require (
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240404121116-4f1a8936d26b // indirect
go.opentelemetry.io/collector/pdata v1.4.1-0.20240404121116-4f1a8936d26b // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
Expand Down
47 changes: 47 additions & 0 deletions receiver/awss3receiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions receiver/awss3receiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ status:
distributions: []
codeowners:
active: [atoulme, adcharre]
tests:
config:
starttime: "2024-01-31"
endtime: "2024-02-03"
65 changes: 60 additions & 5 deletions receiver/awss3receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,79 @@
package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver"

import (
"bytes"
"compress/gzip"
"context"
"io"
"strings"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

type awss3Receiver struct {
type awss3TraceReceiver struct {
s3Reader *s3Reader
consumer consumer.Traces
logger *zap.Logger
cancel context.CancelFunc
}

func newAWSS3TraceReceiver(_ *Config, _ consumer.Traces, _ *zap.Logger) (*awss3Receiver, error) {
return &awss3Receiver{}, nil
func newAWSS3TraceReceiver(cfg *Config, traces consumer.Traces, logger *zap.Logger) (*awss3TraceReceiver, error) {
reader, err := newS3Reader(cfg)
if err != nil {
return nil, err
}
return &awss3TraceReceiver{
s3Reader: reader,
consumer: traces,
logger: logger,
cancel: nil,
}, nil
}

func (r *awss3Receiver) Start(_ context.Context, _ component.Host) error {
func (r *awss3TraceReceiver) Start(ctx context.Context, _ component.Host) error {
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
ctx, r.cancel = context.WithCancel(ctx)
adcharre marked this conversation as resolved.
Show resolved Hide resolved
go func() {
_ = r.s3Reader.readAll(ctx, "traces", r.receiveBytes)
}()
return nil
}

func (r *awss3Receiver) Shutdown(_ context.Context) error {
func (r *awss3TraceReceiver) Shutdown(_ context.Context) error {
if r.cancel != nil {
r.cancel()
}
return nil
}

func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data []byte) error {
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
var unmarshaler ptrace.Unmarshaler
if strings.HasSuffix(key, ".gz") {
reader, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return err
}
key = strings.TrimSuffix(key, ".gz")
data, err = io.ReadAll(reader)
if err != nil {
return err
}
}

if strings.HasSuffix(key, ".json") {
unmarshaler = &ptrace.JSONUnmarshaler{}
}
if strings.HasSuffix(key, ".binpb") {
unmarshaler = &ptrace.ProtoUnmarshaler{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can maybe do something here in a future PR where the unmarshaler is dictated by config. I appreciate how this works well when you don't know what data to expect though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - at this stage just want to get the basics in and then add the ability to use extensions to unmarshall data next.

}
if unmarshaler != nil {
adcharre marked this conversation as resolved.
Show resolved Hide resolved
traces, err := unmarshaler.UnmarshalTraces(data)
if err != nil {
return err
}
return r.consumer.ConsumeTraces(ctx, traces)
}
return nil
}
Loading
Loading