Skip to content

Commit

Permalink
refactor(general): add epoch.Manager.MaybeGenerateRangeCheckpoint (#3727
Browse files Browse the repository at this point in the history
)

* epoch manager: factor out getRangeToCompact
* epoch manager: add epoch.Manager.MaybeGenerateRangeCheckpoint
* test epoch.Manager.MaybeGenerateRangeCheckpoint
  • Loading branch information
julio-lopez authored Mar 22, 2024
1 parent 9c99b8a commit fdb6d3c
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 9 deletions.
55 changes: 46 additions & 9 deletions internal/epoch/epoch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,19 +593,38 @@ func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *CurrentSna
return nil
}

func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) {
latestSettled := cs.WriteEpoch - numUnsettledEpochs
if latestSettled < 0 {
return
// MaybeGenerateRangeCheckpoint may create a new range index for all the
// individual epochs covered by the new range. If there are not enough epochs
// to create a new range, then a range index is not created.
func (e *Manager) MaybeGenerateRangeCheckpoint(ctx context.Context) error {
p, err := e.getParameters(ctx)
if err != nil {
return err
}

firstNonRangeCompacted := 0
if len(cs.LongestRangeCheckpointSets) > 0 {
firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1
cs, err := e.committedState(ctx, 0)
if err != nil {
return err
}

if latestSettled-firstNonRangeCompacted < p.FullCheckpointFrequency {
e.log.Debugf("not generating range checkpoint")
latestSettled, firstNonRangeCompacted, compact := getRangeToCompact(cs, *p)
if !compact {
e.log.Debug("not generating range checkpoint")

return nil
}

if err := e.generateRangeCheckpointFromCommittedState(ctx, cs, firstNonRangeCompacted, latestSettled); err != nil {
return errors.Wrap(err, "unable to generate full checkpoint, performance will be affected")
}

return nil
}

func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) {
latestSettled, firstNonRangeCompacted, compact := getRangeToCompact(cs, *p)
if !compact {
e.log.Debug("not generating range checkpoint")

return
}
Expand All @@ -624,6 +643,24 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs
})
}

func getRangeToCompact(cs CurrentSnapshot, p Parameters) (low, high int, compactRange bool) {
latestSettled := cs.WriteEpoch - numUnsettledEpochs
if latestSettled < 0 {
return -1, -1, false
}

firstNonRangeCompacted := 0
if rangeSetsLen := len(cs.LongestRangeCheckpointSets); rangeSetsLen > 0 {
firstNonRangeCompacted = cs.LongestRangeCheckpointSets[rangeSetsLen-1].MaxEpoch + 1
}

if latestSettled-firstNonRangeCompacted < p.FullCheckpointFrequency {
return -1, -1, false
}

return latestSettled, firstNonRangeCompacted, true
}

func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) {
// TODO: implement me
_ = cs
Expand Down
218 changes: 218 additions & 0 deletions internal/epoch/epoch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,224 @@ func TestMaybeCompactSingleEpoch(t *testing.T) {
require.Len(t, cs.SingleEpochCompactionSets, newestEpochToCompact)
}

func TestMaybeGenerateRangeCheckpoint_Empty(t *testing.T) {
t.Parallel()

te := newTestEnv(t)
ctx := testlogging.Context(t)

// this should be a no-op
err := te.mgr.MaybeGenerateRangeCheckpoint(ctx)

require.NoError(t, err)
}

func TestMaybeGenerateRangeCheckpoint_GetParametersError(t *testing.T) {
t.Parallel()

te := newTestEnv(t)
ctx := testlogging.Context(t)

paramsError := errors.New("no parameters error")
te.mgr.paramProvider = faultyParamsProvider{err: paramsError}

err := te.mgr.MaybeGenerateRangeCheckpoint(ctx)

require.Error(t, err)
require.ErrorIs(t, err, paramsError)
}

func TestMaybeGenerateRangeCheckpoint_FailToReadState(t *testing.T) {
t.Parallel()

te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)

ctx, cancel := context.WithCancel(ctx)

cancel()

err := te.mgr.MaybeGenerateRangeCheckpoint(ctx)

require.Error(t, err)
}

func TestMaybeGenerateRangeCheckpoint_CompactionError(t *testing.T) {
t.Parallel()

te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)

p, err := te.mgr.getParameters(ctx)
require.NoError(t, err)

epochsToWrite := p.FullCheckpointFrequency + 3
idxCount := p.GetEpochAdvanceOnCountThreshold()

