Skip to content

Commit

Permalink
[receiver/awss3receiver]: Logging improvements (#34621)
Browse files Browse the repository at this point in the history
**Description:** Enhance the logging of the AWS S3 Receiver in normal
operation to make it easier for user to debug what is happening.

**Link to tracking Issue:** #30750

**Testing:** Confirmed that logging appears when run as part of the full
collector build.

**Documentation:** N/A
  • Loading branch information
adcharre authored Aug 20, 2024
1 parent eba20f5 commit 98f6c3d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 13 deletions.
27 changes: 27 additions & 0 deletions .chloggen/awss3receiver_logging.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: 'Enhance the logging of the AWS S3 Receiver in normal operation to make it easier for user to debug what is happening.'

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30750]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
5 changes: 4 additions & 1 deletion receiver/awss3receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type awss3Receiver struct {
}

func newAWSS3Receiver(ctx context.Context, cfg *Config, telemetryType string, settings receiver.Settings, processor receiverProcessor) (*awss3Receiver, error) {
reader, err := newS3Reader(ctx, cfg)
reader, err := newS3Reader(ctx, settings.Logger, cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,6 +139,7 @@ func (r *traceReceiver) processReceivedData(ctx context.Context, rcvr *awss3Rece
rcvr.logger.Warn("Unsupported file format", zap.String("key", key))
return nil
}
rcvr.logger.Debug("Processing trace file", zap.String("key", key), zap.String("format", format))
traces, err := unmarshaler.UnmarshalTraces(data)
if err != nil {
return err
Expand Down Expand Up @@ -180,6 +181,7 @@ func (r *metricsReceiver) processReceivedData(ctx context.Context, rcvr *awss3Re
rcvr.logger.Warn("Unsupported file format", zap.String("key", key))
return nil
}
rcvr.logger.Debug("Processing metric file", zap.String("key", key), zap.String("format", format))
metrics, err := unmarshaler.UnmarshalMetrics(data)
if err != nil {
return err
Expand Down Expand Up @@ -221,6 +223,7 @@ func (r *logsReceiver) processReceivedData(ctx context.Context, rcvr *awss3Recei
rcvr.logger.Warn("Unsupported file format", zap.String("key", key))
return nil
}
rcvr.logger.Debug("Processing log file", zap.String("key", key), zap.String("format", format))
logs, err := unmarshaler.UnmarshalLogs(data)
if err != nil {
return err
Expand Down
37 changes: 26 additions & 11 deletions receiver/awss3receiver/s3reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/service/s3"
"go.uber.org/zap"
)

type s3Reader struct {
logger *zap.Logger

listObjectsClient ListObjectsAPI
getObjectClient GetObjectAPI
s3Bucket string
Expand All @@ -26,7 +29,7 @@ type s3Reader struct {

type s3ReaderDataCallback func(context.Context, string, []byte) error

func newS3Reader(ctx context.Context, cfg *Config) (*s3Reader, error) {
func newS3Reader(ctx context.Context, logger *zap.Logger, cfg *Config) (*s3Reader, error) {
listObjectsClient, getObjectClient, err := newS3Client(ctx, cfg.S3Downloader)
if err != nil {
return nil, err
Expand All @@ -44,6 +47,7 @@ func newS3Reader(ctx context.Context, cfg *Config) (*s3Reader, error) {
}

return &s3Reader{
logger: logger,
listObjectsClient: listObjectsClient,
getObjectClient: getObjectClient,
s3Bucket: cfg.S3Downloader.S3Bucket,
Expand All @@ -62,17 +66,21 @@ func (s3Reader *s3Reader) readAll(ctx context.Context, telemetryType string, dat
} else {
timeStep = time.Minute
}

s3Reader.logger.Info("Start reading telemetry", zap.Time("start_time", s3Reader.startTime), zap.Time("end_time", s3Reader.endTime))
for currentTime := s3Reader.startTime; currentTime.Before(s3Reader.endTime); currentTime = currentTime.Add(timeStep) {
select {
case <-ctx.Done():
return nil
s3Reader.logger.Error("Context cancelled, stopping reading telemetry", zap.Time("time", currentTime))
return ctx.Err()
default:
s3Reader.logger.Info("Reading telemetry", zap.Time("time", currentTime))
if err := s3Reader.readTelemetryForTime(ctx, currentTime, telemetryType, dataCallback); err != nil {
s3Reader.logger.Error("Error reading telemetry", zap.Error(err), zap.Time("time", currentTime))
return err
}
}
}
s3Reader.logger.Info("Finished reading telemetry", zap.Time("start_time", s3Reader.startTime), zap.Time("end_time", s3Reader.endTime))
return nil
}

Expand All @@ -82,23 +90,30 @@ func (s3Reader *s3Reader) readTelemetryForTime(ctx context.Context, t time.Time,
}
prefix := s3Reader.getObjectPrefixForTime(t, telemetryType)
params.Prefix = &prefix

s3Reader.logger.Debug("Finding telemetry with prefix", zap.String("prefix", prefix))
p := s3Reader.listObjectsClient.NewListObjectsV2Paginator(params)

firstPage := true
for p.HasMorePages() {
page, err := p.NextPage(ctx)
if err != nil {
return err
}
for _, obj := range page.Contents {
data, err := s3Reader.retrieveObject(ctx, *obj.Key)
if err != nil {
return err
}
if err := dataCallback(ctx, *obj.Key, data); err != nil {
return err
if firstPage && len(page.Contents) == 0 {
s3Reader.logger.Info("No telemetry found for time", zap.String("prefix", prefix), zap.Time("time", t))
} else {
for _, obj := range page.Contents {
data, err := s3Reader.retrieveObject(ctx, *obj.Key)
if err != nil {
return err
}
s3Reader.logger.Debug("Retrieved telemetry", zap.String("key", *obj.Key))
if err := dataCallback(ctx, *obj.Key, data); err != nil {
return err
}
}
}
firstPage = false
}
return nil
}
Expand Down
10 changes: 9 additions & 1 deletion receiver/awss3receiver/s3reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

var testTime = time.Date(2021, 02, 01, 17, 32, 00, 00, time.UTC)
Expand Down Expand Up @@ -85,6 +86,7 @@ func Test_s3Reader_getObjectPrefixForTime(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reader := s3Reader{
logger: zap.NewNop(),
s3Prefix: test.args.s3Prefix,
s3Partition: test.args.s3Partition,
filePrefix: test.args.filePrefix,
Expand Down Expand Up @@ -166,6 +168,7 @@ func Test_readTelemetryForTime(t *testing.T) {
Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))),
}, nil
}),
logger: zap.NewNop(),
s3Bucket: "bucket",
s3Partition: "minute",
s3Prefix: "",
Expand Down Expand Up @@ -214,6 +217,7 @@ func Test_readTelemetryForTime_GetObjectError(t *testing.T) {
require.Equal(t, testKey, *params.Key)
return nil, testError
}),
logger: zap.NewNop(),
s3Bucket: "bucket",
s3Partition: "minute",
s3Prefix: "",
Expand Down Expand Up @@ -248,6 +252,7 @@ func Test_readTelemetryForTime_ListObjectsNoResults(t *testing.T) {
Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))),
}, nil
}),
logger: zap.NewNop(),
s3Bucket: "bucket",
s3Partition: "minute",
s3Prefix: "",
Expand Down Expand Up @@ -294,6 +299,7 @@ func Test_readTelemetryForTime_NextPageError(t *testing.T) {
Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))),
}, nil
}),
logger: zap.NewNop(),
s3Bucket: "bucket",
s3Partition: "minute",
s3Prefix: "",
Expand Down Expand Up @@ -335,6 +341,7 @@ func Test_readAll(t *testing.T) {
Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))),
}, nil
}),
logger: zap.NewNop(),
s3Bucket: "bucket",
s3Prefix: "",
s3Partition: "minute",
Expand Down Expand Up @@ -381,6 +388,7 @@ func Test_readAll_ContextDone(t *testing.T) {
Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))),
}, nil
}),
logger: zap.NewNop(),
s3Bucket: "bucket",
s3Prefix: "",
s3Partition: "minute",
Expand All @@ -397,6 +405,6 @@ func Test_readAll_ContextDone(t *testing.T) {
dataCallbackKeys = append(dataCallbackKeys, key)
return nil
})
require.NoError(t, err)
require.Error(t, err)
require.Len(t, dataCallbackKeys, 0)
}

0 comments on commit 98f6c3d

Please sign in to comment.