Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/8.16.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/v8.15.2\...v8.16.0[View commits]
- Track all bulk request response status codes {pull}13574[13574]
- Fix a concurrent map write panic in monitoring middleware {pull}14335[14335]
- Apply shutdown timeout to http server {pull}14339[14339]
- Tail-based sampling: Fix rare gc thread failure after EA hot reload causing storage not reclaimed and stuck with "storage limit reached" {pull}13574[13574]

[float]
==== Breaking Changes
Expand Down
28 changes: 27 additions & 1 deletion x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
shutdownGracePeriod = 5 * time.Second
)

var (
// gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload
gcCh = make(chan struct{}, 1)
)

// Processor is a tail-sampling event processor.
type Processor struct {
config Config
Expand Down Expand Up @@ -386,6 +391,16 @@ func (p *Processor) Run() error {
}
})
g.Go(func() error {
// Protect this goroutine from running concurrently when 2 TBS processors are active
// as badger GC is not concurrent safe.
select {
case <-p.stopping:
return nil
case gcCh <- struct{}{}:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[to reviewer] Used a channel here instead of a sync.Mutex, just to avoid blocking the goroutine shutdown in case Stop is called. I cannot imagine a case where mu.Lock() will block the shutdown, but just to err on the safe side.

}
defer func() {
<-gcCh
}()
// This goroutine is responsible for periodically garbage
// collecting the Badger value log, using the recommended
// discard ratio of 0.5.
Expand All @@ -411,7 +426,9 @@ func (p *Processor) Run() error {
})
g.Go(func() error {
// Subscribe to remotely sampled trace IDs. This is cancelled immediately when
// Stop is called. The next subscriber will pick up from the previous position.
// Stop is called. But it is possible that both old and new subscriber goroutines
// run concurrently, before the old one eventually receives the Stop call.
// The next subscriber will pick up from the previous position.
defer close(remoteSampledTraceIDs)
defer close(subscriberPositions)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -558,7 +575,13 @@ func (p *Processor) Run() error {
return nil
}

// subscriberPositionFileMutex protects the subscriber file from concurrent RW, in case of hot reload.
var subscriberPositionFileMutex sync.Mutex
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[to reviewer] This ended up as a mutex over the subscriber file only, but not the subscriber goroutine. Although this means possibly duplicate work (e.g. searching in ES) during the overlap, any position written to subscriber file is a position that is processed. Running 2 subscriber goroutines concurrently does not present any correctness issues.


func readSubscriberPosition(logger *logp.Logger, storageDir string) (pubsub.SubscriberPosition, error) {
subscriberPositionFileMutex.Lock()
defer subscriberPositionFileMutex.Unlock()

var pos pubsub.SubscriberPosition
data, err := os.ReadFile(filepath.Join(storageDir, subscriberPositionFile))
if errors.Is(err, os.ErrNotExist) {
Expand All @@ -579,6 +602,9 @@ func writeSubscriberPosition(storageDir string, pos pubsub.SubscriberPosition) e
if err != nil {
return err
}

subscriberPositionFileMutex.Lock()
defer subscriberPositionFileMutex.Unlock()
return os.WriteFile(filepath.Join(storageDir, subscriberPositionFile), data, 0644)
}

Expand Down
26 changes: 26 additions & 0 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/model/modelpb"
Expand Down Expand Up @@ -668,6 +669,31 @@ func TestStorageGC(t *testing.T) {
t.Fatal("timed out waiting for value log garbage collection")
}

func TestStorageGCConcurrency(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[to reviewer] added this test to reproduce the issue. I don't know how to better reproduce it in a simpler way other than setting a short GC interval and sleeping for a second.

// This test ensures that TBS processor does not return an error
// even when run concurrently e.g. in hot reload
if testing.Short() {
t.Skip("skipping slow test")
}

config := newTempdirConfig(t)
config.TTL = 10 * time.Millisecond
config.FlushInterval = 10 * time.Millisecond
config.StorageGCInterval = 10 * time.Millisecond

g := errgroup.Group{}
for i := 0; i < 2; i++ {
processor, err := sampling.NewProcessor(config)
require.NoError(t, err)
g.Go(processor.Run)
go func() {
time.Sleep(time.Second)
assert.NoError(t, processor.Stop(context.Background()))
}()
}
assert.NoError(t, g.Wait())
}

func TestStorageLimit(t *testing.T) {
// This test ensures that when tail sampling is configured with a hard
// storage limit, the limit is respected once the size is available.
Expand Down
Loading