Skip to content

Commit

Permalink
Merge #83386 #83562
Browse files Browse the repository at this point in the history
83386: batcheval: handle MVCC range tombstones in `RefreshRange` r=aliher1911 a=erikgrinaker

This patch add handling of MVCC range tombstones in `RefreshRange`, by
erroring on them as a committed value.

Resolves #83383.

Release note: None

83562: rowexec: fix recent bug of using nil context r=yuzefovich a=yuzefovich

In e7e724e we moved the creation of
a monitor for the streamer's disk usage into the constructor of the join
reader but forgot to update the context used during that operation. The
thing is that the context on the processors is only set in `Start`
meaning it is `nil` in the construct of the processor. This commit fixes
the issue.

Fixes: #83367

Release note: None

Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed Jun 29, 2022
3 parents 2e1c19b + ecfed3b + 86b54f5 commit b3696e3
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 68 deletions.
17 changes: 13 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func refreshRange(
// iterators will emit MVCC tombstones by default and will emit intents when
// configured to do so (see IntentPolicy).
iter := storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
StartKey: span.Key,
EndKey: span.EndKey,
StartTime: refreshFrom, // exclusive
EndTime: refreshTo, // inclusive
Expand All @@ -81,15 +83,24 @@ func refreshRange(
defer iter.Close()

var meta enginepb.MVCCMetadata
iter.SeekGE(storage.MakeMVCCMetadataKey(span.Key))
for {
for iter.SeekGE(storage.MVCCKey{Key: span.Key}); ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return err
} else if !ok {
break
}

key := iter.UnsafeKey().Clone()

if _, hasRange := iter.HasPointAndRange(); hasRange {
rangeKVs := iter.RangeKeys()
if len(rangeKVs) == 0 { // defensive
return errors.Errorf("expected range key at %s not found", key)
}
return roachpb.NewRefreshFailedError(roachpb.RefreshFailedError_REASON_COMMITTED_VALUE,
key.Key, rangeKVs[0].RangeKey.Timestamp)
}

if !key.IsValue() {
// Found an intent. Check whether it is owned by this transaction.
// If so, proceed with iteration. Otherwise, return an error.
Expand All @@ -99,7 +110,6 @@ func refreshRange(
if meta.IsInline() {
// Ignore inline MVCC metadata. We don't expect to see this in practice
// when performing a refresh of an MVCC keyspace.
iter.Next()
continue
}
if meta.Txn.ID == txnID {
Expand All @@ -116,7 +126,6 @@ func refreshRange(
return errors.Errorf("expected provisional value for intent with ts %s, found %s",
meta.Timestamp, iter.UnsafeKey().Timestamp)
}
iter.Next()
continue
}
return roachpb.NewRefreshFailedError(roachpb.RefreshFailedError_REASON_INTENT,
Expand Down
147 changes: 84 additions & 63 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,90 @@ import (
"github.com/stretchr/testify/require"
)

func TestRefreshRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

// Write an MVCC point key at b@3, MVCC point tombstone at b@5, and MVCC range
// tombstone at [d-f)@7.
require.NoError(t, storage.MVCCPut(
ctx, eng, nil, roachpb.Key("b"), hlc.Timestamp{WallTime: 3}, hlc.ClockTimestamp{}, roachpb.MakeValueFromString("value"), nil))
require.NoError(t, storage.MVCCPut(
ctx, eng, nil, roachpb.Key("c"), hlc.Timestamp{WallTime: 5}, hlc.ClockTimestamp{}, roachpb.Value{}, nil))
require.NoError(t, storage.ExperimentalMVCCDeleteRangeUsingTombstone(
ctx, eng, nil, roachpb.Key("d"), roachpb.Key("f"), hlc.Timestamp{WallTime: 7}, hlc.ClockTimestamp{}, nil, nil, 0))

testcases := map[string]struct {
start, end string
from, to int64
expectErr error
}{
"below all": {"a", "z", 1, 2, nil},
"above all": {"a", "z", 8, 10, nil},
"between": {"a", "z", 4, 4, nil},
"beside": {"x", "z", 1, 10, nil},
"point key": {"a", "z", 2, 4, &roachpb.RefreshFailedError{
Reason: roachpb.RefreshFailedError_REASON_COMMITTED_VALUE,
Key: roachpb.Key("b"),
Timestamp: hlc.Timestamp{WallTime: 3},
}},
"point tombstone": {"a", "z", 4, 6, &roachpb.RefreshFailedError{
Reason: roachpb.RefreshFailedError_REASON_COMMITTED_VALUE,
Key: roachpb.Key("c"),
Timestamp: hlc.Timestamp{WallTime: 5},
}},
"range tombstone": {"a", "z", 6, 8, &roachpb.RefreshFailedError{
Reason: roachpb.RefreshFailedError_REASON_COMMITTED_VALUE,
Key: roachpb.Key("d"),
Timestamp: hlc.Timestamp{WallTime: 7},
}},
"to is inclusive": {"a", "z", 1, 3, &roachpb.RefreshFailedError{
Reason: roachpb.RefreshFailedError_REASON_COMMITTED_VALUE,
Key: roachpb.Key("b"),
Timestamp: hlc.Timestamp{WallTime: 3},
}},
"from is exclusive": {"a", "z", 7, 10, nil},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
_, err := RefreshRange(ctx, eng, CommandArgs{
EvalCtx: (&MockEvalCtx{
ClusterSettings: cluster.MakeTestingClusterSettings(),
}).EvalContext(),
Args: &roachpb.RefreshRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: roachpb.Key(tc.start),
EndKey: roachpb.Key(tc.end),
},
RefreshFrom: hlc.Timestamp{WallTime: tc.from},
},
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: tc.to},
Txn: &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
WriteTimestamp: hlc.Timestamp{WallTime: tc.to},
},
ReadTimestamp: hlc.Timestamp{WallTime: tc.to},
},
},
}, &roachpb.RefreshRangeResponse{})

