-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
util/parquet: copy bytes from fmtCtx when writing byte arrays #104626
Conversation
3a9084d
to
de4f80d
Compare
de4f80d
to
444770e
Compare
pkg/sql/sem/tree/format.go
Outdated
@@ -666,6 +666,14 @@ func (ctx *FmtCtx) CloseAndGetString() string { | |||
return s | |||
} | |||
|
|||
// CopyBytes returns the contents of the unread portion of the buffer by | |||
// creating a copy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to add this function here -- just copy bytes at the call site.
pkg/util/parquet/write_functions.go
Outdated
func formatDatum(d tree.Datum, a *batchAlloc, fmtCtx *tree.FmtCtx) { | ||
fmtCtx.Reset() | ||
d.Format(fmtCtx) | ||
a.byteArrayBatch[0] = fmtCtx.Bytes() | ||
a.byteArrayBatch[0] = fmtCtx.CopyBytes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you almost want to return fmtCtx.CloseAndGetString()
Initially, when I suggested passing in fmtCtx, I thought we'd reuse single fmtCtx for all columns; it doesn't look like we're doing this; instead we create new one every time we call writeDatumToChunk
I wonder if it'd be better if you got the context here, and did something like:
// tree.NewFmtCtx uses an underlying pool, so we can assume there is no
// allocation here.
fmtCtx := tree.NewFmtCtx(tree.FmtExport)
d.Format(fmtCtx)
a.byteArrayBatch[0] = unsafeGetBytes(fmtCtx.CloseAndGetString())
Get rid of fmtCtx argument to the write functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i think you should do it this way (call tree.NewFmtCtx
rather than passing one in. it will be easier to use safely, and NewFmtCtx
already makes use of a sync.Pool
that contains FmtCtx
objects, so there aren't as many allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all makes sense to me. fmtCtx.CloseAndGetString()
is the only allocation we incur here.
238faf1
to
48e819b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 4 files at r1, 1 of 3 files at r2.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @rafiss)
pkg/util/parquet/write_functions.go
line 271 at r1 (raw file):
Previously, jayshrivastava (Jayant) wrote…
This all makes sense to me.
fmtCtx.CloseAndGetString()
is the only allocation we incur here.
Yup; I don't think we can get around that.
pkg/util/parquet/write_functions.go
line 260 at r2 (raw file):
// allocation here. fmtCtx := tree.NewFmtCtx(tree.FmtExport) defer fmtCtx.Close()
you don't need defer here since you unconditionally call closeandgetstring() below.
pkg/util/parquet/writer.go
line 210 at r2 (raw file):
func (w *Writer) writeDatumToColChunk(d tree.Datum, datumColIdx int) (err error) { if err = w.sch.cols[datumColIdx].colWriter.Write(d, w.columnChunkWriterCache[datumColIdx], w.ba); err != nil {
can be simplified to just return ...Write()
call
Alternatively, this function is called from 1 place only -- you can get rid of it altogether.
While running CI, it was discovered that there is a data race when writing bytes with the fmtCtx in this pkg. The reason for the data race is that when a byte array is written to the underlying library, it creates a shallow copy and buffers it to be flushed later. While its being buffered, the `Writer` in this package closes the fmtCtx and returns it to the pool. Then, any code running concurrently may access the buffer in the fmtCtx. The data race can be seen [here](https://teamcity.cockroachdb.com/buildConfiguration/Cockroach_BazelExtendedCi/10458378?showRootCauses=true&expandBuildChangesSection=true&expandBuildProblemsSection=true&expandBuildTestsSection=true). To fix this, we now create a copy of the byte array when using fmtCtx before passing it to the underlying library. In this particular failure, the problem is in page statistics. The underlying library will buffer the min and max byte array datums until it needs to flush stats to the output file. See [here](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/parquet/file/column_writer_types.gen.go#L1428). It's worth noting that some encoders will copy byte slices before buffering them and some will not. For example the plain encoder copies slices (see [UnsafeWrite](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/parquet/internal/encoding/byte_array_encoder.go#LL39C1-L46C2)) and the dict encoder does not (see [here](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/parquet/internal/encoding/byte_array_encoder.go#L98) and [appendVal](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/internal/hashing/xxh3_memo_table.go#L248)). If the problem with statistics above were to be solved, then we could consider using a non-dictionary encoding when writing byte slices. For now, we cannot. As expected, this change causes more alloctions when writing datums: Before BenchmarkParquetWriter/bytes-10 162480 7136 ns/op 1913 B/op 64 allocs/op After BenchmarkParquetWriter/bytes-10 181750 6422 ns/op 1148 B/op 32 allocs/op The affected types are bit, box2d, date, decimal, inet, json, interval, timestamp, timestamptz and timetz. Informs: cockroachdb#99028 Epic: CRDB-27372 Release note: None
48e819b
to
343ba8d
Compare
bors r=miretskiy |
Build succeeded: |
While running CI, it was discovered that there is a data race when writing bytes with the fmtCtx in this pkg. The reason for the data race is that when a byte array is written to the underlying library, it creates a shallow copy and buffers it to be flushed later. While its being buffered, the
Writer
in this package closes the fmtCtx and returns it to the pool. Then, any code running concurrently may access the buffer in the fmtCtx.This scenario can be seen here. To fix this, we now create a copy of the byte array when using fmtCtx before passing it to the underlying library.
In this particular failure, the problem is in page statistics.
The underlying library will buffer the min and max byte array datums
until it needs to flush stats to the output file. See here.
It's worth noting that some encoders will copy byte slices before
buffering them and some will not. For example the plain encoder
copies slices (see UnsafeWrite) and the dict encoder does not (see here and appendVal). If the problem with statistics
above were to be solved, then we could consider using a non-dictionary encoding when writing byte slices. For now, we cannot.
As expected, this change causes more alloctions when writing datums:
The affected types are bit, box2d, date, decimal, inet, json, interval, timestamp, timestamptz and timetz.
Informs: #99028
Epic: CRDB-27372
Release note: None