-
Notifications
You must be signed in to change notification settings - Fork 4k
storage/bulk: push buffer-sort down to BulkAdder #36192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cdc86a4 to
094b358
Compare
|
@vivekmenezes I think this subsumes #35855: inspired by your change, I moved the adder out so it is reused across chunks, except it still has the option to choose to flush as it goes if it needs to to maintain bounded memory. |
vivekmenezes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 9 of 10 files at r1.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @dt, @lucy-zhang, and @vivekmenezes)
pkg/ccl/importccl/read_import_proc.go, line 652 at r1 (raw file):
sizeByTableIDIndexID := make(map[string]int64) flush := func(ctx context.Context, buf roachpb.KeyValueByKey) error {
s/flush/add
pkg/ccl/importccl/read_import_proc.go, line 659 at r1 (raw file):
if err := adder.Add(ctx, buf[i].Key, buf[i].Value.RawBytes); err != nil { if _, ok := err.(storagebase.DuplicateKeyError); ok { return pgerror.Wrap(err, pgerror.CodeDataExceptionError, "duplicate key")
"duplicate key" is already a part of the message
pkg/ccl/importccl/read_import_proc.go, line 701 at r1 (raw file):
if err := adder.Flush(ctx); err != nil { if err, ok := err.(storagebase.DuplicateKeyError); ok { return pgerror.Wrap(err, pgerror.CodeDataExceptionError, "duplicate key")
same
pkg/sql/schema_changer_test.go, line 1519 at r1 (raw file):
return nil }, // a non-nil RunAfterBackfillChunk fn will trigger flushing every chunk.
hard to see why?
pkg/sql/distsqlrun/backfiller.go, line 48 at r1 (raw file):
) (roachpb.Key, error) prepare(ctx context.Context) error
// Call prepare() before runChunk()
pkg/sql/distsqlrun/backfiller.go, line 49 at r1 (raw file):
prepare(ctx context.Context) error flush(ctx context.Context) error
// perhaps we should call flush(), done()
pkg/sql/distsqlrun/indexbackfiller.go, line 93 at r1 (raw file):
} func (ib *indexBackfiller) wrapDupError(ctx context.Context, orig error) error {
nice!
pkg/sql/distsqlrun/indexbackfiller.go, line 107 at r1 (raw file):
return err } v := &roachpb.Value{RawBytes: typed.Value}
why do we need this?
pkg/sql/distsqlrun/indexbackfiller.go, line 176 at r1 (raw file):
} } if ib.flowCtx.testingKnobs.RunAfterBackfillChunk != nil {
It probably would be better to move this code to where we call RunAfterBackfillChunk
pkg/storage/bulk/buffering_adder.go, line 35 at r1 (raw file):
type BufferingAdder struct { SSTBatcher timestamp hlc.Timestamp
// Ingest data with this timestamp
pkg/storage/bulk/buffering_adder.go, line 36 at r1 (raw file):
SSTBatcher timestamp hlc.Timestamp skipDuplicates bool
ignore duplicates. This is useful when ingesting inverted indexes.
pkg/storage/bulk/buffering_adder.go, line 38 at r1 (raw file):
skipDuplicates bool flushSize int64
// flush once this threshold is reached
pkg/storage/bulk/buffering_adder.go, line 46 at r1 (raw file):
} const kvOverhead = 24 + 24 // 2 slice headers, each assuming each is 8 + 8 + 8.
can we derive this? you also move it to where it is being used.
pkg/storage/bulk/buffering_adder.go, line 71 at r1 (raw file):
// Add adds a key to the buffer and checks if it needs to flush. func (b *BufferingAdder) Add(ctx context.Context, key roachpb.Key, value []byte) error { if len(b.curBuf) == 0 {
if b.curBufSize == 0
you can also set b.curBuf = b.curBuf[:0] here
pkg/storage/bulk/buffering_adder.go, line 87 at r1 (raw file):
// Flush flushes any buffered kvs to the batcher. func (b *BufferingAdder) Flush(ctx context.Context) error { if len(b.curBuf) == 0 {
if b.curBufSize == 0
pkg/storage/bulk/buffering_adder.go, line 104 at r1 (raw file):
} b.curBufSize = 0 b.curBuf = b.curBuf[:0]
reset it above when you call Add()
31f24f2 to
69b8ad0
Compare
dt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some flush reason counting/logging to this RFAL
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dt, @lucy-zhang, and @vivekmenezes)
pkg/ccl/importccl/read_import_proc.go, line 652 at r1 (raw file):
Previously, vivekmenezes wrote…
s/flush/add
it is still "flushing" a bucket -- this is experimental import and it currently has per-table/index buffering and those buffers are "flushed" both before and after this change -- the main difference is now that they
pkg/ccl/importccl/read_import_proc.go, line 659 at r1 (raw file):
Previously, vivekmenezes wrote…
"duplicate key" is already a part of the message
Done.
pkg/ccl/importccl/read_import_proc.go, line 701 at r1 (raw file):
Previously, vivekmenezes wrote…
same
Done.
pkg/sql/schema_changer_test.go, line 1519 at r1 (raw file):
Previously, vivekmenezes wrote…
hard to see why?
Done.
pkg/sql/distsqlrun/backfiller.go, line 48 at r1 (raw file):
Previously, vivekmenezes wrote…
// Call prepare() before runChunk()
Done.
pkg/sql/distsqlrun/backfiller.go, line 49 at r1 (raw file):
Previously, vivekmenezes wrote…
// perhaps we should call flush(), done()
eh, we can call it any time so it isn't quite done -- i.e. we call it to make work visible to tests.
pkg/sql/distsqlrun/indexbackfiller.go, line 107 at r1 (raw file):
Previously, vivekmenezes wrote…
why do we need this?
The uniqueness error generator wants the value for its error.
pkg/sql/distsqlrun/indexbackfiller.go, line 176 at r1 (raw file):
Previously, vivekmenezes wrote…
It probably would be better to move this code to where we call RunAfterBackfillChunk
we defer it, but we don't actually want to defer this code i.e. this code should not be run on every early return.
pkg/storage/bulk/buffering_adder.go, line 46 at r1 (raw file):
Previously, vivekmenezes wrote…
can we derive this? you also move it to where it is being used.
not without importing unsafe -- I'm fine with it being a hardcoded estimate.
pkg/storage/bulk/buffering_adder.go, line 71 at r1 (raw file):
Previously, vivekmenezes wrote…
if b.curBufSize == 0
you can also set b.curBuf = b.curBuf[:0] here
eh, I'm not sure I prefer that: curBufSize is used estimate memory usage of curBuf so we can decide if we want to call Flush to keep it bounded. Decisions made based on whether or not we have things actually buffered should be made by looking at if we actually have a non-empty buffer, imo.
pkg/storage/bulk/buffering_adder.go, line 87 at r1 (raw file):
Previously, vivekmenezes wrote…
if b.curBufSize == 0
ditto above: I think looking at the actual buffer to decide if there's buffered things to flush is the right answer here.
pkg/storage/bulk/buffering_adder.go, line 104 at r1 (raw file):
Previously, vivekmenezes wrote…
reset it above when you call Add()
Eh? once we've flushed things seems like the right time to stop holding them in a buffer? why wait?
dt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dt, @lucy-zhang, and @vivekmenezes)
pkg/storage/bulk/buffering_adder.go, line 35 at r1 (raw file):
Previously, vivekmenezes wrote…
// Ingest data with this timestamp
Done.
pkg/storage/bulk/buffering_adder.go, line 36 at r1 (raw file):
Previously, vivekmenezes wrote…
ignore duplicates. This is useful when ingesting inverted indexes.
Done.
pkg/storage/bulk/buffering_adder.go, line 38 at r1 (raw file):
Previously, vivekmenezes wrote…
// flush once this threshold is reached
Done.
vivekmenezes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @lucy-zhang and @vivekmenezes)
This pushes the buffering and sorting of out-of-order keys down to the BulkAdder. This pattern is common everywhere we produce out-of-order KVs that we want to ingest in bulk -- we need to sort them, both for the ordering property of SSTs but also for grouping. Evidence suggests that changes in buffering and splitting could lead to significant changes in overall ingest performance -- buffering more or even just buffering/splitting smarter affects the spans of produced SSTs which in turn directly affects how expensive overall ingestion is -- due to both some cockroach-specific factors like stats recomputation as well as other factors like compaction-derived write-amplification. While I'm here: Embedding `backfiller` in the `chunkbackfiller` implementations, and in turn embedding `chunkbackfiller` in `backfiller` created a loop, whereby a stuct with no methods could implement `chunkbackfiller` (leading to a NPE at runtime?). Release note: none.
Release note (sql change): add settings to control buffering in index backfiller.
|
@lucy-zhang I reproduced <1mb SSTs pretty trivially with this once my index started splitting into ranges. The 32mb buffer worked out to ~24mb of sst that, once the index split ~10 ways and then evetually 25 ways, easily explained the small chunks. Upping the default buffer size (with a setting to turn it back down if needed) should help with that I think. |
|
bors r+ |
36192: storage/bulk: push buffer-sort down to BulkAdder r=dt a=dt This pushes the buffering and sorting of out-of-order keys down to the BulkAdder. This pattern is common everywhere we produce out-of-order KVs that we want to ingest in bulk -- we need to sort them, both for the ordering property of SSTs but also for grouping. Evidence suggests that changes in buffering and splitting could lead to significant changes in overall ingest performance -- buffering more or even just buffering/splitting smarter affects the spans of produced SSTs which in turn directly affects how expensive overall ingestion is -- due to both some cockroach-specific factors like stats recomputation as well as other factors like compaction-derived write-amplification. While I'm here: Embedding `backfiller` in the `chunkbackfiller` implementations, and in turn embedding `chunkbackfiller` in `backfiller` created a loop, whereby a stuct with no methods could implement `chunkbackfiller` (leading to a NPE at runtime?). Release note: none. Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Build succeeded |
|
nice work on this PR @dt |
This pushes the buffering and sorting of out-of-order keys down to the
BulkAdder. This pattern is common everywhere we produce out-of-order KVs
that we want to ingest in bulk -- we need to sort them, both for the
ordering property of SSTs but also for grouping.
Evidence suggests that changes in buffering and splitting could lead to
significant changes in overall ingest performance -- buffering more or
even just buffering/splitting smarter affects the spans of produced SSTs
which in turn directly affects how expensive overall ingestion is -- due
to both some cockroach-specific factors like stats recomputation as well
as other factors like compaction-derived write-amplification.
While I'm here: Embedding
backfillerin thechunkbackfillerimplementations, and in turn embedding
chunkbackfillerinbackfillercreated a loop, whereby a stuct with no methods could implement
chunkbackfiller(leading to a NPE at runtime?).Release note: none.