var k int

// Create sufficient indexes blobs and move clock forward to advance epoch.
for j := 0; j < epochsToWrite; j++ {
for i := 0; i < idxCount; i++ {
if i == idxCount-1 {
// Advance the time so that the difference in times for writes will force
// new epochs.
te.ft.Advance(p.MinEpochDuration + 1*time.Hour)
}

te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k))
k++
}

err = te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)

err = te.mgr.Refresh(ctx)
require.NoError(t, err)
}

cs, err := te.mgr.Current(ctx)

require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)

compactionError := errors.New("test compaction error")
te.mgr.compact = func(context.Context, []blob.ID, blob.ID) error {
return compactionError
}

err = te.mgr.MaybeGenerateRangeCheckpoint(ctx)

require.Error(t, err)
require.ErrorIs(t, err, compactionError)
}

func TestMaybeGenerateRangeCheckpoint_FromUncompactedEpochs(t *testing.T) {
t.Parallel()

te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)

p, err := te.mgr.getParameters(ctx)
require.NoError(t, err)

var k int

epochsToWrite := p.FullCheckpointFrequency + 3
idxCount := p.GetEpochAdvanceOnCountThreshold()
// Create sufficient indexes blobs and move clock forward to advance epoch.
for j := 0; j < epochsToWrite; j++ {
for i := 0; i < idxCount; i++ {
if i == idxCount-1 {
// Advance the time so that the difference in times for writes will force
// new epochs.
te.ft.Advance(p.MinEpochDuration + 1*time.Hour)
}

te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k))
}

err = te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)

err = te.mgr.Refresh(ctx)
require.NoError(t, err)
}

cs, err := te.mgr.Current(ctx)

require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
require.Empty(t, cs.LongestRangeCheckpointSets)

err = te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.NoError(t, err)

err = te.mgr.Refresh(ctx)
require.NoError(t, err)

cs, err = te.mgr.Current(ctx)

require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
require.Len(t, cs.LongestRangeCheckpointSets, 1)
}

func TestMaybeGenerateRangeCheckpoint_FromCompactedEpochs(t *testing.T) {
t.Parallel()

te := newTestEnv(t)
te.mgr.allowCleanupWritesOnIndexLoad = false
ctx := testlogging.Context(t)

p, err := te.mgr.getParameters(ctx)
require.NoError(t, err)

var k int

epochsToWrite := p.FullCheckpointFrequency + 3
idxCount := p.GetEpochAdvanceOnCountThreshold()
// Create sufficient indexes blobs and move clock forward to advance epoch.
for j := 0; j < epochsToWrite; j++ {
for i := 0; i < idxCount; i++ {
if i == idxCount-1 {
// Advance the time so that the difference in times for writes will force
// new epochs.
te.ft.Advance(p.MinEpochDuration + 1*time.Hour)
}

te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(k))
}

err = te.mgr.MaybeAdvanceWriteEpoch(ctx)
require.NoError(t, err)

err = te.mgr.Refresh(ctx)
require.NoError(t, err)
}

cs, err := te.mgr.Current(ctx)
require.NoError(t, err)

require.Equal(t, epochsToWrite, cs.WriteEpoch)

// perform single-epoch compaction for settled epochs
newestEpochToCompact := cs.WriteEpoch - numUnsettledEpochs + 1
for j := 0; j < newestEpochToCompact; j++ {
err = te.mgr.MaybeCompactSingleEpoch(ctx)
require.NoError(t, err)

err = te.mgr.Refresh(ctx) // force state refresh
require.NoError(t, err)

cs, err = te.mgr.Current(ctx)
require.NoError(t, err)

require.Len(t, cs.SingleEpochCompactionSets, j+1)
}

cs, err = te.mgr.Current(ctx)

require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
require.Empty(t, cs.LongestRangeCheckpointSets)

err = te.mgr.MaybeGenerateRangeCheckpoint(ctx)
require.NoError(t, err)

err = te.mgr.Refresh(ctx)
require.NoError(t, err)

cs, err = te.mgr.Current(ctx)

require.NoError(t, err)
require.Equal(t, epochsToWrite, cs.WriteEpoch)
require.Len(t, cs.LongestRangeCheckpointSets, 1)
}

func TestValidateParameters(t *testing.T) {
cases := []struct {
p Parameters
Expand Down

0 comments on commit fdb6d3c

Please sign in to comment.