Skip to content

Commit

Permalink
kvserver: Add migration for interleaved intents to separated
Browse files Browse the repository at this point in the history
Adds a long running migration, SeparatedIntentsMigration,
that does the following steps:

1) Iterate through all range descriptors using a RangeIterator
2) Calls a new ranged write command, Barrier, that is a no-op
  during evaluation but waits on any other writes on that range
  to finish before returning
3) Call ScanInterleavedIntents, a new ranged read command, to
  return interleaved intents encountered over key range
4) Iterate over returned intents then push all those txns and
  resolve any intents returned.

The barrier in step 2 ensures that no future writes after that
point will write interleaved intents. The actual disabling
of the setting to write interleaved intents happens in the
next commit.

Part of cockroachdb#41720.

Release note: None.
  • Loading branch information
itsbilal committed Aug 6, 2021
1 parent 6fafc56 commit c3f5976
Show file tree
Hide file tree
Showing 45 changed files with 4,695 additions and 884 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-124 set the active cluster version in the format '<major>.<minor>'
version version 21.1-126 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-124</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-126</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
96 changes: 96 additions & 0 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -1307,6 +1309,99 @@ func runDebugMergeLogs(cmd *cobra.Command, args []string) error {
return writeLogStream(s, cmd.OutOrStdout(), o.filter, o.prefix, o.keepRedactable)
}

var debugIntentCount = &cobra.Command{
Use: "intent-count <store directory>",
Short: "return a count of intents in directory",
Long: `
Returns a count of interleaved and separated intents in the store directory.
Used to investigate stores with lots of unresolved intents, or to confirm
if the migration away from interleaved intents was successful.
`,
Args: cobra.MinimumNArgs(1),
RunE: runDebugIntentCount,
}

func runDebugIntentCount(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */)
if err != nil {
return err
}
defer db.Close()

var interleavedIntentCount, separatedIntentCount int
var keysCount uint64
var wg sync.WaitGroup
closer := make(chan bool)

wg.Add(1)
_ = stopper.RunAsyncTask(ctx, "intent-count-progress-indicator", func(ctx context.Context) {
defer wg.Done()
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

select {
case <-ticker.C:
fmt.Printf("scanned %d keys\n", atomic.LoadUint64(&keysCount))
case <-ctx.Done():
return
case <-closer:
return
}
})

iter := db.NewEngineIterator(storage.IterOptions{
LowerBound: roachpb.KeyMin,
UpperBound: roachpb.KeyMax,
})
defer iter.Close()
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin})
var meta enginepb.MVCCMetadata
for ; valid && err == nil; valid, err = iter.NextEngineKey() {
key, err := iter.EngineKey()
if err != nil {
return err
}
atomic.AddUint64(&keysCount, 1)
if key.IsLockTableKey() {
separatedIntentCount++
continue
}
if !key.IsMVCCKey() {
continue
}
mvccKey, err := key.ToMVCCKey()
if err != nil {
return err
}
if !mvccKey.Timestamp.IsEmpty() {
continue
}
val := iter.UnsafeValue()
if err := protoutil.Unmarshal(val, &meta); err != nil {
return err
}
if meta.IsInline() {
continue
}
interleavedIntentCount++
}
if err != nil {
return err
}
close(closer)
wg.Wait()
fmt.Printf("interleaved intents: %d\nseparated intents: %d\n",
interleavedIntentCount, separatedIntentCount)
return nil
}

// DebugCmdsForRocksDB lists debug commands that access rocksdb through the engine
// and need encryption flags (injected by CCL code).
// Note: do NOT include commands that just call rocksdb code without setting up an engine.
Expand All @@ -1318,6 +1413,7 @@ var DebugCmdsForRocksDB = []*cobra.Command{
debugRaftLogCmd,
debugRangeDataCmd,
debugRangeDescriptorsCmd,
debugIntentCount,
}

// All other debug commands go here.
Expand Down
16 changes: 15 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ const (
SQLInstancesTable
// Can return new retryable rangefeed errors without crashing the client
NewRetryableRangefeedErrors
// SeparatedIntentsMigration adds the migration to move over all remaining
// intents to the separated lock table space.
SeparatedIntentsMigration
// PostSeparatedIntentsMigration runs a cleanup migration after the main
// SeparatedIntentsMigration.
PostSeparatedIntentsMigration

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -434,12 +440,20 @@ var versionsSingleton = keyedVersions{
},
{
Key: SQLInstancesTable,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 122},
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 120},
},
{
Key: NewRetryableRangefeedErrors,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 122},
},
{
Key: SeparatedIntentsMigration,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 124},
},
{
Key: PostSeparatedIntentsMigration,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 126},
},

