Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: add new parquet library #99028

Closed
13 tasks done
jayshrivastava opened this issue Mar 20, 2023 · 2 comments · Fixed by #104528
Closed
13 tasks done

cdc: add new parquet library #99028

jayshrivastava opened this issue Mar 20, 2023 · 2 comments · Fixed by #104528
Assignees
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc

Comments

@jayshrivastava
Copy link
Contributor

jayshrivastava commented Mar 20, 2023

This issue tracks progress for adding the apache arrow parquet library and integrating it into changefeeds.

https://github.com/apache/arrow/

This decision was based on the investigation done in this doc. In summary

  • The existing implementation of changefeed initial scans with parquet is 15x slower than JSON
  • CPU flamegraphs show that 53% of time is spent in the current library and an additional ~20% is doing GC
  • An initial investigation of the old and new libraries shows that the new one would be more efficient with handling memory and writing files. We plan on integrating the new library with the smallest amount of code changes to enable the cdc TPCC roachtest, which we can use to benchmark the new library and confirm our observations.

I'll be working on the following items roughly in order

Jira issue: CRDB-25669

Epic CRDB-27372

@jayshrivastava jayshrivastava added C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc labels Mar 20, 2023
@jayshrivastava jayshrivastava self-assigned this Mar 20, 2023
@blathers-crl
Copy link

blathers-crl bot commented Mar 20, 2023

cc @cockroachdb/cdc

@blathers-crl blathers-crl bot added the A-cdc Change Data Capture label Mar 20, 2023
@jayshrivastava
Copy link
Contributor Author

After adding the new library and integrating it with CDC, I recorded new benchmark results here

jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Mar 23, 2023
This commit installs the apache arrow parquet library for Go
at version 11. The release can be found here:
https://github.com/apache/arrow/releases/tag/go%2Fv11.0.0

This library is licensed under the Apache License 2.0.

Informs: cockroachdb#99028
Epic: None
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Mar 23, 2023
This change implements a `ParquetWriter` struct in the
changefeedccl package with the following public APIs:
```
    NewCDCParquetWriterFromRow(row cdcevent.Row, sink io.Writer) (*ParquetWriter, error)
    (w *ParquetWriter) AddData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error
    (w *ParquetWriter) Close() error
    (w *ParquetWriter) CurrentSize() int64
```
This parquet writer takes rows in the form of `cdcevent.Row` and writes them to the
`io.Writer` sink using parquet version v2.6.

The writer implements several features internally required to write in the parquet format:
- schema creation
- row group / column page management
- encoding/decoding of CRDB datums to parquet datums
Currently, the writer only supports types found in the TPCC workload, namely INT, DECIMAL, STRING
UUID, TIMESTAMP and BOOL.

This change also adds tests for the `ParquetWriter`. These tests write datums from CRDB tables to
parquet files and read back these datums using an internal parquet reader. The tests verify that
the parquet writer is correct by asserting that the datums match.

Informs: cockroachdb#99028
Epic: None
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
This change extends and refactors the util/parquet library to
be able to read and write arrays.

Release note: None

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
This change adds support for the following types families to the `util/parquet`
library:
types.INetFamily, types.JsonFamily, types.FloatFamily, types.BytesFamily,
types.BitFamily, types.EnumFamily, types.Box2DFamily, types.GeographyFamily,
types.GeometryFamily, types.DateFamily, types.TimeFamily, types.TimeTZFamily,
case types.IntervalFamily, types.TimestampTZFamily.

Release note: None

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
This change updates the parquet writer to be able to use
GZIP, ZSTD, SNAPPY, and BROTLI compression codecs. By
default, no compression is used. LZO and LZ4 are unsupported
by the library.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Informs: cockroachdb#99028
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
This change adds an option to the writer which allows the caller
to write arbitrary kv metadata to parquet files. This is useful
for testing purposes.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
Previously, the test utils used to read parquet files
would require the writer as an argument. The main reason
the writer was required is that the writer contained
crdb-specific type information which could be used to
decode raw data until crdb datums.

With this change, the writer is updated to write this
crdb-specific type information to the parquet file in
its metadata. The reader is updated to the read type
information from the file metadata. There is a new
test utility function `ReadFile(parquetFile string)`
which can be used to read all datums from a parquet
file without providing any additional type information.
The function also returns metadata since it is possible
for users of the `Writer` to write arbitrary metadata
and such users may need this metadata in testing.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
This change updates changefeeds to use the new parquet library
added in `pkg/util/parquet` when using `format=parquet`.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
This change refactors randomized testing to use `randgen.RandType`.
`randgen.RandType` is better as it takes into account all allowable
types which can appear in CRDB (ex. array of tuple). The previous
code only generated random types which are supported by the writer
which leaves a gap when new types are added. Now, the code defaults
to all types and filters out unsupported ones.

