diff --git a/pkg/util/parquet/BUILD.bazel b/pkg/util/parquet/BUILD.bazel index 3c1c4eb98053..a2a457617f37 100644 --- a/pkg/util/parquet/BUILD.bazel +++ b/pkg/util/parquet/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "@com_github_apache_arrow_go_v11//parquet", "@com_github_apache_arrow_go_v11//parquet/compress", "@com_github_apache_arrow_go_v11//parquet/file", + "@com_github_apache_arrow_go_v11//parquet/metadata", "@com_github_apache_arrow_go_v11//parquet/schema", "@com_github_cockroachdb_errors//:errors", "@com_github_lib_pq//oid", diff --git a/pkg/util/parquet/writer.go b/pkg/util/parquet/writer.go index 00fd5aa01b4c..3fef33e212b2 100644 --- a/pkg/util/parquet/writer.go +++ b/pkg/util/parquet/writer.go @@ -16,6 +16,7 @@ import ( "github.com/apache/arrow/go/v11/parquet" "github.com/apache/arrow/go/v11/parquet/compress" "github.com/apache/arrow/go/v11/parquet/file" + "github.com/apache/arrow/go/v11/parquet/metadata" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/errors" ) @@ -24,6 +25,9 @@ type config struct { maxRowGroupLength int64 version parquet.Version compression compress.Compression + + // Arbitrary kv metadata. + metadata metadata.KeyValueMetadata } // An Option is a configurable setting for the Writer. @@ -72,6 +76,19 @@ func WithCompressionCodec(compression CompressionCodec) Option { } } +// WithMetadata adds arbitrary kv metadata to the parquet file which can be +// read by a reader. +func WithMetadata(m map[string]string) Option { + return func(c *config) error { + for k, v := range m { + if err := c.metadata.Append(k, v); err != nil { + return err + } + } + return nil + } +} + var allowedVersions = map[string]parquet.Version{ "v1.0": parquet.V1_0, "v2.4": parquet.V1_0, @@ -117,6 +134,7 @@ type Writer struct { ba *batchAlloc + // The current number of rows written to the row group writer. currentRowGroupSize int64 currentRowGroupWriter file.BufferedRowGroupWriter } @@ -131,6 +149,7 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer, maxRowGroupLength: parquet.DefaultMaxRowGroupLen, version: parquet.V2_6, compression: compress.Codecs.Uncompressed, + metadata: metadata.KeyValueMetadata{}, } for _, opt := range opts { err := opt.apply(&cfg) @@ -139,10 +158,13 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer, } } - parquetOpts := []parquet.WriterProperty{parquet.WithCreatedBy("cockroachdb"), - parquet.WithVersion(cfg.version), parquet.WithCompression(cfg.compression)} + parquetOpts := []parquet.WriterProperty{ + parquet.WithCreatedBy("cockroachdb"), + parquet.WithVersion(cfg.version), + parquet.WithCompression(cfg.compression), + } props := parquet.NewWriterProperties(parquetOpts...) - writer := file.NewParquetWriter(sink, sch.schema.Root(), file.WithWriterProps(props)) + writer := file.NewParquetWriter(sink, sch.schema.Root(), file.WithWriterProps(props), file.WithWriteMetadata(cfg.metadata)) return &Writer{ sch: sch, diff --git a/pkg/util/parquet/writer_test.go b/pkg/util/parquet/writer_test.go index 5a977c31716e..a745700bda29 100644 --- a/pkg/util/parquet/writer_test.go +++ b/pkg/util/parquet/writer_test.go @@ -516,67 +516,77 @@ func TestInvalidWriterUsage(t *testing.T) { } func TestVersions(t *testing.T) { - schemaDef, err := NewSchema([]string{}, []*types.T{}) - require.NoError(t, err) - for version := range allowedVersions { - fileName := "TestVersions.parquet" - f, err := os.CreateTemp("", fileName) - require.NoError(t, err) - - writer, err := NewWriter(schemaDef, f, WithVersion(version)) - require.NoError(t, err) - - err = writer.Close() - require.NoError(t, err) - - f, err = os.Open(f.Name()) - require.NoError(t, err) - - reader, err := file.NewParquetReader(f) - require.NoError(t, err) - - require.Equal(t, reader.MetaData().Version(), writer.cfg.version) - - err = reader.Close() - require.NoError(t, err) + opt := WithVersion(version) + optionsTest(t, opt, func(t *testing.T, reader *file.Reader) { + require.Equal(t, reader.MetaData().Version(), allowedVersions[version]) + }) } + schemaDef, err := NewSchema([]string{}, []*types.T{}) + require.NoError(t, err) buf := bytes.Buffer{} _, err = NewWriter(schemaDef, &buf, WithVersion("invalid")) require.Error(t, err) } func TestCompressionCodecs(t *testing.T) { + for compression := range compressionCodecToParquet { + opt := WithCompressionCodec(compression) + optionsTest(t, opt, func(t *testing.T, reader *file.Reader) { + colChunk, err := reader.MetaData().RowGroup(0).ColumnChunk(0) + require.NoError(t, err) + require.Equal(t, colChunk.Compression(), compressionCodecToParquet[compression]) + }) + } +} + +// TestMetadata tests writing arbitrary kv metadata to parquet files. +func TestMetadata(t *testing.T) { + meta := map[string]string{} + meta["testKey1"] = "testValue1" + meta["testKey2"] = "testValue2" + opt := WithMetadata(meta) + optionsTest(t, opt, func(t *testing.T, reader *file.Reader) { + val := reader.MetaData().KeyValueMetadata().FindValue("testKey1") + require.NotNil(t, reader.MetaData().KeyValueMetadata().FindValue("testKey1")) + require.Equal(t, *val, "testValue1") + + val = reader.MetaData().KeyValueMetadata().FindValue("testKey2") + require.NotNil(t, reader.MetaData().KeyValueMetadata().FindValue("testKey2")) + require.Equal(t, *val, "testValue2") + }) +} + +// optionsTest can be used to assert the behavior of an Option. It creates a +// writer using the supplied Option and writes a parquet file with sample data. +// Then it calls the provided test function with the reader and subsequently +// closes it. +func optionsTest(t *testing.T, opt Option, testFn func(t *testing.T, reader *file.Reader)) { schemaDef, err := NewSchema([]string{"a"}, []*types.T{types.Int}) require.NoError(t, err) - for compression := range compressionCodecToParquet { - fileName := "TestCompressionCodecs.parquet" - f, err := os.CreateTemp("", fileName) - require.NoError(t, err) - - writer, err := NewWriter(schemaDef, f, WithCompressionCodec(compression)) - require.NoError(t, err) + fileName := "OptionsTest.parquet" + f, err := os.CreateTemp("", fileName) + require.NoError(t, err) - err = writer.AddRow([]tree.Datum{tree.NewDInt(0)}) - require.NoError(t, err) + writer, err := NewWriter(schemaDef, f, opt) + require.NoError(t, err) - err = writer.Close() - require.NoError(t, err) + err = writer.AddRow([]tree.Datum{tree.NewDInt(0)}) + require.NoError(t, err) - f, err = os.Open(f.Name()) - require.NoError(t, err) + err = writer.Close() + require.NoError(t, err) - reader, err := file.NewParquetReader(f) - require.NoError(t, err) + f, err = os.Open(f.Name()) + require.NoError(t, err) - colChunk, err := reader.MetaData().RowGroup(0).ColumnChunk(0) - require.NoError(t, err) + reader, err := file.NewParquetReader(f) + require.NoError(t, err) - require.Equal(t, colChunk.Compression(), compressionCodecToParquet[compression]) + testFn(t, reader) - err = reader.Close() - require.NoError(t, err) - } + err = reader.Close() + require.NoError(t, err) }