// Step (2): Add new versions here.
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.AddSSTableRequest:
case *roachpb.MigrateRequest:
case *roachpb.QueryResolvedTimestampRequest:
case *roachpb.BarrierRequest:
case *roachpb.ScanInterleavedIntentsRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
Expand Down Expand Up @@ -843,3 +845,45 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) {
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) scanInterleavedIntents(s, e interface{}) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.ScanInterleavedIntentsRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) barrier(s, e interface{}) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.BarrierRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
45 changes: 45 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,51 @@ func (db *DB) QueryResolvedTimestamp(
return r.ResolvedTS, nil
}

// ScanInterleavedIntents is a command that returns all interleaved intents
// encountered in the request span. A resume span is returned if the entirety
// of the request span was not scanned.
func (db *DB) ScanInterleavedIntents(
ctx context.Context, begin, end interface{}, ts hlc.Timestamp,
) ([]roachpb.Intent, *roachpb.Span, error) {
b := &Batch{Header: roachpb.Header{Timestamp: ts}}
b.scanInterleavedIntents(begin, end)
result, err := getOneResult(db.Run(ctx, b), b)
if err != nil {
return nil, nil, err
}
responses := b.response.Responses
if len(responses) == 0 {
return nil, nil, errors.Errorf("unexpected empty response for ScanInterleavedIntents")
}
resp, ok := responses[0].GetInner().(*roachpb.ScanInterleavedIntentsResponse)
if !ok {
return nil, nil, errors.Errorf("unexpected response of type %T for ScanInterleavedIntents",
responses[0].GetInner())
}
return resp.Intents, result.ResumeSpan, nil
}

// Barrier is a command that waits for conflicting operations such as earlier
// writes on the specified key range to finish.
func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) {
b := &Batch{}
b.barrier(begin, end)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
return hlc.Timestamp{}, err
}
responses := b.response.Responses
if len(responses) == 0 {
return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier")
}
resp, ok := responses[0].GetInner().(*roachpb.BarrierResponse)
if !ok {
return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier",
responses[0].GetInner())
}
return resp.Timestamp, nil
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "batcheval",
srcs = [
"cmd_add_sstable.go",
"cmd_barrier.go",
"cmd_clear_range.go",
"cmd_compute_checksum.go",
"cmd_conditional_put.go",
Expand Down Expand Up @@ -36,6 +37,7 @@ go_library(
"cmd_reverse_scan.go",
"cmd_revert_range.go",
"cmd_scan.go",
"cmd_scan_interleaved_intents.go",
"cmd_subsume.go",
"cmd_truncate_log.go",
"command.go",
Expand Down
47 changes: 47 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_barrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
)

func init() {
RegisterReadWriteCommand(roachpb.Barrier, declareKeysBarrier, Barrier)
}

func declareKeysBarrier(
rs ImmutableRangeState,
h roachpb.Header,
req roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
// Barrier is special-cased in the concurrency manager to *not* actually
// grab these latches. Instead, any conflicting latches with these are waited
// on, but new latches aren't inserted.
latchSpans.AddMVCC(spanset.SpanReadWrite, req.Header().Span(), h.Timestamp)
}

// Barrier evaluation is a no-op, as all the latch waiting happens in
// the latch manager.
func Barrier(
_ context.Context, _ storage.ReadWriter, cArgs CommandArgs, response roachpb.Response,
) (result.Result, error) {
resp := response.(*roachpb.BarrierResponse)
resp.Timestamp = cArgs.EvalCtx.Clock().Now()

return result.Result{}, nil
}
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Re

func init() {
registerMigration(clusterversion.TruncatedAndRangeAppliedStateMigration, truncatedAndAppliedStateMigration)
registerMigration(clusterversion.PostSeparatedIntentsMigration, postSeparatedIntentsMigration)
}

func registerMigration(key clusterversion.Key, migration migration) {
Expand Down Expand Up @@ -130,6 +131,16 @@ func truncatedAndAppliedStateMigration(
return pd, nil
}

// postSeparatedIntentsMigration is the below-raft part of the migration for
// interleaved to separated intents. It is a no-op as the only purpose of
// running the Migrate command here is to clear out any orphaned replicas with
// interleaved intents.
func postSeparatedIntentsMigration(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
return result.Result{}, nil
}

// TestingRegisterMigrationInterceptor is used in tests to register an
// interceptor for a below-raft migration.
//
Expand Down
Loading

0 comments on commit c3f5976

Please sign in to comment.