The previous code also unnessarily duplicates code from `randgen`.
For example, generating a random tuple can be done by calling one
method in `randgen`. Generating a random tuple using the
previous code would require more complex work.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
This change adds support for writing tuples. Implementation details below.

The standard way to write a tuple in parquet is to use a group:
```
message schema {                 -- toplevel schema
   optional group a (LIST) {
       optional T1 element;       -- physical column for the first field
       ...
       optional Tn element;       -- physical column for the nth field
   }
}
```

Because parquet has a very strict format, it does not write such groups
as one column with all the fields adjacent to each other. Instead, it
writes each field in the tuple as its own column. This 1:N mapping
from CRDB datum to physical column in parquet violates the assumption
used in this library that the mapping is 1:1.

This change aims to update the library to break that assumption. Firstly,
there is now a clear distiction between a "datum column" and a "physical
column". Also, the `Writer` is updated to be able to write to multiple
physical columns for a given datum, and the reader is updated
to "squash" physical columns into single tuple datums if needed. Finally,
randomized testing and benchmarking is extended to cover tuples.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
While running CI, it was discovered that there is a data race when
writing bytes with the fmtCtx in this pkg. The reason for the
data race is that when a byte array is written to the underlying
library, it creates a shallow copy and buffers it to be flushed
later. While its being buffered, the `Writer` in this package closes
the fmtCtx and returns it to the pool. Then, any code running
concurrently may access the buffer in the fmtCtx.
The data race can be seen [here](https://teamcity.cockroachdb.com/buildConfiguration/Cockroach_BazelExtendedCi/10458378?showRootCauses=true&expandBuildChangesSection=true&expandBuildProblemsSection=true&expandBuildTestsSection=true). To fix this, we now create a copy of the byte array when using fmtCtx before passing it to the underlying library.

In this particular failure, the problem is in page statistics.
The underlying library will buffer the min and max byte array datums
until it needs to flush stats to the output file. See [here](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/parquet/file/column_writer_types.gen.go#L1428).

It's worth noting that some encoders will copy byte slices before
buffering them and some will not. For example the plain encoder
copies slices (see [UnsafeWrite](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/parquet/internal/encoding/byte_array_encoder.go#LL39C1-L46C2)) and the dict encoder does not (see [here](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/parquet/internal/encoding/byte_array_encoder.go#L98) and [appendVal](https://github.com/apache/arrow/blob/6808bfe3cdf5623212e575c1ec1083e194ed580c/go/internal/hashing/xxh3_memo_table.go#L248)). If the problem with statistics
above were to be solved, then we could consider using a non-dictionary encoding when writing byte slices. For now, we cannot.

As expected, this change causes more alloctions when writing datums:
Before
BenchmarkParquetWriter/bytes-10         	  162480	      7136 ns/op	    1913 B/op	      64 allocs/op
After
BenchmarkParquetWriter/bytes-10         	  181750	      6422 ns/op	    1148 B/op	      32 allocs/op

The affected types are bit, box2d, date, decimal, inet, json, interval, timestamp, timestamptz and timetz.

Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
Previously, the option `key_in_value` was disallowed with
`format=parquet`. This change allows these settings to be
used together. Note that `key_in_value` is enabled by
default with `cloudstorage` sinks and `format=parquet` is
only allowed with cloudstorage sinks, so `key_in_value` is
enabled for parquet by default.

Informs: cockroachdb#103129
Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
When using `format=parquet`, an additional column is produced to
indicate the type of operation corresponding to the row: create,
update, or delete. This change adds coverage for this in unit
testing.

Additionally, the test modified in this change is made more simple
by reducing the number of rows and different types because this
complexity is unnecessary as all types are tested within the
util/parquet package already.

Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
Epic: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
Previously, the test utilities in `util/parquet` would not reconstruct
tuples read from files with their labels. This change updates the
package to do so. This is required for testing in users of this
package such as CDC.

Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
This change adds support for the `diff` changefeed
options when using `format=parquet`. Enabling `diff` also adds
support for CDC Transformations with parquet.

Informs: cockroachdb#103129
Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
This change adds support for the `end_time` changefeed
options when using `format=parquet`. No significant code
changes were needed to enable this feature.

Closes: cockroachdb#103129
Closes: cockroachdb#99028
Epic: CRDB-27372
Release note (enterprise change): Changefeeds now officially
support the parquet format at specificiation version 2.6.
It is only usable with the cloudstorage sink.

The syntax to use parquet is like the following:
`CREATE CHANGEFEED FOR foo INTO `...` WITH format=parquet`

It supports all standard changefeed options and features
including CDC transformations, except it does not support the
`topic_in_value` option.
jayshrivastava added a commit to jayshrivastava/cockroach that referenced this issue Jun 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant