Skip to content

Commit

Permalink
kvserver/rangefeed: remove context from kvpb.RangeFeedEventSink
Browse files Browse the repository at this point in the history
Previously, `node.MuxRangefeed` created a child context for each rangefeed
request, storing it in the stream interface to allow the node level to be able
to shut down registration goroutines. This patch simplifies the approach by
passing the stream context directly to `p.Register`, eliminating the need to
store context in `streamSink` or return context via the interface. So this patch
also removes context from `kvpb.RangeFeedEventSink`.

Epic: none
Release note: none
  • Loading branch information
wenyihu6 committed Oct 23, 2024
1 parent 6949837 commit a4cbf7d
Show file tree
Hide file tree
Showing 19 changed files with 85 additions and 83 deletions.
6 changes: 1 addition & 5 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,7 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) {
}
eventC := make(chan *kvpb.RangeFeedEvent)
sink := newChannelSink(ctx, eventC)
require.NoError(t, s3.RangeFeed(&req, sink)) // check if we've errored yet
require.NoError(t, s3.RangeFeed(sink.ctx, &req, sink)) // check if we've errored yet
require.NoError(t, sink.Error())
t.Logf("started rangefeed on %s", repl3)

Expand Down Expand Up @@ -1628,10 +1628,6 @@ func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channe
return &channelSink{ctx: ctx, ch: ch, done: make(chan *kvpb.Error, 1)}
}

func (c *channelSink) Context() context.Context {
return c.ctx
}

func (c *channelSink) SendUnbufferedIsThreadSafe() {}

