Skip to content

Commit

Permalink
kv/rangefeed: guarantee non-empty checkpoint before REASON_SLOW_CONSU…
Browse files Browse the repository at this point in the history
…MER error

Fixes #77696.

This commit ensures progress of per-range rangefeeds in the presence of large
catch-up scans by guaranteeing that a non-empty checkpoint is published before
returning a REASON_SLOW_CONSUMER error to the client. This ensures that the
client doesn't spin in `DistSender` on the same catch-up span without advancing
its frontier. It does so by having rangefeed registrations perform an ad-hoc
resolved timestamp computation in cases where the registration's buffer hits a
memory limit before it succeeds in publishing a non-empty checkpoint.

In doing so, we can make a loose guarantee (assuming timely closed timestamp
progression) that a rangefeed with a client-side retry loop will always be able
to catch-up and converge towards a stable connection as long as its rate of
consumption is greater than the rate of production on the table. In other words,
if `catch_up_scan_rate > new_write_rate`, the retry loop will make forward
progress and eventually stop hitting REASON_SLOW_CONSUMER errors.

A nearly viable alternative to this ad-hoc scan is to ensure that the
processor-wide resolved timestamp tracker publishes at least one non-zero
checkpoint on each registration before the registration is allowed to fail. This
runs into the issues described in #77696 (comment).
Specifically, because this tracker is shared with other registrations, it
continues to advance even after the stream of events has been broken to an
overflowing registration. That means that a checkpoint computed after the
registration has overflowed cannot be published without violating the ordering
contract of rangefeed checkpoints ("all prior events have been seen"). The
checkpoint published after the initial catch-up scan needs to be coherent with
the state of the range that the catch-up scan saw.

This change should be followed up with system-level testing that exercises a
changefeed's ability to be unpaused after a long amount of time on a table with
a high rate of writes. That is an example of the kind of situation that this
change aims to improve.

Release justification: None. Wait on this.

Release note (enterprise change): Changefeeds are now guaranteed to make forward
progress while performing a large catch-up scan on a table with a high rate of
writes.
  • Loading branch information
