Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compression in the awskinesisexporter and thread safe #5663

Merged
merged 6 commits into from
Aug 10, 2023

Conversation

phoebe-canva
Copy link

@phoebe-canva phoebe-canva commented Aug 8, 2023

Description

There are two main changes in this PR:

  • Fixing a bug relating to the compression with EOF markers added in
  • Adding sync.pool for the compressor so that it is thread safe

1st bug

When checking the output of the gzip compressed data, there are errors such as:

EOFError: Compressed file ended before the end-of-stream marker was reached

Received unexpected error: unexpected EOF

This affects zlib, flate and gzip

This is because the Flush does not added the EOF markers. Close() is required be called to mark the EOF.

Fix: Changed Flush to Close so that EOF markers are added to end of compressed payload

2nd bug

There is a race condition with the compressor because the queued retry allows multiple consumers - see here for documentation

Hence, when there is a retry in the kinesis exporter, it can cause a race condition as the concurrent consumers are reusing the same compressor object

│ compress/flate.(*huffmanBitWriter).indexTokens(0xc0000523c0, {0xc04ed22000, 0x4e4, 0xd?})                                                                                                                                                                                     │
│     GOROOT/src/compress/flate/huffman_bit_writer.go:551 +0x2a5                                                                                                                                                                                                                │
│ compress/flate.(*huffmanBitWriter).writeBlockDynamic(0xc0000523c0, {0xc04ed22000?, 0xc0580b75e0?, 0x6b90a2?}, 0x0?, {0xc04ecac000, 0x31aa, 0xffff})                                                                                                                           │
│     GOROOT/src/compress/flate/huffman_bit_writer.go:509 +0xc5                                                                                                                                                                                                                 │
│ compress/flate.(*compressor).encSpeed(0xc04ec02000)                                                                                                                                                                                                                           │
│     GOROOT/src/compress/flate/deflate.go:362 +0x259                                                                                                                                                                                                                           │
│ compress/flate.(*compressor).close(0xc04ec02000)                                                                                                                                                                                                                              │
│     GOROOT/src/compress/flate/deflate.go:638 +0x76                                                                                                                                                                                                                            │
│ compress/flate.(*Writer).Close(...)                                                                                                                                                                                                                                           │
│     GOROOT/src/compress/flate/deflate.go:730                                                                                                                                                                                                                                  │
│ compress/gzip.(*Writer).Close(0xc000a548f0)                                                                                                                                                                                                                                   │
│     GOROOT/src/compress/gzip/gzip.go:242 +0x87                                                                                                                                                                                                                                │
│ github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress.(*compressor).Do(0xc0005205d0, {0xc07ed42000, 0x31aa, 0x31aa})                                                                                                        │
│     external/com_github_open_telemetry_opentelemetry_collector_contrib_exporter_awskinesisexporter/internal/compress/compresser.go:84 +0x99                                                                                                                                   │
│ github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/batch.(*Batch).AddRecord(0xc07a1f6740, {0xc07ed42000?, 0xc0580b7748?, 0xc0580b7748?}, {0xc07e7cce70, 0x24})                                                                    │
│     external/com_github_open_telemetry_opentelemetry_collector_contrib_exporter_awskinesisexporter/internal/batch/batch.go:92 +0x57                                                                                                                                           │
│ github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/batch.(*batchMarshaller).Logs(0xc00051e460, {0x49bfa0?})                                                                                                                       │
│     external/com_github_open_telemetry_opentelemetry_collector_contrib_exporter_awskinesisexporter/internal/batch/encode_marshaler.go:64 +0x1ef                                                                                                                               │
│ github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter.Exporter.consumeLogs({{0x4575ce0?, 0xc0002ee940?}, {0x4580b10?, 0xc00051e460?}}, {0x4594908, 0xc07625acc0}, {0x4594940?})                                                               │
│     external/com_github_open_telemetry_opentelemetry_collector_contrib_exporter_awskinesisexporter/exporter.go:152 +0x48                                                                                                                                                      │
│ go.opentelemetry.io/collector/exporter/exporterhelper.(*logsRequest).Export(0x4594940?, {0x4594908?, 0xc07625acc0?})                                                                                                                                                          │
│     external/io_opentelemetry_go_collector_exporter/exporterhelper/logs.go:65 +0x34                                                                                                                                                                                           │
│ go.opentelemetry.io/collector/exporter/exporterhelper.(*timeoutSender).send(0xc000443778, {0x45a4208, 0xc07ea37440})                                                                                                                                                          │
│     external/io_opentelemetry_go_collector_exporter/exporterhelper/common.go:208 +0x96                                                                                                                                                                                        │
│ go.opentelemetry.io/collector/exporter/exporterhelper.(*retrySender).send(0xc00068f900, {0x45a4208, 0xc07ea37440})                                                                                                                                                            │
│     external/io_opentelemetry_go_collector_exporter/exporterhelper/queued_retry.go:394 +0x596                                                                                                                                                                                 │
│ go.opentelemetry.io/collector/exporter/exporterhelper.(*logsExporterWithObservability).send(0xc0003c6b10, {0x45a4208, 0xc07ea37440})                                                                                                                                          │
│     external/io_opentelemetry_go_collector_exporter/exporterhelper/logs.go:135 +0x88                                                                                                                                                                                          │
│ go.opentelemetry.io/collector/exporter/exporterhelper.(*queuedRetrySender).start.func1({0x45a4208, 0xc07ea37440})                                                                                                                                                             │
│     external/io_opentelemetry_go_collector_exporter/exporterhelper/queued_retry.go:205 +0x39                                                                                                                                                                                  │
│ go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*boundedMemoryQueue).StartConsumers.func1()                                                                                                                                                                   │
│     external/io_opentelemetry_go_collector_exporter/exporterhelper/internal/bounded_memory_queue.go:58 +0xb6                                                                                                                                                                  │
│ created by go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*boundedMemoryQueue).StartConsumers                                                                                                                                                                │
│     external/io_opentelemetry_go_collector_exporter/exporterhelper/internal/bounded_memory_queue.go:53 +0x45

