Skip to content

Commit

Permalink
kvserver: add cluster setting to disable queues
Browse files Browse the repository at this point in the history
This patch introduces a new set of cluster settings to disable work
queues in the `Store` object. It makes use of the underlying `disabled`
property of the `BaseQueue`.

Fixes: #102031

Release note: None
  • Loading branch information
aadityasondhi committed Jun 7, 2023
1 parent 618a893 commit 2b91c38
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 60 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
Expand Down Expand Up @@ -100,6 +101,7 @@ func newConsistencyQueue(store *Store) *consistencyQueue {
pending: store.metrics.ConsistencyQueuePending,
processingNanos: store.metrics.ConsistencyQueueProcessingNanos,
processTimeoutFunc: makeRateLimitedTimeoutFunc(consistencyCheckRate),
disabledConfig: kvserverbase.ConsistencyQueueEnabled,
},
)
return q
Expand Down
45 changes: 45 additions & 0 deletions pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,51 @@ var ReplicateQueueEnabled = settings.RegisterBoolSetting(
true,
)

// ReplicaGCQueueEnabled is a setting that controls whether the replica GC queue
// is enabled.
var ReplicaGCQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.replica_gc_queue.enabled",
"whether the replica gc queue is enabled",
true,
)

// RaftLogQueueEnabled is a setting that controls whether the raft log queue is
// enabled.
var RaftLogQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.raft_log_queue.enabled",
"whether the raft log queue is enabled",
true,
)

// RaftSnapshotQueueEnabled is a setting that controls whether the raft snapshot
// queue is enabled.
var RaftSnapshotQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.raft_snapshot_queue.enabled",
"whether the raft snapshot queue is enabled",
true,
)

// ConsistencyQueueEnabled is a setting that controls whether the consistency
// queue is enabled.
var ConsistencyQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.consistency_queue.enabled",
"whether the consistency queue is enabled",
true,
)

// TimeSeriesMaintenanceQueueEnabled is a setting that controls whether the
// timeseries maintenance queue is enabled.
var TimeSeriesMaintenanceQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.timeseries_maintenance_queue.enabled",
"whether the timeseries maintenance queue is enabled",
true,
)

// SplitQueueEnabled is a setting that controls whether the split queue is
// enabled.
var SplitQueueEnabled = settings.RegisterBoolSetting(
Expand Down
14 changes: 1 addition & 13 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,15 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
pending: store.metrics.MergeQueuePending,
processingNanos: store.metrics.MergeQueueProcessingNanos,
purgatory: store.metrics.MergeQueuePurgatory,
disabledConfig: kvserverbase.MergeQueueEnabled,
},
)
return mq
}

func (mq *mergeQueue) enabled() bool {
st := mq.store.ClusterSettings()
return kvserverbase.MergeQueueEnabled.Get(&st.SV)
}

