Skip to content

Commit

Permalink
cdcevent: add updated and mvcc timestamp option columns
Browse files Browse the repository at this point in the history
  • Loading branch information
jayshrivastava committed May 24, 2023
1 parent 1c7bedf commit 74a2387
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 54 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
Expand Down Expand Up @@ -488,7 +487,7 @@ func TestAvroSchema(t *testing.T) {
tableDesc, err := parseTableDesc(`CREATE TABLE foo (pk INT PRIMARY KEY, a ` + colType + `)`)
require.NoError(t, err)
field, err := columnToAvroSchema(
cdcevent.ResultColumn{ResultColumn: colinfo.ResultColumn{Typ: tableDesc.PublicColumns()[1].GetType()}},
cdcevent.ResultColumn{Typ: tableDesc.PublicColumns()[1].GetType()},
)
require.NoError(t, err)
schema, err := json.Marshal(field.SchemaType)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/expr_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ $$`)
defer e.Close()

ctx := context.Background()
decoder, err := cdcevent.NewEventDecoder(ctx, &execCfg, targets, false, false)
decoder, err := cdcevent.NewEventDecoder(ctx, &execCfg, targets, false, false, false, false)
require.NoError(t, err)

for _, action := range tc.setupActions {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func newEventDescriptorForTarget(
if err != nil {
return nil, err
}
return cdcevent.NewEventDescriptor(desc, family, includeVirtual, keyOnly, schemaTS)
return cdcevent.NewEventDescriptor(desc, family, includeVirtual, keyOnly, schemaTS, false, false)
}

func getTargetFamilyDescriptor(
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/lease",
"//pkg/sql/distsql",
Expand Down
126 changes: 109 additions & 17 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type Row struct {
// datums is the new value of a changed table row.
datums rowenc.EncDatumRow

// metadatums
metaDatums tree.Datums

// deleted is true if row is a deletion. In this case, only the primary
// key columns are guaranteed to be set in `datums`.
deleted bool
Expand Down Expand Up @@ -110,6 +113,11 @@ func (r Row) ForEachUDTColumn() Iterator {
return iter{r: r, cols: r.udtCols}
}

// ForAllMeta returns Iterator for all meta columns.
func (r Row) ForAllMeta() Iterator {
return iter{r: r, cols: r.metaCols}
}

// DatumNamed returns the datum with the specified column name, in the form of an Iterator.
func (r Row) DatumNamed(n string) (Iterator, error) {
idx, ok := r.EventDescriptor.colsByName[n]
Expand Down Expand Up @@ -180,6 +188,13 @@ func (r Row) forEachDatum(fn DatumFn, colIndexes []int) error {
numVirtualCols := 0
for _, colIdx := range colIndexes {
col := r.cols[colIdx]
if col.isMetadataColumn {
if err := fn(r.metaDatums[col.ord], col); err != nil {
return err
}
continue
}

// A datum row will never contain virtual columns. If we encounter a column that is virtual,
// then we need to offset each subsequent col.ord by 1. This offset is tracked by numVirtualCols.
physicalOrd := col.ord - numVirtualCols
Expand Down Expand Up @@ -218,9 +233,24 @@ func (r Row) forEachColumn(fn ColumnFn, colIndexes []int) error {
// ResultColumn associates ResultColumn with an ordinal position where
// such column expected to be found.
type ResultColumn struct {
colinfo.ResultColumn
ord int
sqlString string
Name string
Typ *types.T

// isMetadataColumn is true if this column does not correspond to a physical column
// stored in the table. These columns are considered value columns, tacked on
// at the end.
isMetadataColumn bool

// ord is the position of the column in the datums array of a Row.
// For metadata columns, this is the position of the column in
// the meta datums array in a Row.
ord int

// The below fields are uninitialized for metadata columns.

sqlString string
tableID descpb.ID // OID of column's source table (pg_attribute.attrelid).
pgAttributeNum uint32 // Column's number in source table (pg_attribute.attnum).
}

// SQLStringNotHumanReadable returns the SQL statement describing the column.
Expand Down Expand Up @@ -248,17 +278,24 @@ type EventDescriptor struct {
keyCols []int // Primary key columns.
valueCols []int // All column family columns.
udtCols []int // Columns containing UDTs.
metaCols []int // Metadata cols.
allCols []int // Contains all the columns
colsByName map[string]int // All columns, map[col.GetName()]idx in cols
}

const MetaUpdatedColName = "updated"

const MetaMVCCTimestampColName = "mvcc_timestamp"

// NewEventDescriptor returns EventDescriptor for specified table and family descriptors.
func NewEventDescriptor(
desc catalog.TableDescriptor,
family *descpb.ColumnFamilyDescriptor,
includeVirtualColumns bool,
keyOnly bool,
schemaTS hlc.Timestamp,
withMetaUpdated bool,
withMetaMVCCTimestamp bool,
) (*EventDescriptor, error) {
sd := EventDescriptor{
Metadata: Metadata{
Expand All @@ -277,12 +314,11 @@ func NewEventDescriptor(
// addColumn is a helper to add a column to this descriptor.
addColumn := func(col catalog.Column, ord int) int {
resultColumn := ResultColumn{
ResultColumn: colinfo.ResultColumn{
Name: col.GetName(),
Typ: col.GetType(),
TableID: desc.GetID(),
PGAttributeNum: uint32(col.GetPGAttributeNum()),
},
Name: col.GetName(),
Typ: col.GetType(),
tableID: desc.GetID(),
pgAttributeNum: uint32(col.GetPGAttributeNum()),

ord: ord,
sqlString: col.ColumnDesc().SQLStringNotHumanReadable(),
}
Expand All @@ -297,6 +333,23 @@ func NewEventDescriptor(
return colIdx
}

addMetaColumn := func(name string, typ *types.T, ord int) (int, error) {
resultColumn := ResultColumn{
Name: name,
Typ: typ,
isMetadataColumn: true,
}

colIdx := len(sd.cols)
sd.cols = append(sd.cols, resultColumn)
if _, exists := sd.colsByName[name]; exists {
return 0, errors.AssertionFailedf("duplicate name %s", name)
}
sd.colsByName[name] = colIdx

return colIdx, nil
}

// Primary key columns must be added in the same order they
// appear in the primary key index.
primaryIdx := desc.GetPrimaryIndex()
Expand Down Expand Up @@ -348,6 +401,23 @@ func NewEventDescriptor(
}
}
}
metaOrd := 0
if withMetaUpdated {
colIdx, err := addMetaColumn(MetaUpdatedColName, types.String, metaOrd)
if err != nil {
return nil, err
}
sd.metaCols = append(sd.metaCols, colIdx)
metaOrd += 1
}
if withMetaMVCCTimestamp {
colIdx, err := addMetaColumn(MetaMVCCTimestampColName, types.String, metaOrd)
if err != nil {
return nil, err
}
sd.metaCols = append(sd.metaCols, colIdx)
metaOrd += 1
}

allCols := make([]int, len(sd.cols))
for i := 0; i < len(sd.cols); i++ {
Expand Down Expand Up @@ -432,6 +502,8 @@ func getEventDescriptorCached(
keyOnly bool,
schemaTS hlc.Timestamp,
cache *cache.UnorderedCache,
withUpdated bool,
withMVCC bool,
) (*EventDescriptor, error) {
idVer := CacheKey{ID: desc.GetID(), Version: desc.GetVersion(), FamilyID: family.ID}

Expand All @@ -442,10 +514,11 @@ func getEventDescriptorCached(
}
}

ed, err := NewEventDescriptor(desc, family, includeVirtual, keyOnly, schemaTS)
ed, err := NewEventDescriptor(desc, family, includeVirtual, keyOnly, schemaTS, withUpdated, withMVCC)
if err != nil {
return nil, err
}

cache.Add(idVer, ed)
return ed, nil
}
Expand All @@ -457,6 +530,8 @@ func NewEventDecoder(
targets changefeedbase.Targets,
includeVirtual bool,
keyOnly bool,
withUpdated bool,
withMvccTimestamps bool,
) (Decoder, error) {
rfCache, err := newRowFetcherCache(
ctx,
Expand All @@ -476,7 +551,7 @@ func NewEventDecoder(
family *descpb.ColumnFamilyDescriptor,
schemaTS hlc.Timestamp,
) (*EventDescriptor, error) {
return getEventDescriptorCached(desc, family, includeVirtual, keyOnly, schemaTS, eventDescriptorCache)
return getEventDescriptorCached(desc, family, includeVirtual, keyOnly, schemaTS, eventDescriptorCache, withUpdated, withMvccTimestamps)
}

return &eventDecoder{
Expand All @@ -493,6 +568,13 @@ const (
PrevRow
)

// timestampToString converts an internal timestamp to the string form used in
// all encoders. This could be made more efficient. And/or it could be configurable
// to include the Synthetic flag when present, but that's unlikely to be needed.
func timestampToString(t hlc.Timestamp) string {
return t.WithSynthetic(false).AsOfSystemTime()
}

// DecodeKV decodes key value at specified schema timestamp.
func (d *eventDecoder) DecodeKV(
ctx context.Context, kv roachpb.KeyValue, rt RowType, schemaTS hlc.Timestamp, keyOnly bool,
Expand All @@ -516,11 +598,22 @@ func (d *eventDecoder) DecodeKV(
if err != nil {
return Row{}, err
}
var metaDatums tree.Datums
if len(ed.metaCols) > 0 {
for _, colIdx := range ed.metaCols {
if ed.cols[colIdx].Name == MetaUpdatedColName {
metaDatums = append(metaDatums, tree.NewDString(timestampToString(schemaTS)))
} else if ed.cols[colIdx].Name == MetaMVCCTimestampColName {
metaDatums = append(metaDatums, tree.NewDString(timestampToString(kv.Value.Timestamp)))
}
}
}

return Row{
EventDescriptor: ed,
MvccTimestamp: kv.Value.Timestamp,
datums: datums,
metaDatums: metaDatums,
deleted: isDeleted,
alloc: &d.alloc,
}, nil
Expand Down Expand Up @@ -628,7 +721,7 @@ func TestingMakeEventRow(
panic(err) // primary column family always exists.
}
const includeVirtual = false
ed, err := NewEventDescriptor(desc, family, includeVirtual, false, hlc.Timestamp{})
ed, err := NewEventDescriptor(desc, family, includeVirtual, false, hlc.Timestamp{}, false, false)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -726,11 +819,10 @@ func TestingMakeEventRowFromEncDatums(
colName += fmt.Sprintf("_%d", names[colName]-1)
}
cols = append(cols, ResultColumn{
ResultColumn: colinfo.ResultColumn{
Name: colName,
Typ: typ,
TableID: 42,
},
Name: colName,
Typ: typ,
tableID: 42,

ord: i,
})
}
Expand Down
Loading

0 comments on commit 74a2387

Please sign in to comment.