Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Jul 11, 2024
1 parent d8aa7a7 commit a629015
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
8 changes: 6 additions & 2 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
return it.r, nil
}

func (m *Manager) Get() (*PendingItem, error) {
// NextPending returns the next segment to be flushed. It returns nil if the
// pending list is empty.
func (m *Manager) NextPending() (*PendingItem, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.pending.Len() == 0 {
Expand All @@ -190,7 +192,7 @@ func (m *Manager) Get() (*PendingItem, error) {
m.available.Remove(el)
}
}
// If there are still no pending items, return nil.
// If the pending list is still empty return nil.
if m.pending.Len() == 0 {
return nil, nil
}
Expand All @@ -201,6 +203,8 @@ func (m *Manager) Get() (*PendingItem, error) {
return &PendingItem{Result: it.r, Writer: it.w}, nil
}

// Put resets the segment and puts it back in the available list to accept
// writes. A PendingItem should not be put back until it has been flushed.
func (m *Manager) Put(it *PendingItem) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
16 changes: 8 additions & 8 deletions pkg/storage/wal/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestManager_Append(t *testing.T) {
}

// Flush the data and broadcast that the flush is successful.
it, err := m.Get()
it, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
it.Result.SetDone(nil)
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestManager_Append(t *testing.T) {
require.NotNil(t, res)

// Flush the data, but this time broadcast an error that the flush failed.
it, err = m.Get()
it, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
it.Result.SetDone(errors.New("failed to flush"))
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestManager_Append_ErrFull(t *testing.T) {
require.Nil(t, res)
}

func TestManager_Get(t *testing.T) {
func TestManager_NextPending(t *testing.T) {
m, err := NewManager(Config{
MaxAge: DefaultMaxAge,
MaxSegments: 1,
Expand All @@ -144,7 +144,7 @@ func TestManager_Get(t *testing.T) {
require.NoError(t, err)

// There should be no items as no data has been written.
it, err := m.Get()
it, err := m.NextPending()
require.NoError(t, err)
require.Nil(t, it)

Expand All @@ -165,7 +165,7 @@ func TestManager_Get(t *testing.T) {
Entries: entries,
})
require.NoError(t, err)
it, err = m.Get()
it, err = m.NextPending()
require.NoError(t, err)
require.Nil(t, it)

Expand All @@ -181,12 +181,12 @@ func TestManager_Get(t *testing.T) {
Entries: entries,
})
require.NoError(t, err)
it, err = m.Get()
it, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)

// Should not get the same item more than once.
it, err = m.Get()
it, err = m.NextPending()
require.NoError(t, err)
require.Nil(t, it)
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestManager_Put(t *testing.T) {
require.Equal(t, 1, m.pending.Len())

// Getting the pending segment should remove it from the list.
it, err := m.Get()
it, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, it)
require.Equal(t, 9, m.available.Len())
Expand Down

0 comments on commit a629015

Please sign in to comment.