func (mq *mergeQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, confReader spanconfig.StoreReader,
) (shouldQueue bool, priority float64) {
if !mq.enabled() {
return false, 0
}

desc := repl.Desc()

if desc.EndKey.Equal(roachpb.RKeyMax) {
Expand Down Expand Up @@ -243,10 +235,6 @@ func (mq *mergeQueue) requestRangeStats(
func (mq *mergeQueue) process(
ctx context.Context, lhsRepl *Replica, confReader spanconfig.StoreReader,
) (processed bool, err error) {
if !mq.enabled() {
log.VEventf(ctx, 2, "skipping merge: queue has been disabled")
return false, nil
}

lhsDesc := lhsRepl.Desc()
lhsStats := lhsRepl.GetMVCCStats()
Expand Down
15 changes: 1 addition & 14 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func newMVCCGCQueue(store *Store) *mvccGCQueue {
failures: store.metrics.MVCCGCQueueFailures,
pending: store.metrics.MVCCGCQueuePending,
processingNanos: store.metrics.MVCCGCQueueProcessingNanos,
disabledConfig: kvserverbase.MVCCGCQueueEnabled,
},
)
return mgcq
Expand Down Expand Up @@ -232,22 +233,13 @@ func (r mvccGCQueueScore) String() string {
humanizeutil.IBytes(r.GCByteAge), humanizeutil.IBytes(r.ExpMinGCByteAgeReduction))
}

func (mgcq *mvccGCQueue) enabled() bool {
st := mgcq.store.ClusterSettings()
return kvserverbase.MVCCGCQueueEnabled.Get(&st.SV)
}

// shouldQueue determines whether a replica should be queued for garbage
// collection, and if so, at what priority. Returns true for shouldQ
// in the event that the cumulative ages of GC'able bytes or extant
// intents exceed thresholds.
func (mgcq *mvccGCQueue) shouldQueue(
ctx context.Context, _ hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (bool, float64) {
if !mgcq.enabled() {
return false, 0
}

// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score.
conf := repl.SpanConfig()
Expand Down Expand Up @@ -681,11 +673,6 @@ func (r *replicaGCer) GC(
func (mgcq *mvccGCQueue) process(
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
) (processed bool, err error) {
if !mgcq.enabled() {
log.VEventf(ctx, 2, "skipping mvcc gc: queue has been disabled")
return false, nil
}

// Record the CPU time processing the request for this replica. This is
// recorded regardless of errors that are encountered.
defer repl.MeasureReqCPUNanos(grunning.Time())
Expand Down
17 changes: 12 additions & 5 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ type queueConfig struct {
// want to try to replicate a range until we know which zone it is in and
// therefore how many replicas are required).
acceptsUnsplitRanges bool
// processDestroyedReplicas controls whether or not we want to process replicas
// that have been destroyed but not GCed.
// processDestroyedReplicas controls whether or not we want to process
// replicas that have been destroyed but not GCed.
processDestroyedReplicas bool
// processTimeout returns the timeout for processing a replica.
processTimeoutFunc queueProcessTimeoutFunc
Expand All @@ -343,10 +343,14 @@ type queueConfig struct {
failures *metric.Counter
// pending is a gauge measuring current replica count pending.
pending *metric.Gauge
// processingNanos is a counter measuring total nanoseconds spent processing replicas.
// processingNanos is a counter measuring total nanoseconds spent processing
// replicas.
processingNanos *metric.Counter
// purgatory is a gauge measuring current replica count in purgatory.
purgatory *metric.Gauge
// disabledConfig is a reference to the cluster setting that controls enabling
// and disabling queues.
disabledConfig *settings.BoolSetting
}

// baseQueue is the base implementation of the replicaQueue interface. Queue
Expand Down Expand Up @@ -440,8 +444,7 @@ type baseQueue struct {
priorityQ priorityQueue // The priority queue
purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors
stopped bool
// Some tests in this package disable queues.
disabled bool
disabled bool
}
}

Expand Down Expand Up @@ -494,6 +497,10 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b
},
}
bq.mu.replicas = map[roachpb.RangeID]*replicaItem{}
bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV))
cfg.disabledConfig.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) {
bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV))
})

return &bq
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/queue_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/benignerror"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -65,6 +67,7 @@ func TestBaseQueueConcurrent(t *testing.T) {
pending: metric.NewGauge(metric.Metadata{Name: "pending"}),
processingNanos: metric.NewCounter(metric.Metadata{Name: "processingnanos"}),
purgatory: metric.NewGauge(metric.Metadata{Name: "purgatory"}),
disabledConfig: &settings.BoolSetting{},
}

// Set up a fake store with just exactly what the code calls into. Ideally
Expand All @@ -75,6 +78,7 @@ func TestBaseQueueConcurrent(t *testing.T) {
Clock: hlc.NewClockForTesting(nil),
AmbientCtx: log.MakeTestingAmbientContext(tr),
DefaultSpanConfig: roachpb.TestingDefaultSpanConfig(),
Settings: cluster.MakeTestingClusterSettingsWithVersions(clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryVersion, true),
},
}

Expand Down
89 changes: 89 additions & 0 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
Expand Down Expand Up @@ -106,6 +108,7 @@ func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfi
cfg.pending = metric.NewGauge(metric.Metadata{Name: "pending"})
cfg.processingNanos = metric.NewCounter(metric.Metadata{Name: "processingnanos"})
cfg.purgatory = metric.NewGauge(metric.Metadata{Name: "purgatory"})
cfg.disabledConfig = &settings.BoolSetting{}
return newBaseQueue(name, impl, store, cfg)
}

