Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
134430: roachtest: disk bandwidth limiter test should only asssert on writes r=sumeerbhola a=aadityasondhi

Since we do not pace reads yet, the test will remain flaky in this assertion, as the system can see unbounded read bandwidth usage and fail the assertion even if writes are paced.

Fixes #131484

Release note: None

134527: roachtest: add debugging to gossip/chaos r=tbg a=tbg

This test has had a string of weird failures where
either a `t.L().Printf` call or `time.Sleep(1s)`
take dozens of seconds.

This PR adds a goroutine that gets spawned right
before and, unless signaled within 2s by both
the Printf and the Sleep having completed, dumps
stacks to stderr.

See the main issue #130737.
Closes the duplicates across various branches:

Closes #132651.
Closes #134495.

Epic: none
Release note: None


134751: lease: dump stacks if TestDescriptorRefreshOnRetry fails r=rafiss a=rafiss

We added additional logging to help debug a source of flakiness in which
the acquisition counts exceed the number of release counts. For that
logging to be useful, we need to know the goroutine IDs and stacks.

Marking this as fixing the linked issue so that the next time it fails,
we are reminded to look at the logs.

fixes: #134695
Release note: None

134953: kvserver/rangefeed: rename Disconnect to SendError for stream interface r=tbg,stevendanna a=wenyihu6

This patch renames `Disconnect` to `SendError` in the
`rangefeed.Stream` interface to clarify its role for sending
errors, distinguishing it from other similarly named
functions like `registration.disconnect`.

Part of: #110432
Release note: none

Co-authored-by: Steven Danna danna@cockroachlabs.com

Co-authored-by: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com>
Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
Co-authored-by: Wenyi Hu <wenyi@cockroachlabs.com>
  • Loading branch information
5 people committed Nov 12, 2024
5 parents deb2445 + 4ae299b + e97b598 + cfea1c2 + cd47a66 commit 6d18a89
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 78 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ go_library(
"//pkg/ts/tspb",
"//pkg/ui/workspaces/e2e-tests",
"//pkg/util",
"//pkg/util/allstacks",
"//pkg/util/cancelchecker",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ func registerDiskBandwidthOverload(r registry.Registry) {
continue
}
totalBW := writeVal + readVal
if totalBW > bandwidthThreshold {
t.Fatalf("write + read bandwidth %f (%f + %f) exceeded threshold of %f", totalBW, writeVal, readVal, bandwidthThreshold)
// TODO(aaditya): We should be asserting on total bandwidth once reads
// are being paced.
if writeVal > bandwidthThreshold {
t.Fatalf("write bandwidth %f exceeded threshold of %f, read bandwidth: %f, total bandwidth: %f", writeVal, bandwidthThreshold, readVal, totalBW)
}
numSuccesses++
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/cmd/roachtest/tests/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"net"
"net/url"
"os"
"sort"
"strconv"
"strings"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/allstacks"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -126,8 +128,18 @@ SELECT node_id
return
}
const sleepDur = 1 * time.Second
timer := time.AfterFunc(2*time.Second, func() {
// This is an attempt to debug a rare issue in which either the `Printf`
// or the `time.Sleep()` surprisingly take >>20s which causes the test
// to fail.
//
// See https://github.com/cockroachdb/cockroach/issues/130737#issuecomment-2352473436.
_, _ = fmt.Fprintf(os.Stderr, "%s", allstacks.Get())
t.L().Printf("sleep took too long, dumped stacks to Stderr")
})
t.L().Printf("sleeping for %s (%.0fs)\n", sleepDur, timeutil.Since(start).Seconds())
time.Sleep(sleepDur)
timer.Stop()
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1645,9 +1645,9 @@ func (c *channelSink) Error() error {
}
}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// SendError implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (c *channelSink) Disconnect(err *kvpb.Error) {
func (c *channelSink) SendError(err *kvpb.Error) {
c.done <- err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,9 @@ func (s *dummyStream) SendUnbuffered(ev *kvpb.RangeFeedEvent) error {
}
}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// SendError implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (s *dummyStream) Disconnect(err *kvpb.Error) {
func (s *dummyStream) SendError(err *kvpb.Error) {
s.done <- err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ func (s *noopStream) SendUnbuffered(*kvpb.RangeFeedEvent) error {
// thread-safety.
func (s *noopStream) SendUnbufferedIsThreadSafe() {}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// SendError implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (s *noopStream) Disconnect(error *kvpb.Error) {
func (s *noopStream) SendError(error *kvpb.Error) {
s.done <- error
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/buffered_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (br *bufferedRegistration) disconnect(pErr *kvpb.Error) {
br.mu.outputLoopCancelFn()
}
br.mu.disconnected = true
br.stream.Disconnect(pErr)
br.stream.SendError(pErr)
}
}

Expand Down
55 changes: 28 additions & 27 deletions pkg/kv/kvserver/rangefeed/buffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,35 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

// ┌─────────────────────────────────────────┐ MuxRangefeedEvent
// Node.MuxRangeFeed │◄──────────────────────────────────────────────────┐
// └─────────────────┬──────────────────────┘ ▲ │
// Sender.AddStream │ │LockedMuxStream.Send │ │
// ┌────────────▼───┴──────────┐ │ │
// │ Buffered/Unbuffered Sender├───────────┐ │ │
// └────────────┬──────────────┘ │ │ │
// │ │ │ │
// ┌────────▼─────────┐ │ │ │
// │ Stores.Rangefeed │ │ │ │
// └────────┬─────────┘ │ │ │
// │ │ │ │
// ┌───────▼─────────┐ BufferedSender BufferedSender │
// │ Store.Rangefeed │ SendUnbuffered/SendBuffered SendBufferedError ─────► BufferedSender.run
// └───────┬─────────┘ (catch-up scan)(live raft) ▲
//
// ┌────────▼──────────┐
// │ Replica.Rangefeed │
// └────────┬──────────┘
//
// ┌───────▼──────┐
// │ Registration │
// └──────┬───────┘
//
//
// └─────────────────────────┘─────────────────┘
// BufferedPerRangeEventSink.Send BufferedPerRangeEventSink.Disconnect
// ┌─────────────────
// Node.MuxRangefeed│
// └─────────┬──────
// Sender.AddStream │ │LockedMuxStream.Send ───────────────────┐────────────────────────────────┐
// ┌──────────┴─▼─┴────────────┐ │ │
// │ Buffered/Unbuffered Sender├───────────┐ │ │
// └────────────┬──────────────┘ │ │ │
// │ │ │ │
// ┌────────▼─────────┐ │ │ │
// │ Stores.Rangefeed │ │ │ │
// └────────┬─────────┘ │ │ │
// │ │ │ │
// ┌───────▼─────────┐ BufferedSender BufferedSender │
// │ Store.Rangefeed │ SendUnbuffered/SendBuffered SendBufferedError ─────► BufferedSender.run
// └───────┬─────────┘ (catch-up scan)(live raft) ▲
//
// ┌────────▼──────────┐
// │ Replica.Rangefeed │
// └────────┬──────────┘
//
// ┌───────▼──────┐
// │ Registration │
// └──────┬───────┘
//
//
// └───────────────────────────┘───────────────┘
// BufferedPerRangeEventSink.Send BufferedPerRangeEventSink.SendError
//

// BufferedSender is embedded in every rangefeed.BufferedPerRangeEventSink,
// serving as a helper which buffers events before forwarding events to the
// underlying gRPC stream.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/buffered_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ func (s *BufferedPerRangeEventSink) SendUnbuffered(event *kvpb.RangeFeedEvent) e
return s.wrapped.SendUnbuffered(response, nil)
}

