Skip to content

Commit

Permalink
[receiver/awss3receiver] Initial implementation (#32222)
Browse files Browse the repository at this point in the history
**Description:** This is the initial implementation of the AWS S3
receiver. The receiver can load trace from an S3 bucket starting at the
configured time until the stop time. Json and protobuf formats are
supported along with gzip compression.

**Link to tracking Issue:** #30750

**Testing:** Unit tests added and read real trace from an S3 bucket.

**Documentation:** None added

---------

Co-authored-by: Antoine Toulme <antoine@toulme.name>
  • Loading branch information
adcharre and atoulme authored May 7, 2024
1 parent cdefd69 commit 4ae20e9
Show file tree
Hide file tree
Showing 15 changed files with 979 additions and 75 deletions.
27 changes: 27 additions & 0 deletions .chloggen/awss3receiver_impl_mvp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Initial implementation of the AWS S3 receiver."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30750]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
25 changes: 13 additions & 12 deletions receiver/awss3receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ Receiver for retrieving trace previously stored in S3 by the [AWS S3 Exporter](.
## Configuration
The following exporter configuration parameters are supported.

| Name | Description | Default | Required |
|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|----------|
| `starttime` | The time at which to start retrieving data. | | Required |
| `endtime` | The time at which to stop retrieving data. | | Required |
| `s3downloader:` | | | |
| `region` | AWS region. | "us-east-1" | Optional |
| `s3_bucket` | S3 bucket | | Required |
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | Required |
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" | Optional |
| `file_prefix` | file prefix defined by user | | Optional |
| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | Optional |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | Optional |
| Name | Description | Default | Required |
|:------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|----------|
| `starttime` | The time at which to start retrieving data. | | Required |
| `endtime` | The time at which to stop retrieving data. | | Required |
| `s3downloader:` | | | |
| `region` | AWS region. | "us-east-1" | Optional |
| `s3_bucket` | S3 bucket | | Required |
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | Required |
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" | Optional |
| `file_prefix` | file prefix defined by user | | Optional |
| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | Optional |
| `endpoint_partition_id` | partition id to use if `endpoint` is specified. | "aws" | Optional |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | Optional |

### Time format for `starttime` and `endtime`
The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data.
Expand Down
51 changes: 32 additions & 19 deletions receiver/awss3receiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package awss3receiver // import "github.com/open-telemetry/opentelemetry-collect

import (
"errors"
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -14,13 +16,14 @@ import (
// S3DownloaderConfig contains aws s3 downloader related config to controls things
// like bucket, prefix, batching, connections, retries, etc.
type S3DownloaderConfig struct {
Region string `mapstructure:"region"`
S3Bucket string `mapstructure:"s3_bucket"`
S3Prefix string `mapstructure:"s3_prefix"`
S3Partition string `mapstructure:"s3_partition"`
FilePrefix string `mapstructure:"file_prefix"`
Endpoint string `mapstructure:"endpoint"`
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
Region string `mapstructure:"region"`
S3Bucket string `mapstructure:"s3_bucket"`
S3Prefix string `mapstructure:"s3_prefix"`
S3Partition string `mapstructure:"s3_partition"`
FilePrefix string `mapstructure:"file_prefix"`
Endpoint string `mapstructure:"endpoint"`
EndpointPartitionID string `mapstructure:"endpoint_partition_id"`
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
}

// Config defines the configuration for the file receiver.
Expand All @@ -30,11 +33,17 @@ type Config struct {
EndTime string `mapstructure:"endtime"`
}

const (
S3PartitionMinute = "minute"
S3PartitionHour = "hour"
)

func createDefaultConfig() component.Config {
return &Config{
S3Downloader: S3DownloaderConfig{
Region: "us-east-1",
S3Partition: "minute",
Region: "us-east-1",
S3Partition: S3PartitionMinute,
EndpointPartitionID: "aws",
},
}
}
Expand All @@ -44,29 +53,33 @@ func (c Config) Validate() error {
if c.S3Downloader.S3Bucket == "" {
errs = multierr.Append(errs, errors.New("bucket is required"))
}
if c.S3Downloader.S3Partition != S3PartitionHour && c.S3Downloader.S3Partition != S3PartitionMinute {
errs = multierr.Append(errs, errors.New("s3_partition must be either 'hour' or 'minute'"))
}
if c.StartTime == "" {
errs = multierr.Append(errs, errors.New("start time is required"))
errs = multierr.Append(errs, errors.New("starttime is required"))
} else {
if err := validateTime(c.StartTime); err != nil {
errs = multierr.Append(errs, errors.New("unable to parse start date"))
if _, err := parseTime(c.StartTime, "starttime"); err != nil {
errs = multierr.Append(errs, err)
}
}
if c.EndTime == "" {
errs = multierr.Append(errs, errors.New("end time is required"))
errs = multierr.Append(errs, errors.New("endtime is required"))
} else {
if err := validateTime(c.EndTime); err != nil {
errs = multierr.Append(errs, errors.New("unable to parse end time"))
if _, err := parseTime(c.EndTime, "endtime"); err != nil {
errs = multierr.Append(errs, err)
}
}
return errs
}

func validateTime(str string) error {
func parseTime(timeStr, configName string) (time.Time, error) {
layouts := []string{"2006-01-02 15:04", time.DateOnly}

for _, layout := range layouts {
if _, err := time.Parse(layout, str); err == nil {
return nil
if t, err := time.Parse(layout, timeStr); err == nil {
return t, nil
}
}
return errors.New("unable to parse time string")
return time.Time{}, fmt.Errorf("unable to parse %s (%s), accepted formats: %s", configName, timeStr, strings.Join(layouts, ", "))
}
26 changes: 14 additions & 12 deletions receiver/awss3receiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ func TestLoadConfig_Validate_Invalid(t *testing.T) {
func TestConfig_Validate_Valid(t *testing.T) {
cfg := Config{
S3Downloader: S3DownloaderConfig{
Region: "",
S3Bucket: "abucket",
S3Prefix: "",
S3Partition: "",
FilePrefix: "",
Endpoint: "",
S3ForcePathStyle: false,
Region: "",
S3Bucket: "abucket",
S3Prefix: "",
S3Partition: "minute",
FilePrefix: "",
Endpoint: "",
EndpointPartitionID: "aws",
S3ForcePathStyle: false,
},
StartTime: "2024-01-01",
EndTime: "2024-01-01",
Expand All @@ -48,19 +49,20 @@ func TestLoadConfig(t *testing.T) {
}{
{
id: component.NewIDWithName(metadata.Type, ""),
errorMessage: "bucket is required; start time is required; end time is required",
errorMessage: "bucket is required; starttime is required; endtime is required",
},
{
id: component.NewIDWithName(metadata.Type, "1"),
errorMessage: "unable to parse start date; unable to parse end time",
errorMessage: "s3_partition must be either 'hour' or 'minute'; unable to parse starttime (a date), accepted formats: 2006-01-02 15:04, 2006-01-02; unable to parse endtime (2024-02-03a), accepted formats: 2006-01-02 15:04, 2006-01-02",
},
{
id: component.NewIDWithName(metadata.Type, "2"),
expected: &Config{
S3Downloader: S3DownloaderConfig{
Region: "us-east-1",
S3Bucket: "abucket",
S3Partition: "minute",
Region: "us-east-1",
S3Bucket: "abucket",
S3Partition: "minute",
EndpointPartitionID: "aws",
},
StartTime: "2024-01-31 15:00",
EndTime: "2024-02-03",
Expand Down
4 changes: 2 additions & 2 deletions receiver/awss3receiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ func NewFactory() receiver.Factory {
)
}

func createTracesReceiver(_ context.Context, settings receiver.CreateSettings, cc component.Config, consumer consumer.Traces) (receiver.Traces, error) {
return newAWSS3TraceReceiver(cc.(*Config), consumer, settings.Logger)
func createTracesReceiver(ctx context.Context, settings receiver.CreateSettings, cc component.Config, consumer consumer.Traces) (receiver.Traces, error) {
return newAWSS3TraceReceiver(ctx, cc.(*Config), consumer, settings.Logger)
}
24 changes: 0 additions & 24 deletions receiver/awss3receiver/factory_test.go

This file was deleted.

23 changes: 22 additions & 1 deletion receiver/awss3receiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3r
go 1.21.0

require (
github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go-v2/config v1.27.11
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.100.0
go.opentelemetry.io/collector/confmap v0.100.0
go.opentelemetry.io/collector/consumer v0.100.0
go.opentelemetry.io/collector/pdata v1.7.0
go.opentelemetry.io/collector/receiver v0.100.0
go.opentelemetry.io/collector/semconv v0.100.0
go.opentelemetry.io/otel/metric v1.26.0
go.opentelemetry.io/otel/trace v1.26.0
go.uber.org/goleak v1.3.0
Expand All @@ -16,6 +22,21 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.11 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -24,6 +45,7 @@ require (
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand All @@ -38,7 +60,6 @@ require (
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.100.0 // indirect
go.opentelemetry.io/collector/pdata v1.7.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.48.0 // indirect
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
Expand Down
Loading

0 comments on commit 4ae20e9

Please sign in to comment.