if tc.expectErr == nil {
require.NoError(t, err)
} else {
var refreshErr *roachpb.RefreshFailedError
require.Error(t, err)
require.ErrorAs(t, err, &refreshErr)
require.Equal(t, tc.expectErr, refreshErr)
}
})
}
}

// TestRefreshRangeTimeBoundIterator is a regression test for
// https://github.com/cockroachdb/cockroach/issues/31823. RefreshRange
// uses a time-bound iterator, which has a bug that can cause old
Expand Down Expand Up @@ -227,66 +311,3 @@ func TestRefreshRangeError(t *testing.T) {
}
})
}

// TestRefreshRangeTimestampBounds verifies that a RefreshRange treats its
// RefreshFrom timestamp as exclusive and its txn.ReadTimestamp (i.e. its
// "RefreshTo" timestamp) as inclusive.
func TestRefreshRangeTimestampBounds(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
db := storage.NewDefaultInMemForTesting()
defer db.Close()

k := roachpb.Key("key")
v := roachpb.MakeValueFromString("val")
ts1 := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
ts3 := hlc.Timestamp{WallTime: 3}

// Write to a key at time ts2.
require.NoError(t, storage.MVCCPut(ctx, db, nil, k, ts2, hlc.ClockTimestamp{}, v, nil))

for _, tc := range []struct {
from, to hlc.Timestamp
expErr bool
}{
// Sanity-check.
{ts1, ts3, true},
// RefreshTo is inclusive, so expect error on collision.
{ts1, ts2, true},
// RefreshFrom is exclusive, so expect no error on collision.
{ts2, ts3, false},
} {
var resp roachpb.RefreshRangeResponse
_, err := RefreshRange(ctx, db, CommandArgs{
EvalCtx: (&MockEvalCtx{
ClusterSettings: cluster.MakeTestingClusterSettings(),
}).EvalContext(),
Args: &roachpb.RefreshRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: k,
EndKey: k.Next(),
},
RefreshFrom: tc.from,
},
Header: roachpb.Header{
Txn: &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
WriteTimestamp: tc.to,
},
ReadTimestamp: tc.to,
},
Timestamp: tc.to,
},
}, &resp)

if tc.expErr {
require.Error(t, err)
require.Regexp(t, "encountered recently written committed value", err)
} else {
require.NoError(t, err)
}
}
}
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func newJoinReader(
var diskBuffer kvstreamer.ResultDiskBuffer
if jr.maintainOrdering {
jr.streamerInfo.diskMonitor = execinfra.NewMonitor(
jr.Ctx, jr.FlowCtx.DiskMonitor, "streamer-disk", /* name */
flowCtx.EvalCtx.Ctx(), jr.FlowCtx.DiskMonitor, "streamer-disk", /* name */
)
diskBuffer = rowcontainer.NewKVStreamerResultDiskBuffer(
jr.FlowCtx.Cfg.TempStorage, jr.streamerInfo.diskMonitor,
Expand Down

0 comments on commit b3696e3

Please sign in to comment.