diff --git a/pkg/ccl/backupccl/restore.go b/pkg/ccl/backupccl/restore.go index 6bf7ccdf0bc5..d4f0690d286f 100644 --- a/pkg/ccl/backupccl/restore.go +++ b/pkg/ccl/backupccl/restore.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/types" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/stats" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" @@ -1219,6 +1220,17 @@ func restore( return mu.res, nil, nil, errors.Wrapf(err, "importing %d ranges", len(importSpans)) } + g = ctxgroup.WithContext(restoreCtx) + for _, table := range tables { + span := table.TableSpan() + g.GoCtx(func(ctx context.Context) error { + return storagebase.CheckIngestedStats(ctx, db, span) + }) + } + if err := g.Wait(); err != nil { + return mu.res, nil, nil, errors.Wrap(err, "checking imported table ranges") + } + return mu.res, databases, tables, nil } diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index fb93fd075ae4..5842c792e71b 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -505,7 +506,7 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error { defer adder.Close() // Drain the kvCh using the BulkAdder until it closes. - if err := ingestKvs(ctx, adder, kvCh); err != nil { + if err := ingestKvs(ctx, cp.flowCtx.ClientDB, adder, kvCh); err != nil { return err } @@ -619,7 +620,11 @@ func makeRowErr(file string, row int64, format string, args ...interface{}) erro // ingestKvs drains kvs from the channel until it closes, ingesting them using // the BulkAdder. It handles the required buffering/sorting/etc. -func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvBatch) error { +func ingestKvs( + ctx context.Context, db *client.DB, adder storagebase.BulkAdder, kvCh <-chan kvBatch, +) error { + var ingestSpan roachpb.Span + const sortBatchSize = 48 << 20 // 48MB // TODO(dt): buffer to disk instead of all in-mem. @@ -644,6 +649,17 @@ func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvB return nil } sort.Sort(buf) + + // Update the total span ingested. + if len(buf) > 0 { + if k, cur := buf[0].Key, ingestSpan.Key; cur == nil || k.Compare(cur) < 0 { + ingestSpan.Key = append([]byte(nil), k...) + } + if k, cur := buf[len(buf)-1].Key, ingestSpan.EndKey; cur == nil || k.Compare(cur) > 0 { + ingestSpan.EndKey = append([]byte(nil), k...) + } + } + for i := range buf { if err := adder.Add(ctx, buf[i].Key, buf[i].Value.RawBytes); err != nil { if i > 0 && bytes.Equal(buf[i].Key, buf[i-1].Key) { @@ -689,7 +705,12 @@ func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvB return err } } - return nil + + // If somehow we only ingested one key... + if ingestSpan.Key.Equal(ingestSpan.EndKey) { + ingestSpan.EndKey = ingestSpan.EndKey.Next() + } + return storagebase.CheckIngestedStats(ctx, db, ingestSpan) } func init() { diff --git a/pkg/ccl/importccl/sst_writer_proc.go b/pkg/ccl/importccl/sst_writer_proc.go index 646d863d7fb2..e5e7bf5db62b 100644 --- a/pkg/ccl/importccl/sst_writer_proc.go +++ b/pkg/ccl/importccl/sst_writer_proc.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/bulk" "github.com/cockroachdb/cockroach/pkg/storage/diskmap" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -158,6 +159,8 @@ func (sp *sstWriter) Run(ctx context.Context) { }) group.GoCtx(func(ctx context.Context) error { chunk := -1 + var ingestSpan roachpb.Span + for sst := range contentCh { chunk++ @@ -189,6 +192,12 @@ func (sp *sstWriter) Run(ctx context.Context) { // throughput. log.Errorf(ctx, "failed to scatter span %s: %s", roachpb.PrettyPrintKey(nil, end), pErr) } + if k, cur := sst.span.Key, ingestSpan.Key; cur == nil || k.Compare(cur) < 0 { + ingestSpan.Key = append([]byte(nil), k...) + } + if k, cur := sst.span.EndKey, ingestSpan.EndKey; cur == nil || k.Compare(cur) > 0 { + ingestSpan.EndKey = append([]byte(nil), k...) + } if err := bulk.AddSSTable(ctx, sp.db, sst.span.Key, sst.span.EndKey, sst.data); err != nil { return err } @@ -247,7 +256,7 @@ func (sp *sstWriter) Run(ctx context.Context) { return errors.New("unexpected closure of consumer") } } - return nil + return storagebase.CheckIngestedStats(ctx, sp.db, ingestSpan) }) if err := group.Wait(); err != nil { return err diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index ca6484c158b1..2553a489c763 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -588,6 +589,7 @@ func (sc *SchemaChanger) validateIndexes( return err } + var allIndexSpans []roachpb.Span var forwardIndexes []*sqlbase.IndexDescriptor var invertedIndexes []*sqlbase.IndexDescriptor @@ -599,6 +601,7 @@ func (sc *SchemaChanger) validateIndexes( if idx == nil || m.Direction == sqlbase.DescriptorMutation_DROP { continue } + allIndexSpans = append(allIndexSpans, tableDesc.IndexSpan(idx.ID)) switch idx.Type { case sqlbase.IndexDescriptor_FORWARD: forwardIndexes = append(forwardIndexes, idx) @@ -615,6 +618,15 @@ func (sc *SchemaChanger) validateIndexes( forwardIndexesDone := make(chan struct{}) invertedIndexesDone := make(chan struct{}) + grp.GoCtx(func(ctx context.Context) error { + for _, i := range allIndexSpans { + if err := storagebase.CheckIngestedStats(ctx, sc.db, i); err != nil { + return err + } + } + return nil + }) + grp.GoCtx(func(ctx context.Context) error { defer close(forwardIndexesDone) if len(forwardIndexes) > 0 { diff --git a/pkg/sql/logictest/testdata/planner_test/show_trace b/pkg/sql/logictest/testdata/planner_test/show_trace index cb9d07be7981..58f01c0cbcfa 100644 --- a/pkg/sql/logictest/testdata/planner_test/show_trace +++ b/pkg/sql/logictest/testdata/planner_test/show_trace @@ -103,6 +103,7 @@ dist sender send r6: sending batch 1 Put to (n1,s1):1 sql txn rows affected: 0 dist sender send r11: sending batch 3 QueryIntent to (n1,s1):1 dist sender send r6: sending batch 1 EndTxn to (n1,s1):1 +dist sender send r20: sending batch 1 ComputeChksum to (n1,s1):1 statement ok SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (1,2); SET tracing = off diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace b/pkg/sql/opt/exec/execbuilder/testdata/show_trace index 998e54df67c5..1f74def8ab4d 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace @@ -99,6 +99,7 @@ dist sender send r6: sending batch 1 Put to (n1,s1):1 sql txn rows affected: 0 dist sender send r11: sending batch 3 QueryIntent to (n1,s1):1 dist sender send r6: sending batch 1 EndTxn to (n1,s1):1 +dist sender send r20: sending batch 1 ComputeChksum to (n1,s1):1 statement ok SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (1,2); SET tracing = off diff --git a/pkg/storage/storagebase/bulk_adder.go b/pkg/storage/storagebase/bulk_adder.go index cadd69212b7d..51b4af42152d 100644 --- a/pkg/storage/storagebase/bulk_adder.go +++ b/pkg/storage/storagebase/bulk_adder.go @@ -40,3 +40,24 @@ type BulkAdder interface { // Reset resets the bulk-adder, returning it to its initial state. Reset() error } + +// CheckIngestedStats sends consistency checks for the ranges in the ingested +// span. This will verify and fixup any stats that were estimated during ingest. +func CheckIngestedStats(ctx context.Context, db *client.DB, ingested roachpb.Span) error { + if ingested.Key == nil { + return nil + } + if ingested.Key.Equal(ingested.EndKey) { + ingested.EndKey = ingested.EndKey.Next() + } + // TODO(dt): This will compute stats twice when there is a delta once to find + // the delta and then again to fix it. We could potentially just skip to the + // fixing but Recompute is a bit harder to use from a client on an entire + // table since it is a point request, while distsender will fan this one out. + req := &roachpb.CheckConsistencyRequest{RequestHeader: roachpb.RequestHeaderFromSpan(ingested)} + _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), req) + if pErr != nil { + return pErr.GoError() + } + return nil +}