Fix: Used sync.pool to instantiate the compress writers so that it is goroutine safe

@phoebe-canva phoebe-canva changed the title added in compression and sync pool Use sync.pool and fix compression Aug 8, 2023
@phoebe-canva phoebe-canva marked this pull request as ready for review August 8, 2023 08:38
@phoebe-canva phoebe-canva changed the title Use sync.pool and fix compression Fix compression in the awskinesisexporter and thread safe Aug 8, 2023
compressionPool: sync.Pool{
New: func() any {
w, err := gzip.NewWriterLevel(nil, gzip.BestSpeed)
if err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return nil on errors? NewWriterLevel return nil on error itself but I think it's safer if we do the same too

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! i forgot that! thanks for reminding, will fix

Copy link

@sasanrose sasanrose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM in general, just left one small comment. Will approve though.

Copy link

@foadnh foadnh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 LGTM

@foadnh
Copy link

foadnh commented Aug 9, 2023

A suggestion:
Break it down into 2 PRs. 1 PR per bug. Upstream is somewhat sensitive about PR being small and addressing a single bug!
The compressor pool should be super easy to merge into upstream. The error message is self-explanatory.

@phoebe-canva
Copy link
Author

A suggestion: Break it down into 2 PRs. 1 PR per bug. Upstream is somewhat sensitive about PR being small and addressing a single bug! The compressor pool should be super easy to merge into upstream. The error message is self-explanatory.

Sounds good! Thanks Foad! I will merge this in so i can build the otel collector and have 2 separate PRs to raise for upstream

@phoebe-canva phoebe-canva merged commit e9acd9e into main Aug 10, 2023
@phoebe-canva phoebe-canva deleted the phoebe-fix-awskinesisexporter-compression branch August 10, 2023 04:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants