Skip to content

Commit

Permalink
changefeedccl: add parquet writer
Browse files Browse the repository at this point in the history
This change adds the file `parquet.go` which contains
helper functions to help create parquet writers
and export data via `cdcevent.Row` structs.

This change also adds tests to ensure rows are written
to parquet files correctly.

Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Apr 3, 2023
1 parent 6b9b940 commit f015156
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 12 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"metrics.go",
"name.go",
"parallel_io.go",
"parquet.go",
"parquet_sink_cloudstorage.go",
"retry.go",
"scheduled_changefeed.go",
Expand Down Expand Up @@ -137,6 +138,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/mon",
"//pkg/util/parquet",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down Expand Up @@ -188,6 +190,7 @@ go_test(
"main_test.go",
"name_test.go",
"nemeses_test.go",
"parquet_test.go",
"scheduled_changefeed_test.go",
"schema_registry_test.go",
"show_changefeed_jobs_test.go",
Expand Down Expand Up @@ -297,6 +300,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/mon",
"//pkg/util/parquet",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
sqlDB.ExpectErr(t, errMsg, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar WITH initial_scan`, jobFeed.JobID()))
}

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
cdcTestWithSystem(t, testFn, feedTestForceSink("pubsub"), feedTestNoExternalConnection)
}

func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
Expand Down
65 changes: 65 additions & 0 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"io"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
)

// NewCDCParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func NewCDCParquetWriterFromRow(
row cdcevent.Row, sink io.Writer, maxRowGroupSize int64,
) (*parquet.Writer, error) {
columnNames := make([]string, len(row.ResultColumns())+1)
columnTypes := make([]*types.T, len(row.ResultColumns())+1)

idx := 0
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
columnNames[idx] = col.Name
columnTypes[idx] = col.Typ
idx += 1
return nil
}); err != nil {
return nil, err
}

columnNames[idx] = parquetCrdbEventTypeColName
columnTypes[idx] = types.String

schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
return nil, err
}

return parquet.NewWriter(schemaDef, sink, parquet.WithMaxRowGroupLength(maxRowGroupSize))
}

// AddData writes the updatedRow to the writer, adding the row's event type.
func AddData(
writer *parquet.Writer, updatedRow cdcevent.Row, prevRow cdcevent.Row,
) ([]tree.Datum, error) {
eventType := getEventTypeDatum(updatedRow, prevRow)
datumRow := make([]tree.Datum, len(updatedRow.ResultColumns())+1)
idx := 0
if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
datumRow[idx] = d
idx += 1
return nil
}); err != nil {
return nil, err
}
datumRow[idx] = eventType.DString()
return datumRow, writer.AddData(datumRow)
}
36 changes: 26 additions & 10 deletions pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ var includeParquetTestMetadata = false
// defined below.
const parquetCrdbEventTypeColName string = "__crdb_event_type__"

type parquetEventType int

const (
parquetEventInsert string = "c"
parquetEventUpdate string = "u"
parquetEventDelete string = "d"
parquetEventInsert parquetEventType = iota
parquetEventUpdate
parquetEventDelete
)

var parquetEventTypeDatumStringMap = map[parquetEventType]*tree.DString{
parquetEventInsert: tree.NewDString("c"),
parquetEventUpdate: tree.NewDString("u"),
parquetEventDelete: tree.NewDString("d"),
}

func (e parquetEventType) DString() *tree.DString {
return parquetEventTypeDatumStringMap[e]
}

// We need a separate sink for parquet format because the parquet encoder has to
// write metadata to the parquet file (buffer) after each flush. This means that the
// parquet encoder should have access to the buffer object inside
Expand Down Expand Up @@ -178,13 +190,8 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
return err
}

if updatedRow.IsDeleted() {
parquetRow[parquetCrdbEventTypeColName] = []byte(parquetEventDelete)
} else if prevRow.IsInitialized() && !prevRow.IsDeleted() {
parquetRow[parquetCrdbEventTypeColName] = []byte(parquetEventUpdate)
} else {
parquetRow[parquetCrdbEventTypeColName] = []byte(parquetEventInsert)
}
et := getEventTypeDatum(updatedRow, prevRow)
parquetRow[parquetCrdbEventTypeColName] = []byte(et.DString().String())

if err = file.parquetCodec.parquetWriter.AddData(parquetRow); err != nil {
return err
Expand All @@ -204,6 +211,15 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
return nil
}

func getEventTypeDatum(updatedRow cdcevent.Row, prevRow cdcevent.Row) parquetEventType {
if updatedRow.IsDeleted() {
return parquetEventDelete
} else if prevRow.IsInitialized() && !prevRow.IsDeleted() {
return parquetEventUpdate
}
return parquetEventInsert
}

func makeParquetWriterWrapper(
ctx context.Context, row cdcevent.Row, buf *bytes.Buffer, compression parquet.CompressionCodec,
) (*parquetFileWriter, error) {
Expand Down
149 changes: 149 additions & 0 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"context"
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/stretchr/testify/require"
)

func TestParquetRows(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Rangefeed reader can time out under stress.
skip.UnderStress(t)

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
// TODO(#98816): cdctest.GetHydratedTableDescriptor does not work with tenant dbs.
// Once it is fixed, this flag can be removed.
DefaultTestTenant: base.TestTenantDisabled,
})
defer s.Stopper().Stop(ctx)

maxRowGroupSize := int64(2)

sqlDB := sqlutils.MakeSQLRunner(db)

for _, tc := range []struct {
testName string
createTable string
inserts []string
}{
{
testName: "mixed",
createTable: `CREATE TABLE foo (
int32Col INT4 PRIMARY KEY,
varCharCol VARCHAR(16) ,
charCol CHAR(2),
tsCol TIMESTAMP ,
stringCol STRING ,
decimalCOl DECIMAL(12,2),
uuidCol UUID
)`,
inserts: []string{
`INSERT INTO foo values (0, 'zero', 'CA', now(), 'oiwjfoijsdjif', 'inf', gen_random_uuid())`,
`INSERT INTO foo values (1, 'one', 'NY', now(), 'sdi9fu90d', '-1.90', gen_random_uuid())`,
`INSERT INTO foo values (2, 'two', 'WA', now(), 'sd9fid9fuj', '0.01', gen_random_uuid())`,
`INSERT INTO foo values (3, 'three', 'ON', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (4, 'four', 'NS', now(), '123123', '-11222221.2', gen_random_uuid())`,
`INSERT INTO foo values (5, 'five', 'BC', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (6, 'siz', 'AB', now(), '123123', '-11222221.2', gen_random_uuid())`,
},
},
} {
t.Run(tc.testName, func(t *testing.T) {
sqlDB.Exec(t, tc.createTable)
defer func() {
sqlDB.Exec(t, "DROP TABLE foo")
}()

popRow, cleanup, decoder := makeRangefeedReaderAndDecoder(t, s)
defer cleanup()

fileName := "TestParquetRows"
var writer *parquet.Writer
var numCols int
f, err := os.CreateTemp(os.TempDir(), fileName)
require.NoError(t, err)

numRows := len(tc.inserts)
for _, insertStmt := range tc.inserts {
sqlDB.Exec(t, insertStmt)
}

datums := make([][]tree.Datum, numRows)
for i := 0; i < numRows; i++ {
v := popRow(t)

updatedRow, err := decoder.DecodeKV(
ctx, roachpb.KeyValue{Key: v.Key, Value: v.Value}, cdcevent.CurrentRow, v.Timestamp(), false)
require.NoError(t, err)

prevRow, err := decoder.DecodeKV(
ctx, roachpb.KeyValue{Key: v.Key, Value: v.PrevValue}, cdcevent.PrevRow, v.Timestamp(), false)
require.NoError(t, err)

if writer == nil {
writer, err = NewCDCParquetWriterFromRow(updatedRow, f, maxRowGroupSize)
if err != nil {
t.Fatalf(err.Error())
}
numCols = len(updatedRow.ResultColumns()) + 1
}

datumRow, err := AddData(writer, updatedRow, prevRow)
datums[i] = datumRow
require.NoError(t, err)
}

err = writer.Close()
require.NoError(t, err)

parquet.ReadFileAndVerifyDatums(t, f.Name(), numRows, numCols, writer.Config(), writer.SchemaDefinition(), datums)
})
}
}

func makeRangefeedReaderAndDecoder(
t *testing.T, s serverutils.TestServerInterface,
) (func(t *testing.T) *kvpb.RangeFeedValue, func(), cdcevent.Decoder) {
tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo")
popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc)
targets := changefeedbase.Targets{}
targets.Add(changefeedbase.Target{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: tableDesc.GetID(),
FamilyName: "primary",
})
sqlExecCfg := s.ExecutorConfig().(sql.ExecutorConfig)
ctx := context.Background()
decoder, err := cdcevent.NewEventDecoder(ctx, &sqlExecCfg, targets, false, false)
require.NoError(t, err)
return popRow, cleanup, decoder
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ func (c *cloudFeed) appendParquetTestFeedMessages(

for k, v := range row {
if k == parquetCrdbEventTypeColName {
if string(v.([]byte)) == parquetEventDelete {
if string(v.([]byte)) == parquetEventDelete.DString().String() {
isDeleted = true
}
continue
Expand Down

0 comments on commit f015156

Please sign in to comment.