From 7d6fd4cc664d8d3a944dd4396001f66def26e975 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 10 May 2023 13:43:02 -0400 Subject: [PATCH] util/parquet: add option to write kv metadata to files This change adds an option to the writer which allows the caller to write arbitrary kv metadata to parquet files. This is useful for testing purposes. Informs: https://github.com/cockroachdb/cockroach/issues/99028 Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071 Release note: None --- pkg/util/parquet/BUILD.bazel | 1 + pkg/util/parquet/writer.go | 28 +++++++++- pkg/util/parquet/writer_test.go | 98 ++++++++++++++++++--------------- 3 files changed, 80 insertions(+), 47 deletions(-) diff --git a/pkg/util/parquet/BUILD.bazel b/pkg/util/parquet/BUILD.bazel index 3c1c4eb98053..a2a457617f37 100644 --- a/pkg/util/parquet/BUILD.bazel +++ b/pkg/util/parquet/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "@com_github_apache_arrow_go_v11//parquet", "@com_github_apache_arrow_go_v11//parquet/compress", "@com_github_apache_arrow_go_v11//parquet/file", + "@com_github_apache_arrow_go_v11//parquet/metadata", "@com_github_apache_arrow_go_v11//parquet/schema", "@com_github_cockroachdb_errors//:errors", "@com_github_lib_pq//oid", diff --git a/pkg/util/parquet/writer.go b/pkg/util/parquet/writer.go index 00fd5aa01b4c..3fef33e212b2 100644 --- a/pkg/util/parquet/writer.go +++ b/pkg/util/parquet/writer.go @@ -16,6 +16,7 @@ import ( "github.com/apache/arrow/go/v11/parquet" "github.com/apache/arrow/go/v11/parquet/compress" "github.com/apache/arrow/go/v11/parquet/file" + "github.com/apache/arrow/go/v11/parquet/metadata" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/errors" ) @@ -24,6 +25,9 @@ type config struct { maxRowGroupLength int64 version parquet.Version compression compress.Compression + + // Arbitrary kv metadata. + metadata metadata.KeyValueMetadata } // An Option is a configurable setting for the Writer. @@ -72,6 +76,19 @@ func WithCompressionCodec(compression CompressionCodec) Option { } } +// WithMetadata adds arbitrary kv metadata to the parquet file which can be +// read by a reader. +func WithMetadata(m map[string]string) Option { + return func(c *config) error { + for k, v := range m { + if err := c.metadata.Append(k, v); err != nil { + return err + } + } + return nil + } +} + var allowedVersions = map[string]parquet.Version{ "v1.0": parquet.V1_0, "v2.4": parquet.V1_0, @@ -117,6 +134,7 @@ type Writer struct { ba *batchAlloc + // The current number of rows written to the row group writer. currentRowGroupSize int64 currentRowGroupWriter file.BufferedRowGroupWriter } @@ -131,6 +149,7 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer, maxRowGroupLength: parquet.DefaultMaxRowGroupLen, version: parquet.V2_6, compression: compress.Codecs.Uncompressed, + metadata: metadata.KeyValueMetadata{}, } for _, opt := range opts { err := opt.apply(&cfg) @@ -139,10 +158,13 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer, } } - parquetOpts := []parquet.WriterProperty{parquet.WithCreatedBy("cockroachdb"), - parquet.WithVersion(cfg.version), parquet.WithCompression(cfg.compression)} + parquetOpts := []parquet.WriterProperty{ + parquet.WithCreatedBy("cockroachdb"), + parquet.WithVersion(cfg.version), + parquet.WithCompression(cfg.compression), + } props := parquet.NewWriterProperties(parquetOpts...) - writer := file.NewParquetWriter(sink, sch.schema.Root(), file.WithWriterProps(props)) + writer := file.NewParquetWriter(sink, sch.schema.Root(), file.WithWriterProps(props), file.WithWriteMetadata(cfg.metadata)) return &Writer{ sch: sch, diff --git a/pkg/util/parquet/writer_test.go b/pkg/util/parquet/writer_test.go index 5a977c31716e..a745700bda29 100644 --- a/pkg/util/parquet/writer_test.go +++ b/pkg/util/parquet/writer_test.go @@ -516,67 +516,77 @@ func TestInvalidWriterUsage(t *testing.T) { } func TestVersions(t *testing.T) { - schemaDef, err := NewSchema([]string{}, []*types.T{}) - require.NoError(t, err) - for version := range allowedVersions { - fileName := "TestVersions.parquet" - f, err := os.CreateTemp("", fileName) - require.NoError(t, err) - - writer, err := NewWriter(schemaDef, f, WithVersion(version)) - require.NoError(t, err) - - err = writer.Close() - require.NoError(t, err) - - f, err = os.Open(f.Name()) - require.NoError(t, err) - - reader, err := file.NewParquetReader(f) - require.NoError(t, err) - - require.Equal(t, reader.MetaData().Version(), writer.cfg.version) - - err = reader.Close() - require.NoError(t, err) + opt := WithVersion(version) + optionsTest(t, opt, func(t *testing.T, reader *file.Reader) { + require.Equal(t, reader.MetaData().Version(), allowedVersions[version]) + }) } + schemaDef, err := NewSchema([]string{}, []*types.T{}) + require.NoError(t, err) buf := bytes.Buffer{} _, err = NewWriter(schemaDef, &buf, WithVersion("invalid")) require.Error(t, err) } func TestCompressionCodecs(t *testing.T) { + for compression := range compressionCodecToParquet { + opt := WithCompressionCodec(compression) + optionsTest(t, opt, func(t *testing.T, reader *file.Reader) { + colChunk, err := reader.MetaData().RowGroup(0).ColumnChunk(0) + require.NoError(t, err) + require.Equal(t, colChunk.Compression(), compressionCodecToParquet[compression]) + }) + } +} + +// TestMetadata tests writing arbitrary kv metadata to parquet files. +func TestMetadata(t *testing.T) { + meta := map[string]string{} + meta["testKey1"] = "testValue1" + meta["testKey2"] = "testValue2" + opt := WithMetadata(meta) + optionsTest(t, opt, func(t *testing.T, reader *file.Reader) { + val := reader.MetaData().KeyValueMetadata().FindValue("testKey1") + require.NotNil(t, reader.MetaData().KeyValueMetadata().FindValue("testKey1")) + require.Equal(t, *val, "testValue1") + + val = reader.MetaData().KeyValueMetadata().FindValue("testKey2") + require.NotNil(t, reader.MetaData().KeyValueMetadata().FindValue("testKey2")) + require.Equal(t, *val, "testValue2") + }) +} + +// optionsTest can be used to assert the behavior of an Option. It creates a +// writer using the supplied Option and writes a parquet file with sample data. +// Then it calls the provided test function with the reader and subsequently +// closes it. +func optionsTest(t *testing.T, opt Option, testFn func(t *testing.T, reader *file.Reader)) { schemaDef, err := NewSchema([]string{"a"}, []*types.T{types.Int}) require.NoError(t, err) - for compression := range compressionCodecToParquet { - fileName := "TestCompressionCodecs.parquet" - f, err := os.CreateTemp("", fileName) - require.NoError(t, err) - - writer, err := NewWriter(schemaDef, f, WithCompressionCodec(compression)) - require.NoError(t, err) + fileName := "OptionsTest.parquet" + f, err := os.CreateTemp("", fileName) + require.NoError(t, err) - err = writer.AddRow([]tree.Datum{tree.NewDInt(0)}) - require.NoError(t, err) + writer, err := NewWriter(schemaDef, f, opt) + require.NoError(t, err) - err = writer.Close() - require.NoError(t, err) + err = writer.AddRow([]tree.Datum{tree.NewDInt(0)}) + require.NoError(t, err) - f, err = os.Open(f.Name()) - require.NoError(t, err) + err = writer.Close() + require.NoError(t, err) - reader, err := file.NewParquetReader(f) - require.NoError(t, err) + f, err = os.Open(f.Name()) + require.NoError(t, err) - colChunk, err := reader.MetaData().RowGroup(0).ColumnChunk(0) - require.NoError(t, err) + reader, err := file.NewParquetReader(f) + require.NoError(t, err) - require.Equal(t, colChunk.Compression(), compressionCodecToParquet[compression]) + testFn(t, reader) - err = reader.Close() - require.NoError(t, err) - } + err = reader.Close() + require.NoError(t, err) }