Expand Down Expand Up @@ -1224,6 +1227,92 @@ func TestBaseQueueDisable(t *testing.T) {
}
}

// TestQueueDisable verifies that setting the set of queue.enabled cluster
// settings actually disables the base queue. This test works alongside
// TestBaseQueueDisable to verify the entire disable workflow.
func TestQueueDisable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := testContext{}
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)
tc.Start(ctx, t, stopper)

testCases := []struct {
name string
clusterSetting *settings.BoolSetting
queue *baseQueue
}{
{
name: "Merge Queue",
clusterSetting: kvserverbase.MergeQueueEnabled,
queue: tc.store.mergeQueue.baseQueue,
},
{
name: "Replicate Queue",
clusterSetting: kvserverbase.ReplicateQueueEnabled,
queue: tc.store.replicateQueue.baseQueue,
},
{
name: "Replica GC Queue",
clusterSetting: kvserverbase.ReplicaGCQueueEnabled,
queue: tc.store.replicaGCQueue.baseQueue,
},
{
name: "Raft Log Queue",
clusterSetting: kvserverbase.RaftLogQueueEnabled,
queue: tc.store.raftLogQueue.baseQueue,
},
{
name: "Raft Snapshot Queue",
clusterSetting: kvserverbase.RaftSnapshotQueueEnabled,
queue: tc.store.raftSnapshotQueue.baseQueue,
},
{
name: "Consistency Queue",
clusterSetting: kvserverbase.ConsistencyQueueEnabled,
queue: tc.store.consistencyQueue.baseQueue,
},
{
name: "Split Queue",
clusterSetting: kvserverbase.SplitQueueEnabled,
queue: tc.store.splitQueue.baseQueue,
},
{
name: "MVCC GC Queue",
clusterSetting: kvserverbase.MVCCGCQueueEnabled,
queue: tc.store.mvccGCQueue.baseQueue,
},
}

if tc.store.tsMaintenanceQueue != nil {
testCases = append(testCases, struct {
name string
clusterSetting *settings.BoolSetting
queue *baseQueue
}{
name: "Timeseries Maintenance Queue",
clusterSetting: kvserverbase.TimeSeriesMaintenanceQueueEnabled,
queue: tc.store.tsMaintenanceQueue.baseQueue,
})
}

// Disable and verify all queues are disabled
for _, testCase := range testCases {
testCase.clusterSetting.Override(ctx, &tc.store.ClusterSettings().SV, false)
if testCase.queue == nil {
continue
}
testCase.queue.mu.Lock()
disabled := testCase.queue.mu.disabled
testCase.queue.mu.Unlock()
if disabled != true {
t.Errorf("%s should be disabled", testCase.name)
}
}
}

type parallelQueueImpl struct {
testQueueImpl
processBlocker chan struct{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -183,6 +184,7 @@ func newRaftLogQueue(store *Store, db *kv.DB) *raftLogQueue {
failures: store.metrics.RaftLogQueueFailures,
pending: store.metrics.RaftLogQueuePending,
processingNanos: store.metrics.RaftLogQueueProcessingNanos,
disabledConfig: kvserverbase.RaftLogQueueEnabled,
},
)
return rlq
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
Expand Down Expand Up @@ -57,6 +58,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue {
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos,
disabledConfig: kvserverbase.RaftSnapshotQueueEnabled,
},
)
return rq
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -106,6 +107,7 @@ func newReplicaGCQueue(store *Store, db *kv.DB) *replicaGCQueue {
failures: store.metrics.ReplicaGCQueueFailures,
pending: store.metrics.ReplicaGCQueuePending,
processingNanos: store.metrics.ReplicaGCQueueProcessingNanos,
disabledConfig: kvserverbase.ReplicaGCQueueEnabled,
},
)
return rgcq
Expand Down
Loading

0 comments on commit 2b91c38

Please sign in to comment.