Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] sql: add metadata router #50637

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions pkg/sql/execinfra/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import (
// StaticNodeID is the default Node ID to be used in tests.
const StaticNodeID = roachpb.NodeID(3)

type RepeatableSource interface {
RowSource
// Reset resets the RepeatableSource such that a subsequent call to Next()
// returns the first row.
Reset()
}

// RepeatableRowSource is a RowSource used in benchmarks to avoid having to
// reinitialize a new RowSource every time during multiple passes of the input.
// It is intended to be initialized with all rows.
Expand Down Expand Up @@ -66,8 +73,7 @@ func (r *RepeatableRowSource) Next() (sqlbase.EncDatumRow, *execinfrapb.Producer
return nextRow, nil
}

// Reset resets the RepeatableRowSource such that a subsequent call to Next()
// returns the first row.
// Reset is part of the RepeatableSource interface.
func (r *RepeatableRowSource) Reset() {
r.nextRowIdx = 0
}
Expand All @@ -78,6 +84,63 @@ func (r *RepeatableRowSource) ConsumerDone() {}
// ConsumerClosed is part of the RowSource interface.
func (r *RepeatableRowSource) ConsumerClosed() {}

// TODO(pbardea): Document.
func MakeMetas(numMeta, numOutputs int) []*execinfrapb.ProducerMetadata {
metas := make([]*execinfrapb.ProducerMetadata, numMeta)
uniqueMetas := make([]*execinfrapb.ProducerMetadata, numOutputs)
for i := range metas {
metas[i] = &execinfrapb.ProducerMetadata{StreamIdx: i, SamplerProgress: &execinfrapb.RemoteProducerMetadata_SamplerProgress{
RowsProcessed: 10,
}}
}
for i := range metas {
metas[i] = uniqueMetas[i%numOutputs]
}
return metas
}

type RepeatableMetaSource struct {
metas []*execinfrapb.ProducerMetadata
nextMetaIdx int
}

var _ RowSource = &RepeatableMetaSource{}

// NewRepeatableMetaSource creates a RepeatableMetaSource with the given metas.
func NewRepeatableMetaSource(metas []*execinfrapb.ProducerMetadata) *RepeatableMetaSource {
return &RepeatableMetaSource{metas: metas}
}

// OutputTypes is part of the RowSource interface.
func (r *RepeatableMetaSource) OutputTypes() []*types.T {
return []*types.T{}
}

// Start is part of the RowSource interface.
func (r *RepeatableMetaSource) Start(ctx context.Context) context.Context { return ctx }

// Next is part of the RowSource interface.
func (r *RepeatableMetaSource) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {
// If we've emitted all metadata entries, signal that we have reached the end.
if r.nextMetaIdx >= len(r.metas) {
return nil, nil
}
nextMeta := r.metas[r.nextMetaIdx]
r.nextMetaIdx++
return nil, nextMeta
}

// Reset is part of the RepeatableSource interface.
func (r *RepeatableMetaSource) Reset() {
r.nextMetaIdx = 0
}

// ConsumerDone is part of the RowSource interface.
func (r *RepeatableMetaSource) ConsumerDone() {}

// ConsumerClosed is part of the RowSource interface.
func (r *RepeatableMetaSource) ConsumerClosed() {}

// NewTestMemMonitor creates and starts a new memory monitor to be used in
// tests.
// TODO(yuzefovich): consider reusing this in tree.MakeTestingEvalContext
Expand Down
11 changes: 8 additions & 3 deletions pkg/sql/execinfrapb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,14 @@ func (e *Error) ErrorDetail(ctx context.Context) (err error) {

// ProducerMetadata represents a metadata record flowing through a DistSQL flow.
type ProducerMetadata struct {
// Only one of these fields will be set. If this ever changes, note that
// there're consumers out there that extract the error and, if there is one,
// forward it in isolation and drop the rest of the record.
// StreamIdx informs DistSQL the index of the output stream that this metadata
// should be sent on when hooked up to a BY_META router. This is used by
// metadata-only streams (e.g. restore). This will be set in conjunction with
// another field.
StreamIdx int
// If StreamIdx is not set, only one of these fields will be set. If this ever
// changes, note that there're consumers out there that extract the error and,
// if there is one, forward it in isolation and drop the rest of the record.
Ranges []roachpb.RangeInfo
// TODO(vivek): change to type Error
Err error
Expand Down
Loading