// Disconnect implements the Stream interface. BufferedSender is then
// SendError implements the Stream interface. BufferedSender is then
// responsible for canceling the context of the stream. The actual rangefeed
// disconnection from processor happens late when the error event popped from
// the queue and about to be sent to the grpc stream. So caller should not rely
// on immediate disconnection as cleanup takes place async.
func (s *BufferedPerRangeEventSink) Disconnect(err *kvpb.Error) {
func (s *BufferedPerRangeEventSink) SendError(err *kvpb.Error) {
ev := &kvpb.MuxRangeFeedEvent{
StreamID: s.streamID,
RangeID: s.rangeID,
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestProcessorBasic(t *testing.T) {
r1Stream.Cancel()
require.NotNil(t, r1Stream.WaitForError(t))

// Disconnect the registration via Disconnect should work.
// Disconnect the registration via SendError should work.
r3Stream := newTestStream()
r30K, _ := p.Register(
r3Stream.ctx,
Expand All @@ -317,7 +317,7 @@ func TestProcessorBasic(t *testing.T) {
func() {},
)
require.True(t, r30K)
r3Stream.Disconnect(kvpb.NewError(fmt.Errorf("disconnection error")))
r3Stream.SendError(kvpb.NewError(fmt.Errorf("disconnection error")))
require.NotNil(t, r3Stream.WaitForError(t))

// Stop the processor with an error.
Expand Down Expand Up @@ -1393,9 +1393,9 @@ func (c *consumer) WaitBlock() {
<-c.blocked
}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// SendError implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (c *consumer) Disconnect(error *kvpb.Error) {
func (c *consumer) SendError(error *kvpb.Error) {
c.done <- error
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/registry_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ func (s *testStream) BlockSend() func() {
}
}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// SendError implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (s *testStream) Disconnect(err *kvpb.Error) {
func (s *testStream) SendError(err *kvpb.Error) {
s.done <- err
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/rangefeed/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
// rangefeed to a client.
type Stream interface {
kvpb.RangeFeedEventSink
// Disconnect disconnects the stream with the provided error. Note that this
// function can be called by the processor worker while holding raftMu, so it
// is important that this function doesn't block IO or try acquiring locks
// that could lead to deadlocks.
Disconnect(err *kvpb.Error)
// SendError sends an error to the stream. Since this function can be called by
// the processor worker while holding raftMu as part of
// registration.Disconnect(), it is important that this function doesn't block
// IO or try acquiring locks that could lead to deadlocks.
SendError(err *kvpb.Error)
}

// PerRangeEventSink is an implementation of Stream which annotates each
Expand Down Expand Up @@ -56,11 +56,11 @@ func (s *PerRangeEventSink) SendUnbuffered(event *kvpb.RangeFeedEvent) error {
return s.wrapped.SendUnbuffered(response)
}

// Disconnect implements the Stream interface. It requests the UnbufferedSender
// SendError implements the Stream interface. It requests the UnbufferedSender
// to detach the stream. The UnbufferedSender is then responsible for handling
// the actual disconnection and additional cleanup. Note that Caller should not
// rely on immediate disconnection as cleanup takes place async.
func (s *PerRangeEventSink) Disconnect(err *kvpb.Error) {
func (s *PerRangeEventSink) SendError(err *kvpb.Error) {
ev := &kvpb.MuxRangeFeedEvent{
RangeID: s.rangeID,
StreamID: s.streamID,
Expand Down
Loading

0 comments on commit 6d18a89

Please sign in to comment.