Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
123178: streamingccl/streamingest: consume all kvs in event r=dt a=dt

This switching from passing single KV events from the subscription to the ingest processor to passing batches of KVs. This avoids needing to go back to the channel and select for each KV, but rather for each batch of KVs.

Release note: none.
Epic: none.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
craig[bot] and dt committed May 2, 2024
2 parents 5b1a3c1 + 51feb0f commit d20a00e
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 90 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/cmdccl/clusterrepl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,10 @@ func subscriptionConsumer(
}
switch event.Type() {
case streamingccl.KVEvent:
kv := event.GetKV()
sz = kv.Size()
sz = 0
for _, kv := range event.GetKVs() {
sz += kv.Size()
}
case streamingccl.SSTableEvent:
ssTab := event.GetSSTable()
sz = ssTab.Size()
Expand Down
32 changes: 16 additions & 16 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type Event interface {
// Type specifies which accessor will be meaningful.
Type() EventType

// GetKV returns a KV event if the EventType is KVEvent.
GetKV() *roachpb.KeyValue
// GetKVs returns a KV event if the EventType is KVEvent.
GetKVs() []roachpb.KeyValue

// GetSSTable returns a AddSSTable event if the EventType is SSTableEvent.
GetSSTable() *kvpb.RangeFeedSSTable
Expand All @@ -68,7 +68,7 @@ type Event interface {

// kvEvent is a key value pair that needs to be ingested.
type kvEvent struct {
kv roachpb.KeyValue
kv []roachpb.KeyValue
}

var _ Event = kvEvent{}
Expand All @@ -78,9 +78,9 @@ func (kve kvEvent) Type() EventType {
return KVEvent
}

// GetKV implements the Event interface.
func (kve kvEvent) GetKV() *roachpb.KeyValue {
return &kve.kv
// GetKVs implements the Event interface.
func (kve kvEvent) GetKVs() []roachpb.KeyValue {
return kve.kv
}

// GetSSTable implements the Event interface.
Expand Down Expand Up @@ -118,8 +118,8 @@ func (sste sstableEvent) Type() EventType {
return SSTableEvent
}

// GetKV implements the Event interface.
func (sste sstableEvent) GetKV() *roachpb.KeyValue {
// GetKVs implements the Event interface.
func (sste sstableEvent) GetKVs() []roachpb.KeyValue {
return nil
}

Expand Down Expand Up @@ -160,8 +160,8 @@ func (dre delRangeEvent) Type() EventType {
return DeleteRangeEvent
}

// GetKV implements the Event interface.
func (dre delRangeEvent) GetKV() *roachpb.KeyValue {
// GetKVs implements the Event interface.
func (dre delRangeEvent) GetKVs() []roachpb.KeyValue {
return nil
}

Expand Down Expand Up @@ -205,8 +205,8 @@ func (ce checkpointEvent) Type() EventType {
return CheckpointEvent
}

// GetKV implements the Event interface.
func (ce checkpointEvent) GetKV() *roachpb.KeyValue {
// GetKVs implements the Event interface.
func (ce checkpointEvent) GetKVs() []roachpb.KeyValue {
return nil
}

Expand Down Expand Up @@ -246,8 +246,8 @@ func (spe spanConfigEvent) Type() EventType {
return SpanConfigEvent
}

// GetKV implements the Event interface.
func (spe spanConfigEvent) GetKV() *roachpb.KeyValue {
// GetKVs implements the Event interface.
func (spe spanConfigEvent) GetKVs() []roachpb.KeyValue {
return nil
}

Expand Down Expand Up @@ -288,7 +288,7 @@ func (se splitEvent) Type() EventType {
}

// GetKV implements the Event interface.
func (se splitEvent) GetKV() *roachpb.KeyValue {
func (se splitEvent) GetKVs() []roachpb.KeyValue {
return nil
}

Expand Down Expand Up @@ -318,7 +318,7 @@ func (se splitEvent) GetSplitEvent() *roachpb.Key {
}

// MakeKVEvent creates an Event from a KV.
func MakeKVEvent(kv roachpb.KeyValue) Event {
func MakeKVEvent(kv []roachpb.KeyValue) Event {
return kvEvent{kv: kv}
}

Expand Down
14 changes: 10 additions & 4 deletions pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,18 @@ type FeedEventPredicate func(message streamingccl.Event) bool
// FeedErrorPredicate allows tests to match an error from ReplicationFeed.
type FeedErrorPredicate func(err error) bool

// KeyMatches makes a FeedEventPredicate that matches a given key.
// KeyMatches makes a FeedEventPredicate that matches a given key in a kv batch.
func KeyMatches(key roachpb.Key) FeedEventPredicate {
return func(msg streamingccl.Event) bool {
if msg.Type() != streamingccl.KVEvent {
return false
}
return bytes.Equal(key, msg.GetKV().Key)
for _, kv := range msg.GetKVs() {
if bytes.Equal(key, kv.Key) {
return true
}
}
return false
}
}

Expand Down Expand Up @@ -107,14 +112,15 @@ func MakeReplicationFeed(t *testing.T, f FeedSource) *ReplicationFeed {
}
}

// ObserveKey consumes the feed until requested key has been seen (or deadline expired).
// ObserveKey consumes the feed until requested key has been seen (or deadline
// expired) in a batch (all of which, including subsequent keys, is consumed).
// Note: we don't do any buffering here. Therefore, it is required that the key
// we want to observe will arrive at some point in the future.
func (rf *ReplicationFeed) ObserveKey(ctx context.Context, key roachpb.Key) roachpb.KeyValue {
rf.consumeUntil(ctx, KeyMatches(key), func(err error) bool {
return false
})
return *rf.msg.GetKV()
return rf.msg.GetKVs()[0]
}

// ObserveAnySpanConfigRecord consumes the feed until any span config record is observed.
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
"//pkg/util/bufalloc",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/streamingccl/streamclient/client_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func subscribeInternal(
ctx context.Context, feed pgx.Rows, eventsChan chan streamingccl.Event, closeChan chan struct{},
ctx context.Context, feed pgx.Rows, eventCh chan streamingccl.Event, closeCh chan struct{},
) error {
// Get the next event from the cursor.
var bufferedEvent *streampb.StreamEvent
Expand Down Expand Up @@ -51,8 +51,8 @@ func subscribeInternal(
return err
}
select {
case eventsChan <- event:
case <-closeChan:
case eventCh <- event:
case <-closeCh:
// Exit quietly to not cause other subscriptions in the same
// ctxgroup.Group to exit.
return nil
Expand Down Expand Up @@ -81,8 +81,8 @@ func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event {
event = streamingccl.MakeSSTableEvent(streamEvent.Batch.Ssts[0])
streamEvent.Batch.Ssts = streamEvent.Batch.Ssts[1:]
case len(streamEvent.Batch.KeyValues) > 0:
event = streamingccl.MakeKVEvent(streamEvent.Batch.KeyValues[0])
streamEvent.Batch.KeyValues = streamEvent.Batch.KeyValues[1:]
event = streamingccl.MakeKVEvent(streamEvent.Batch.KeyValues)
streamEvent.Batch.KeyValues = nil
case len(streamEvent.Batch.DelRanges) > 0:
event = streamingccl.MakeDeleteRangeEvent(streamEvent.Batch.DelRanges[0])
streamEvent.Batch.DelRanges = streamEvent.Batch.DelRanges[1:]
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (sc testStreamClient) Subscribe(
}

events := make(chan streamingccl.Event, 2)
events <- streamingccl.MakeKVEvent(sampleKV)
events <- streamingccl.MakeKVEvent([]roachpb.KeyValue{sampleKV})
events <- streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{sampleResolvedSpan})
close(events)

Expand Down Expand Up @@ -303,8 +303,10 @@ func ExampleClient() {
for event := range sub.Events() {
switch event.Type() {
case streamingccl.KVEvent:
kv := event.GetKV()
fmt.Printf("kv: %s->%s@%d\n", kv.Key.String(), string(kv.Value.RawBytes), kv.Value.Timestamp.WallTime)
kvs := event.GetKVs()
for _, kv := range kvs {
fmt.Printf("kv: %s->%s@%d\n", kv.Key.String(), string(kv.Value.RawBytes), kv.Value.Timestamp.WallTime)
}
case streamingccl.SSTableEvent:
sst := event.GetSSTable()
fmt.Printf("sst: %s->%s@%d\n", sst.Span.String(), string(sst.Data), sst.WriteTS.WallTime)
Expand Down
23 changes: 11 additions & 12 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -280,7 +281,7 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event {
if len(r.systemKVs) > 0 {
systemKV := r.systemKVs[0]
systemKV.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
event = streamingccl.MakeKVEvent(systemKV)
event = streamingccl.MakeKVEvent([]roachpb.KeyValue{systemKV})
r.systemKVs = r.systemKVs[1:]
return event
}
Expand All @@ -295,7 +296,7 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event {
}
event = streamingccl.MakeSSTableEvent(r.sstMaker(keyVals))
} else {
event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.codec, r.tableDesc))
event = streamingccl.MakeKVEvent([]roachpb.KeyValue{makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)})
}
r.numEventsSinceLastResolved++
}
Expand Down Expand Up @@ -674,17 +675,15 @@ func duplicateEvent(event streamingccl.Event) streamingccl.Event {
copy(resolvedSpans, event.GetResolvedSpans())
dup = streamingccl.MakeCheckpointEvent(resolvedSpans)
case streamingccl.KVEvent:
eventKV := event.GetKV()
rawBytes := make([]byte, len(eventKV.Value.RawBytes))
copy(rawBytes, eventKV.Value.RawBytes)
keyVal := roachpb.KeyValue{
Key: event.GetKV().Key.Clone(),
Value: roachpb.Value{
RawBytes: rawBytes,
Timestamp: eventKV.Value.Timestamp,
},
kvs := event.GetKVs()
res := make([]roachpb.KeyValue, len(kvs))
var a bufalloc.ByteAllocator
for i := range kvs {
res[i].Key = kvs[i].Key.Clone()
res[i].Value.Timestamp = kvs[i].Value.Timestamp
a, res[i].Value.RawBytes = a.Copy(kvs[i].Value.RawBytes, 0)
}
dup = streamingccl.MakeKVEvent(keyVal)
dup = streamingccl.MakeKVEvent(res)
case streamingccl.SSTableEvent:
sst := event.GetSSTable()
dataCopy := make([]byte, len(sst.Data))
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/streamingccl/streamingest/merged_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func TestMergeSubscriptionsRun(t *testing.T) {
ctx := context.Background()
events := func(partition string) []streamingccl.Event {
return []streamingccl.Event{
streamingccl.MakeKVEvent(roachpb.KeyValue{
streamingccl.MakeKVEvent([]roachpb.KeyValue{{
Key: []byte(partition + "_key1"),
}),
streamingccl.MakeKVEvent(roachpb.KeyValue{
}}),
streamingccl.MakeKVEvent([]roachpb.KeyValue{{
Key: []byte(partition + "_key2"),
}),
}}),
}
}
mockClient := &mockStreamClient{
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestMergeSubscriptionsRun(t *testing.T) {
events := []string{}
g.Go(func() error {
for ev := range merged.Events() {
events = append(events, string(ev.GetKV().Key))
events = append(events, string(ev.GetKVs()[0].Key))
}
return nil
})
Expand Down
53 changes: 27 additions & 26 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func (sip *streamIngestionProcessor) handleEvent(event partitionEvent) error {

if event.Type() == streamingccl.KVEvent {
sip.metrics.AdmitLatency.RecordValue(
timeutil.Since(event.GetKV().Value.Timestamp.GoTime()).Nanoseconds())
timeutil.Since(event.GetKVs()[0].Value.Timestamp.GoTime()).Nanoseconds())
}

if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
Expand All @@ -720,7 +720,7 @@ func (sip *streamIngestionProcessor) handleEvent(event partitionEvent) error {

switch event.Type() {
case streamingccl.KVEvent:
if err := sip.bufferKV(event.GetKV()); err != nil {
if err := sip.bufferKVs(event.GetKVs()); err != nil {
return err
}
case streamingccl.SSTableEvent:
Expand Down Expand Up @@ -792,13 +792,13 @@ func (sip *streamIngestionProcessor) bufferSST(sst *kvpb.RangeFeedSSTable) error
return err
}

return sip.bufferKV(&roachpb.KeyValue{
return sip.bufferKVs([]roachpb.KeyValue{{
Key: keyVal.Key.Key,
Value: roachpb.Value{
RawBytes: mvccValue.RawBytes,
Timestamp: keyVal.Key.Timestamp,
},
})
}})
}, func(rangeKeyVal storage.MVCCRangeKeyValue) error {
return sip.bufferRangeKeyVal(rangeKeyVal)
})
Expand Down Expand Up @@ -870,36 +870,37 @@ func (sip *streamIngestionProcessor) handleSplitEvent(key *roachpb.Key) error {
return kvDB.AdminSplit(ctx, rekey, expiration)
}

func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error {
func (sip *streamIngestionProcessor) bufferKVs(kvs []roachpb.KeyValue) error {
// TODO: In addition to flushing when receiving a checkpoint event, we
// should also flush when we've buffered sufficient KVs. A buffering adder
// would save us here.
if kv == nil {
if kvs == nil {
return errors.New("kv event expected to have kv")
}
for _, kv := range kvs {
var err error
var ok bool
kv.Key, ok, err = sip.rekey(kv.Key)
if err != nil {
return err
}
if !ok {
continue
}

var err error
var ok bool
kv.Key, ok, err = sip.rekey(kv.Key)
if err != nil {
return err
}
if !ok {
return nil
}
if sip.rewriteToDiffKey {
kv.Value.ClearChecksum()
kv.Value.InitChecksum(kv.Key)
}

if sip.rewriteToDiffKey {
kv.Value.ClearChecksum()
kv.Value.InitChecksum(kv.Key)
sip.buffer.addKV(storage.MVCCKeyValue{
Key: storage.MVCCKey{
Key: kv.Key,
Timestamp: kv.Value.Timestamp,
},
Value: kv.Value.RawBytes,
})
}

sip.buffer.addKV(storage.MVCCKeyValue{
Key: storage.MVCCKey{
Key: kv.Key,
Timestamp: kv.Value.Timestamp,
},
Value: kv.Value.RawBytes,
})
return nil
}

Expand Down
Loading

0 comments on commit d20a00e

Please sign in to comment.