Skip to content

Commit

Permalink
closedts: fix bug ignoring zero MLIAs to stabilize roachtest cdc/rang…
Browse files Browse the repository at this point in the history
…efeed

Before this PR, ranges which emitted min lease applied indices of zero were
ignored. This happened because the logic to track and store MLAIs takes care to
not allow storing indices less than the currently stored value. Go's maps return
zero values when queried for non-existing keys and because it is not the case
that 0 < 0, we never added zero entries to the map.

This PR was motivated by the formerly unstable cdc/rangefeed roachtest.
The author has verified that the formerly flakey cdc rangefeed roachtest now
reliably passes after making minor changes to the test. The change updates the
target steady latency from 1m to 2m to adjust for the increasing of
kv.closedts.target_duration from 5s to 30s (see #31837). It also adds a short
sleep between installing and running the TPCC workload which seems to deflake
some errors which @Danhz had observed. Lastly, it increase the replica_rangefeed
channel buffer size from 512 to 4096 to mitigate errors obserted related to
"buffer capacity exceeded due to slow consumer".

Release note: None
  • Loading branch information
ajwerner committed Dec 18, 2018
1 parent c0b0309 commit 69add28
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 18 deletions.
16 changes: 5 additions & 11 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,11 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) {
sqlNodes: crdbNodes,
workloadNodes: workloadNode,
tpccWarehouseCount: args.tpccWarehouseCount,
// TODO(dan): Applying tolerateErrors to all tests is unfortunate, but we're
// seeing all sorts of "error in newOrder: missing stock row" from tpcc. I'm
// debugging it, but in the meantime, we need to be getting data from these
// roachtest runs. In ideal usage, this should only be enabled for tests
// with CRDB chaos enabled.
/* tolerateErrors */
tolerateErrors: true,
}
tpcc.install(ctx, c)

// TODO(dan,ajwerner): sleeping momentarily before running the workload
// mitigates errors like "error in newOrder: missing stock row" from tpcc.
time.Sleep(2 * time.Second)
t.Status("initiating workload")
m.Go(func(ctx context.Context) error {
defer func() { close(workloadCompleteCh) }()
Expand Down Expand Up @@ -350,8 +345,7 @@ func registerCDC(r *registry) {
},
})
r.Add(testSpec{
Name: "cdc/rangefeed-unstable",
Skip: `resolved timestamps are not yet reliable with RangeFeed`,
Name: "cdc/rangefeed",
MinVersion: "v2.2.0",
Nodes: nodes(4, cpu(16)),
Run: func(ctx context.Context, t *test, c *cluster) {
Expand All @@ -363,7 +357,7 @@ func registerCDC(r *registry) {
rangefeed: true,
kafkaChaos: false,
targetInitialScanLatency: 30 * time.Minute,
targetSteadyLatency: time.Minute,
targetSteadyLatency: 2 * time.Minute,
})
},
})
Expand Down
5 changes: 2 additions & 3 deletions pkg/storage/closedts/minprop/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (t *Tracker) Track(
return
}

if curLAI := t.mu.leftMLAI[rangeID]; curLAI < lai {
if curLAI, found := t.mu.leftMLAI[rangeID]; !found || curLAI < lai {
t.mu.leftMLAI[rangeID] = lai
}
} else if minProp == t.mu.next.Next() {
Expand All @@ -249,8 +249,7 @@ func (t *Tracker) Track(
if rangeID == 0 {
return
}

if curLAI := t.mu.rightMLAI[rangeID]; curLAI < lai {
if curLAI, found := t.mu.rightMLAI[rangeID]; !found || curLAI < lai {
t.mu.rightMLAI[rangeID] = lai
}
} else {
Expand Down
23 changes: 23 additions & 0 deletions pkg/storage/closedts/minprop/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,29 @@ func TestTrackerDoubleRelease(t *testing.T) {
}
}

func TestTrackerReleaseZero(t *testing.T) {
ctx := context.Background()
tracker := NewTracker()
trackedTs1, release1 := tracker.Track(ctx)
trackedTs2, release2 := tracker.Track(ctx)
release2(ctx, 2, 0)
leftTs, _ := tracker.Close(trackedTs2)
leftTs.Logical += 2
release1(ctx, 1, 0)
closedTs, mlais := tracker.Close(leftTs)
if closedTs != trackedTs1 {
t.Fatalf("expected to have closed %v, got %v %v", trackedTs1, closedTs, mlais)
} else if mlai1, found := mlais[1]; !found {
t.Fatalf("expected to find mlai for range 1")
} else if mlai1 != 0 {
t.Fatalf("expected to find zero mlai for range 1, got %v", mlai1)
} else if mlai2, found := mlais[2]; !found {
t.Fatalf("expected to find mlai for range 2")
} else if mlai2 != 0 {
t.Fatalf("expected to find zero mlai for range 2, got %v", mlai2)
}
}

type modelClient struct {
lai map[roachpb.RangeID]*int64 // read-only map, values accessed atomically
mu struct {
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/closedts/storage/storage_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,13 @@ func merge(e, ee ctpb.Entry) ctpb.Entry {
}
// The result is full if either operand is.
re.Full = e.Full || ee.Full

// Use the larger of both timestamps with the union of the MLAIs, preferring larger
// ones on conflict.
re.ClosedTimestamp.Forward(ee.ClosedTimestamp)
for rangeID, mlai := range ee.MLAI {
if re.MLAI[rangeID] < mlai {
if cur, found := re.MLAI[rangeID]; !found || cur < mlai {
re.MLAI[rangeID] = mlai
}
}

return re
}
34 changes: 34 additions & 0 deletions pkg/storage/closedts/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -333,6 +334,39 @@ func ExampleMultiStorage_epoch() {
// +-----+---------------------+----------------------+
}

func TestZeroValueGetsStored(t *testing.T) {
defer leaktest.AfterTest(t)()
// This test ensures that a zero values MLAI is stored for an epoch especially
// after we've already stored a non-zero MLAI for a different range in the
// same epoch. See #32904.
ms := NewMultiStorage(func() SingleStorage {
return NewMemStorage(time.Millisecond, 10)
})
e := ctpb.Entry{
Epoch: 1,
ClosedTimestamp: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
MLAI: map[roachpb.RangeID]ctpb.LAI{1: 1},
}
ms.Add(1, e)
e.ClosedTimestamp.WallTime++
r := roachpb.RangeID(2)
e.MLAI = map[roachpb.RangeID]ctpb.LAI{r: 0}
ms.Add(1, e)
var seen bool
ms.VisitDescending(1, func(e ctpb.Entry) (done bool) {
for rr, mlai := range e.MLAI {
if rr == r && mlai == 0 {
seen = true
return true
}
}
return false
})
if !seen {
t.Fatalf("Failed to see added zero value MLAI for range %v", r)
}
}

// TestConcurrent runs a very basic sanity check against a Storage, verifiying
// that the bucketed Entries don't regress in obvious ways.
func TestConcurrent(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (r *Replica) maybeInitRangefeedRaftMuLocked() *rangefeed.Processor {
Clock: r.Clock(),
Span: desc.RSpan(),
TxnPusher: &tp,
EventChanCap: 512,
EventChanCap: 4096,
EventChanTimeout: 50 * time.Millisecond,
}
r.raftMu.rangefeed = rangefeed.NewProcessor(cfg)
Expand Down

0 comments on commit 69add28

Please sign in to comment.