Skip to content

Commit

Permalink
[chore] prometheusremotewrite: simplify wal initialization
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jan 17, 2024
1 parent e5b8f44 commit 41d9456
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 30 deletions.
8 changes: 1 addition & 7 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,8 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
SendMetadata: cfg.SendMetadata,
},
}
if cfg.WAL == nil {
return prwe, nil
}

prwe.wal, err = newWAL(cfg.WAL, prwe.export)
if err != nil {
return nil, err
}
prwe.wal = newWAL(cfg.WAL, prwe.export)
return prwe, nil
}

Expand Down
7 changes: 3 additions & 4 deletions exporter/prometheusremotewriteexporter/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func (wc *WALConfig) truncateFrequency() time.Duration {
return defaultWALTruncateFrequency
}

func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.WriteRequest) error) (*prweWAL, error) {
func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.WriteRequest) error) *prweWAL {
if walConfig == nil {
// There are cases for which the WAL can be disabled.
// TODO: Perhaps log that the WAL wasn't enabled.
return nil, errNilConfig
return nil
}

return &prweWAL{
Expand All @@ -72,7 +72,7 @@ func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.Wri
stopChan: make(chan struct{}),
rWALIndex: &atomic.Uint64{},
wWALIndex: &atomic.Uint64{},
}, nil
}
}

func (wc *WALConfig) createWAL() (*wal.Log, string, error) {
Expand All @@ -90,7 +90,6 @@ func (wc *WALConfig) createWAL() (*wal.Log, string, error) {
var (
errAlreadyClosed = errors.New("already closed")
errNilWAL = errors.New("wal is nil")
errNilConfig = errors.New("expecting a non-nil configuration")
)

// retrieveWALIndices queries the WriteAheadLog for its current first and last indices.
Expand Down
31 changes: 12 additions & 19 deletions exporter/prometheusremotewriteexporter/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ func doNothingExportSink(_ context.Context, reqL []*prompb.WriteRequest) error {

func TestWALCreation_nilConfig(t *testing.T) {
config := (*WALConfig)(nil)
pwal, err := newWAL(config, doNothingExportSink)
require.Equal(t, err, errNilConfig)
pwal := newWAL(config, doNothingExportSink)
require.Nil(t, pwal)
}

func TestWALCreation_nonNilConfig(t *testing.T) {
config := &WALConfig{Directory: t.TempDir()}
pwal, err := newWAL(config, doNothingExportSink)
pwal := newWAL(config, doNothingExportSink)
require.NotNil(t, pwal)
assert.Nil(t, err)
assert.NoError(t, pwal.stop())
}

Expand Down Expand Up @@ -80,27 +78,24 @@ func TestWALStopManyTimes(t *testing.T) {
TruncateFrequency: 60 * time.Microsecond,
BufferSize: 1,
}
pwal, err := newWAL(config, doNothingExportSink)
require.Nil(t, err)
pwal := newWAL(config, doNothingExportSink)
require.NotNil(t, pwal)

// Ensure that invoking .stop() multiple times doesn't cause a panic, but actually
// First close should NOT return an error.
err = pwal.stop()
require.Nil(t, err)
require.NoError(t, pwal.stop())
for i := 0; i < 4; i++ {
// Every invocation to .stop() should return an errAlreadyClosed.
err = pwal.stop()
require.Equal(t, err, errAlreadyClosed)
require.ErrorIs(t, pwal.stop(), errAlreadyClosed)
}
}

func TestWAL_persist(t *testing.T) {
// Unit tests that requests written to the WAL persist.
config := &WALConfig{Directory: t.TempDir()}

pwal, err := newWAL(config, doNothingExportSink)
require.Nil(t, err)
pwal := newWAL(config, doNothingExportSink)
require.NotNil(t, pwal)

// 1. Write out all the entries.
reqL := []*prompb.WriteRequest{
Expand All @@ -127,27 +122,25 @@ func TestWAL_persist(t *testing.T) {
}

ctx := context.Background()
err = pwal.retrieveWALIndices()
require.Nil(t, err)
require.NoError(t, pwal.retrieveWALIndices())
t.Cleanup(func() {
assert.NoError(t, pwal.stop())
})

err = pwal.persistToWAL(reqL)
require.Nil(t, err)
require.NoError(t, pwal.persistToWAL(reqL))

// 2. Read all the entries from the WAL itself, guided by the indices available,
// and ensure that they are exactly in order as we'd expect them.
wal := pwal.wal
start, err := wal.FirstIndex()
require.Nil(t, err)
require.NoError(t, err)
end, err := wal.LastIndex()
require.Nil(t, err)
require.NoError(t, err)

var reqLFromWAL []*prompb.WriteRequest
for i := start; i <= end; i++ {
req, err := pwal.readPrompbFromWAL(ctx, i)
require.Nil(t, err)
require.NoError(t, err)
reqLFromWAL = append(reqLFromWAL, req)
}

Expand Down

0 comments on commit 41d9456

Please sign in to comment.