Skip to content

Commit

Permalink
changefeedccl: implement parquet writer library and tests
Browse files Browse the repository at this point in the history
This change implements a `ParquetWriter` struct in the
changefeedccl package with the following public APIs:
```
    NewCDCParquetWriterFromRow(row cdcevent.Row, sink io.Writer) (*ParquetWriter, error)
    (w *ParquetWriter) AddData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error
    (w *ParquetWriter) Close() error
    (w *ParquetWriter) CurrentSize() int64
```
This parquet writer takes rows in the form of `cdcevent.Row` and writes them to the
`io.Writer` sink using parquet version v2.6.

The writer implements several features internally required to write in the parquet format:
- schema creation
- row group / column page management
- encoding/decoding of CRDB datums to parquet datums
Currently, the writer only supports types found in the TPCC workload, namely INT, DECIMAL, STRING
UUID, TIMESTAMP and BOOL.

This change also adds tests for the `ParquetWriter`. These tests write datums from CRDB tables to
parquet files and read back these datums using an internal parquet reader. The tests verify that
the parquet writer is correct by asserting that the datums match.

Informs: cockroachdb#99028
Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Mar 23, 2023
1 parent 6d51df7 commit cffa798
Show file tree
Hide file tree
Showing 5 changed files with 807 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ changefeed.balance_range_distribution.enable boolean false if enabled, the range
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.format.parquet.max_row_group_size integer 67108864 is the maximal number of rows which can be written out to a single row group in a parquet file when running a changefeed using the parquet format
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td></tr>
<tr><td><div id="setting-changefeed-format-parquet-max-row-group-size" class="anchored"><code>changefeed.format.parquet.max_row_group_size</code></div></td><td>integer</td><td><code>67108864</code></td><td>is the maximal number of rows which can be written out to a single row group in a parquet file when running a changefeed using the parquet format</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><div id="setting-changefeed-schema-feed-read-with-priority-after" class="anchored"><code>changefeed.schema_feed.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td></tr>
<tr><td><div id="setting-cloudstorage-azure-concurrent-upload-buffers" class="anchored"><code>cloudstorage.azure.concurrent_upload_buffers</code></div></td><td>integer</td><td><code>1</code></td><td>controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload</td></tr>
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"event_processing.go",
"metrics.go",
"name.go",
"parquet.go",
"parquet_sink_cloudstorage.go",
"retry.go",
"scheduled_changefeed.go",
Expand Down Expand Up @@ -142,6 +143,8 @@ go_library(
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_apache_arrow_go_v11//parquet",
"@com_github_apache_arrow_go_v11//parquet/file",
"@com_github_apache_arrow_go_v11//parquet/schema",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand All @@ -154,6 +157,7 @@ go_library(
"@com_github_google_btree//:btree",
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_lib_pq//oid",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_shopify_sarama//:sarama",
"@com_github_xdg_go_scram//:scram",
Expand Down Expand Up @@ -183,6 +187,7 @@ go_test(
"main_test.go",
"name_test.go",
"nemeses_test.go",
"parquet_test.go",
"scheduled_changefeed_test.go",
"schema_registry_test.go",
"show_changefeed_jobs_test.go",
Expand Down Expand Up @@ -304,6 +309,9 @@ go_test(
"//pkg/workload/bank",
"//pkg/workload/ledger",
"//pkg/workload/workloadsql",
"@com_github_apache_arrow_go_v11//parquet",
"@com_github_apache_arrow_go_v11//parquet/file",
"@com_github_apache_arrow_go_v11//parquet/schema",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
Expand Down
Loading

0 comments on commit cffa798

Please sign in to comment.