From 240f96df90daeabc5c82a39c56db0fe24cebd619 Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Fri, 19 Mar 2021 21:05:58 -0300 Subject: [PATCH 1/2] colfetcher: respect limit hint Previously, the colfetcher ignored limit hints: it always fetched data from KV until its batch was full. This produces bad behavior if the batch size is larger than the limit hint. For example, if the expected row count was 500, causing us to create a 500-sized batch, but the limit hint for whatever reason was only 20, we would still go ahead and fetch 500 rows. This, in practice, does not appear to show up too easily - if the optimizer is doing its job, the batch size should always be equal to the limit hint for limited scans. Release note: None --- pkg/sql/colfetcher/cfetcher.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index cd4be2ab0f01..db5e4e9c714a 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -269,6 +269,10 @@ type cFetcher struct { // seekPrefix is the prefix to seek to in stateSeekPrefix. seekPrefix roachpb.Key + // limitHint is a hint as to the number of rows that the caller expects to + // be returned from this fetch. + limitHint int + // remainingValueColsByIdx is the set of value columns that are yet to be // seen during the decoding of the current row. remainingValueColsByIdx util.FastIntSet @@ -649,6 +653,7 @@ func (rf *cFetcher) StartScan( } rf.fetcher = f rf.machine.lastRowPrefix = nil + rf.machine.limitHint = int(limitHint) rf.machine.state[0] = stateInitFetch return nil } @@ -1038,7 +1043,19 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) { } rf.machine.rowIdx++ rf.shiftState() + + var emitBatch bool if rf.machine.rowIdx >= rf.machine.batch.Capacity() { + // We have no more room in our batch, so output it immediately. + emitBatch = true + } else if rf.machine.limitHint > 0 && rf.machine.rowIdx >= rf.machine.limitHint { + // If we made it to our limit hint, output our batch early to make sure + // that we don't bother filling in extra data if we don't need to. + emitBatch = true + rf.machine.limitHint = 0 + } + + if emitBatch { rf.pushState(stateResetBatch) rf.machine.batch.SetLength(rf.machine.rowIdx) rf.machine.rowIdx = 0 From 5041b220fe1b9ee60d882565b389e2f7d878a929 Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Fri, 19 Mar 2021 19:55:04 -0300 Subject: [PATCH 2/2] sql: hint scan batch size by expected row count Previously, the "dynamic batch size" strategy for the vectorized engine's batch allocator worked the same in every situation: batches would start at size 1, then double on each re-allocation, until they hit their maximum size of 1024. Now, to improve performance for scans that return a number of rows somewhere in between 1 and 1024, we pass in the optimizer's best guess of the number of rows that the scan will produce all the way down into the TableReader. That guess is used as the initial size of the batch if it's less than 1024. Release note (performance improvement): improve the performance for the vectorized engine when scanning fewer than 1024 rows at a time. --- pkg/BUILD.bazel | 1 + pkg/sql/colexec/colbuilder/execplan.go | 6 +- pkg/sql/colfetcher/BUILD.bazel | 23 +- pkg/sql/colfetcher/cfetcher.go | 17 +- pkg/sql/colfetcher/colbatch_scan.go | 2 + pkg/sql/colfetcher/main_test.go | 33 +++ .../colfetcher/vectorized_batch_size_test.go | 74 +++++++ pkg/sql/colmem/allocator.go | 5 +- pkg/sql/distsql_physical_planner.go | 1 + pkg/sql/execinfrapb/processors.pb.go | 205 ++++++++++-------- pkg/sql/execinfrapb/processors.proto | 4 + pkg/sql/instrumentation.go | 1 + .../testdata/logic_test/explain_analyze | 8 +- pkg/sql/opt/exec/explain/emit.go | 6 + pkg/sql/opt/exec/factory.go | 4 + pkg/sql/physicalplan/physical_plan.go | 8 +- 16 files changed, 298 insertions(+), 100 deletions(-) create mode 100644 pkg/sql/colfetcher/main_test.go create mode 100644 pkg/sql/colfetcher/vectorized_batch_size_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 887fba2e7b22..24c621de80a0 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -178,6 +178,7 @@ ALL_TESTS = [ "//pkg/sql/colexec:colexec_test", "//pkg/sql/colexecerror:colexecerror_test", "//pkg/sql/colexecop:colexecop_test", + "//pkg/sql/colfetcher:colfetcher_test", "//pkg/sql/colflow/colrpc:colrpc_test", "//pkg/sql/colflow:colflow_test", "//pkg/sql/colmem:colmem_test", diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 7b87275f3b53..7c4e71f9b4dd 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -735,7 +735,11 @@ func NewColOperator( if err := checkNumIn(inputs, 0); err != nil { return r, err } - scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post) + + estimatedRowCount := spec.EstimatedRowCount + scanOp, err := colfetcher.NewColBatchScan( + ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post, estimatedRowCount, + ) if err != nil { return r, err } diff --git a/pkg/sql/colfetcher/BUILD.bazel b/pkg/sql/colfetcher/BUILD.bazel index d23fd96b5884..0a4adda8fe1f 100644 --- a/pkg/sql/colfetcher/BUILD.bazel +++ b/pkg/sql/colfetcher/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("//build:STRINGER.bzl", "stringer") go_library( @@ -47,3 +47,24 @@ stringer( src = "cfetcher.go", typ = "fetcherState", ) + +go_test( + name = "colfetcher_test", + srcs = [ + "main_test.go", + "vectorized_batch_size_test.go", + ], + deps = [ + "//pkg/base", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/testutils/serverutils", + "//pkg/testutils/skip", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "@com_github_stretchr_testify//assert", + ], +) diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index db5e4e9c714a..7688abc9c766 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -253,6 +253,10 @@ type cFetcher struct { // fetcher is the underlying fetcher that provides KVs. fetcher *row.KVFetcher + // estimatedRowCount is the optimizer-derived number of expected rows that + // this fetch will produce, if non-zero. + estimatedRowCount uint64 + // machine contains fields that get updated during the run of the fetcher. machine struct { // state is the queue of next states of the state machine. The 0th entry @@ -310,12 +314,19 @@ type cFetcher struct { } } -const cFetcherBatchMinCapacity = 1 - func (rf *cFetcher) resetBatch(timestampOutputIdx, tableOidOutputIdx int) { var reallocated bool + var estimatedRowCount int + // We need to transform our rf.estimatedRowCount, which is a uint64, into + // an int. We have to be careful: if we just cast it directly, a giant + // estimate will wrap around and become negative. + if rf.estimatedRowCount > uint64(coldata.BatchSize()) { + estimatedRowCount = coldata.BatchSize() + } else { + estimatedRowCount = int(rf.estimatedRowCount) + } rf.machine.batch, reallocated = rf.allocator.ResetMaybeReallocate( - rf.typs, rf.machine.batch, cFetcherBatchMinCapacity, rf.memoryLimit, + rf.typs, rf.machine.batch, estimatedRowCount, rf.memoryLimit, ) if reallocated { rf.machine.colvecs = rf.machine.batch.ColVecs() diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 787a61e3734e..0efe733f323c 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -216,6 +216,7 @@ func NewColBatchScan( evalCtx *tree.EvalContext, spec *execinfrapb.TableReaderSpec, post *execinfrapb.PostProcessSpec, + estimatedRowCount uint64, ) (*ColBatchScan, error) { // NB: we hit this with a zero NodeID (but !ok) with multi-tenancy. if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok { @@ -264,6 +265,7 @@ func NewColBatchScan( } fetcher := cFetcherPool.Get().(*cFetcher) + fetcher.estimatedRowCount = estimatedRowCount if _, _, err := initCRowFetcher( flowCtx.Codec(), allocator, execinfra.GetWorkMemLimit(flowCtx.Cfg), fetcher, table, columnIdxMap, neededColumns, spec, spec.HasSystemColumns, diff --git a/pkg/sql/colfetcher/main_test.go b/pkg/sql/colfetcher/main_test.go new file mode 100644 index 000000000000..27e4a8b6b3c7 --- /dev/null +++ b/pkg/sql/colfetcher/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colfetcher_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/sql/colfetcher/vectorized_batch_size_test.go b/pkg/sql/colfetcher/vectorized_batch_size_test.go new file mode 100644 index 000000000000..40b9ca04f719 --- /dev/null +++ b/pkg/sql/colfetcher/vectorized_batch_size_test.go @@ -0,0 +1,74 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colfetcher_test + +import ( + "context" + "regexp" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/assert" +) + +// TestScanBatchSize tests that the the cfetcher's dynamic batch size algorithm +// uses the optimizer's estimated row count for its initial batch size. This +// test sets up a scan against a table with a known row count, and makes sure +// that the optimizer uses its statistics to produce an estimated row count that +// is equal to the number of rows in the table, allowing the fetcher to create +// a single batch for the scan. +func TestScanBatchSize(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderMetamorphic(t, "This test doesn't work with metamorphic batch sizes.") + + testClusterArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + } + tc := testcluster.StartTestCluster(t, 1, testClusterArgs) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + + conn := tc.Conns[0] + + _, err := conn.ExecContext(ctx, `CREATE TABLE t (a PRIMARY KEY) AS SELECT generate_series(1, 511)`) + assert.NoError(t, err) + + rows, err := conn.QueryContext(ctx, `EXPLAIN ANALYZE (VERBOSE, DISTSQL) SELECT * FROM t`) + assert.NoError(t, err) + batchCountRegex := regexp.MustCompile(`vectorized batch count: (\d+)`) + var found bool + var sb strings.Builder + for rows.Next() { + var res string + assert.NoError(t, rows.Scan(&res)) + sb.WriteString(res) + sb.WriteByte('\n') + matches := batchCountRegex.FindStringSubmatch(res) + if len(matches) == 0 { + continue + } + foundBatches, err := strconv.Atoi(matches[1]) + assert.NoError(t, err) + assert.Equal(t, 1, foundBatches, "should use just 1 batch to scan 511 rows") + found = true + break + } + if !found { + t.Fatalf("expected to find a vectorized batch count; found nothing. text:\n%s", sb.String()) + } +} diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index 263f8ecc8fe6..db1de607d764 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -163,10 +163,9 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat func (a *Allocator) ResetMaybeReallocate( typs []*types.T, oldBatch coldata.Batch, minCapacity int, maxBatchMemSize int64, ) (newBatch coldata.Batch, reallocated bool) { - if minCapacity < 1 { + if minCapacity < 0 { colexecerror.InternalError(errors.AssertionFailedf("invalid minCapacity %d", minCapacity)) - } - if minCapacity > coldata.BatchSize() { + } else if minCapacity == 0 || minCapacity > coldata.BatchSize() { minCapacity = coldata.BatchSize() } reallocated = true diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 26c390a01fc8..834ce071ee6d 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1327,6 +1327,7 @@ func (dsp *DistSQLPlanner) planTableReaders( p.TotalEstimatedScannedRows += info.estimatedRowCount corePlacement[i].NodeID = sp.Node + corePlacement[i].EstimatedRowCount = info.estimatedRowCount corePlacement[i].Core.TableReader = tr } diff --git a/pkg/sql/execinfrapb/processors.pb.go b/pkg/sql/execinfrapb/processors.pb.go index 0676136e38be..5031c6f8bc95 100644 --- a/pkg/sql/execinfrapb/processors.pb.go +++ b/pkg/sql/execinfrapb/processors.pb.go @@ -71,13 +71,16 @@ type ProcessorSpec struct { // This can be aliased with InputSyncSpec.ColumnTypes, so it must not be // modified in-place during planning. ResultTypes []*types.T `protobuf:"bytes,7,rep,name=result_types,json=resultTypes" json:"result_types,omitempty"` + // estimated_row_count contains the number of rows that the optimizer expects + // will be emitted from this processor, or 0 if the estimate wasn't populated. + EstimatedRowCount uint64 `protobuf:"varint,8,opt,name=estimated_row_count,json=estimatedRowCount" json:"estimated_row_count"` } func (m *ProcessorSpec) Reset() { *m = ProcessorSpec{} } func (m *ProcessorSpec) String() string { return proto.CompactTextString(m) } func (*ProcessorSpec) ProtoMessage() {} func (*ProcessorSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_5e45e1d8c39180fc, []int{0} + return fileDescriptor_processors_840af64cccdf7fe1, []int{0} } func (m *ProcessorSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -141,7 +144,7 @@ func (m *ProcessorCoreUnion) Reset() { *m = ProcessorCoreUnion{} } func (m *ProcessorCoreUnion) String() string { return proto.CompactTextString(m) } func (*ProcessorCoreUnion) ProtoMessage() {} func (*ProcessorCoreUnion) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_5e45e1d8c39180fc, []int{1} + return fileDescriptor_processors_840af64cccdf7fe1, []int{1} } func (m *ProcessorCoreUnion) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -176,7 +179,7 @@ func (m *NoopCoreSpec) Reset() { *m = NoopCoreSpec{} } func (m *NoopCoreSpec) String() string { return proto.CompactTextString(m) } func (*NoopCoreSpec) ProtoMessage() {} func (*NoopCoreSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_5e45e1d8c39180fc, []int{2} + return fileDescriptor_processors_840af64cccdf7fe1, []int{2} } func (m *NoopCoreSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -218,7 +221,7 @@ func (m *LocalPlanNodeSpec) Reset() { *m = LocalPlanNodeSpec{} } func (m *LocalPlanNodeSpec) String() string { return proto.CompactTextString(m) } func (*LocalPlanNodeSpec) ProtoMessage() {} func (*LocalPlanNodeSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_5e45e1d8c39180fc, []int{3} + return fileDescriptor_processors_840af64cccdf7fe1, []int{3} } func (m *LocalPlanNodeSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -251,7 +254,7 @@ func (m *MetadataTestSenderSpec) Reset() { *m = MetadataTestSenderSpec{} func (m *MetadataTestSenderSpec) String() string { return proto.CompactTextString(m) } func (*MetadataTestSenderSpec) ProtoMessage() {} func (*MetadataTestSenderSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_5e45e1d8c39180fc, []int{4} + return fileDescriptor_processors_840af64cccdf7fe1, []int{4} } func (m *MetadataTestSenderSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -284,7 +287,7 @@ func (m *MetadataTestReceiverSpec) Reset() { *m = MetadataTestReceiverSp func (m *MetadataTestReceiverSpec) String() string { return proto.CompactTextString(m) } func (*MetadataTestReceiverSpec) ProtoMessage() {} func (*MetadataTestReceiverSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_5e45e1d8c39180fc, []int{5} + return fileDescriptor_processors_840af64cccdf7fe1, []int{5} } func (m *MetadataTestReceiverSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -390,6 +393,9 @@ func (m *ProcessorSpec) MarshalTo(dAtA []byte) (int, error) { i += n } } + dAtA[i] = 0x40 + i++ + i = encodeVarintProcessors(dAtA, i, uint64(m.EstimatedRowCount)) return i, nil } @@ -911,6 +917,7 @@ func (m *ProcessorSpec) Size() (n int) { n += 1 + l + sovProcessors(uint64(l)) } } + n += 1 + sovProcessors(uint64(m.EstimatedRowCount)) return n } @@ -1503,6 +1510,25 @@ func (m *ProcessorSpec) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EstimatedRowCount", wireType) + } + m.EstimatedRowCount = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EstimatedRowCount |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipProcessors(dAtA[iNdEx:]) @@ -3061,89 +3087,90 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/processors.proto", fileDescriptor_processors_5e45e1d8c39180fc) + proto.RegisterFile("sql/execinfrapb/processors.proto", fileDescriptor_processors_840af64cccdf7fe1) } -var fileDescriptor_processors_5e45e1d8c39180fc = []byte{ - // 1265 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x97, 0xdf, 0x72, 0x13, 0xb7, - 0x17, 0xc7, 0xe3, 0xc4, 0x24, 0xb6, 0x9c, 0x10, 0x23, 0x02, 0xe8, 0xe7, 0x5f, 0xeb, 0xa4, 0x2e, - 0xb4, 0x2e, 0x05, 0xd3, 0x32, 0x6d, 0x2f, 0x98, 0x76, 0x5a, 0x6c, 0x97, 0xc9, 0xa6, 0x34, 0xd0, - 0x75, 0x0a, 0x33, 0xdc, 0xb8, 0xca, 0xae, 0xe2, 0x2c, 0xac, 0x57, 0x8b, 0x24, 0x13, 0xa0, 0x2f, - 0xd1, 0x47, 0xe8, 0x03, 0xf4, 0x41, 0x72, 0xd5, 0xe1, 0x92, 0x2b, 0xa6, 0x35, 0x2f, 0xd2, 0xd1, - 0xd1, 0x7a, 0x77, 0xed, 0xc4, 0xbb, 0xb9, 0xc9, 0x38, 0xf2, 0xf7, 0xfb, 0xd1, 0xd1, 0x39, 0x47, - 0x7f, 0x8c, 0xb6, 0xe4, 0x73, 0xff, 0x16, 0x7b, 0xc9, 0x1c, 0x2f, 0x38, 0x10, 0x34, 0xdc, 0xbf, - 0x15, 0x0a, 0xee, 0x30, 0x29, 0xb9, 0x90, 0xad, 0x50, 0x70, 0xc5, 0x31, 0x71, 0xb8, 0xf3, 0x4c, - 0x70, 0xea, 0x1c, 0xb6, 0xe4, 0x73, 0xbf, 0xe5, 0x7a, 0x52, 0xc9, 0xe7, 0xbe, 0x18, 0x05, 0xb5, - 0xda, 0xac, 0xd7, 0xa5, 0x8a, 0x1a, 0x57, 0xed, 0xda, 0x7c, 0x6e, 0x7f, 0x9f, 0x4a, 0x16, 0xc9, - 0xae, 0x66, 0xc8, 0xf4, 0x6c, 0x46, 0xd5, 0xcc, 0x82, 0x8d, 0xfc, 0x67, 0x7d, 0x8f, 0x47, 0xca, - 0x1b, 0x19, 0x4a, 0xe7, 0x90, 0x06, 0x03, 0x76, 0xc0, 0x98, 0x2b, 0xcf, 0xa0, 0x56, 0x74, 0xdf, - 0x67, 0x7d, 0xa9, 0xa8, 0x9a, 0xa8, 0x2f, 0x69, 0xb5, 0x7a, 0x15, 0x32, 0x69, 0xfe, 0x46, 0xc3, - 0x1b, 0x03, 0x3e, 0xe0, 0xf0, 0xf1, 0x96, 0xfe, 0x64, 0x46, 0x1b, 0x7f, 0x2f, 0xa1, 0xb5, 0x87, - 0x13, 0x5a, 0x2f, 0x64, 0x0e, 0xee, 0xa0, 0x73, 0x5e, 0x10, 0x8e, 0x14, 0x29, 0x6c, 0x2d, 0x35, - 0x2b, 0xb7, 0x3f, 0x6d, 0xcd, 0xcb, 0x6b, 0xcb, 0xd2, 0xb2, 0xde, 0xab, 0xc0, 0xd1, 0xbe, 0x76, - 0xf1, 0xf8, 0xdd, 0xe6, 0x82, 0x6d, 0xbc, 0xf8, 0x1e, 0x2a, 0x3a, 0x5c, 0x30, 0xb2, 0xb8, 0x55, - 0x68, 0x56, 0x6e, 0xdf, 0x98, 0xcf, 0x88, 0xe7, 0xee, 0x70, 0xc1, 0x7e, 0x0d, 0x3c, 0x1e, 0x44, - 0x20, 0xf0, 0xe3, 0x6d, 0xb4, 0xcc, 0x47, 0x4a, 0x47, 0xb3, 0x04, 0xd1, 0x5c, 0x9f, 0x4f, 0x7a, - 0x00, 0x3a, 0x9b, 0x8f, 0x14, 0x13, 0xa9, 0x80, 0x22, 0x3f, 0xee, 0xa0, 0x62, 0xc8, 0xa5, 0x22, - 0x45, 0x88, 0xe8, 0xb3, 0x8c, 0x88, 0xb8, 0x54, 0x51, 0x54, 0x29, 0x0c, 0x98, 0xf1, 0x75, 0x54, - 0x92, 0x8a, 0x0e, 0x58, 0xdf, 0x73, 0xc9, 0xb9, 0xad, 0x42, 0xf3, 0x5c, 0x7b, 0x5d, 0x7f, 0x3b, - 0x7e, 0xb7, 0xb9, 0xd2, 0xd3, 0xe3, 0x56, 0xd7, 0x5e, 0x01, 0x81, 0xe5, 0xe2, 0x6f, 0xd0, 0x6a, - 0x5c, 0x26, 0xad, 0x5f, 0x06, 0xfd, 0xc5, 0x48, 0x5f, 0x89, 0x17, 0x6e, 0x75, 0xed, 0x4a, 0x2c, - 0xb4, 0x5c, 0xfc, 0x1d, 0x5a, 0x15, 0x4c, 0x8e, 0x7c, 0xd5, 0x87, 0xea, 0x91, 0x15, 0x58, 0x78, - 0x6d, 0x26, 0x60, 0xc9, 0x86, 0x2d, 0x53, 0xdd, 0x3d, 0xbb, 0x62, 0xf4, 0x7b, 0xfa, 0xdf, 0xc6, - 0x5f, 0x1b, 0x08, 0x9f, 0x4c, 0x2a, 0xbe, 0x83, 0x8a, 0x01, 0xe7, 0x21, 0x29, 0xc0, 0xf2, 0x3f, - 0x99, 0xbf, 0xfc, 0x5d, 0xce, 0x43, 0x6d, 0xd3, 0x6b, 0xb7, 0xc1, 0x83, 0x7f, 0x42, 0x15, 0xe8, - 0x32, 0x9b, 0x51, 0x97, 0x89, 0xa8, 0xa6, 0x19, 0x19, 0xdc, 0x4b, 0xc4, 0x40, 0x49, 0xbb, 0xf1, - 0x36, 0x42, 0x4f, 0xb9, 0x17, 0x44, 0xac, 0x25, 0x60, 0x35, 0xe7, 0xb3, 0x76, 0x62, 0x2d, 0xa0, - 0x52, 0x5e, 0xfc, 0x2d, 0x5a, 0x96, 0x5c, 0x28, 0x26, 0xa2, 0x9a, 0x5e, 0x9d, 0x4f, 0xe9, 0x81, - 0x0e, 0x08, 0x91, 0x47, 0xc7, 0x41, 0x07, 0x03, 0xc1, 0x06, 0x54, 0x71, 0x01, 0xc5, 0xcc, 0x8c, - 0xe3, 0x6e, 0xac, 0x35, 0x71, 0x24, 0x5e, 0xdc, 0x46, 0x25, 0x2d, 0xf4, 0x02, 0x47, 0x91, 0x95, - 0xbc, 0xf4, 0x76, 0x23, 0x25, 0x50, 0x62, 0x9f, 0x4e, 0xf1, 0x90, 0x89, 0x01, 0xd3, 0xcb, 0x65, - 0x82, 0x94, 0xf2, 0x52, 0xfc, 0x73, 0x22, 0x36, 0x29, 0x4e, 0xb9, 0xf5, 0xd2, 0x0e, 0xa9, 0x3c, - 0x8c, 0x58, 0xe5, 0xbc, 0xa5, 0x6d, 0xc7, 0x5a, 0xb3, 0xb4, 0xc4, 0x8b, 0x7f, 0x40, 0xcb, 0x2f, - 0xa8, 0x3f, 0x62, 0x92, 0xa0, 0x3c, 0xca, 0x23, 0xd0, 0xc5, 0x9d, 0x13, 0xf9, 0x74, 0x2c, 0xfb, - 0xd4, 0x79, 0x76, 0xe0, 0xf9, 0x3e, 0x13, 0xa4, 0x92, 0x47, 0x69, 0xc7, 0x5a, 0x13, 0x4b, 0xe2, - 0xc5, 0xf7, 0x11, 0x12, 0x8c, 0xba, 0xd6, 0x30, 0xe4, 0x42, 0x91, 0xb5, 0xbc, 0x83, 0xc5, 0x8e, - 0xb5, 0x5d, 0xaa, 0xa8, 0xa1, 0x25, 0x7e, 0xfc, 0x3d, 0x5a, 0xe9, 0xd1, 0x61, 0xa8, 0x83, 0x5a, - 0x07, 0xd4, 0xb5, 0x8c, 0xee, 0x31, 0x42, 0x60, 0x4c, 0x5c, 0xf8, 0x09, 0xaa, 0x9a, 0x8f, 0x49, - 0x67, 0x90, 0x2a, 0x90, 0x5a, 0x79, 0xa4, 0x99, 0x5e, 0x3a, 0xc1, 0xc1, 0xbf, 0x21, 0x3c, 0x64, - 0x8a, 0xea, 0x6b, 0x6a, 0x8f, 0x49, 0xd5, 0x63, 0x81, 0xde, 0x2b, 0x18, 0xe8, 0x5f, 0x64, 0x35, - 0xc5, 0xac, 0x07, 0xf8, 0xa7, 0xb0, 0xf0, 0x01, 0xda, 0x48, 0x8f, 0xda, 0xcc, 0x61, 0xde, 0x0b, - 0x26, 0xc8, 0x45, 0x98, 0xe3, 0xf6, 0xd9, 0xe6, 0x98, 0xb8, 0x60, 0x96, 0x53, 0x79, 0xf8, 0x47, - 0x54, 0xee, 0xf4, 0x1e, 0x3d, 0x16, 0x9e, 0xde, 0xa6, 0x1b, 0x00, 0xcf, 0xb8, 0x50, 0x62, 0x29, - 0x10, 0x13, 0x27, 0xde, 0x45, 0xab, 0xaf, 0xbd, 0xc1, 0x6b, 0x3a, 0x88, 0x7a, 0xfa, 0x12, 0x90, - 0x32, 0x2e, 0x83, 0x27, 0x29, 0x35, 0xc0, 0xa6, 0xfc, 0xba, 0x2b, 0x43, 0xc1, 0x9f, 0x32, 0x47, - 0xf5, 0x98, 0x22, 0x97, 0xf3, 0xba, 0xf2, 0x61, 0xac, 0x35, 0x7d, 0x94, 0x78, 0xf5, 0xe6, 0x3f, - 0xf2, 0x02, 0x97, 0x1f, 0x31, 0x41, 0xae, 0xe4, 0x6d, 0xfe, 0xc7, 0x91, 0xd2, 0x6c, 0xfe, 0x89, - 0x0f, 0xff, 0x82, 0xd6, 0x7c, 0xee, 0x50, 0xff, 0xa1, 0x4f, 0x83, 0x5d, 0xee, 0x32, 0x42, 0x00, - 0xf4, 0xf9, 0x7c, 0xd0, 0xfd, 0xb4, 0x1c, 0x68, 0xd3, 0x04, 0xdd, 0x9d, 0xe6, 0x19, 0x91, 0xea, - 0xce, 0xff, 0xe5, 0x75, 0x67, 0x67, 0xc6, 0x61, 0xba, 0x73, 0x96, 0x83, 0xf7, 0xd0, 0x79, 0x33, - 0x76, 0x4f, 0xf0, 0x40, 0x79, 0x4c, 0x90, 0x5a, 0xde, 0x66, 0xec, 0x4c, 0xe9, 0x81, 0x3b, 0xc3, - 0xd0, 0x25, 0xe1, 0xc2, 0xf5, 0x02, 0xea, 0x7b, 0xea, 0x15, 0xf9, 0x7f, 0x5e, 0x49, 0x1e, 0xc4, - 0x5a, 0x53, 0x92, 0xc4, 0xab, 0xd3, 0xa9, 0x1f, 0x5b, 0x36, 0x3f, 0x8a, 0xfa, 0xee, 0x83, 0xbc, - 0x74, 0xb6, 0xd3, 0x72, 0x93, 0xce, 0x29, 0x82, 0x4e, 0xa7, 0x17, 0xbc, 0x60, 0x42, 0x31, 0xf7, - 0x9e, 0xe7, 0x2b, 0x26, 0x98, 0x20, 0x1f, 0xe6, 0xa5, 0xd3, 0x9a, 0x71, 0x98, 0x74, 0xce, 0x72, - 0x74, 0x3a, 0x27, 0x63, 0x51, 0x77, 0xd7, 0xf3, 0xd2, 0x69, 0x4d, 0xe9, 0x4d, 0x3a, 0xa7, 0x19, - 0x93, 0x73, 0x77, 0x14, 0xea, 0xd3, 0x8f, 0x6c, 0x9e, 0xe5, 0xdc, 0x35, 0xda, 0xe4, 0xdc, 0x35, - 0xff, 0xe3, 0xc7, 0x68, 0x5d, 0x86, 0xbe, 0xa7, 0xee, 0x06, 0x6e, 0xcf, 0xa1, 0x4a, 0x27, 0x74, - 0x0b, 0x70, 0x37, 0x33, 0xce, 0xb9, 0x69, 0x03, 0x30, 0x67, 0x29, 0xfa, 0xce, 0x13, 0x4c, 0x2a, - 0x2e, 0x18, 0xc4, 0xf8, 0x51, 0xde, 0x9d, 0x67, 0x27, 0x62, 0x73, 0xe7, 0xa5, 0xdc, 0x7a, 0x1f, - 0x1e, 0x4c, 0x2a, 0xd3, 0xc8, 0xdb, 0x87, 0x53, 0x15, 0x89, 0x7d, 0xd8, 0x41, 0x17, 0xa5, 0x12, - 0x8c, 0x0e, 0xad, 0x60, 0xc0, 0xa4, 0xf2, 0x78, 0x00, 0x81, 0x7d, 0x0c, 0xb8, 0x2f, 0x33, 0x56, - 0x7b, 0xd2, 0x04, 0xe4, 0xd3, 0x68, 0x98, 0xa3, 0x2b, 0x33, 0xc3, 0xf1, 0x36, 0xba, 0x0a, 0x13, - 0x7d, 0x7d, 0xe6, 0x89, 0xa6, 0xf6, 0xd3, 0x3c, 0xea, 0x9d, 0xe2, 0xf1, 0x9f, 0x9b, 0x85, 0x9d, - 0x62, 0xe9, 0x7c, 0x75, 0x7d, 0xa7, 0x58, 0xba, 0x50, 0xc5, 0x3b, 0xc5, 0xd2, 0x72, 0x75, 0x65, - 0xa7, 0x58, 0x5a, 0xad, 0xae, 0x35, 0xce, 0xa3, 0xd5, 0xf4, 0x8b, 0xaf, 0xf1, 0x3b, 0xba, 0x70, - 0xe2, 0x70, 0xc1, 0x4d, 0xb4, 0x6a, 0xf3, 0xa3, 0x1e, 0x1f, 0x09, 0x87, 0x59, 0xee, 0x4b, 0x78, - 0x44, 0xae, 0x45, 0x0f, 0xe3, 0xa9, 0x6f, 0x70, 0x03, 0x95, 0x77, 0x47, 0x43, 0xf8, 0x61, 0x20, - 0xe1, 0xa1, 0x38, 0x91, 0x25, 0xc3, 0x98, 0xa0, 0xe2, 0x2e, 0x1d, 0x32, 0x78, 0xfb, 0x95, 0x27, - 0xcf, 0x6b, 0x3d, 0xd2, 0xf8, 0x0a, 0x5d, 0x3e, 0xfd, 0x0e, 0xc3, 0x35, 0xb4, 0xe8, 0xb9, 0x30, - 0x6f, 0xb9, 0x8d, 0xa2, 0x27, 0xf4, 0xa2, 0xd5, 0xb5, 0x17, 0x3d, 0xb7, 0xb1, 0x8d, 0xc8, 0xbc, - 0x5b, 0x09, 0xdf, 0x40, 0x48, 0x02, 0xa5, 0xef, 0xb9, 0x12, 0x7e, 0xd1, 0x94, 0xdb, 0x6b, 0xe3, - 0x77, 0x9b, 0x65, 0xc3, 0xb6, 0xba, 0xd2, 0x2e, 0x1b, 0x81, 0xe5, 0xca, 0xf6, 0xcd, 0xe3, 0x7f, - 0xeb, 0x0b, 0xc7, 0xe3, 0x7a, 0xe1, 0xcd, 0xb8, 0x5e, 0x78, 0x3b, 0xae, 0x17, 0xfe, 0x19, 0xd7, - 0x0b, 0x7f, 0xbc, 0xaf, 0x2f, 0xbc, 0x79, 0x5f, 0x5f, 0x78, 0xfb, 0xbe, 0xbe, 0xf0, 0xa4, 0x92, - 0xfa, 0x15, 0xf6, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x34, 0x76, 0x7e, 0x5f, 0x94, 0x0e, 0x00, - 0x00, +var fileDescriptor_processors_840af64cccdf7fe1 = []byte{ + // 1296 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x97, 0xdf, 0x72, 0xd3, 0x46, + 0x14, 0xc6, 0xe3, 0x20, 0x12, 0x7b, 0x9d, 0x90, 0xb0, 0x09, 0xb0, 0x4d, 0x5b, 0x27, 0x75, 0xa1, + 0x75, 0x29, 0x98, 0x96, 0xa1, 0xbd, 0x60, 0xda, 0x69, 0xb1, 0x53, 0x26, 0x4a, 0x69, 0xa0, 0x72, + 0x0a, 0x33, 0xdc, 0xb8, 0x1b, 0x69, 0xe3, 0x08, 0x64, 0xad, 0xd8, 0x5d, 0x13, 0xa0, 0xb7, 0x7d, + 0x80, 0x3e, 0x42, 0x1f, 0xa0, 0x0f, 0x92, 0x4b, 0x2e, 0xb9, 0x62, 0x5a, 0xe7, 0x45, 0x3a, 0x7b, + 0x56, 0x96, 0x64, 0x27, 0x96, 0x72, 0xe3, 0x91, 0x57, 0xdf, 0xf7, 0xdb, 0xb3, 0x67, 0xcf, 0xfe, + 0x11, 0xda, 0x90, 0x2f, 0x82, 0x5b, 0xec, 0x15, 0x73, 0xfd, 0x70, 0x5f, 0xd0, 0x68, 0xef, 0x56, + 0x24, 0xb8, 0xcb, 0xa4, 0xe4, 0x42, 0x36, 0x23, 0xc1, 0x15, 0xc7, 0xc4, 0xe5, 0xee, 0x73, 0xc1, + 0xa9, 0x7b, 0xd0, 0x94, 0x2f, 0x82, 0xa6, 0xe7, 0x4b, 0x25, 0x5f, 0x04, 0x62, 0x10, 0xae, 0xad, + 0x4d, 0x7a, 0x3d, 0xaa, 0xa8, 0x71, 0xad, 0x5d, 0x9b, 0xce, 0xed, 0xee, 0x51, 0xc9, 0x62, 0xd9, + 0xd5, 0x1c, 0x99, 0xee, 0xcd, 0xa8, 0x1a, 0x79, 0xb0, 0x41, 0xf0, 0xbc, 0xeb, 0xf3, 0x58, 0x79, + 0x23, 0x47, 0xe9, 0x1e, 0xd0, 0xb0, 0xc7, 0xf6, 0x19, 0xf3, 0xe4, 0x19, 0xd4, 0x8a, 0xee, 0x05, + 0xac, 0x2b, 0x15, 0x55, 0x23, 0xf5, 0x25, 0xad, 0x56, 0xaf, 0x23, 0x26, 0xcd, 0x6f, 0xdc, 0xbc, + 0xda, 0xe3, 0x3d, 0x0e, 0x8f, 0xb7, 0xf4, 0x93, 0x69, 0xad, 0xff, 0x69, 0xa1, 0xc5, 0x47, 0x23, + 0x5a, 0x27, 0x62, 0x2e, 0x6e, 0xa3, 0xf3, 0x7e, 0x18, 0x0d, 0x14, 0x29, 0x6d, 0x9c, 0x6b, 0x54, + 0x6f, 0x7f, 0xde, 0x9c, 0x96, 0xd7, 0xa6, 0xad, 0x65, 0x9d, 0xd7, 0xa1, 0xab, 0x7d, 0x2d, 0xeb, + 0xe8, 0xfd, 0xfa, 0x8c, 0x63, 0xbc, 0xf8, 0x3e, 0xb2, 0x5c, 0x2e, 0x18, 0x99, 0xdd, 0x28, 0x35, + 0xaa, 0xb7, 0x6f, 0x4c, 0x67, 0x24, 0x7d, 0xb7, 0xb9, 0x60, 0xbf, 0x85, 0x3e, 0x0f, 0x63, 0x10, + 0xf8, 0xf1, 0x16, 0x9a, 0xe3, 0x03, 0xa5, 0xa3, 0x39, 0x07, 0xd1, 0x5c, 0x9f, 0x4e, 0x7a, 0x08, + 0x3a, 0x87, 0x0f, 0x14, 0x13, 0x99, 0x80, 0x62, 0x3f, 0x6e, 0x23, 0x2b, 0xe2, 0x52, 0x11, 0x0b, + 0x22, 0xfa, 0x22, 0x27, 0x22, 0x2e, 0x55, 0x1c, 0x55, 0x06, 0x03, 0x66, 0x7c, 0x1d, 0x95, 0xa5, + 0xa2, 0x3d, 0xd6, 0xf5, 0x3d, 0x72, 0x7e, 0xa3, 0xd4, 0x38, 0xdf, 0x5a, 0xd2, 0x6f, 0x87, 0xef, + 0xd7, 0xe7, 0x3b, 0xba, 0xdd, 0xde, 0x74, 0xe6, 0x41, 0x60, 0x7b, 0xf8, 0x5b, 0xb4, 0x90, 0x4c, + 0x93, 0xd6, 0xcf, 0x81, 0x7e, 0x25, 0xd6, 0x57, 0x93, 0x81, 0xdb, 0x9b, 0x4e, 0x35, 0x11, 0xda, + 0x1e, 0xfe, 0x1e, 0x2d, 0x08, 0x26, 0x07, 0x81, 0xea, 0xc2, 0xec, 0x91, 0x79, 0x18, 0xf8, 0xda, + 0x44, 0xc0, 0x92, 0xf5, 0x9b, 0x66, 0x76, 0x77, 0x9d, 0xaa, 0xd1, 0xef, 0xea, 0xbf, 0xf8, 0x0e, + 0x5a, 0x61, 0x52, 0xf9, 0x7d, 0xaa, 0x98, 0xd7, 0x15, 0xfc, 0xb0, 0xeb, 0xf2, 0x41, 0xa8, 0x48, + 0x79, 0xa3, 0xd4, 0xb0, 0xe2, 0xb1, 0x5c, 0x4c, 0x04, 0x0e, 0x3f, 0x6c, 0xeb, 0xd7, 0xf5, 0x7f, + 0x56, 0x11, 0x3e, 0x39, 0x15, 0xf8, 0x2e, 0xb2, 0x42, 0xce, 0x23, 0x52, 0x82, 0xa4, 0x7d, 0x36, + 0x3d, 0x69, 0x3b, 0x9c, 0x47, 0xda, 0xa6, 0x33, 0xe6, 0x80, 0x07, 0xff, 0x8c, 0xaa, 0x50, 0x9b, + 0x0e, 0xa3, 0x1e, 0x13, 0x71, 0x25, 0xe4, 0xe4, 0x7d, 0x37, 0x15, 0x03, 0x25, 0xeb, 0xc6, 0x5b, + 0x08, 0x3d, 0xe3, 0x7e, 0x18, 0xb3, 0xce, 0x01, 0xab, 0x31, 0x9d, 0xb5, 0x9d, 0x68, 0x01, 0x95, + 0xf1, 0xe2, 0xef, 0xd0, 0x9c, 0xe4, 0x42, 0x31, 0x11, 0x57, 0xc2, 0xd5, 0xe9, 0x94, 0x0e, 0xe8, + 0x80, 0x10, 0x7b, 0x74, 0x1c, 0xb4, 0xd7, 0x13, 0xac, 0x47, 0x15, 0x17, 0x50, 0x02, 0xb9, 0x71, + 0xdc, 0x4b, 0xb4, 0x26, 0x8e, 0xd4, 0x8b, 0x5b, 0xa8, 0xac, 0x85, 0x7e, 0xe8, 0x2a, 0x32, 0x5f, + 0x94, 0xde, 0xcd, 0x58, 0x09, 0x94, 0xc4, 0xa7, 0x53, 0xdc, 0x67, 0xa2, 0xc7, 0xf4, 0x70, 0x99, + 0x80, 0x39, 0xce, 0x4d, 0xf1, 0x2f, 0xa9, 0xd8, 0xa4, 0x38, 0xe3, 0xd6, 0x43, 0x3b, 0xa0, 0xf2, + 0x20, 0x66, 0x55, 0x8a, 0x86, 0xb6, 0x95, 0x68, 0xcd, 0xd0, 0x52, 0x2f, 0xfe, 0x11, 0xcd, 0xbd, + 0xa4, 0xc1, 0x80, 0x49, 0x82, 0x8a, 0x28, 0x8f, 0x41, 0x97, 0x54, 0x4e, 0xec, 0xd3, 0xb1, 0xec, + 0x51, 0xf7, 0xf9, 0xbe, 0x1f, 0x04, 0x4c, 0x90, 0x6a, 0x11, 0xa5, 0x95, 0x68, 0x4d, 0x2c, 0xa9, + 0x17, 0x3f, 0x40, 0x48, 0x30, 0xea, 0xd9, 0xfd, 0x88, 0x0b, 0x45, 0x16, 0x8b, 0xb6, 0x23, 0x27, + 0xd1, 0x6e, 0x52, 0x45, 0x0d, 0x2d, 0xf5, 0xe3, 0x1f, 0xd0, 0x7c, 0x87, 0xf6, 0x23, 0x1d, 0xd4, + 0x12, 0xa0, 0xae, 0xe5, 0x54, 0x8f, 0x11, 0x02, 0x63, 0xe4, 0xc2, 0x4f, 0xd1, 0xb2, 0x79, 0x4c, + 0x2b, 0x83, 0x2c, 0x03, 0xa9, 0x59, 0x44, 0x9a, 0xa8, 0xa5, 0x13, 0x1c, 0xfc, 0x3b, 0xc2, 0x7d, + 0xa6, 0xa8, 0x3e, 0xdc, 0x76, 0x99, 0x54, 0x1d, 0x16, 0xea, 0xb5, 0x82, 0x81, 0xfe, 0x55, 0x5e, + 0x51, 0x4c, 0x7a, 0x80, 0x7f, 0x0a, 0x0b, 0xef, 0xa3, 0xd5, 0x6c, 0xab, 0xc3, 0x5c, 0xe6, 0xbf, + 0x64, 0x82, 0xac, 0x40, 0x1f, 0xb7, 0xcf, 0xd6, 0xc7, 0xc8, 0x05, 0xbd, 0x9c, 0xca, 0xc3, 0x3f, + 0xa1, 0x4a, 0xbb, 0xf3, 0xf8, 0x89, 0xf0, 0xf5, 0x32, 0x5d, 0x05, 0x78, 0xce, 0x31, 0x94, 0x48, + 0x81, 0x98, 0x3a, 0xf1, 0x0e, 0x5a, 0x78, 0xe3, 0xf7, 0xde, 0xd0, 0x5e, 0x5c, 0xd3, 0x97, 0x80, + 0x94, 0x73, 0x84, 0x3c, 0xcd, 0xa8, 0x01, 0x36, 0xe6, 0xd7, 0x55, 0x19, 0x09, 0xfe, 0x8c, 0xb9, + 0xaa, 0xc3, 0x14, 0xb9, 0x5c, 0x54, 0x95, 0x8f, 0x12, 0xad, 0xa9, 0xa3, 0xd4, 0xab, 0x17, 0xff, + 0xa1, 0x1f, 0x7a, 0xfc, 0x90, 0x09, 0x72, 0xa5, 0x68, 0xf1, 0x3f, 0x89, 0x95, 0x66, 0xf1, 0x8f, + 0x7c, 0xf8, 0x57, 0xb4, 0x18, 0x70, 0x97, 0x06, 0x8f, 0x02, 0x1a, 0xee, 0x70, 0x8f, 0x11, 0x02, + 0xa0, 0x2f, 0xa7, 0x83, 0x1e, 0x64, 0xe5, 0x40, 0x1b, 0x27, 0xe8, 0xea, 0x34, 0x97, 0x8f, 0x4c, + 0x75, 0x7e, 0x50, 0x54, 0x9d, 0xed, 0x09, 0x87, 0xa9, 0xce, 0x49, 0x0e, 0xde, 0x45, 0x17, 0x4c, + 0xdb, 0x7d, 0xc1, 0x43, 0xe5, 0x33, 0x41, 0xd6, 0x8a, 0x16, 0x63, 0x7b, 0x4c, 0x0f, 0xdc, 0x09, + 0x86, 0x9e, 0x12, 0x2e, 0x3c, 0x3f, 0xa4, 0x81, 0xaf, 0x5e, 0x93, 0x0f, 0x8b, 0xa6, 0xe4, 0x61, + 0xa2, 0x35, 0x53, 0x92, 0x7a, 0x75, 0x3a, 0xf5, 0x15, 0xcd, 0xe1, 0x87, 0x71, 0xdd, 0x7d, 0x54, + 0x94, 0xce, 0x56, 0x56, 0x6e, 0xd2, 0x39, 0x46, 0xd0, 0xe9, 0xf4, 0xc3, 0x97, 0x4c, 0x28, 0xe6, + 0xdd, 0xf7, 0x03, 0xc5, 0x04, 0x13, 0xe4, 0xe3, 0xa2, 0x74, 0xda, 0x13, 0x0e, 0x93, 0xce, 0x49, + 0x8e, 0x4e, 0xe7, 0xa8, 0x2d, 0xae, 0xee, 0x5a, 0x51, 0x3a, 0xed, 0x31, 0xbd, 0x49, 0xe7, 0x38, + 0x63, 0xb4, 0xef, 0x0e, 0x22, 0xbd, 0xfb, 0x91, 0xf5, 0xb3, 0xec, 0xbb, 0x46, 0x9b, 0xee, 0xbb, + 0xe6, 0x3f, 0x7e, 0x82, 0x96, 0x64, 0x14, 0xf8, 0xea, 0x5e, 0xe8, 0x75, 0x5c, 0xaa, 0x74, 0x42, + 0x37, 0x00, 0x77, 0x33, 0x67, 0x9f, 0x1b, 0x37, 0x00, 0x73, 0x92, 0xa2, 0xcf, 0x3c, 0xc1, 0xa4, + 0xe2, 0x82, 0x41, 0x8c, 0x9f, 0x14, 0x9d, 0x79, 0x4e, 0x2a, 0x36, 0x67, 0x5e, 0xc6, 0xad, 0xd7, + 0xe1, 0xfe, 0x68, 0x66, 0xea, 0x45, 0xeb, 0x70, 0x6c, 0x46, 0x12, 0x1f, 0x76, 0xd1, 0x8a, 0x54, + 0x82, 0xd1, 0xbe, 0x1d, 0xf6, 0xf4, 0xc5, 0x8a, 0x87, 0x10, 0xd8, 0xa7, 0x80, 0xfb, 0x3a, 0x67, + 0xb4, 0x27, 0x4d, 0x40, 0x3e, 0x8d, 0x86, 0x39, 0xba, 0x32, 0xd1, 0x9c, 0x2c, 0xa3, 0xab, 0xd0, + 0xd1, 0x37, 0x67, 0xee, 0x68, 0x6c, 0x3d, 0x4d, 0xa3, 0xde, 0xb5, 0x8e, 0xfe, 0x5e, 0x2f, 0x6d, + 0x5b, 0xe5, 0x0b, 0xcb, 0x4b, 0xdb, 0x56, 0xf9, 0xe2, 0x32, 0xde, 0xb6, 0xca, 0x73, 0xcb, 0xf3, + 0xdb, 0x56, 0x79, 0x61, 0x79, 0xb1, 0x7e, 0x01, 0x2d, 0x64, 0x6f, 0x7c, 0xf5, 0x3f, 0xd0, 0xc5, + 0x13, 0x9b, 0x0b, 0x6e, 0xa0, 0x05, 0x87, 0x1f, 0x76, 0xf8, 0x40, 0xb8, 0xcc, 0xf6, 0x5e, 0xc1, + 0x25, 0x72, 0x31, 0xbe, 0x82, 0x8e, 0xbd, 0xc1, 0x75, 0x54, 0xd9, 0x19, 0xf4, 0xe1, 0x73, 0x42, + 0xc2, 0x45, 0x71, 0x24, 0x4b, 0x9b, 0x31, 0x41, 0xd6, 0x0e, 0xed, 0x33, 0xb8, 0xfb, 0x55, 0x46, + 0x97, 0x72, 0xdd, 0x52, 0xbf, 0x83, 0x2e, 0x9f, 0x7e, 0x86, 0xe1, 0x35, 0x34, 0xeb, 0x7b, 0xd0, + 0x6f, 0xa5, 0x85, 0xe2, 0x8b, 0xf7, 0xac, 0xbd, 0xe9, 0xcc, 0xfa, 0x5e, 0x7d, 0x0b, 0x91, 0x69, + 0xa7, 0x12, 0xbe, 0x81, 0x90, 0x04, 0x4a, 0xd7, 0xf7, 0x24, 0x7c, 0x07, 0x55, 0x5a, 0x8b, 0xc3, + 0xf7, 0xeb, 0x15, 0xc3, 0xb6, 0x37, 0xa5, 0x53, 0x31, 0x02, 0xdb, 0x93, 0xad, 0x9b, 0x47, 0xff, + 0xd5, 0x66, 0x8e, 0x86, 0xb5, 0xd2, 0xdb, 0x61, 0xad, 0xf4, 0x6e, 0x58, 0x2b, 0xfd, 0x3b, 0xac, + 0x95, 0xfe, 0x3a, 0xae, 0xcd, 0xbc, 0x3d, 0xae, 0xcd, 0xbc, 0x3b, 0xae, 0xcd, 0x3c, 0xad, 0x66, + 0xbe, 0xdd, 0xfe, 0x0f, 0x00, 0x00, 0xff, 0xff, 0xbe, 0x2a, 0x7a, 0x01, 0xca, 0x0e, 0x00, 0x00, } diff --git a/pkg/sql/execinfrapb/processors.proto b/pkg/sql/execinfrapb/processors.proto index f797f629fcf4..6cb1f5911a1e 100644 --- a/pkg/sql/execinfrapb/processors.proto +++ b/pkg/sql/execinfrapb/processors.proto @@ -80,6 +80,10 @@ message ProcessorSpec { // This can be aliased with InputSyncSpec.ColumnTypes, so it must not be // modified in-place during planning. repeated sql.sem.types.T result_types = 7; + + // estimated_row_count contains the number of rows that the optimizer expects + // will be emitted from this processor, or 0 if the estimate wasn't populated. + optional uint64 estimated_row_count = 8 [(gogoproto.nullable) = false]; } message ProcessorCoreUnion { diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 8da0346acd73..49cc8936c5a6 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -511,6 +511,7 @@ func (m execNodeTraceMetadata) annotateExplain( nodeStats.RowCount.MaybeAdd(stats.Output.NumTuples) nodeStats.KVBytesRead.MaybeAdd(stats.KV.BytesRead) nodeStats.KVRowsRead.MaybeAdd(stats.KV.TuplesRead) + nodeStats.VectorizedBatchCount.MaybeAdd(stats.Output.NumBatches) } // If we didn't get statistics for all processors, we don't show the // incomplete results. In the future, we may consider an incomplete flag diff --git a/pkg/sql/logictest/testdata/logic_test/explain_analyze b/pkg/sql/logictest/testdata/logic_test/explain_analyze index b2f9bdb5a6da..5b7b9862af1c 100644 --- a/pkg/sql/logictest/testdata/logic_test/explain_analyze +++ b/pkg/sql/logictest/testdata/logic_test/explain_analyze @@ -1,5 +1,8 @@ -# LogicTest: default-configs !local-spec-planning !fakedist-spec-planning +# LogicTest: local fakedist fakedist-disk !local-spec-planning !fakedist-spec-planning # +# TODO(yuzefovich): we run this file only with vectorize=on configs because +# EXPLAIN ANALYZE (PLAN, VERBOSE) output differs depending on the execution +# engine used. Clean this up. # TODO(yuzefovich): for some reason spec-planning configs are not happy. Figure # it out. @@ -67,6 +70,7 @@ network usage: │ columns: (k, v, a, b) │ cluster nodes: │ actual row count: 2 +│ vectorized batch count: 0 │ estimated row count: 990 (missing stats) │ equality: (v) = (a) │ right cols are key @@ -75,6 +79,7 @@ network usage: │ columns: (k, v) │ cluster nodes: │ actual row count: 4 +│ vectorized batch count: 0 │ KV rows read: 4 │ KV bytes read: 32 B │ estimated row count: 1,000 (missing stats) @@ -85,6 +90,7 @@ network usage: columns: (a, b) cluster nodes: actual row count: 3 + vectorized batch count: 0 KV rows read: 3 KV bytes read: 24 B estimated row count: 1,000 (missing stats) diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index fa0fc41b9cef..15ed6f123096 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -338,6 +338,12 @@ func (e *emitter) emitNodeAttributes(n *Node) error { if s.RowCount.HasValue() { e.ob.AddField("actual row count", humanizeutil.Count(s.RowCount.Value())) } + // Omit vectorized batches in non-verbose mode. + if e.ob.flags.Verbose { + if s.VectorizedBatchCount.HasValue() { + e.ob.AddField("vectorized batch count", humanizeutil.Count(s.VectorizedBatchCount.Value())) + } + } if s.KVRowsRead.HasValue() { e.ob.AddField("KV rows read", humanizeutil.Count(s.KVRowsRead.Value())) } diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index 215d0ef74a30..f1b3c735d5b1 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -311,6 +311,10 @@ type ExecutionStats struct { // RowCount is the number of rows produced by the operator. RowCount optional.Uint + // VectorizedBatchCount is the number of vectorized batches produced by the + // operator. + VectorizedBatchCount optional.Uint + KVBytesRead optional.Uint KVRowsRead optional.Uint diff --git a/pkg/sql/physicalplan/physical_plan.go b/pkg/sql/physicalplan/physical_plan.go index 7cadc13e9ad9..146023afff9d 100644 --- a/pkg/sql/physicalplan/physical_plan.go +++ b/pkg/sql/physicalplan/physical_plan.go @@ -225,6 +225,9 @@ func (p *PhysicalPlan) SetMergeOrdering(o execinfrapb.Ordering) { type ProcessorCorePlacement struct { NodeID roachpb.NodeID Core execinfrapb.ProcessorCoreUnion + // EstimatedRowCount, if set to non-zero, is the optimizer's guess of how + // many rows will be emitted from this processor. + EstimatedRowCount uint64 } // AddNoInputStage creates a stage of processors that don't have any input from @@ -248,8 +251,9 @@ func (p *PhysicalPlan) AddNoInputStage( Output: []execinfrapb.OutputRouterSpec{{ Type: execinfrapb.OutputRouterSpec_PASS_THROUGH, }}, - StageID: stageID, - ResultTypes: outputTypes, + StageID: stageID, + ResultTypes: outputTypes, + EstimatedRowCount: corePlacements[i].EstimatedRowCount, }, }