Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: add TestStoreRangeSplitAndMergeWithGlobalReads #60753

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -90,16 +91,18 @@ func Subsume(
}

// Sanity check the caller has initiated a merge transaction by checking for
// a deletion intent on the local range descriptor.
// a deletion intent on the local range descriptor. Read inconsistently at
// the maximum timestamp to ensure that we see an intent if one exists,
// regardless of what timestamp it is written at.
descKey := keys.RangeDescriptorKey(desc.StartKey)
_, intent, err := storage.MVCCGet(ctx, readWriter, descKey, cArgs.Header.Timestamp,
_, intent, err := storage.MVCCGet(ctx, readWriter, descKey, hlc.MaxTimestamp,
storage.MVCCGetOptions{Inconsistent: true})
if err != nil {
return result.Result{}, errors.Errorf("fetching local range descriptor: %s", err)
} else if intent == nil {
return result.Result{}, errors.AssertionFailedf("range missing intent on its local descriptor")
}
val, _, err := storage.MVCCGetAsTxn(ctx, readWriter, descKey, cArgs.Header.Timestamp, intent.Txn)
val, _, err := storage.MVCCGetAsTxn(ctx, readWriter, descKey, intent.Txn.WriteTimestamp, intent.Txn)
if err != nil {
return result.Result{}, errors.Errorf("fetching local range descriptor as txn: %s", err)
} else if val != nil {
Expand Down
57 changes: 57 additions & 0 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3545,3 +3545,60 @@ func TestSplitBlocksReadsToRHS(t *testing.T) {
}
require.Nil(t, g.Wait())
}

// TestStoreRangeSplitAndMergeWithGlobalReads tests that a range configured to
// serve global reads can be split and merged. In essence, this tests whether
// the split and merge transactions can handle having their timestamp bumped by
// the closed timestamp on the ranges they're operating on.
func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
},
},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)
config.TestingSetupZoneConfigHook(s.Stopper())

// Set global reads.
descID := uint32(keys.MinUserDescID)
descKey := keys.SystemSQLCodec.TablePrefix(descID)
zoneConfig := zonepb.DefaultZoneConfig()
zoneConfig.GlobalReads = proto.Bool(true)
config.TestingSetZoneConfig(config.SystemTenantObjectID(descID), zoneConfig)

// Trigger gossip callback and wait for propagation
require.NoError(t, store.Gossip().AddInfoProto(gossip.KeySystemConfig, &config.SystemConfigEntries{}, 0))
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(roachpb.RKey(descKey))
if repl.ClosedTimestampPolicy() != roachpb.LEAD_FOR_GLOBAL_READS {
return errors.Errorf("expected LEAD_FOR_GLOBAL_READS policy")
}
return nil
})

// Split the range. Should succeed.
splitKey := append(descKey, []byte("split")...)
splitArgs := adminSplitArgs(splitKey)
_, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs)
require.Nil(t, pErr)

repl := store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, splitKey, repl.Desc().StartKey.AsRawKey())

// Merge the range. Should succeed.
mergeArgs := adminMergeArgs(descKey)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), mergeArgs)
require.Nil(t, pErr)

repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
}
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,14 @@ func (r *Replica) ReadProtectedTimestamps(ctx context.Context) {
ts = r.readProtectedTimestampsRLocked(ctx, nil /* f */)
}

// ClosedTimestampPolicy returns the closed timestamp policy of the range, which
// is updated asynchronously through gossip of zone configurations.
func (r *Replica) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
r.mu.RLock()
defer r.mu.RUnlock()
return r.closedTimestampPolicyRLocked()
}

// GetCircuitBreaker returns the circuit breaker controlling
// connection attempts to the specified node.
func (t *RaftTransport) GetCircuitBreaker(
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,10 @@ func (r *Replica) descRLocked() *roachpb.RangeDescriptor {
return r.mu.state.Desc
}

// closedTimestampPolicyRLocked returns the closed timestamp policy of the
// range, which is updated asynchronously through gossip of zone configurations.
// NOTE: an exported version of this method which does not require the replica
// lock exists in helpers_test.go. Move here if needed.
func (r *Replica) closedTimestampPolicyRLocked() roachpb.RangeClosedTimestampPolicy {
if r.mu.zone.GlobalReads != nil && *r.mu.zone.GlobalReads {
return roachpb.LEAD_FOR_GLOBAL_READS
Expand Down