nvanbenschoten committed Sep 23, 2022
1 parent dc8897e commit 45571ce
Show file tree
Hide file tree
Showing 11 changed files with 785 additions and 209 deletions.
49 changes: 37 additions & 12 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ const (
// defaultPushTxnsAge is the default age at which a Processor will begin to
// consider a transaction old enough to push.
defaultPushTxnsAge = 10 * time.Second
// The size of an event is 72 bytes, so this will result in an allocation on
// the order of ~300KB per RangeFeed. That's probably ok given the number of
// ranges on a node that we'd like to support with active rangefeeds, but it's
// certainly on the upper end of the range.
//
// TODO(dan): Everyone seems to agree that this memory limit would be better
// set at a store-wide level, but there doesn't seem to be an easy way to
// accomplish that.
defaultEventChanCap = 4096
// defaultCheckStreamsInterval is the default interval at which a Processor
// will check all streams to make sure they have not been canceled.
defaultCheckStreamsInterval = 1 * time.Second
Expand All @@ -53,6 +62,10 @@ type Config struct {
RangeID roachpb.RangeID
Span roachpb.RSpan

// InitClosedTS is the initial closed timestamp on the range, at the time
// when the rangefeed Processor was started.
InitClosedTS hlc.Timestamp

TxnPusher TxnPusher
// PushTxnsInterval specifies the interval at which a Processor will push
// all transactions in the unresolvedIntentQueue that are above the age
Expand All @@ -74,6 +87,10 @@ type Config struct {
// all streams to make sure they have not been canceled.
CheckStreamsInterval time.Duration

// SkipInitResolvedTS is used in testing to disable the processor-wide initial
// resolved timestamp scan.
SkipInitResolvedTS bool

// Metrics is for production monitoring of RangeFeeds.
Metrics *Metrics

Expand All @@ -99,6 +116,9 @@ func (sc *Config) SetDefaults() {
sc.PushTxnsAge = defaultPushTxnsAge
}
}
if sc.EventChanCap == 0 {
sc.EventChanCap = defaultEventChanCap
}
if sc.CheckStreamsInterval == 0 {
sc.CheckStreamsInterval = defaultCheckStreamsInterval
}
Expand Down Expand Up @@ -196,7 +216,7 @@ func NewProcessor(cfg Config) *Processor {
p := &Processor{
Config: cfg,
reg: makeRegistry(),
rts: makeResolvedTimestamp(),
rts: makeResolvedTimestamp(cfg.InitClosedTS),

regC: make(chan registration),
unregC: make(chan *registration),
Expand All @@ -215,7 +235,7 @@ func NewProcessor(cfg Config) *Processor {
// IntentScannerConstructor is used to construct an IntentScanner. It
// should be called from underneath a stopper task to ensure that the
// engine has not been closed.
type IntentScannerConstructor func() IntentScanner
type IntentScannerConstructor func(roachpb.Span) IntentScanner

// CatchUpIteratorConstructor is used to construct an iterator that can be used
// for catchup-scans. Takes the key span and exclusive start time to run the
Expand Down Expand Up @@ -258,15 +278,18 @@ func (p *Processor) run(

// Launch an async task to scan over the resolved timestamp iterator and
// initialize the unresolvedIntentQueue. Ignore error if quiescing.
if rtsIterFunc != nil {
rtsIter := rtsIterFunc()
initScan := newInitResolvedTSScan(p, rtsIter)
err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run)
if err != nil {
initScan.Cancel()
if !p.SkipInitResolvedTS {
if rtsIterFunc != nil {
sp := p.Span.AsRawSpanWithNoLocals()
rtsIter := rtsIterFunc(sp)
initScan := newInitResolvedTSScan(rtsIter, sp, (*processorRTSScanConsumer)(p))
err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run)
if err != nil {
initScan.Cancel()
}
} else {
p.initResolvedTS(ctx)
}
} else {
p.initResolvedTS(ctx)
}

// txnPushTicker periodically pushes the transaction record of all
Expand Down Expand Up @@ -465,7 +488,9 @@ func (p *Processor) sendStop(pErr *roachpb.Error) {
func (p *Processor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
initClosedTS hlc.Timestamp,
catchUpIterConstructor CatchUpIteratorConstructor,
rtsIterConstructor IntentScannerConstructor,
withDiff bool,
stream Stream,
errC chan<- *roachpb.Error,
Expand All @@ -476,8 +501,8 @@ func (p *Processor) Register(
p.syncEventC()

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
p.Config.EventChanCap, p.Metrics, stream, errC,
span.AsRawSpanWithNoLocals(), startTS, initClosedTS, catchUpIterConstructor,
rtsIterConstructor, withDiff, p.Config.EventChanCap, p.Metrics, stream, errC,
)
select {
case p.regC <- r:
Expand Down
94 changes: 59 additions & 35 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *roachpb.RangeFeed
const testProcessorEventCCap = 16

func newTestProcessorWithTxnPusher(
t *testing.T, rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
t *testing.T, rtsIter *testIterator, txnPusher TxnPusher,
) (*Processor, *stop.Stopper) {
t.Helper()
stopper := stop.NewStopper()
Expand All @@ -162,16 +162,14 @@ func newTestProcessorWithTxnPusher(
return p, stopper
}

func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScannerConstructor {
func makeIntentScannerConstructor(rtsIter *testIterator) IntentScannerConstructor {
if rtsIter == nil {
return nil
}
return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) }
return func(roachpb.Span) IntentScanner { return NewLegacyIntentScanner(rtsIter) }
}

func newTestProcessor(
t *testing.T, rtsIter storage.SimpleMVCCIterator,
) (*Processor, *stop.Stopper) {
func newTestProcessor(t *testing.T, rtsIter *testIterator) (*Processor, *stop.Stopper) {
t.Helper()
return newTestProcessorWithTxnPusher(t, rtsIter, nil /* pusher */)
}
Expand Down Expand Up @@ -207,8 +205,10 @@ func TestProcessorBasic(t *testing.T) {
r1OK, r1Filter := p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r1Stream,
r1ErrC,
)
Expand Down Expand Up @@ -328,8 +328,10 @@ func TestProcessorBasic(t *testing.T) {
r2OK, r1And2Filter := p.Register(
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
true, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
true, /* withDiff */
r2Stream,
r2ErrC,
)
Expand Down Expand Up @@ -411,8 +413,10 @@ func TestProcessorBasic(t *testing.T) {
r3OK, _ := p.Register(
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r3Stream,
r3ErrC,
)
Expand All @@ -438,7 +442,7 @@ func TestNilProcessor(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
require.Panics(t, func() { _ = p.Start(stopper, nil) })
require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, nil, false, nil, nil) })
require.Panics(t, func() { p.Register(roachpb.RSpan{}, hlc.Timestamp{}, hlc.Timestamp{}, nil, nil, false, nil, nil) })
}

func TestProcessorSlowConsumer(t *testing.T) {
Expand All @@ -453,8 +457,10 @@ func TestProcessorSlowConsumer(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r1Stream,
r1ErrC,
)
Expand All @@ -463,8 +469,10 @@ func TestProcessorSlowConsumer(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r2Stream,
r2ErrC,
)
Expand Down Expand Up @@ -565,8 +573,10 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r1Stream,
r1ErrC,
)
Expand Down Expand Up @@ -634,8 +644,10 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r1Stream,
r1ErrC,
)
Expand Down Expand Up @@ -707,8 +719,10 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r1Stream,
make(chan *roachpb.Error, 1),
)
Expand Down Expand Up @@ -952,7 +966,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
runtime.Gosched()
s := newTestStream()
errC := make(chan<- *roachpb.Error, 1)
p.Register(p.Span, hlc.Timestamp{}, nil, false, s, errC)
p.Register(p.Span, hlc.Timestamp{}, hlc.Timestamp{}, nil, nil, false, s, errC)
}()
go func() {
defer wg.Done()
Expand Down Expand Up @@ -1021,7 +1035,7 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
s := newTestStream()
regs[s] = firstIdx
errC := make(chan *roachpb.Error, 1)
p.Register(p.Span, hlc.Timestamp{}, nil, false, s, errC)
p.Register(p.Span, hlc.Timestamp{}, hlc.Timestamp{}, nil, nil, false, s, errC)
regDone <- struct{}{}
}
}()
Expand Down Expand Up @@ -1109,8 +1123,10 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
rStream,
rErrC,
)
Expand Down Expand Up @@ -1197,8 +1213,10 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
rStream,
rErrC,
)
Expand Down Expand Up @@ -1274,8 +1292,10 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r1Stream,
r1ErrC,
)
Expand All @@ -1285,8 +1305,10 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r2Stream,
r2ErrC,
)
Expand Down Expand Up @@ -1442,8 +1464,10 @@ func BenchmarkProcessorWithBudget(b *testing.B) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
hlc.Timestamp{}, /* initClosedTS */
nil, /* catchUpIterConstructor */
nil, /* rtsIterConstructor */
false, /* withDiff */
r1Stream,
r1ErrC,
)
Expand Down
Loading

0 comments on commit 45571ce

Please sign in to comment.