func (c *channelSink) SendUnbuffered(e *kvpb.RangeFeedEvent) error {
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2520,8 +2520,6 @@ func (s *ScanStats) String() string {

// RangeFeedEventSink is an interface for sending a single rangefeed event.
type RangeFeedEventSink interface {
// Context returns the context for this stream.
Context() context.Context
// SendUnbuffered blocks until it sends the RangeFeedEvent, the stream is
// done, or the stream breaks. Send must be safe to call on the same stream in
// different goroutines.
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,6 @@ func newDummyStream(ctx context.Context, name string) *dummyStream {
}
}

func (s *dummyStream) Context() context.Context {
return s.ctx
}

func (s *dummyStream) SendUnbufferedIsThreadSafe() {}

func (s *dummyStream) SendUnbuffered(ev *kvpb.RangeFeedEvent) error {
Expand Down Expand Up @@ -493,7 +489,7 @@ func waitReplicaRangeFeed(
return stream.SendUnbuffered(&event)
}

err := r.RangeFeed(req, stream, nil /* pacer */)
err := r.RangeFeed(stream.ctx, req, stream, nil /* pacer */)
if err != nil {
return sendErrToStream(kvpb.NewError(err))
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
// extra data.
const withFiltering = false
streams[i] = &noopStream{ctx: ctx, done: make(chan *kvpb.Error, 1)}
ok, _ := p.Register(span, hlc.MinTimestamp, nil,
ok, _ := p.Register(ctx, span, hlc.MinTimestamp, nil,
withDiff, withFiltering, false, /* withOmitRemote */
streams[i], nil)
require.True(b, ok)
Expand Down Expand Up @@ -192,10 +192,6 @@ type noopStream struct {
done chan *kvpb.Error
}

func (s *noopStream) Context() context.Context {
return s.ctx
}

func (s *noopStream) SendUnbuffered(*kvpb.RangeFeedEvent) error {
s.events++
return nil
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/rangefeed/buffered_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type bufferedRegistration struct {
var _ registration = &bufferedRegistration{}

func newBufferedRegistration(
streamCtx context.Context,
span roachpb.Span,
startTS hlc.Timestamp,
catchUpIter *CatchUpIterator,
Expand All @@ -84,6 +85,7 @@ func newBufferedRegistration(
) *bufferedRegistration {
br := &bufferedRegistration{
baseRegistration: baseRegistration{
streamCtx: streamCtx,
span: span,
catchUpTimestamp: startTS,
withDiff: withDiff,
Expand Down Expand Up @@ -212,8 +214,8 @@ func (br *bufferedRegistration) outputLoop(ctx context.Context) error {
}
case <-ctx.Done():
return ctx.Err()
case <-br.stream.Context().Done():
return br.stream.Context().Err()
case <-br.streamCtx.Done():
return br.streamCtx.Err()
}
}
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/kv/kvserver/rangefeed/buffered_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package rangefeed

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)
Expand All @@ -27,17 +25,15 @@ type BufferedStream interface {
// similar to PerRangeEventSink but buffers events in BufferedSender before
// forwarding events to the underlying grpc stream.
type BufferedPerRangeEventSink struct {
ctx context.Context
rangeID roachpb.RangeID
streamID int64
wrapped *BufferedSender
}

func NewBufferedPerRangeEventSink(
ctx context.Context, rangeID roachpb.RangeID, streamID int64, wrapped *BufferedSender,
rangeID roachpb.RangeID, streamID int64, wrapped *BufferedSender,
) *BufferedPerRangeEventSink {
return &BufferedPerRangeEventSink{
ctx: ctx,
rangeID: rangeID,
streamID: streamID,
wrapped: wrapped,
Expand All @@ -48,10 +44,6 @@ var _ kvpb.RangeFeedEventSink = (*BufferedPerRangeEventSink)(nil)
var _ Stream = (*BufferedPerRangeEventSink)(nil)
var _ BufferedStream = (*BufferedPerRangeEventSink)(nil)

func (s *BufferedPerRangeEventSink) Context() context.Context {
return s.ctx
}

// SendUnbufferedIsThreadSafe is a no-op declaration method. It is a contract
// that the SendUnbuffered method is thread-safe. Note that
// BufferedSender.SendBuffered and BufferedSender.SendUnbuffered are both
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ type Processor interface {
//
// NB: startTS is exclusive; the first possible event will be at startTS.Next().
Register(
streamCtx context.Context,
span roachpb.RSpan,
startTS hlc.Timestamp, // exclusive
catchUpIter *CatchUpIterator,
Expand Down Expand Up @@ -582,6 +583,7 @@ func (p *LegacyProcessor) sendStop(pErr *kvpb.Error) {

// Register implements Processor interface.
func (p *LegacyProcessor) Register(
streamCtx context.Context,
span roachpb.RSpan,
startTS hlc.Timestamp,
catchUpIter *CatchUpIterator,
Expand All @@ -598,7 +600,7 @@ func (p *LegacyProcessor) Register(

blockWhenFull := p.Config.EventChanTimeout == 0 // for testing
r := newBufferedRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote,
streamCtx, span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote,
p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn,
)
select {
Expand Down
50 changes: 38 additions & 12 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestProcessorBasic(t *testing.T) {
// Add a registration.
r1Stream := newTestStream()
r1OK, r1Filter := p.Register(
r1Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -196,6 +197,7 @@ func TestProcessorBasic(t *testing.T) {
// Add another registration with withDiff = true and withFiltering = true.
r2Stream := newTestStream()
r2OK, r1And2Filter := p.Register(
r2Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -297,29 +299,46 @@ func TestProcessorBasic(t *testing.T) {
}
require.Equal(t, valEvent3, r1Stream.Events())
// r2Stream should not see the event.

// Cancel the first registration.
r1Stream.Cancel()
require.NotNil(t, r1Stream.WaitForError(t))

// Disconnect the registration via Disconnect should work.
r3Stream := newTestStream()
r30K, _ := p.Register(
r3Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
false, /* withFiltering */
false, /* withOmitRemote */
r3Stream,
func() {},
)
require.True(t, r30K)
r3Stream.Disconnect(kvpb.NewError(fmt.Errorf("disconnection error")))
require.NotNil(t, r3Stream.WaitForError(t))

// Stop the processor with an error.
pErr := kvpb.NewErrorf("stop err")
p.StopWithErr(pErr)
require.NotNil(t, r2Stream.WaitForError(t))

// Adding another registration should fail.
r3Stream := newTestStream()
r3OK, _ := p.Register(
r4Stream := newTestStream()
r4OK, _ := p.Register(
r3Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
false, /* withFiltering */
false, /* withOmitRemote */
r3Stream,
r4Stream,
func() {},
)
require.False(t, r3OK)
require.False(t, r4OK)
})
}

Expand All @@ -335,6 +354,7 @@ func TestProcessorOmitRemote(t *testing.T) {
// Add a registration.
r1Stream := newTestStream()
r1OK, _ := p.Register(
r1Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand All @@ -360,6 +380,7 @@ func TestProcessorOmitRemote(t *testing.T) {
// Add another registration with withOmitRemote = true.
r2Stream := newTestStream()
r2OK, _ := p.Register(
r2Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -413,6 +434,7 @@ func TestProcessorSlowConsumer(t *testing.T) {
// Add a registration.
r1Stream := newTestStream()
_, _ = p.Register(
r1Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand All @@ -424,6 +446,7 @@ func TestProcessorSlowConsumer(t *testing.T) {
)
r2Stream := newTestStream()
p.Register(
r2Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -520,6 +543,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) {
// Add a registration.
r1Stream := newTestStream()
_, _ = p.Register(
r1Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -575,6 +599,7 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) {
// Add a registration.
r1Stream := newTestStream()
p.Register(
r1Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -656,6 +681,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
// Add a registration.
r1Stream := newTestStream()
p.Register(
r1Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -969,7 +995,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
defer wg.Done()
runtime.Gosched()
s := newTestStream()
p.Register(h.span, hlc.Timestamp{}, nil, /* catchUpIter */
p.Register(s.ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {})
}()
go func() {
Expand Down Expand Up @@ -1041,7 +1067,7 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
// operation is should see is firstIdx.
s := newTestStream()
regs[s] = firstIdx
p.Register(h.span, hlc.Timestamp{}, nil, /* catchUpIter */
p.Register(s.ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {})
regDone <- struct{}{}
}
Expand Down Expand Up @@ -1095,6 +1121,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
rStream := newConsumer(50)
defer func() { rStream.Resume() }()
_, _ = p.Register(
rStream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -1175,6 +1202,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
rStream := newConsumer(90)
defer func() { rStream.Resume() }()
_, _ = p.Register(
rStream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -1245,6 +1273,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
r1Stream := newConsumer(50)
defer func() { r1Stream.Resume() }()
_, _ = p.Register(
r1Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand All @@ -1258,6 +1287,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
// Non-blocking registration that would consume all events.
r2Stream := newConsumer(0)
p.Register(
r2Stream.ctx,
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
Expand Down Expand Up @@ -1355,10 +1385,6 @@ func (c *consumer) SendUnbuffered(e *kvpb.RangeFeedEvent) error {
return nil
}

func (c *consumer) Context() context.Context {
return c.ctx
}

func (c *consumer) Cancel() {
c.ctxDone()
}
Expand Down Expand Up @@ -1435,7 +1461,7 @@ func TestProcessorBackpressure(t *testing.T) {

// Add a registration.
stream := newTestStream()
ok, _ := p.Register(span, hlc.MinTimestamp, nil, /* catchUpIter */
ok, _ := p.Register(stream.ctx, span, hlc.MinTimestamp, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, stream, nil)
require.True(t, ok)

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type registration interface {
// baseRegistration is a common base for all registration types. It is intended
// to be embedded in an actual registration struct.
type baseRegistration struct {
streamCtx context.Context
span roachpb.Span
withDiff bool
withFiltering bool
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/registry_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func newTestRegistration(
) *testRegistration {
s := newTestStream()
r := newBufferedRegistration(
s.ctx,
span,
ts,
makeCatchUpIterator(catchup, span, ts),
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func TestRegistrationBasic(t *testing.T) {
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */)

streamCancelReg.Cancel()
go streamCancelReg.runOutputLoop(ctx, 0)
go streamCancelReg.runOutputLoop(streamCancelReg.ctx, 0)
require.NoError(t, streamCancelReg.waitForCaughtUp(ctx))
require.Equal(t, streamCancelReg.stream.Context().Err(), streamCancelReg.WaitForError(t))
require.Equal(t, streamCancelReg.ctx.Err(), streamCancelReg.WaitForError(t))
}

func TestRegistrationCatchUpScan(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (p *ScheduledProcessor) sendStop(pErr *kvpb.Error) {
//
// NB: startTS is exclusive; the first possible event will be at startTS.Next().
func (p *ScheduledProcessor) Register(
streamCtx context.Context,
span roachpb.RSpan,
startTS hlc.Timestamp,
catchUpIter *CatchUpIterator,
Expand All @@ -317,6 +318,7 @@ func (p *ScheduledProcessor) Register(
"unimplemented: unbuffered registrations for rangefeed, see #126560")
} else {
r = newBufferedRegistration(
streamCtx,
span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote,
p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn,
)
Expand Down
Loading

0 comments on commit a4cbf7d

Please sign in to comment.