Skip to content

Commit

Permalink
[exporter/prometheusremotewrite] Make WAL shutdown wait until gorouti…
Browse files Browse the repository at this point in the history
…ne exits (#37733)

#### Description

When WAL is used the shutdown doesn't wait until the go routine launched
to handle WAL terminates. This makes the shutdown non-deterministic and
cause the test failure issues linked below.

Although a bug on the component it doesn't seem to deserve a changelog
since it was detected via tests and there is no open issue around the
shutdown of the component.

#### Link to tracking issue

Fixes #9124, #37715

#### Testing

Ran the component tests locally multiple times on Windows (more likely
to surface concurrency issues).

#### Documentation

N/A
  • Loading branch information
pjanotti authored Feb 6, 2025
1 parent 007b0ae commit c33b2ad
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
3 changes: 0 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,6 @@ func Test_PushMetrics(t *testing.T) {
name = "WAL"
}
t.Run(name, func(t *testing.T) {
if useWAL {
t.Skip("Flaky test, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9124")
}
for _, tt := range tests {
if useWAL && tt.skipForWAL {
t.Skip("test not supported when using WAL")
Expand Down
12 changes: 7 additions & 5 deletions exporter/prometheusremotewriteexporter/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
)

type prweWAL struct {
mu sync.Mutex // mu protects the fields below.
wg sync.WaitGroup // wg waits for the go routines to finish.
mu sync.Mutex // mu protects the fields below.
wal *wal.Log
walConfig *WALConfig
walPath string
Expand Down Expand Up @@ -128,10 +129,8 @@ func (prwe *prweWAL) retrieveWALIndices() (err error) {
func (prwe *prweWAL) stop() error {
err := errAlreadyClosed
prwe.stopOnce.Do(func() {
prwe.mu.Lock()
defer prwe.mu.Unlock()

close(prwe.stopChan)
prwe.wg.Wait()
err = prwe.closeWAL()
})
return err
Expand All @@ -154,9 +153,12 @@ func (prwe *prweWAL) run(ctx context.Context) (err error) {

// Start the process of exporting but wait until the exporting has started.
waitUntilStartedCh := make(chan bool)
prwe.wg.Add(1)
go func() {
signalStart := func() { close(waitUntilStartedCh) }
defer prwe.wg.Done()
defer cancel()

signalStart := func() { close(waitUntilStartedCh) }
for {
select {
case <-runCtx.Done():
Expand Down
3 changes: 2 additions & 1 deletion exporter/prometheusremotewriteexporter/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func TestWAL_persist(t *testing.T) {
}

func TestExportWithWALEnabled(t *testing.T) {
t.Skip("skipping test, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37715")
cfg := &Config{
WAL: &WALConfig{
Directory: t.TempDir(),
Expand Down Expand Up @@ -191,6 +190,8 @@ func TestExportWithWALEnabled(t *testing.T) {

assert.Len(t, writeReq.Timeseries, 1)
}))
defer server.Close()

clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
cfg.ClientConfig = clientConfig
Expand Down

0 comments on commit c33b2ad

Please sign in to comment.