Skip to content

Commit

Permalink
sql: add metadata router
Browse files Browse the repository at this point in the history
This commit adds a DistSQL router that routes metadata to a given output
stream based on the metadata's StreamIdx. This is used by flows which
schedule DistSQL processors in order to coordinate work around the
cluster.

The motivation for this change is a refactoring to Restore which
attempts to distribute the work of performing the restore across the
cluster. RESTORE works by creating a pipeline of work with 2 stages. The
first stage splits and scatters the ranges we are going to import. This
shuffling means that a range could end up on a random node in the
cluster. The second stage of the pipeline is to download the data from
the backup file and restore the data, which is accomplished through an
AddSSTable request. It is beneficial (as well as the motivation for this
refactor) for the node which issues this request to also be the
leaseholder of the range it is trying to import. This is to prevent a
situation where many nodes are all waiting on one node that had the
misfortune of being the recipient of many scatter ranges in a row.

This router would allow restore to be implemented with 2 separate
processors: one that splits and scatters the data, and one that imports
the data. Using this router, the split and scatter processor could
determine, on the fly, which processor is suitable to import the data.

Release note: None
  • Loading branch information
pbardea committed Jun 25, 2020
1 parent 8523caa commit 80a4983
Show file tree
Hide file tree
Showing 8 changed files with 498 additions and 164 deletions.
67 changes: 65 additions & 2 deletions pkg/sql/execinfra/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import (
// StaticNodeID is the default Node ID to be used in tests.
const StaticNodeID = roachpb.NodeID(3)

type RepeatableSource interface {
RowSource
// Reset resets the RepeatableSource such that a subsequent call to Next()
// returns the first row.
Reset()
}

// RepeatableRowSource is a RowSource used in benchmarks to avoid having to
// reinitialize a new RowSource every time during multiple passes of the input.
// It is intended to be initialized with all rows.
Expand Down Expand Up @@ -66,8 +73,7 @@ func (r *RepeatableRowSource) Next() (sqlbase.EncDatumRow, *execinfrapb.Producer
return nextRow, nil
}

// Reset resets the RepeatableRowSource such that a subsequent call to Next()
// returns the first row.
// Reset is part of the RepeatableSource interface.
func (r *RepeatableRowSource) Reset() {
r.nextRowIdx = 0
}
Expand All @@ -78,6 +84,63 @@ func (r *RepeatableRowSource) ConsumerDone() {}
// ConsumerClosed is part of the RowSource interface.
func (r *RepeatableRowSource) ConsumerClosed() {}

// TODO(pbardea): Document.
func MakeMetas(numMeta, numOutputs int) []*execinfrapb.ProducerMetadata {
metas := make([]*execinfrapb.ProducerMetadata, numMeta)
uniqueMetas := make([]*execinfrapb.ProducerMetadata, numOutputs)
for i := range metas {
metas[i] = &execinfrapb.ProducerMetadata{StreamIdx: i, SamplerProgress: &execinfrapb.RemoteProducerMetadata_SamplerProgress{
RowsProcessed: 10,
}}
}
for i := range metas {
metas[i] = uniqueMetas[i%numOutputs]
}
return metas
}

type RepeatableMetaSource struct {
metas []*execinfrapb.ProducerMetadata
nextMetaIdx int
}

var _ RowSource = &RepeatableMetaSource{}

// NewRepeatableMetaSource creates a RepeatableMetaSource with the given metas.
func NewRepeatableMetaSource(metas []*execinfrapb.ProducerMetadata) *RepeatableMetaSource {
return &RepeatableMetaSource{metas: metas}
}

// OutputTypes is part of the RowSource interface.
func (r *RepeatableMetaSource) OutputTypes() []*types.T {
return []*types.T{}
}

// Start is part of the RowSource interface.
func (r *RepeatableMetaSource) Start(ctx context.Context) context.Context { return ctx }

// Next is part of the RowSource interface.
func (r *RepeatableMetaSource) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {
// If we've emitted all metadata entries, signal that we have reached the end.
if r.nextMetaIdx >= len(r.metas) {
return nil, nil
}
nextMeta := r.metas[r.nextMetaIdx]
r.nextMetaIdx++
return nil, nextMeta
}

// Reset is part of the RepeatableSource interface.
func (r *RepeatableMetaSource) Reset() {
r.nextMetaIdx = 0
}

// ConsumerDone is part of the RowSource interface.
func (r *RepeatableMetaSource) ConsumerDone() {}

// ConsumerClosed is part of the RowSource interface.
func (r *RepeatableMetaSource) ConsumerClosed() {}

// NewTestMemMonitor creates and starts a new memory monitor to be used in
// tests.
// TODO(yuzefovich): consider reusing this in tree.MakeTestingEvalContext
Expand Down
11 changes: 8 additions & 3 deletions pkg/sql/execinfrapb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,14 @@ func (e *Error) ErrorDetail(ctx context.Context) (err error) {

// ProducerMetadata represents a metadata record flowing through a DistSQL flow.
type ProducerMetadata struct {
// Only one of these fields will be set. If this ever changes, note that
// there're consumers out there that extract the error and, if there is one,
// forward it in isolation and drop the rest of the record.
// StreamIdx informs DistSQL the index of the output stream that this metadata
// should be sent on when hooked up to a BY_META router. This is used by
// metadata-only streams (e.g. restore). This will be set in conjunction with
// another field.
StreamIdx int
// If StreamIdx is not set, only one of these fields will be set. If this ever
// changes, note that there're consumers out there that extract the error and,
// if there is one, forward it in isolation and drop the rest of the record.
Ranges []roachpb.RangeInfo
// TODO(vivek): change to type Error
Err error
Expand Down
Loading

0 comments on commit 80a4983

Please sign in to comment.