Skip to content

Commit

Permalink
sink/(ticdc): Simplify and optimize some logic in cloud storage sink (#…
Browse files Browse the repository at this point in the history
…7547)

ref #6797, close #7724
  • Loading branch information
zhaoxinyu authored Nov 28, 2022
1 parent 07657c5 commit 7f27730
Show file tree
Hide file tree
Showing 30 changed files with 422 additions and 379 deletions.
17 changes: 4 additions & 13 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,30 +365,21 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
intRowID = row.RecordID.IntValue()
}

var tableInfoVersion uint64
// Align with the old format if old value disabled.
if row.Delete && !m.enableOldValue {
tableInfoVersion = 0
} else {
tableInfoVersion = tableInfo.TableInfoVersion
}

_, _, colInfos := tableInfo.GetRowColInfos()

rawRow.PreRowDatums = preRawCols
rawRow.RowDatums = rawCols
return &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: intRowID,
TableInfoVersion: tableInfoVersion,
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: intRowID,
Table: &model.TableName{
Schema: schemaName,
Table: tableName,
TableID: row.PhysicalTableID,
IsPartition: tableInfo.GetPartitionInfo() != nil,
},
ColInfos: colInfos,
TableInfo: tableInfo,
Columns: cols,
PreColumns: preCols,
IndexColumns: tableInfo.IndexColumnsOffset,
Expand Down
14 changes: 7 additions & 7 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ type TableInfo struct {
*model.TableInfo
SchemaID int64
TableName TableName
// TableInfoVersion record the tso of create the table info.
TableInfoVersion uint64
columnsOffset map[int64]int
indicesOffset map[int64]int
uniqueColumns map[int64]struct{}
// Version record the tso of create the table info.
Version uint64
columnsOffset map[int64]int
indicesOffset map[int64]int
uniqueColumns map[int64]struct{}

// It's a mapping from ColumnID to the offset of the columns in row changed events.
RowColumnsOffset map[int64]int
Expand Down Expand Up @@ -70,7 +70,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
Table: info.Name.O,
TableID: info.ID,
},
TableInfoVersion: version,
Version: version,
columnsOffset: make(map[int64]int, len(info.Columns)),
indicesOffset: make(map[int64]int, len(info.Indices)),
uniqueColumns: make(map[int64]struct{}),
Expand Down Expand Up @@ -301,5 +301,5 @@ func (ti *TableInfo) IsIndexUnique(indexInfo *model.IndexInfo) bool {

// Clone clones the TableInfo
func (ti *TableInfo) Clone() *TableInfo {
return WrapTableInfo(ti.SchemaID, ti.TableName.Schema, ti.TableInfoVersion, ti.TableInfo.Clone())
return WrapTableInfo(ti.SchemaID, ti.TableName.Schema, ti.Version, ti.TableInfo.Clone())
}
17 changes: 8 additions & 9 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,9 @@ type RowChangedEvent struct {

RowID int64 `json:"row-id" msg:"-"` // Deprecated. It is empty when the RowID comes from clustered index table.

Table *TableName `json:"table" msg:"table"`
ColInfos []rowcodec.ColInfo `json:"column-infos" msg:"-"`

TableInfoVersion uint64 `json:"table-info-version,omitempty" msg:"table-info-version"`
Table *TableName `json:"table" msg:"table"`
ColInfos []rowcodec.ColInfo `json:"column-infos" msg:"-"`
TableInfo *TableInfo `json:"-" msg:"-"`

Columns []*Column `json:"columns" msg:"-"`
PreColumns []*Column `json:"pre-columns" msg:"-"`
Expand Down Expand Up @@ -653,11 +652,11 @@ func (d *DDLEvent) FromRenameTablesJob(job *model.Job,
//msgp:ignore SingleTableTxn
type SingleTableTxn struct {
// data fields of SingleTableTxn
Table *TableName
TableVersion uint64
StartTs uint64
CommitTs uint64
Rows []*RowChangedEvent
Table *TableName
TableInfo *TableInfo
StartTs uint64
CommitTs uint64
Rows []*RowChangedEvent

// control fields of SingleTableTxn
// FinishWg is a barrier txn, after this txn is received, the worker must
Expand Down
35 changes: 5 additions & 30 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ func SplitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv
deleteEvent.Row.PreColumns[i] = nil
}
}
// Align with the old format if old value disabled.
deleteEvent.Row.TableInfoVersion = 0

insertEvent := *updateEvent
insertEventRow := *updateEvent.Row
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (a *BatchEncoder) avroEncode(
avroCodec, registryID, err := schemaManager.GetCachedOrRegister(
ctx,
topic,
e.TableInfoVersion,
e.TableInfo.Version,
schemaGen,
)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions cdc/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,12 @@ func TestAvroEncode(t *testing.T) {
Schema: "testdb",
Table: "avroencode",
},
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "testdb",
Table: "avroencode",
},
},
Columns: cols,
ColInfos: colInfos,
}
Expand Down Expand Up @@ -910,8 +916,9 @@ func TestArvoAppendRowChangedEventWithCallback(t *testing.T) {
require.Len(t, msgs, 0, "no message should be built and no panic")

row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
TableInfo: &model.TableInfo{TableName: model.TableName{Schema: "a", Table: "b"}},
Columns: []*model.Column{{
Name: "col1",
Type: mysql.TypeVarchar,
Expand Down
7 changes: 6 additions & 1 deletion cdc/sink/codec/csv/csv_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (c *csvMessage) encode() []byte {
}

func (c *csvMessage) decode(datums []types.Datum) error {
var dataColIdx int
if len(datums) < minimumColsCnt {
return cerror.WrapError(cerror.ErrCSVDecodeFailed,
errors.New("the csv row should have at least four columns"+
Expand All @@ -123,21 +124,25 @@ func (c *csvMessage) decode(datums []types.Datum) error {
if err := c.opType.FromString(datums[0].GetString()); err != nil {
return cerror.WrapError(cerror.ErrCSVDecodeFailed, err)
}
dataColIdx++
c.tableName = datums[1].GetString()
dataColIdx++
c.schemaName = datums[2].GetString()
dataColIdx++
if c.config.IncludeCommitTs {
commitTs, err := strconv.ParseUint(datums[3].GetString(), 10, 64)
if err != nil {
return cerror.WrapError(cerror.ErrCSVDecodeFailed,
fmt.Errorf("the 4th column(%s) of csv row should be a valid commit-ts", datums[3].GetString()))
}
c.commitTs = commitTs
dataColIdx++
} else {
c.commitTs = 0
}
c.columns = c.columns[:0]

for i := 4; i < len(datums); i++ {
for i := dataColIdx; i < len(datums); i++ {
if datums[i].IsNull() {
c.columns = append(c.columns, nil)
} else {
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/mysql/txn_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) {
var txn *model.SingleTableTxn
if len(t.txns) == 0 || row.SplitTxn || t.txns[len(t.txns)-1].StartTs < row.StartTs {
txn = &model.SingleTableTxn{
StartTs: row.StartTs,
CommitTs: row.CommitTs,
Table: row.Table,
StartTs: row.StartTs,
CommitTs: row.CommitTs,
Table: row.Table,
TableInfo: row.TableInfo,
}
t.txns = append(t.txns, txn)
} else if t.txns[len(t.txns)-1].StartTs == row.StartTs {
Expand Down
25 changes: 0 additions & 25 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type ddlSink struct {
// statistic is used to record the DDL metrics
statistics *metrics.Statistics
storage storage.ExternalStorage
tables *model.TableSet
}

// NewCloudStorageDDLSink creates a ddl sink for cloud storage.
Expand All @@ -63,7 +62,6 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er
d := &ddlSink{
id: changefeedID,
storage: storage,
tables: model.NewTableSet(),
statistics: metrics.NewStatistics(ctx, sink.TxnSink),
}

Expand Down Expand Up @@ -94,7 +92,6 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return err1
}

d.tables.Add(ddl.TableInfo.ID)
return nil
})

Expand All @@ -104,28 +101,6 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
func (d *ddlSink) WriteCheckpointTs(ctx context.Context,
ts uint64, tables []*model.TableInfo,
) error {
for _, table := range tables {
ok := d.tables.Contain(table.ID)
// if table is not cached before, then create the corresponding
// schema.json file anyway.
if !ok {
var def cloudstorage.TableDetail
def.FromTableInfo(table)

encodedDef, err := json.MarshalIndent(def, "", " ")
if err != nil {
return errors.Trace(err)
}

path := d.generateSchemaPath(def)
err = d.storage.WriteFile(ctx, path, encodedDef)
if err != nil {
return errors.Trace(err)
}
d.tables.Add(table.ID)
}
}

ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts})
if err != nil {
return errors.Trace(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestWriteDDLEvent(t *testing.T) {

ddlEvent := &model.DDLEvent{
TableInfo: &model.TableInfo{
TableInfoVersion: 100,
Version: 100,
TableName: model.TableName{
Schema: "test",
Table: "table1",
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestWriteCheckpointTs(t *testing.T) {
require.Nil(t, err)
tables := []*model.TableInfo{
{
TableInfoVersion: 100,
Version: 100,
TableName: model.TableName{
Schema: "test",
Table: "table1",
Expand All @@ -123,8 +123,6 @@ func TestWriteCheckpointTs(t *testing.T) {

err = sink.WriteCheckpointTs(ctx, 100, tables)
require.Nil(t, err)
_, err = os.Stat(path.Join(table1Dir, "schema.json"))
require.Nil(t, err)
metadata, err := os.ReadFile(path.Join(parentDir, "metadata"))
require.Nil(t, err)
require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata))
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func New(
return blackhole.New(), nil
case sink.MySQLSSLScheme, sink.MySQLScheme, sink.TiDBScheme, sink.TiDBSSLScheme:
return mysql.NewMySQLDDLSink(ctx, sinkURI, cfg, pmysql.CreateMySQLDBConn)
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.AzblobScheme:
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
return cloudstorage.NewCloudStorageDDLSink(ctx, sinkURI)
default:
return nil,
Expand Down
Loading

0 comments on commit 7f27730

Please sign in to comment.