Skip to content

Commit

Permalink
Merge #104753
Browse files Browse the repository at this point in the history
104753: storage: remove duplicated key kind constants r=jbowens a=jbowens

This commit performs a small refactor of the batch reading interface exposed by the storage package. It removes 'Pebble' from a few names, removes the duplicated key kinds (Pebble exports these now), and adds assertions to ensure new key kinds trigger compilation failures. This motivated by #104539, in which I never even considered the batch reader or how it might need to change.

Epic: none
Release note: None

Co-authored-by: Jackson Owens <jackson@cockroachlabs.com>
  • Loading branch information
craig[bot] and jbowens committed Jun 13, 2023
2 parents ddb9303 + 1ab59eb commit c1a71cf
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 128 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (b *appBatch) addWriteBatch(
if wb == nil {
return nil
}
if mutations, err := storage.PebbleBatchCount(wb.Data); err != nil {
if mutations, err := storage.BatchCount(wb.Data); err != nil {
log.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err)
} else {
b.numMutations += mutations
Expand Down
38 changes: 27 additions & 11 deletions pkg/kv/kvserver/debug_print.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"bytes"
"encoding/binary"
"fmt"
"strings"

Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"go.etcd.io/raft/v3/raftpb"
)

Expand Down Expand Up @@ -232,11 +234,18 @@ func tryIntent(kv storage.MVCCKeyValue) (string, error) {
}

func decodeWriteBatch(writeBatch *kvserverpb.WriteBatch) (string, error) {
// Ensure that we always update this function to consider any necessary
// updates when a new key kind is introduced. To do this, we assert
// pebble.KeyKindDeleteSized is the most recent key kind, ensuring that
// compilation will fail if it's not. Unfortunately, this doesn't protect
// against reusing a currently unused RocksDB key kind.
const _ = uint(pebble.InternalKeyKindDeleteSized - pebble.InternalKeyKindMax)

if writeBatch == nil {
return "<nil>\n", nil
}

r, err := storage.NewPebbleBatchReader(writeBatch.Data)
r, err := storage.NewBatchReader(writeBatch.Data)
if err != nil {
return "", err
}
Expand All @@ -245,32 +254,32 @@ func decodeWriteBatch(writeBatch *kvserverpb.WriteBatch) (string, error) {
// the caller all the info we have (in case the writebatch is corrupted).
var sb strings.Builder
for r.Next() {
switch r.BatchType() {
case storage.BatchTypeDeletion:
switch r.KeyKind() {
case pebble.InternalKeyKindDelete:
engineKey, err := r.EngineKey()
if err != nil {
return sb.String(), err
}
sb.WriteString(fmt.Sprintf("Delete: %s\n", SprintEngineKey(engineKey)))
case storage.BatchTypeValue:
case pebble.InternalKeyKindSet, pebble.InternalKeyKindSetWithDelete:
engineKey, err := r.EngineKey()
if err != nil {
return sb.String(), err
}
sb.WriteString(fmt.Sprintf("Put: %s\n", SprintEngineKeyValue(engineKey, r.Value())))
case storage.BatchTypeMerge:
case pebble.InternalKeyKindMerge:
engineKey, err := r.EngineKey()
if err != nil {
return sb.String(), err
}
sb.WriteString(fmt.Sprintf("Merge: %s\n", SprintEngineKeyValue(engineKey, r.Value())))
case storage.BatchTypeSingleDeletion:
case pebble.InternalKeyKindSingleDelete:
engineKey, err := r.EngineKey()
if err != nil {
return sb.String(), err
}
sb.WriteString(fmt.Sprintf("Single Delete: %s\n", SprintEngineKey(engineKey)))
case storage.BatchTypeRangeDeletion:
case pebble.InternalKeyKindRangeDelete:
engineStartKey, err := r.EngineKey()
if err != nil {
return sb.String(), err
Expand All @@ -282,7 +291,7 @@ func decodeWriteBatch(writeBatch *kvserverpb.WriteBatch) (string, error) {
sb.WriteString(fmt.Sprintf(
"Delete Range: [%s, %s)\n", SprintEngineKey(engineStartKey), SprintEngineKey(engineEndKey),
))
case storage.BatchTypeRangeKeySet:
case pebble.InternalKeyKindRangeKeySet:
engineStartKey, err := r.EngineKey()
if err != nil {
return sb.String(), err
Expand All @@ -301,7 +310,7 @@ func decodeWriteBatch(writeBatch *kvserverpb.WriteBatch) (string, error) {
"Set Range Key: %s\n", SprintEngineRangeKeyValue(span, rangeKey),
))
}
case storage.BatchTypeRangeKeyUnset:
case pebble.InternalKeyKindRangeKeyUnset:
engineStartKey, err := r.EngineKey()
if err != nil {
return sb.String(), err
Expand All @@ -320,7 +329,7 @@ func decodeWriteBatch(writeBatch *kvserverpb.WriteBatch) (string, error) {
"Unset Range Key: %s\n", SprintEngineRangeKey(span, rangeKey.Version),
))
}
case storage.BatchTypeRangeKeyDelete:
case pebble.InternalKeyKindRangeKeyDelete:
engineStartKey, err := r.EngineKey()
if err != nil {
return sb.String(), err
Expand All @@ -333,8 +342,15 @@ func decodeWriteBatch(writeBatch *kvserverpb.WriteBatch) (string, error) {
"Delete Range Keys: %s\n",
SprintKeySpan(roachpb.Span{Key: engineStartKey.Key, EndKey: engineEndKey.Key}),
))
case pebble.InternalKeyKindDeleteSized:
engineKey, err := r.EngineKey()
if err != nil {
return sb.String(), err
}
v, _ := binary.Uvarint(r.Value())
sb.WriteString(fmt.Sprintf("Delete (Sized at %d): %s\n", v, SprintEngineKey(engineKey)))
default:
sb.WriteString(fmt.Sprintf("unsupported batch type: %d\n", r.BatchType()))
sb.WriteString(fmt.Sprintf("unsupported key kind: %d\n", r.KeyKind()))
}
}
return sb.String(), r.Error()
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/redact"
"go.etcd.io/raft/v3/raftpb"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -425,16 +426,16 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(

if req.KVBatch != nil {
recordBytesReceived(int64(len(req.KVBatch)))
batchReader, err := storage.NewPebbleBatchReader(req.KVBatch)
batchReader, err := storage.NewBatchReader(req.KVBatch)
if err != nil {
return noSnap, errors.Wrap(err, "failed to decode batch")
}

timingTag.start("sst")
// All batch operations are guaranteed to be point key or range key puts.
for batchReader.Next() {
switch batchReader.BatchType() {
case storage.BatchTypeValue:
switch batchReader.KeyKind() {
case pebble.InternalKeyKindSet, pebble.InternalKeyKindSetWithDelete:
key, err := batchReader.EngineKey()
if err != nil {
return noSnap, err
Expand All @@ -443,7 +444,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
return noSnap, errors.Wrapf(err, "writing sst for raft snapshot")
}

case storage.BatchTypeRangeKeySet:
case pebble.InternalKeyKindRangeKeySet:
start, err := batchReader.EngineKey()
if err != nil {
return noSnap, err
Expand All @@ -464,7 +465,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
}

default:
return noSnap, errors.AssertionFailedf("unexpected batch entry type %d", batchReader.BatchType())
return noSnap, errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind())
}
}
timingTag.stop("sst")
Expand Down
Loading

0 comments on commit c1a71cf

Please sign in to comment.