Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
Expand Down Expand Up @@ -1219,6 +1220,17 @@ func restore(
return mu.res, nil, nil, errors.Wrapf(err, "importing %d ranges", len(importSpans))
}

g = ctxgroup.WithContext(restoreCtx)
for _, table := range tables {
span := table.TableSpan()
g.GoCtx(func(ctx context.Context) error {
return storagebase.CheckIngestedStats(ctx, db, span)
})
}
if err := g.Wait(); err != nil {
return mu.res, nil, nil, errors.Wrap(err, "checking imported table ranges")
}

return mu.res, databases, tables, nil
}

Expand Down
27 changes: 24 additions & 3 deletions pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -505,7 +506,7 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {
defer adder.Close()

// Drain the kvCh using the BulkAdder until it closes.
if err := ingestKvs(ctx, adder, kvCh); err != nil {
if err := ingestKvs(ctx, cp.flowCtx.ClientDB, adder, kvCh); err != nil {
return err
}

Expand Down Expand Up @@ -619,7 +620,11 @@ func makeRowErr(file string, row int64, format string, args ...interface{}) erro

// ingestKvs drains kvs from the channel until it closes, ingesting them using
// the BulkAdder. It handles the required buffering/sorting/etc.
func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvBatch) error {
func ingestKvs(
ctx context.Context, db *client.DB, adder storagebase.BulkAdder, kvCh <-chan kvBatch,
) error {
var ingestSpan roachpb.Span

const sortBatchSize = 48 << 20 // 48MB

// TODO(dt): buffer to disk instead of all in-mem.
Expand All @@ -644,6 +649,17 @@ func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvB
return nil
}
sort.Sort(buf)

// Update the total span ingested.
if len(buf) > 0 {
if k, cur := buf[0].Key, ingestSpan.Key; cur == nil || k.Compare(cur) < 0 {
ingestSpan.Key = append([]byte(nil), k...)
}
if k, cur := buf[len(buf)-1].Key, ingestSpan.EndKey; cur == nil || k.Compare(cur) > 0 {
ingestSpan.EndKey = append([]byte(nil), k...)
}
}

for i := range buf {
if err := adder.Add(ctx, buf[i].Key, buf[i].Value.RawBytes); err != nil {
if i > 0 && bytes.Equal(buf[i].Key, buf[i-1].Key) {
Expand Down Expand Up @@ -689,7 +705,12 @@ func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvB
return err
}
}
return nil

// If somehow we only ingested one key...
if ingestSpan.Key.Equal(ingestSpan.EndKey) {
ingestSpan.EndKey = ingestSpan.EndKey.Next()
}
return storagebase.CheckIngestedStats(ctx, db, ingestSpan)
}

func init() {
Expand Down
11 changes: 10 additions & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/bulk"
"github.com/cockroachdb/cockroach/pkg/storage/diskmap"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -158,6 +159,8 @@ func (sp *sstWriter) Run(ctx context.Context) {
})
group.GoCtx(func(ctx context.Context) error {
chunk := -1
var ingestSpan roachpb.Span

for sst := range contentCh {
chunk++

Expand Down Expand Up @@ -189,6 +192,12 @@ func (sp *sstWriter) Run(ctx context.Context) {
// throughput.
log.Errorf(ctx, "failed to scatter span %s: %s", roachpb.PrettyPrintKey(nil, end), pErr)
}
if k, cur := sst.span.Key, ingestSpan.Key; cur == nil || k.Compare(cur) < 0 {
ingestSpan.Key = append([]byte(nil), k...)
}
if k, cur := sst.span.EndKey, ingestSpan.EndKey; cur == nil || k.Compare(cur) > 0 {
ingestSpan.EndKey = append([]byte(nil), k...)
}
if err := bulk.AddSSTable(ctx, sp.db, sst.span.Key, sst.span.EndKey, sst.data); err != nil {
return err
}
Expand Down Expand Up @@ -247,7 +256,7 @@ func (sp *sstWriter) Run(ctx context.Context) {
return errors.New("unexpected closure of consumer")
}
}
return nil
return storagebase.CheckIngestedStats(ctx, sp.db, ingestSpan)
})
if err := group.Wait(); err != nil {
return err
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -588,6 +589,7 @@ func (sc *SchemaChanger) validateIndexes(
return err
}

var allIndexSpans []roachpb.Span
var forwardIndexes []*sqlbase.IndexDescriptor
var invertedIndexes []*sqlbase.IndexDescriptor

Expand All @@ -599,6 +601,7 @@ func (sc *SchemaChanger) validateIndexes(
if idx == nil || m.Direction == sqlbase.DescriptorMutation_DROP {
continue
}
allIndexSpans = append(allIndexSpans, tableDesc.IndexSpan(idx.ID))
switch idx.Type {
case sqlbase.IndexDescriptor_FORWARD:
forwardIndexes = append(forwardIndexes, idx)
Expand All @@ -615,6 +618,15 @@ func (sc *SchemaChanger) validateIndexes(
forwardIndexesDone := make(chan struct{})
invertedIndexesDone := make(chan struct{})

grp.GoCtx(func(ctx context.Context) error {
for _, i := range allIndexSpans {
if err := storagebase.CheckIngestedStats(ctx, sc.db, i); err != nil {
return err
}
}
return nil
})

grp.GoCtx(func(ctx context.Context) error {
defer close(forwardIndexesDone)
if len(forwardIndexes) > 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/planner_test/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ dist sender send r6: sending batch 1 Put to (n1,s1):1
sql txn rows affected: 0
dist sender send r11: sending batch 3 QueryIntent to (n1,s1):1
dist sender send r6: sending batch 1 EndTxn to (n1,s1):1
dist sender send r20: sending batch 1 ComputeChksum to (n1,s1):1

statement ok
SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (1,2); SET tracing = off
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ dist sender send r6: sending batch 1 Put to (n1,s1):1
sql txn rows affected: 0
dist sender send r11: sending batch 3 QueryIntent to (n1,s1):1
dist sender send r6: sending batch 1 EndTxn to (n1,s1):1
dist sender send r20: sending batch 1 ComputeChksum to (n1,s1):1

statement ok
SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (1,2); SET tracing = off
Expand Down
21 changes: 21 additions & 0 deletions pkg/storage/storagebase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,24 @@ type BulkAdder interface {
// Reset resets the bulk-adder, returning it to its initial state.
Reset() error
}

// CheckIngestedStats sends consistency checks for the ranges in the ingested
// span. This will verify and fixup any stats that were estimated during ingest.
func CheckIngestedStats(ctx context.Context, db *client.DB, ingested roachpb.Span) error {
if ingested.Key == nil {
return nil
}
if ingested.Key.Equal(ingested.EndKey) {
ingested.EndKey = ingested.EndKey.Next()
}
// TODO(dt): This will compute stats twice when there is a delta once to find
// the delta and then again to fix it. We could potentially just skip to the
// fixing but Recompute is a bit harder to use from a client on an entire
// table since it is a point request, while distsender will fan this one out.
req := &roachpb.CheckConsistencyRequest{RequestHeader: roachpb.RequestHeaderFromSpan(ingested)}
_, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), req)
if pErr != nil {
return pErr.GoError()
}
return nil
}