From f0151560c30ac4d931bbbc4f246b0c259fb4ea7d Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 28 Mar 2023 14:01:56 -0400 Subject: [PATCH] changefeedccl: add parquet writer This change adds the file `parquet.go` which contains helper functions to help create parquet writers and export data via `cdcevent.Row` structs. This change also adds tests to ensure rows are written to parquet files correctly. Epic: None Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 4 + .../changefeedccl/alter_changefeed_test.go | 2 +- pkg/ccl/changefeedccl/parquet.go | 65 ++++++++ .../parquet_sink_cloudstorage.go | 36 +++-- pkg/ccl/changefeedccl/parquet_test.go | 149 ++++++++++++++++++ pkg/ccl/changefeedccl/testfeed_test.go | 2 +- 6 files changed, 246 insertions(+), 12 deletions(-) create mode 100644 pkg/ccl/changefeedccl/parquet.go create mode 100644 pkg/ccl/changefeedccl/parquet_test.go diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 480f7c7cdd91..b349691915e3 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "metrics.go", "name.go", "parallel_io.go", + "parquet.go", "parquet_sink_cloudstorage.go", "retry.go", "scheduled_changefeed.go", @@ -137,6 +138,7 @@ go_library( "//pkg/util/metric", "//pkg/util/metric/aggmetric", "//pkg/util/mon", + "//pkg/util/parquet", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/retry", @@ -188,6 +190,7 @@ go_test( "main_test.go", "name_test.go", "nemeses_test.go", + "parquet_test.go", "scheduled_changefeed_test.go", "schema_registry_test.go", "show_changefeed_jobs_test.go", @@ -297,6 +300,7 @@ go_test( "//pkg/util/log", "//pkg/util/log/eventpb", "//pkg/util/mon", + "//pkg/util/parquet", "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/retry", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 181b70d8bb30..ba9d7b2e4222 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -1253,7 +1253,7 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { sqlDB.ExpectErr(t, errMsg, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar WITH initial_scan`, jobFeed.JobID())) } - cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection) + cdcTestWithSystem(t, testFn, feedTestForceSink("pubsub"), feedTestNoExternalConnection) } func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go new file mode 100644 index 000000000000..45cda6341221 --- /dev/null +++ b/pkg/ccl/changefeedccl/parquet.go @@ -0,0 +1,65 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "io" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/parquet" +) + +// NewCDCParquetWriterFromRow constructs a new parquet writer which outputs to +// the given sink. This function interprets the schema from the supplied row. +func NewCDCParquetWriterFromRow( + row cdcevent.Row, sink io.Writer, maxRowGroupSize int64, +) (*parquet.Writer, error) { + columnNames := make([]string, len(row.ResultColumns())+1) + columnTypes := make([]*types.T, len(row.ResultColumns())+1) + + idx := 0 + if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error { + columnNames[idx] = col.Name + columnTypes[idx] = col.Typ + idx += 1 + return nil + }); err != nil { + return nil, err + } + + columnNames[idx] = parquetCrdbEventTypeColName + columnTypes[idx] = types.String + + schemaDef, err := parquet.NewSchema(columnNames, columnTypes) + if err != nil { + return nil, err + } + + return parquet.NewWriter(schemaDef, sink, parquet.WithMaxRowGroupLength(maxRowGroupSize)) +} + +// AddData writes the updatedRow to the writer, adding the row's event type. +func AddData( + writer *parquet.Writer, updatedRow cdcevent.Row, prevRow cdcevent.Row, +) ([]tree.Datum, error) { + eventType := getEventTypeDatum(updatedRow, prevRow) + datumRow := make([]tree.Datum, len(updatedRow.ResultColumns())+1) + idx := 0 + if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { + datumRow[idx] = d + idx += 1 + return nil + }); err != nil { + return nil, err + } + datumRow[idx] = eventType.DString() + return datumRow, writer.AddData(datumRow) +} diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index 72875f09a4e6..82e63e671aa9 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -35,12 +35,24 @@ var includeParquetTestMetadata = false // defined below. const parquetCrdbEventTypeColName string = "__crdb_event_type__" +type parquetEventType int + const ( - parquetEventInsert string = "c" - parquetEventUpdate string = "u" - parquetEventDelete string = "d" + parquetEventInsert parquetEventType = iota + parquetEventUpdate + parquetEventDelete ) +var parquetEventTypeDatumStringMap = map[parquetEventType]*tree.DString{ + parquetEventInsert: tree.NewDString("c"), + parquetEventUpdate: tree.NewDString("u"), + parquetEventDelete: tree.NewDString("d"), +} + +func (e parquetEventType) DString() *tree.DString { + return parquetEventTypeDatumStringMap[e] +} + // We need a separate sink for parquet format because the parquet encoder has to // write metadata to the parquet file (buffer) after each flush. This means that the // parquet encoder should have access to the buffer object inside @@ -178,13 +190,8 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( return err } - if updatedRow.IsDeleted() { - parquetRow[parquetCrdbEventTypeColName] = []byte(parquetEventDelete) - } else if prevRow.IsInitialized() && !prevRow.IsDeleted() { - parquetRow[parquetCrdbEventTypeColName] = []byte(parquetEventUpdate) - } else { - parquetRow[parquetCrdbEventTypeColName] = []byte(parquetEventInsert) - } + et := getEventTypeDatum(updatedRow, prevRow) + parquetRow[parquetCrdbEventTypeColName] = []byte(et.DString().String()) if err = file.parquetCodec.parquetWriter.AddData(parquetRow); err != nil { return err @@ -204,6 +211,15 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( return nil } +func getEventTypeDatum(updatedRow cdcevent.Row, prevRow cdcevent.Row) parquetEventType { + if updatedRow.IsDeleted() { + return parquetEventDelete + } else if prevRow.IsInitialized() && !prevRow.IsDeleted() { + return parquetEventUpdate + } + return parquetEventInsert +} + func makeParquetWriterWrapper( ctx context.Context, row cdcevent.Row, buf *bytes.Buffer, compression parquet.CompressionCodec, ) (*parquetFileWriter, error) { diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go new file mode 100644 index 000000000000..9223ca2f6591 --- /dev/null +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -0,0 +1,149 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/parquet" + "github.com/stretchr/testify/require" +) + +func TestParquetRows(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Rangefeed reader can time out under stress. + skip.UnderStress(t) + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + // TODO(#98816): cdctest.GetHydratedTableDescriptor does not work with tenant dbs. + // Once it is fixed, this flag can be removed. + DefaultTestTenant: base.TestTenantDisabled, + }) + defer s.Stopper().Stop(ctx) + + maxRowGroupSize := int64(2) + + sqlDB := sqlutils.MakeSQLRunner(db) + + for _, tc := range []struct { + testName string + createTable string + inserts []string + }{ + { + testName: "mixed", + createTable: `CREATE TABLE foo ( + int32Col INT4 PRIMARY KEY, + varCharCol VARCHAR(16) , + charCol CHAR(2), + tsCol TIMESTAMP , + stringCol STRING , + decimalCOl DECIMAL(12,2), + uuidCol UUID + )`, + inserts: []string{ + `INSERT INTO foo values (0, 'zero', 'CA', now(), 'oiwjfoijsdjif', 'inf', gen_random_uuid())`, + `INSERT INTO foo values (1, 'one', 'NY', now(), 'sdi9fu90d', '-1.90', gen_random_uuid())`, + `INSERT INTO foo values (2, 'two', 'WA', now(), 'sd9fid9fuj', '0.01', gen_random_uuid())`, + `INSERT INTO foo values (3, 'three', 'ON', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`, + `INSERT INTO foo values (4, 'four', 'NS', now(), '123123', '-11222221.2', gen_random_uuid())`, + `INSERT INTO foo values (5, 'five', 'BC', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`, + `INSERT INTO foo values (6, 'siz', 'AB', now(), '123123', '-11222221.2', gen_random_uuid())`, + }, + }, + } { + t.Run(tc.testName, func(t *testing.T) { + sqlDB.Exec(t, tc.createTable) + defer func() { + sqlDB.Exec(t, "DROP TABLE foo") + }() + + popRow, cleanup, decoder := makeRangefeedReaderAndDecoder(t, s) + defer cleanup() + + fileName := "TestParquetRows" + var writer *parquet.Writer + var numCols int + f, err := os.CreateTemp(os.TempDir(), fileName) + require.NoError(t, err) + + numRows := len(tc.inserts) + for _, insertStmt := range tc.inserts { + sqlDB.Exec(t, insertStmt) + } + + datums := make([][]tree.Datum, numRows) + for i := 0; i < numRows; i++ { + v := popRow(t) + + updatedRow, err := decoder.DecodeKV( + ctx, roachpb.KeyValue{Key: v.Key, Value: v.Value}, cdcevent.CurrentRow, v.Timestamp(), false) + require.NoError(t, err) + + prevRow, err := decoder.DecodeKV( + ctx, roachpb.KeyValue{Key: v.Key, Value: v.PrevValue}, cdcevent.PrevRow, v.Timestamp(), false) + require.NoError(t, err) + + if writer == nil { + writer, err = NewCDCParquetWriterFromRow(updatedRow, f, maxRowGroupSize) + if err != nil { + t.Fatalf(err.Error()) + } + numCols = len(updatedRow.ResultColumns()) + 1 + } + + datumRow, err := AddData(writer, updatedRow, prevRow) + datums[i] = datumRow + require.NoError(t, err) + } + + err = writer.Close() + require.NoError(t, err) + + parquet.ReadFileAndVerifyDatums(t, f.Name(), numRows, numCols, writer.Config(), writer.SchemaDefinition(), datums) + }) + } +} + +func makeRangefeedReaderAndDecoder( + t *testing.T, s serverutils.TestServerInterface, +) (func(t *testing.T) *kvpb.RangeFeedValue, func(), cdcevent.Decoder) { + tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") + popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc) + targets := changefeedbase.Targets{} + targets.Add(changefeedbase.Target{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: tableDesc.GetID(), + FamilyName: "primary", + }) + sqlExecCfg := s.ExecutorConfig().(sql.ExecutorConfig) + ctx := context.Background() + decoder, err := cdcevent.NewEventDecoder(ctx, &sqlExecCfg, targets, false, false) + require.NoError(t, err) + return popRow, cleanup, decoder +} diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 3581857ad2c7..0d0e11040c7d 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1344,7 +1344,7 @@ func (c *cloudFeed) appendParquetTestFeedMessages( for k, v := range row { if k == parquetCrdbEventTypeColName { - if string(v.([]byte)) == parquetEventDelete { + if string(v.([]byte)) == parquetEventDelete.DString().String() { isDeleted = true } continue