Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deflake TestActivityLog_MultipleFragmentsAndSegments #20930

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ type ActivityLog struct {
// Channel for sending fragment immediately
sendCh chan struct{}

// Channel for writing fragment immediately
writeCh chan struct{}

// Channel to stop background processing
doneCh chan struct{}

Expand Down Expand Up @@ -203,6 +200,8 @@ type ActivityLogCoreConfig struct {
// Enable activity log even if the feature flag not set
ForceEnable bool

DisableFragmentWorker bool

// Do not start timers to send or persist fragments.
DisableTimers bool

Expand Down Expand Up @@ -237,7 +236,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
nodeID: hostname,
newFragmentCh: make(chan struct{}, 1),
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
writeCh: make(chan struct{}, 1), // same for full segment
doneCh: make(chan struct{}, 1),
partialMonthClientTracker: make(map[string]*activity.EntityRecord),
CensusReportInterval: time.Hour * 1,
Expand Down Expand Up @@ -1136,9 +1134,13 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup) e
// Lock already held here, can't use .PerfStandby()
// The workers need to know the current segment time.
if c.perfStandby {
go manager.perfStandbyFragmentWorker(ctx)
if !c.activityLogConfig.DisableFragmentWorker {
go manager.perfStandbyFragmentWorker(ctx)
}
} else {
go manager.activeFragmentWorker(ctx)
if !c.activityLogConfig.DisableFragmentWorker {
go manager.activeFragmentWorker(ctx)
}

// Check for any intent log, in the background
manager.computationWorkerDone = make(chan struct{})
Expand Down Expand Up @@ -1332,16 +1334,6 @@ func (a *ActivityLog) activeFragmentWorker(ctx context.Context) {
}
a.logger.Trace("writing segment on timer expiration")
writeFunc()
case <-a.writeCh:
a.logger.Trace("writing segment on request")
writeFunc()

// Reset the schedule to wait 10 minutes from this forced write.
ticker.Stop()
ticker = a.clock.NewTicker(activitySegmentInterval)

// Simpler, but ticker.Reset was introduced in go 1.15:
// ticker.Reset(activitySegmentInterval)
case currentTime := <-endOfMonthChannel:
err := a.HandleEndOfMonth(ctx, currentTime.UTC())
if err != nil {
Expand Down
23 changes: 11 additions & 12 deletions vault/activity_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,25 +651,24 @@ func TestActivityLog_availableLogs(t *testing.T) {
}
}

// TestActivityLog_MultipleFragmentsAndSegments adds 4000 clients to a fragment and saves it and reads it. The test then
// adds 4000 more clients and calls receivedFragment with 200 more entities. The current segment is saved to storage and
// read back. The test verifies that there are 5000 clients in the first segment index, then the rest in the second index.
// TestActivityLog_MultipleFragmentsAndSegments adds 4000 clients to a fragment
// and saves it and reads it. The test then adds 4000 more clients and calls
// receivedFragment with 200 more entities. The current segment is saved to
// storage and read back. The test verifies that there are 5000 clients in the
// first segment index, then the rest in the second index.
func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
core, _, _ := TestCoreUnsealedWithConfig(t, &CoreConfig{
ActivityLogConfig: ActivityLogCoreConfig{
DisableFragmentWorker: true,
DisableTimers: true,
},
})
a := core.activityLog

// enabled check is now inside AddClientToFragment
a.SetEnable(true)
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment

// Stop timers for test purposes
close(a.doneCh)
defer func() {
a.l.Lock()
a.doneCh = make(chan struct{}, 1)
a.l.Unlock()
}()

startTimestamp := a.GetStartTimestamp()
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp)
path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp)
Expand Down
6 changes: 5 additions & 1 deletion vault/logical_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/hashicorp/vault/sdk/helper/compressutil"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/helper/pluginutil"
"github.com/hashicorp/vault/sdk/helper/testhelpers/schema"
"github.com/hashicorp/vault/sdk/logical"
Expand Down Expand Up @@ -5517,7 +5518,10 @@ func TestSystemBackend_LoggersByName(t *testing.T) {
t.Run(fmt.Sprintf("loggers-by-name-%s", tc.logger), func(t *testing.T) {
t.Parallel()

core, b, _ := testCoreSystemBackend(t)
core, _, _ := TestCoreUnsealedWithConfig(t, &CoreConfig{
Logger: logging.NewVaultLogger(hclog.Trace),
})
b := core.systemBackend

// Test core overrides logging level outside of config,
// an initial delete will ensure that we an initial read
Expand Down
2 changes: 1 addition & 1 deletion vault/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestCoreWithSealAndUI(t testing.T, opts *CoreConfig) *Core {
}

func TestCoreWithSealAndUINoCleanup(t testing.T, opts *CoreConfig) *Core {
logger := logging.NewVaultLogger(log.Trace)
logger := logging.NewVaultLogger(log.Trace).Named(t.Name())
physicalBackend, err := physInmem.NewInmem(nil, logger)
if err != nil {
t.Fatal(err)
Expand Down