Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage,kv: faster intent resolution over a key range #66268

Merged
merged 1 commit into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) {
Status: roachpb.PENDING,
}
if _, _, err := storage.MVCCResolveWriteIntentRange(
ctx, batch, nil /* ms */, intent, 0,
ctx, batch, nil /* ms */, intent, 0, eng.IsSeparatedIntentsEnabledForTesting(ctx),
); err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ go_test(
embed = [":batcheval"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,13 @@ func resolveLocalLocks(
// These transactions rely on having their locks resolved synchronously.
resolveAllowance = math.MaxInt64
}
onlySeparatedIntents := false
st := evalCtx.ClusterSettings()
// Some tests have st == nil.
if st != nil {
onlySeparatedIntents = st.Version.ActiveVersionOrEmpty(ctx).IsActive(
clusterversion.PostSeparatedIntentsMigration)
}
for _, span := range args.LockSpans {
if err := func() error {
if resolveAllowance == 0 {
Expand Down Expand Up @@ -496,7 +503,7 @@ func resolveLocalLocks(
if inSpan != nil {
update.Span = *inSpan
num, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, resolveAllowance)
ctx, readWriter, ms, update, resolveAllowance, onlySeparatedIntents)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package batcheval
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -44,8 +45,11 @@ func ResolveIntentRange(

update := args.AsLockUpdate()

onlySeparatedIntents :=
cArgs.EvalCtx.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).IsActive(
clusterversion.PostSeparatedIntentsMigration)
numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, h.MaxSpanRequestKeys)
ctx, readWriter, ms, update, h.MaxSpanRequestKeys, onlySeparatedIntents)
if err != nil {
return result.Result{}, err
}
Expand Down
36 changes: 33 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ import (
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -76,6 +79,7 @@ func TestDeclareKeysResolveIntent(t *testing.T) {
}
ctx := context.Background()
engine := storage.NewDefaultInMemForTesting()
st := makeClusterSettingsUsingEngineIntentsSetting(engine)
defer engine.Close()
testutils.RunTrueAndFalse(t, "ranged", func(t *testing.T, ranged bool) {
for _, test := range tests {
Expand Down Expand Up @@ -105,7 +109,7 @@ func TestDeclareKeysResolveIntent(t *testing.T) {
h.RangeID = desc.RangeID

cArgs := CommandArgs{Header: h}
cArgs.EvalCtx = (&MockEvalCtx{AbortSpan: as}).EvalContext()
cArgs.EvalCtx = (&MockEvalCtx{ClusterSettings: st, AbortSpan: as}).EvalContext()

if !ranged {
cArgs.Args = &ri
Expand All @@ -116,6 +120,7 @@ func TestDeclareKeysResolveIntent(t *testing.T) {
} else {
cArgs.Args = &rir
declareKeysResolveIntentRange(&desc, h, &rir, &latchSpans, &lockSpans)
addLockTableSpansForRangedIntentResolution(rir, &latchSpans)
if _, err := ResolveIntentRange(ctx, batch, cArgs, &roachpb.ResolveIntentRangeResponse{}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -157,6 +162,7 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {
defer db.Close()
batch := db.NewBatch()
defer batch.Close()
st := makeClusterSettingsUsingEngineIntentsSetting(db)

var v roachpb.Value
// Write a first value at key.
Expand Down Expand Up @@ -185,6 +191,9 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {

var spans spanset.SpanSet
rbatch := db.NewBatch()
// The spans will be used for validating that reads and writes are
// consistent with the declared spans. We initialize spans below, before
// performing reads and writes.
rbatch = spanset.NewBatch(rbatch, &spans)
defer rbatch.Close()

Expand All @@ -202,7 +211,7 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {
if _, err := ResolveIntent(ctx, rbatch,
CommandArgs{
Header: h,
EvalCtx: (&MockEvalCtx{}).EvalContext(),
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &ri,
},
&roachpb.ResolveIntentResponse{},
Expand All @@ -220,12 +229,13 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {
rir.EndKey = endKey

declareKeysResolveIntentRange(&desc, h, &rir, &spans, nil)
addLockTableSpansForRangedIntentResolution(rir, &spans)

h.MaxSpanRequestKeys = 10
if _, err := ResolveIntentRange(ctx, rbatch,
CommandArgs{
Header: h,
EvalCtx: (&MockEvalCtx{}).EvalContext(),
EvalCtx: (&MockEvalCtx{ClusterSettings: st}).EvalContext(),
Args: &rir,
},
&roachpb.ResolveIntentRangeResponse{},
Expand Down Expand Up @@ -261,3 +271,23 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {
}
})
}

func makeClusterSettingsUsingEngineIntentsSetting(engine storage.Engine) *cluster.Settings {
version := clusterversion.TestingBinaryVersion
if !engine.IsSeparatedIntentsEnabledForTesting(context.Background()) {
// Before SeparatedIntentsMigration, so intent resolution will not assume
// only separated intents.
version = clusterversion.ByKey(clusterversion.SeparatedIntentsMigration - 1)
}
return cluster.MakeTestingClusterSettingsWithVersions(version, version, true)
}

func addLockTableSpansForRangedIntentResolution(
rir roachpb.ResolveIntentRangeRequest, spans *spanset.SpanSet,
) {
// This is similar to the span logic in Replica.newBatchedEngine.
start, _ := keys.LockTableSingleKey(rir.Key, nil)
end, _ := keys.LockTableSingleKey(rir.EndKey, nil)
spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: start, EndKey: end})
}
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,9 +734,11 @@ func TestUpdateAbortSpan(t *testing.T) {
defer db.Close()
batch := db.NewBatch()
defer batch.Close()
st := makeClusterSettingsUsingEngineIntentsSetting(db)
evalCtx := &MockEvalCtx{
Desc: &desc,
AbortSpan: as,
ClusterSettings: st,
Desc: &desc,
AbortSpan: as,
CanCreateTxn: func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) {
return true, hlc.Timestamp{}, 0
},
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ func (tc *testContext) Clock() *hlc.Clock {
// entire keyspace.
func (tc *testContext) Start(t testing.TB, stopper *stop.Stopper) {
tc.manualClock = hlc.NewManualClock(123)
cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond))
cfg := TestStoreConfigWithRandomizedClusterSeparatedIntentsMigration(
hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond))
// testContext tests like to move the manual clock around and assume that they can write at past
// timestamps.
cfg.TestingKnobs.DontCloseTimestamps = true
Expand Down Expand Up @@ -232,12 +233,17 @@ func (tc *testContext) StartWithStoreConfigAndVersion(
tc.gossip = gossip.NewTest(1, rpcContext, server, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
}
if tc.engine == nil {
disableSeparatedIntents :=
!cfg.Settings.Version.ActiveVersionOrEmpty(context.Background()).IsActive(
clusterversion.PostSeparatedIntentsMigration)
log.Infof(context.Background(), "engine creation is randomly setting disableSeparatedIntents: %t",
disableSeparatedIntents)
var err error
tc.engine, err = storage.Open(context.Background(),
storage.InMemory(),
storage.Attributes(roachpb.Attributes{Attrs: []string{"dc1", "mem"}}),
storage.MaxSize(1<<20),
storage.ForTesting)
storage.SetSeparatedIntents(disableSeparatedIntents))
if err != nil {
t.Fatal(err)
}
Expand Down
29 changes: 26 additions & 3 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (r *Replica) evaluateWriteBatchWrapper(
lul hlc.Timestamp,
latchSpans, lockSpans *spanset.SpanSet,
) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) {
batch, opLogger := r.newBatchedEngine(latchSpans, lockSpans)
batch, opLogger := r.newBatchedEngine(ba, latchSpans, lockSpans)
br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, lul, false /* readOnly */)
if pErr == nil {
if opLogger != nil {
Expand All @@ -671,7 +671,7 @@ func (r *Replica) evaluateWriteBatchWrapper(
// OpLogger is attached to the returned engine.Batch, recording all operations.
// Its recording should be attached to the Result of request evaluation.
func (r *Replica) newBatchedEngine(
latchSpans, lockSpans *spanset.SpanSet,
ba *roachpb.BatchRequest, latchSpans, lockSpans *spanset.SpanSet,
) (storage.Batch, *storage.OpLoggerBatch) {
batch := r.store.Engine().NewBatch()
if !batch.ConsistentIterators() {
Expand Down Expand Up @@ -720,14 +720,37 @@ func (r *Replica) newBatchedEngine(
// To account for separated intent accesses, we translate the lock spans
// to lock table spans.
spans := latchSpans.Copy()
lockSpans.Iterate(func(sa spanset.SpanAccess, _ spanset.SpanScope, span spanset.Span) {
addLockTableSpan := func(sa spanset.SpanAccess, span spanset.Span) {
ltKey, _ := keys.LockTableSingleKey(span.Key, nil)
var ltEndKey roachpb.Key
if span.EndKey != nil {
ltEndKey, _ = keys.LockTableSingleKey(span.EndKey, nil)
}
spans.AddNonMVCC(sa, roachpb.Span{Key: ltKey, EndKey: ltEndKey})
}
lockSpans.Iterate(func(sa spanset.SpanAccess, _ spanset.SpanScope, span spanset.Span) {
addLockTableSpan(sa, span)
})
// The lock spans are insufficient for ranged intent resolution, which
// does not declare lock spans and directly calls
// spanSetBatch.NewEngineIterator.
//
// TODO(sumeer): we can't keep adding additional cases here -- come up
// with something cleaner.
for _, union := range ba.Requests {
inner := union.GetInner()
switch req := inner.(type) {
case *roachpb.ResolveIntentRangeRequest:
span := req.Span()
addLockTableSpan(spanset.SpanReadWrite, spanset.Span{Span: span})
case *roachpb.EndTxnRequest:
// EndTxnRequest does local intent resolution. We don't know the
// spans up front so we just allow everything.
spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: keys.LockTableSingleKeyStart, EndKey: keys.LockTableSingleKeyEnd})
}
}

// During writes we may encounter a versioned value newer than the request
// timestamp, and may have to retry at a higher timestamp. This is still
// safe as we're only ever writing at timestamps higher than the timestamp
Expand Down
20 changes: 19 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"math"
"math/rand"
"path/filepath"
"runtime"
"sort"
Expand Down Expand Up @@ -194,10 +195,14 @@ var ExportRequestsLimit = settings.RegisterIntSetting(

// TestStoreConfig has some fields initialized with values relevant in tests.
func TestStoreConfig(clock *hlc.Clock) StoreConfig {
return testStoreConfig(clock, clusterversion.TestingBinaryVersion)
}

func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig {
if clock == nil {
clock = hlc.NewClock(hlc.UnixNano, time.Nanosecond)
}
st := cluster.MakeTestingClusterSettings()
st := cluster.MakeTestingClusterSettingsWithVersions(version, version, true)
sc := StoreConfig{
DefaultSpanConfig: zonepb.DefaultZoneConfigRef().AsSpanConfig(),
DefaultSystemSpanConfig: zonepb.DefaultSystemZoneConfigRef().AsSpanConfig(),
Expand All @@ -219,6 +224,19 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig {
return sc
}

// TestStoreConfigWithRandomizedClusterSeparatedIntentsMigration randomizes
// the StoreConfig to be before or after completion of the separated intents
// migration.
func TestStoreConfigWithRandomizedClusterSeparatedIntentsMigration(clock *hlc.Clock) StoreConfig {
version := clusterversion.TestingBinaryVersion
if rand.Intn(2) == 0 {
// This is before SeparatedIntentsMigration, so we may have interleaved
// intents.
version = clusterversion.ByKey(clusterversion.SeparatedIntentsMigration - 1)
}
return testStoreConfig(clock, version)
}

func newRaftConfig(
strg raft.Storage, id uint64, appliedIndex uint64, storeCfg StoreConfig, logger raft.Logger,
) *raft.Config {
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,8 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
storage.CacheSize(cfg.CacheSize),
storage.MaxSize(sizeInBytes),
storage.EncryptionAtRest(spec.EncryptionOptions),
storage.Settings(cfg.Settings))
storage.Settings(cfg.Settings),
storage.SetSeparatedIntents(disableSeparatedIntents))
if err != nil {
return Engines{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/sticky_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (registry *stickyInMemEnginesRegistryImpl) GetOrCreateStickyInMemEngine(
storage.CacheSize(cfg.CacheSize),
storage.MaxSize(spec.Size.InBytes),
storage.EncryptionAtRest(spec.EncryptionOptions),
storage.ForTesting)
storage.ForStickyEngineTesting)

engineEntry := &stickyInMemEngine{
id: spec.StickyInMemoryEngineID,
Expand Down
Loading