From bc2dfbff7c79e2f24a8b4be9dab2be585d6fa7d9 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 23 Nov 2021 23:20:12 +0800 Subject: [PATCH 1/5] remove cdclog. --- cdc/sink/sink.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 6b89e334167..67069809460 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -19,7 +19,6 @@ import ( "strings" "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/sink/cdclog" "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/filter" @@ -92,18 +91,6 @@ func init() { return newPulsarSink(ctx, sinkURI, filter, config, opts, errCh) } sinkIniterMap["pulsar+ssl"] = sinkIniterMap["pulsar"] - - // register local sink - sinkIniterMap["local"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { - return cdclog.NewLocalFileSink(ctx, sinkURI, errCh) - } - - // register s3 sink - sinkIniterMap["s3"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { - return cdclog.NewS3Sink(ctx, sinkURI, errCh) - } } // New creates a new sink with the sink-uri From 0c3404f89cc209687c68fa7a81841d8dc5b2cce0 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 24 Nov 2021 11:00:21 +0800 Subject: [PATCH 2/5] remove cdclog integration test. --- tests/cdclog_file/run.sh | 83 --------------------------- tests/cdclog_s3/run.sh | 118 --------------------------------------- 2 files changed, 201 deletions(-) delete mode 100644 tests/cdclog_file/run.sh delete mode 100644 tests/cdclog_s3/run.sh diff --git a/tests/cdclog_file/run.sh b/tests/cdclog_file/run.sh deleted file mode 100644 index af73828c962..00000000000 --- a/tests/cdclog_file/run.sh +++ /dev/null @@ -1,83 +0,0 @@ -#!/bin/bash - -set -e - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test - -function prepare() { - rm -rf "$WORK_DIR" - mkdir -p "$WORK_DIR" - stop_tidb_cluster - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - # record tso before we create tables to skip the system table DDLs - start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - - SINK_URI="local://$WORK_DIR/test?" - - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -} - -success=0 -function check_cdclog() { - # TODO test rotate file - DATA_DIR="$WORK_DIR/test" - # retrieve table id by log meta - if [ ! -f $DATA_DIR/log.meta ]; then - return - fi - table_id=$(cat $DATA_DIR/log.meta | jq | grep t1 | awk -F '"' '{print $2}') - if [ ! -d $DATA_DIR/t_$table_id ]; then - return - fi - file_count=$(ls -ahl $DATA_DIR/t_$table_id | grep cdclog | wc -l) - if [[ ! "$file_count" -eq "1" ]]; then - echo "$TEST_NAME failed, expect 1 row changed files, obtain $file_count" - return - fi - if [ ! -d $DATA_DIR/ddls ]; then - return - fi - ddl_file_count=$(ls -ahl $DATA_DIR/ddls | grep ddl | wc -l) - if [[ ! "$ddl_file_count" -eq "1" ]]; then - echo "$TEST_NAME failed, expect 1 ddl file, obtain $ddl_file_count" - return - fi - success=1 -} - -function cdclog_test() { - run_sql "drop database if exists $TEST_NAME" - run_sql "create database $TEST_NAME" - run_sql "create table $TEST_NAME.t1 (c0 int primary key, payload varchar(1024));" - run_sql "create table $TEST_NAME.t2 (c0 int primary key, payload varchar(1024));" - - run_sql "insert into $TEST_NAME.t1 values (1, 'a')" - run_sql "insert into $TEST_NAME.t1 values (2, 'b')" - - i=0 - while [ $i -lt 30 ]; do - check_cdclog - if [ "$success" == 1 ]; then - echo "check log successfully" - break - fi - i=$(($i + 1)) - echo "check log failed $i-th time, retry later" - sleep 2 - done - cleanup_process $CDC_BINARY -} - -trap stop_tidb_cluster EXIT -prepare $* -cdclog_test $* -check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/cdclog_s3/run.sh b/tests/cdclog_s3/run.sh deleted file mode 100644 index c72d1460705..00000000000 --- a/tests/cdclog_s3/run.sh +++ /dev/null @@ -1,118 +0,0 @@ -#!/bin/bash - -set -e - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test - -# start the s3 server -export MINIO_ACCESS_KEY=cdcs3accesskey -export MINIO_SECRET_KEY=cdcs3secretkey -export MINIO_BROWSER=off -export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY -export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY -export S3_ENDPOINT=127.0.0.1:24927 -rm -rf "$WORK_DIR" -mkdir -p "$WORK_DIR" -pkill -9 minio || true -bin/minio server --address $S3_ENDPOINT "$WORK_DIR/s3" & -MINIO_PID=$! -i=0 -while ! curl -o /dev/null -v -s "http://$S3_ENDPOINT/"; do - i=$(($i + 1)) - if [ $i -gt 7 ]; then - echo 'Failed to start minio' - exit 1 - fi - sleep 2 -done - -stop_minio() { - kill -2 $MINIO_PID -} - -stop() { - stop_minio - stop_tidb_cluster -} - -s3cmd --access_key=$MINIO_ACCESS_KEY --secret_key=$MINIO_SECRET_KEY --host=$S3_ENDPOINT --host-bucket=$S3_ENDPOINT --no-ssl mb s3://logbucket - -function prepare() { - stop_tidb_cluster - start_tidb_cluster --workdir $WORK_DIR - - cd $WORK_DIR - - # record tso before we create tables to skip the system table DDLs - start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) - - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - -} - -success=0 -function check_cdclog() { - DATA_DIR="$WORK_DIR/s3/logbucket/test" - # retrieve table id by log meta - if [ ! -f $DATA_DIR/log.meta ]; then - return - fi - table_id=$(cat $DATA_DIR/log.meta | jq | grep t1 | awk -F '"' '{print $2}') - if [ ! -d $DATA_DIR/t_$table_id ]; then - return - fi - file_count=$(ls -ahl $DATA_DIR/t_$table_id | grep cdclog | wc -l) - if [[ ! "$file_count" -eq "2" ]]; then - echo "$TEST_NAME failed, expect 2 row changed files, obtain $file_count" - return - fi - if [ ! -d $DATA_DIR/ddls ]; then - return - fi - ddl_file_count=$(ls -ahl $DATA_DIR/ddls | grep ddl | wc -l) - if [[ ! "$ddl_file_count" -eq "1" ]]; then - echo "$TEST_NAME failed, expect 1 ddl file, obtain $ddl_file_count" - return - fi - success=1 -} - -function cdclog_test() { - run_sql "drop database if exists $TEST_NAME" - run_sql "create database $TEST_NAME" - run_sql "create table $TEST_NAME.t1 (c0 int primary key, payload varchar(1024));" - - SINK_URI="s3://logbucket/test?endpoint=http://$S3_ENDPOINT/" - - run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" - - run_sql "create table $TEST_NAME.t2 (c0 int primary key, payload varchar(1024));" - - run_sql "insert into $TEST_NAME.t1 values (1, 'a')" - # because flush row changed events interval is 5 second - # so sleep 20 second will generate two files - sleep 20 - run_sql "insert into $TEST_NAME.t1 values (2, 'b')" - - i=0 - while [ $i -lt 30 ]; do - check_cdclog - if [ "$success" == 1 ]; then - echo "check log successfully" - break - fi - i=$(($i + 1)) - echo "check log failed $i-th time, retry later" - sleep 2 - done - cleanup_process $CDC_BINARY -} - -trap stop EXIT -prepare $* -cdclog_test $* -check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 23fa27752a058a2d1b2baf126a5dfb1d155f3735 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 1 Dec 2021 11:17:29 +0800 Subject: [PATCH 3/5] remove cdclog --- cdc/sink/cdclog/file.go | 397 -------------------------------------- cdc/sink/cdclog/s3.go | 403 --------------------------------------- cdc/sink/cdclog/utils.go | 252 ------------------------ 3 files changed, 1052 deletions(-) delete mode 100644 cdc/sink/cdclog/file.go delete mode 100644 cdc/sink/cdclog/s3.go delete mode 100644 cdc/sink/cdclog/utils.go diff --git a/cdc/sink/cdclog/file.go b/cdc/sink/cdclog/file.go deleted file mode 100644 index 6913b015a0b..00000000000 --- a/cdc/sink/cdclog/file.go +++ /dev/null @@ -1,397 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdclog - -import ( - "context" - "net/url" - "os" - "path/filepath" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/sink/codec" - cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/quotes" - parsemodel "github.com/pingcap/tidb/parser/model" - "github.com/uber-go/atomic" - "go.uber.org/zap" -) - -const ( - defaultDirMode = 0o755 - defaultFileMode = 0o644 - - defaultFileName = "cdclog" - - maxRowFileSize = 10 << 20 // TODO update -) - -type logPath struct { - root string - ddl string - meta string -} - -type tableStream struct { - dataCh chan *model.RowChangedEvent - rowFile *os.File - - encoder codec.EventBatchEncoder - - tableID int64 - sendEvents *atomic.Int64 - sendSize *atomic.Int64 -} - -func newTableStream(tableID int64) logUnit { - return &tableStream{ - tableID: tableID, - dataCh: make(chan *model.RowChangedEvent, defaultBufferChanSize), - - sendEvents: atomic.NewInt64(0), - sendSize: atomic.NewInt64(0), - } -} - -func (ts *tableStream) dataChan() chan *model.RowChangedEvent { - return ts.dataCh -} - -func (ts *tableStream) TableID() int64 { - return ts.tableID -} - -func (ts *tableStream) Events() *atomic.Int64 { - return ts.sendEvents -} - -func (ts *tableStream) Size() *atomic.Int64 { - return ts.sendSize -} - -func (ts *tableStream) isEmpty() bool { - return ts.sendEvents.Load() == 0 -} - -func (ts *tableStream) shouldFlush() bool { - // if sendSize > 5 MB or data chennal is full, flush it - return ts.sendSize.Load() > maxPartFlushSize || ts.sendEvents.Load() == defaultBufferChanSize -} - -func (ts *tableStream) flush(ctx context.Context, sink *logSink) error { - var fileName string - flushedEvents := ts.sendEvents.Load() - flushedSize := ts.sendSize.Load() - if flushedEvents == 0 { - log.Info("[flushTableStreams] no events to flush") - return nil - } - firstCreated := false - if ts.encoder == nil { - // create encoder for each file - ts.encoder = sink.encoder() - firstCreated = true - } - for event := int64(0); event < flushedEvents; event++ { - row := <-ts.dataCh - if event == flushedEvents-1 { - // the last event - fileName = makeTableFileName(row.CommitTs) - } - _, err := ts.encoder.AppendRowChangedEvent(row) - if err != nil { - return err - } - } - rowDatas := ts.encoder.MixedBuild(firstCreated) - defer func() { - if ts.encoder != nil { - ts.encoder.Reset() - } - }() - - log.Debug("[flushTableStreams] build cdc log data", - zap.Int64("table id", ts.tableID), - zap.Int64("flushed size", flushedSize), - zap.Int64("flushed event", flushedEvents), - zap.Int("encode size", len(rowDatas)), - zap.String("file name", fileName), - ) - - tableDir := filepath.Join(sink.root(), makeTableDirectoryName(ts.tableID)) - - if ts.rowFile == nil { - // create new file to append data - err := os.MkdirAll(tableDir, defaultDirMode) - if err != nil { - return err - } - file, err := os.OpenFile(filepath.Join(tableDir, defaultFileName), os.O_CREATE|os.O_WRONLY|os.O_APPEND, defaultFileMode) - if err != nil { - return err - } - ts.rowFile = file - } - - _, err := ts.rowFile.Write(rowDatas) - if err != nil { - return err - } - - stat, err := ts.rowFile.Stat() - if err != nil { - return err - } - - if stat.Size() > maxRowFileSize { - // rotate file - err := ts.rowFile.Close() - if err != nil { - return err - } - oldPath := filepath.Join(tableDir, defaultFileName) - newPath := filepath.Join(tableDir, fileName) - err = os.Rename(oldPath, newPath) - if err != nil { - return err - } - file, err := os.OpenFile(filepath.Join(tableDir, defaultFileName), os.O_CREATE|os.O_WRONLY|os.O_APPEND, defaultFileMode) - if err != nil { - return err - } - ts.rowFile = file - ts.encoder = nil - } - - ts.sendEvents.Sub(flushedEvents) - ts.sendSize.Sub(flushedSize) - return nil -} - -type fileSink struct { - *logSink - - logMeta *logMeta - logPath *logPath - - ddlFile *os.File - - ddlEncoder codec.EventBatchEncoder -} - -func (f *fileSink) flushLogMeta() error { - data, err := f.logMeta.Marshal() - if err != nil { - return cerror.WrapError(cerror.ErrMarshalFailed, err) - } - // FIXME: if initialize succeed, O_WRONLY is enough, but sometimes it will failed - tmpFileName := f.logPath.meta + ".tmp" - tmpFile, err := os.OpenFile(tmpFileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, defaultFileMode) - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkFileOp, err) - } - _, err = tmpFile.Write(data) - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkFileOp, err) - } - err = os.Rename(tmpFileName, f.logPath.meta) - return cerror.WrapError(cerror.ErrFileSinkFileOp, err) -} - -func (f *fileSink) createDDLFile(commitTs uint64) (*os.File, error) { - fileName := makeDDLFileName(commitTs) - file, err := os.OpenFile(filepath.Join(f.logPath.ddl, fileName), os.O_CREATE|os.O_WRONLY|os.O_APPEND, defaultFileMode) - if err != nil { - log.Error("[EmitDDLEvent] create ddl file failed", zap.Error(err)) - return nil, cerror.WrapError(cerror.ErrFileSinkFileOp, err) - } - return file, nil -} - -func (f *fileSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { - return f.emitRowChangedEvents(ctx, newTableStream, rows...) -} - -func (f *fileSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { - log.Debug("[FlushRowChangedEvents] enter", zap.Uint64("ts", resolvedTs)) - return f.flushRowChangedEvents(ctx, resolvedTs) -} - -func (f *fileSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { - log.Debug("[EmitCheckpointTs]", zap.Uint64("ts", ts)) - f.logMeta.GlobalResolvedTS = ts - return f.flushLogMeta() -} - -func (f *fileSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - switch ddl.Type { - case parsemodel.ActionCreateTable: - f.logMeta.Names[ddl.TableInfo.TableID] = quotes.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) - err := f.flushLogMeta() - if err != nil { - return err - } - case parsemodel.ActionRenameTable: - delete(f.logMeta.Names, ddl.PreTableInfo.TableID) - f.logMeta.Names[ddl.TableInfo.TableID] = quotes.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) - err := f.flushLogMeta() - if err != nil { - return err - } - } - firstCreated := false - if f.ddlEncoder == nil { - // create ddl encoder once for each ddl log file - f.ddlEncoder = f.encoder() - firstCreated = true - } - _, err := f.ddlEncoder.EncodeDDLEvent(ddl) - if err != nil { - return err - } - data := f.ddlEncoder.MixedBuild(firstCreated) - - defer func() { - if f.ddlEncoder != nil { - f.ddlEncoder.Reset() - } - }() - - if f.ddlFile == nil { - // create file stream - file, err := f.createDDLFile(ddl.CommitTs) - if err != nil { - return err - } - f.ddlFile = file - } - - stat, err := f.ddlFile.Stat() - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkFileOp, err) - } - - log.Debug("[EmitDDLEvent] current file stats", - zap.String("name", stat.Name()), - zap.Int64("size", stat.Size()), - zap.Int("data size", len(data)), - ) - - if stat.Size() > maxDDLFlushSize { - // rotate file - err = f.ddlFile.Close() - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkFileOp, err) - } - file, err := f.createDDLFile(ddl.CommitTs) - if err != nil { - return err - } - f.ddlFile = file - // reset ddl encoder for new file - f.ddlEncoder = nil - } - - _, err = f.ddlFile.Write(data) - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkFileOp, err) - } - return nil -} - -func (f *fileSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - if tableInfo != nil { - for _, table := range tableInfo { - if table != nil { - name := makeTableDirectoryName(table.TableID) - err := os.MkdirAll(filepath.Join(f.logPath.root, name), defaultDirMode) - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkCreateDir, err) - } - } - } - // update log meta to record the relationship about tableName and tableID - f.logMeta = makeLogMetaContent(tableInfo) - data, err := f.logMeta.Marshal() - if err != nil { - return cerror.WrapError(cerror.ErrMarshalFailed, err) - } - filePath := f.logPath.meta - if _, err := os.Stat(filePath); !os.IsNotExist(err) { - return cerror.WrapError(cerror.ErrFileSinkMetaAlreadyExists, err) - } - file, err := os.Create(filePath) - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkCreateDir, err) - } - _, err = file.Write(data) - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkFileOp, err) - } - } - return nil -} - -func (f *fileSink) Close(ctx context.Context) error { - return nil -} - -func (f *fileSink) Barrier(ctx context.Context, tableID model.TableID) error { - // Barrier does nothing because FlushRowChangedEvents in file sink has flushed - // all buffered events forcedlly. - return nil -} - -// NewLocalFileSink support log data to file. -func NewLocalFileSink(ctx context.Context, sinkURI *url.URL, errCh chan error) (*fileSink, error) { - log.Info("[NewLocalFileSink]", - zap.String("host", sinkURI.Host), - zap.String("path", sinkURI.Path), - ) - rootPath := sinkURI.Path + "/" - logPath := &logPath{ - root: rootPath, - meta: rootPath + logMetaFile, - ddl: rootPath + ddlEventsDir, - } - err := os.MkdirAll(logPath.ddl, defaultDirMode) - if err != nil { - log.Error("create ddl path failed", - zap.String("ddl path", logPath.ddl), - zap.Error(err)) - return nil, cerror.WrapError(cerror.ErrFileSinkCreateDir, err) - } - - f := &fileSink{ - logMeta: newLogMeta(), - logPath: logPath, - logSink: newLogSink(logPath.root, nil), - } - - // important! we should flush asynchronously in another goroutine - go func() { - if err := f.startFlush(ctx); err != nil && errors.Cause(err) != context.Canceled { - select { - case <-ctx.Done(): - return - case errCh <- err: - default: - log.Error("error channel is full", zap.Error(err)) - } - } - }() - return f, nil -} diff --git a/cdc/sink/cdclog/s3.go b/cdc/sink/cdclog/s3.go deleted file mode 100644 index 52726b60c9b..00000000000 --- a/cdc/sink/cdclog/s3.go +++ /dev/null @@ -1,403 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdclog - -import ( - "context" - "net/url" - "strings" - "time" - - "github.com/pingcap/errors" - backup "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/log" - "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/sink/codec" - cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/quotes" - "github.com/pingcap/tidb/br/pkg/storage" - parsemodel "github.com/pingcap/tidb/parser/model" - "github.com/uber-go/atomic" - "go.uber.org/zap" -) - -const ( - maxPartFlushSize = 5 << 20 // The minimal multipart upload size is 5Mb. - maxCompletePartSize = 100 << 20 // rotate row changed event file if one complete file larger than 100Mb - maxDDLFlushSize = 10 << 20 // rotate ddl event file if one complete file larger than 10Mb - - defaultBufferChanSize = 20480 - defaultFlushRowChangedEventDuration = 5 * time.Second // TODO make it as a config -) - -type tableBuffer struct { - // for log - tableID int64 - dataCh chan *model.RowChangedEvent - sendSize *atomic.Int64 - sendEvents *atomic.Int64 - - encoder codec.EventBatchEncoder - - uploadParts struct { - writer storage.ExternalFileWriter - uploadNum int - byteSize int64 - } -} - -func (tb *tableBuffer) dataChan() chan *model.RowChangedEvent { - return tb.dataCh -} - -func (tb *tableBuffer) TableID() int64 { - return tb.tableID -} - -func (tb *tableBuffer) Events() *atomic.Int64 { - return tb.sendEvents -} - -func (tb *tableBuffer) Size() *atomic.Int64 { - return tb.sendSize -} - -func (tb *tableBuffer) isEmpty() bool { - return tb.sendEvents.Load() == 0 && tb.uploadParts.uploadNum == 0 -} - -func (tb *tableBuffer) shouldFlush() bool { - // if sendSize > 5 MB or data chennal is full, flush it - return tb.sendSize.Load() > maxPartFlushSize || tb.sendEvents.Load() == defaultBufferChanSize -} - -func (tb *tableBuffer) flush(ctx context.Context, sink *logSink) error { - hashPart := tb.uploadParts - sendEvents := tb.sendEvents.Load() - if sendEvents == 0 && hashPart.uploadNum == 0 { - log.Info("nothing to flush", zap.Int64("tableID", tb.tableID)) - return nil - } - - firstCreated := false - if tb.encoder == nil { - // create encoder for each file - tb.encoder = sink.encoder() - firstCreated = true - } - - var newFileName string - flushedSize := int64(0) - for event := int64(0); event < sendEvents; event++ { - row := <-tb.dataCh - flushedSize += row.ApproximateDataSize - if event == sendEvents-1 { - // if last event, we record ts as new rotate file name - newFileName = makeTableFileObject(row.Table.TableID, row.CommitTs) - } - _, err := tb.encoder.AppendRowChangedEvent(row) - if err != nil { - return err - } - } - rowDatas := tb.encoder.MixedBuild(firstCreated) - // reset encoder buf for next round append - defer func() { - if tb.encoder != nil { - tb.encoder.Reset() - } - }() - - log.Debug("[FlushRowChangedEvents[Debug]] flush table buffer", - zap.Int64("table", tb.tableID), - zap.Int64("event size", sendEvents), - zap.Int("row data size", len(rowDatas)), - zap.Int("upload num", hashPart.uploadNum), - zap.Int64("upload byte size", hashPart.byteSize), - // zap.ByteString("rowDatas", rowDatas), - ) - - if len(rowDatas) > maxPartFlushSize || hashPart.uploadNum > 0 { - // S3 multi-upload need every chunk(except the last one) is greater than 5Mb - // so, if this batch data size is greater than 5Mb or it has uploadPart already - // we will use multi-upload this batch data - if len(rowDatas) > 0 { - if hashPart.writer == nil { - fileWriter, err := sink.storage().Create(ctx, newFileName) - if err != nil { - return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err) - } - hashPart.writer = fileWriter - } - - _, err := hashPart.writer.Write(ctx, rowDatas) - if err != nil { - return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err) - } - - hashPart.byteSize += int64(len(rowDatas)) - hashPart.uploadNum++ - } - - if hashPart.byteSize > maxCompletePartSize || len(rowDatas) <= maxPartFlushSize { - // we need do complete when total upload size is greater than 100Mb - // or this part data is less than 5Mb to avoid meet EntityTooSmall error - log.Info("[FlushRowChangedEvents] complete file", zap.Int64("tableID", tb.tableID)) - err := hashPart.writer.Close(ctx) - if err != nil { - return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err) - } - hashPart.byteSize = 0 - hashPart.uploadNum = 0 - hashPart.writer = nil - tb.encoder = nil - } - } else { - // generate normal file because S3 multi-upload need every part at least 5Mb. - log.Info("[FlushRowChangedEvents] normal upload file", zap.Int64("tableID", tb.tableID)) - err := sink.storage().WriteFile(ctx, newFileName, rowDatas) - if err != nil { - return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err) - } - tb.encoder = nil - } - - tb.sendEvents.Sub(sendEvents) - tb.sendSize.Sub(flushedSize) - tb.uploadParts = hashPart - return nil -} - -func newTableBuffer(tableID int64) logUnit { - return &tableBuffer{ - tableID: tableID, - dataCh: make(chan *model.RowChangedEvent, defaultBufferChanSize), - sendSize: atomic.NewInt64(0), - sendEvents: atomic.NewInt64(0), - uploadParts: struct { - writer storage.ExternalFileWriter - uploadNum int - byteSize int64 - }{ - writer: nil, - uploadNum: 0, - byteSize: 0, - }, - } -} - -type s3Sink struct { - *logSink - - prefix string - - storage storage.ExternalStorage - - logMeta *logMeta - - // hold encoder for ddl event log - ddlEncoder codec.EventBatchEncoder -} - -func (s *s3Sink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { - return s.emitRowChangedEvents(ctx, newTableBuffer, rows...) -} - -func (s *s3Sink) flushLogMeta(ctx context.Context) error { - data, err := s.logMeta.Marshal() - if err != nil { - return cerror.WrapError(cerror.ErrMarshalFailed, err) - } - return cerror.WrapError(cerror.ErrS3SinkWriteStorage, s.storage.WriteFile(ctx, logMetaFile, data)) -} - -func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { - // we should flush all events before resolvedTs, there are two kind of flush policy - // 1. flush row events to a s3 chunk: if the event size is not enough, - // TODO: when cdc crashed, we should repair these chunks to a complete file - // 2. flush row events to a complete s3 file: if the event size is enough - return s.flushRowChangedEvents(ctx, resolvedTs) -} - -// EmitCheckpointTs update the global resolved ts in log meta -// sleep 5 seconds to avoid update too frequently -func (s *s3Sink) EmitCheckpointTs(ctx context.Context, ts uint64) error { - s.logMeta.GlobalResolvedTS = ts - return s.flushLogMeta(ctx) -} - -// EmitDDLEvent write ddl event to S3 directory, all events split by '\n' -// Because S3 doesn't support append-like write. -// we choose a hack way to read origin file then write in place. -func (s *s3Sink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - switch ddl.Type { - case parsemodel.ActionCreateTable: - s.logMeta.Names[ddl.TableInfo.TableID] = quotes.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) - err := s.flushLogMeta(ctx) - if err != nil { - return err - } - case parsemodel.ActionRenameTable: - delete(s.logMeta.Names, ddl.PreTableInfo.TableID) - s.logMeta.Names[ddl.TableInfo.TableID] = quotes.QuoteSchema(ddl.TableInfo.Schema, ddl.TableInfo.Table) - err := s.flushLogMeta(ctx) - if err != nil { - return err - } - } - firstCreated := false - if s.ddlEncoder == nil { - s.ddlEncoder = s.encoder() - firstCreated = true - } - // reset encoder buf for next round append - defer s.ddlEncoder.Reset() - - var ( - name string - size int64 - fileData []byte - ) - opt := &storage.WalkOption{ - SubDir: ddlEventsDir, - ListCount: 1, - } - err := s.storage.WalkDir(ctx, opt, func(key string, fileSize int64) error { - log.Debug("[EmitDDLEvent] list content from s3", - zap.String("key", key), - zap.Int64("size", size), - zap.Any("ddl", ddl)) - name = strings.ReplaceAll(key, s.prefix, "") - size = fileSize - return nil - }) - if err != nil { - return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err) - } - - // only reboot and (size = 0 or size >= maxRowFileSize) should we add version to s3 - withVersion := firstCreated && (size == 0 || size >= maxDDLFlushSize) - - // clean ddlEncoder version part - // if we reboot cdc and size between (0, maxDDLFlushSize), we should skip version part in - // JSONEventBatchEncoder.keyBuf, JSONEventBatchEncoder consturctor func has - // alreay filled with version part, see NewJSONEventBatchEncoder and - // JSONEventBatchEncoder.MixedBuild - if firstCreated && size > 0 && size < maxDDLFlushSize { - s.ddlEncoder.Reset() - } - - _, er := s.ddlEncoder.EncodeDDLEvent(ddl) - if er != nil { - return er - } - - data := s.ddlEncoder.MixedBuild(withVersion) - - if size == 0 || size >= maxDDLFlushSize { - // no ddl file exists or - // exists file is oversized. we should generate a new file - fileData = data - name = makeDDLFileObject(ddl.CommitTs) - log.Debug("[EmitDDLEvent] create first or rotate ddl log", - zap.String("name", name), zap.Any("ddl", ddl)) - if size > maxDDLFlushSize { - // reset ddl encoder for new file - s.ddlEncoder = nil - } - } else { - // hack way: append data to old file - log.Debug("[EmitDDLEvent] append ddl to origin log", - zap.String("name", name), zap.Any("ddl", ddl)) - fileData, err = s.storage.ReadFile(ctx, name) - if err != nil { - return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err) - } - fileData = append(fileData, data...) - } - return s.storage.WriteFile(ctx, name, fileData) -} - -func (s *s3Sink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - if tableInfo != nil { - // update log meta to record the relationship about tableName and tableID - s.logMeta = makeLogMetaContent(tableInfo) - - data, err := s.logMeta.Marshal() - if err != nil { - return cerror.WrapError(cerror.ErrMarshalFailed, err) - } - return s.storage.WriteFile(ctx, logMetaFile, data) - } - return nil -} - -func (s *s3Sink) Close(ctx context.Context) error { - return nil -} - -func (s *s3Sink) Barrier(ctx context.Context, tableID model.TableID) error { - // Barrier does nothing because FlushRowChangedEvents in s3 sink has flushed - // all buffered events forcedlly. - return nil -} - -// NewS3Sink creates new sink support log data to s3 directly -func NewS3Sink(ctx context.Context, sinkURI *url.URL, errCh chan error) (*s3Sink, error) { - if len(sinkURI.Host) == 0 { - return nil, errors.Errorf("please specify the bucket for s3 in %s", sinkURI) - } - prefix := strings.Trim(sinkURI.Path, "/") - s3 := &backup.S3{Bucket: sinkURI.Host, Prefix: prefix} - options := &storage.BackendOptions{} - storage.ExtractQueryParameters(sinkURI, &options.S3) - if err := options.S3.Apply(s3); err != nil { - return nil, cerror.WrapError(cerror.ErrS3SinkInitialize, err) - } - // we should set this to true, since br set it by default in parseBackend - s3.ForcePathStyle = true - backend := &backup.StorageBackend{ - Backend: &backup.StorageBackend_S3{S3: s3}, - } - s3storage, err := storage.New(ctx, backend, &storage.ExternalStorageOptions{ - SendCredentials: false, - SkipCheckPath: true, - HTTPClient: nil, - }) - if err != nil { - return nil, cerror.WrapError(cerror.ErrS3SinkInitialize, err) - } - - s := &s3Sink{ - prefix: prefix, - storage: s3storage, - logMeta: newLogMeta(), - logSink: newLogSink("", s3storage), - } - - // important! we should flush asynchronously in another goroutine - go func() { - if err := s.startFlush(ctx); err != nil && errors.Cause(err) != context.Canceled { - select { - case <-ctx.Done(): - return - case errCh <- err: - default: - log.Error("error channel is full", zap.Error(err)) - } - } - }() - - return s, nil -} diff --git a/cdc/sink/cdclog/utils.go b/cdc/sink/cdclog/utils.go deleted file mode 100644 index 9dd3621b0af..00000000000 --- a/cdc/sink/cdclog/utils.go +++ /dev/null @@ -1,252 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdclog - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "time" - - "github.com/pingcap/log" - "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/sink/codec" - "github.com/pingcap/ticdc/pkg/quotes" - "github.com/pingcap/tidb/br/pkg/storage" - "github.com/uber-go/atomic" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -const ( - tablePrefix = "t_" - logMetaFile = "log.meta" - - ddlEventsDir = "ddls" - ddlEventsPrefix = "ddl" - - maxUint64 = ^uint64(0) -) - -type logUnit interface { - TableID() int64 - Events() *atomic.Int64 - Size() *atomic.Int64 - - dataChan() chan *model.RowChangedEvent - - isEmpty() bool - shouldFlush() bool - // flush data to storage. - flush(ctx context.Context, sink *logSink) error -} - -type logSink struct { - notifyChan chan []logUnit - notifyWaitChan chan struct{} - - encoder func() codec.EventBatchEncoder - units []logUnit - - // file sink use - rootPath string - // s3 sink use - storagePath storage.ExternalStorage - - hashMap sync.Map -} - -func newLogSink(root string, storage storage.ExternalStorage) *logSink { - return &logSink{ - notifyChan: make(chan []logUnit), - notifyWaitChan: make(chan struct{}), - encoder: func() codec.EventBatchEncoder { - ret := codec.NewJSONEventBatchEncoder() - ret.(*codec.JSONEventBatchEncoder).SetMixedBuildSupport(true) - return ret - }, - units: make([]logUnit, 0), - rootPath: root, - storagePath: storage, - } -} - -// s3Sink need this -func (l *logSink) storage() storage.ExternalStorage { - return l.storagePath -} - -// fileSink need this -func (l *logSink) root() string { - return l.rootPath -} - -func (l *logSink) startFlush(ctx context.Context) error { - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - log.Info("[startFlush] log sink stopped") - return ctx.Err() - case needFlushedUnits := <-l.notifyChan: - // try specify buffers - eg, ectx := errgroup.WithContext(ctx) - for _, u := range needFlushedUnits { - uReplica := u - eg.Go(func() error { - log.Info("start Flush asynchronously to storage by caller", - zap.Int64("table id", uReplica.TableID()), - zap.Int64("size", uReplica.Size().Load()), - zap.Int64("event count", uReplica.Events().Load()), - ) - return uReplica.flush(ectx, l) - }) - } - if err := eg.Wait(); err != nil { - return err - } - // tell flush goroutine this time flush finished - l.notifyWaitChan <- struct{}{} - - case <-ticker.C: - // try all tableBuffers - eg, ectx := errgroup.WithContext(ctx) - for _, u := range l.units { - uReplica := u - if u.shouldFlush() { - eg.Go(func() error { - log.Info("start Flush asynchronously to storage", - zap.Int64("table id", uReplica.TableID()), - zap.Int64("size", uReplica.Size().Load()), - zap.Int64("event count", uReplica.Events().Load()), - ) - return uReplica.flush(ectx, l) - }) - } - } - if err := eg.Wait(); err != nil { - return err - } - } - } -} - -func (l *logSink) emitRowChangedEvents(ctx context.Context, newUnit func(int64) logUnit, rows ...*model.RowChangedEvent) error { - for _, row := range rows { - // dispatch row event by tableID - tableID := row.Table.GetTableID() - var ( - ok bool - item interface{} - hash int - ) - if item, ok = l.hashMap.Load(tableID); !ok { - // found new tableID - l.units = append(l.units, newUnit(tableID)) - hash = len(l.units) - 1 - l.hashMap.Store(tableID, hash) - } else { - hash = item.(int) - } - select { - case <-ctx.Done(): - return ctx.Err() - case l.units[hash].dataChan() <- row: - l.units[hash].Size().Add(row.ApproximateDataSize) - l.units[hash].Events().Inc() - } - } - return nil -} - -func (l *logSink) flushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { - // TODO update flush policy with size - select { - case <-ctx.Done(): - return 0, ctx.Err() - - default: - needFlushedUnits := make([]logUnit, 0, len(l.units)) - for _, u := range l.units { - if !u.isEmpty() { - needFlushedUnits = append(needFlushedUnits, u) - } - } - if len(needFlushedUnits) > 0 { - select { - case <-ctx.Done(): - return 0, ctx.Err() - - case <-time.After(defaultFlushRowChangedEventDuration): - // cannot accumulate enough row events in 5 second - // call flushed worker to flush - l.notifyChan <- needFlushedUnits - // wait flush worker finished - <-l.notifyWaitChan - } - } - } - return resolvedTs, nil -} - -type logMeta struct { - Names map[int64]string `json:"names"` - GlobalResolvedTS uint64 `json:"global_resolved_ts"` -} - -func newLogMeta() *logMeta { - return &logMeta{ - Names: make(map[int64]string), - } -} - -// Marshal saves logMeta -func (l *logMeta) Marshal() ([]byte, error) { - return json.Marshal(l) -} - -func makeTableDirectoryName(tableID int64) string { - return fmt.Sprintf("%s%d", tablePrefix, tableID) -} - -func makeTableFileObject(tableID int64, commitTS uint64) string { - return fmt.Sprintf("%s%d/%s", tablePrefix, tableID, makeTableFileName(commitTS)) -} - -func makeTableFileName(commitTS uint64) string { - return fmt.Sprintf("cdclog.%d", commitTS) -} - -func makeLogMetaContent(tableInfos []*model.SimpleTableInfo) *logMeta { - meta := new(logMeta) - names := make(map[int64]string) - for _, table := range tableInfos { - if table != nil { - log.Info("[makeLogMetaContent]", zap.Reflect("table", table)) - names[table.TableID] = quotes.QuoteSchema(table.Schema, table.Table) - } - } - meta.Names = names - return meta -} - -func makeDDLFileObject(commitTS uint64) string { - return fmt.Sprintf("%s/%s", ddlEventsDir, makeDDLFileName(commitTS)) -} - -func makeDDLFileName(commitTS uint64) string { - return fmt.Sprintf("%s.%d", ddlEventsPrefix, maxUint64-commitTS) -} From 84257f3c669f48ef9fdb890c5fca5e4f69c40bcc Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 1 Dec 2021 11:21:50 +0800 Subject: [PATCH 4/5] remove cdclog related code. --- pkg/errors/errors.go | 97 +++++++++++++++++++---------------------- proto/generate-proto.sh | 1 - 2 files changed, 45 insertions(+), 53 deletions(-) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 2e57c7930bb..e5aecb304a4 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -68,58 +68,51 @@ var ( ErrCreateMarkTableFailed = errors.Normalize("create mark table failed", errors.RFCCodeText("CDC:ErrCreateMarkTableFailed")) // sink related errors - ErrExecDDLFailed = errors.Normalize("exec DDL failed", errors.RFCCodeText("CDC:ErrExecDDLFailed")) - ErrEmitCheckpointTsFailed = errors.Normalize("emit checkpoint ts failed", errors.RFCCodeText("CDC:ErrEmitCheckpointTsFailed")) - ErrDDLEventIgnored = errors.Normalize("ddl event is ignored", errors.RFCCodeText("CDC:ErrDDLEventIgnored")) - ErrKafkaSendMessage = errors.Normalize("kafka send message failed", errors.RFCCodeText("CDC:ErrKafkaSendMessage")) - ErrKafkaAsyncSendMessage = errors.Normalize("kafka async send message failed", errors.RFCCodeText("CDC:ErrKafkaAsyncSendMessage")) - ErrKafkaFlushUnfinished = errors.Normalize("flush not finished before producer close", errors.RFCCodeText("CDC:ErrKafkaFlushUnfinished")) - ErrKafkaInvalidPartitionNum = errors.Normalize("invalid partition num %d", errors.RFCCodeText("CDC:ErrKafkaInvalidPartitionNum")) - ErrKafkaNewSaramaProducer = errors.Normalize("new sarama producer", errors.RFCCodeText("CDC:ErrKafkaNewSaramaProducer")) - ErrKafkaInvalidClientID = errors.Normalize("invalid kafka client ID '%s'", errors.RFCCodeText("CDC:ErrKafkaInvalidClientID")) - ErrKafkaInvalidVersion = errors.Normalize("invalid kafka version", errors.RFCCodeText("CDC:ErrKafkaInvalidVersion")) - ErrPulsarNewProducer = errors.Normalize("new pulsar producer", errors.RFCCodeText("CDC:ErrPulsarNewProducer")) - ErrPulsarSendMessage = errors.Normalize("pulsar send message failed", errors.RFCCodeText("CDC:ErrPulsarSendMessage")) - ErrFileSinkCreateDir = errors.Normalize("file sink create dir", errors.RFCCodeText("CDC:ErrFileSinkCreateDir")) - ErrFileSinkFileOp = errors.Normalize("file sink file operation", errors.RFCCodeText("CDC:ErrFileSinkFileOp")) - ErrRedoConfigInvalid = errors.Normalize("redo log config invalid", errors.RFCCodeText("CDC:ErrRedoConfigInvalid")) - ErrRedoDownloadFailed = errors.Normalize("redo log down load to local failed", errors.RFCCodeText("CDC:ErrRedoDownloadFailed")) - ErrRedoWriterStopped = errors.Normalize("redo log writer stopped", errors.RFCCodeText("CDC:ErrRedoWriterStopped")) - ErrRedoFileOp = errors.Normalize("redo file operation", errors.RFCCodeText("CDC:ErrRedoFileOp")) - ErrRedoMetaFileNotFound = errors.Normalize("no redo meta file found in dir: %s", errors.RFCCodeText("CDC:ErrRedoMetaFileNotFound")) - ErrRedoMetaInitialize = errors.Normalize("initialize meta for redo log", errors.RFCCodeText("CDC:ErrRedoMetaInitialize")) - ErrFileSizeExceed = errors.Normalize("rawData size %d exceeds maximum file size %d", errors.RFCCodeText("CDC:ErrFileSizeExceed")) - ErrFileSinkMetaAlreadyExists = errors.Normalize("file sink meta file already exists", errors.RFCCodeText("CDC:ErrFileSinkMetaAlreadyExists")) - ErrS3SinkWriteStorage = errors.Normalize("write to storage", errors.RFCCodeText("CDC:ErrS3SinkWriteStorage")) - ErrS3SinkInitialize = errors.Normalize("new s3 sink", errors.RFCCodeText("CDC:ErrS3SinkInitialize")) - ErrS3SinkStorageAPI = errors.Normalize("s3 sink storage api", errors.RFCCodeText("CDC:ErrS3SinkStorageAPI")) - ErrS3StorageAPI = errors.Normalize("s3 storage api", errors.RFCCodeText("CDC:ErrS3StorageAPI")) - ErrS3StorageInitialize = errors.Normalize("new s3 storage for redo log", errors.RFCCodeText("CDC:ErrS3StorageInitialize")) - ErrPrepareAvroFailed = errors.Normalize("prepare avro failed", errors.RFCCodeText("CDC:ErrPrepareAvroFailed")) - ErrAsyncBroadcastNotSupport = errors.Normalize("Async broadcasts not supported", errors.RFCCodeText("CDC:ErrAsyncBroadcastNotSupport")) - ErrKafkaInvalidConfig = errors.Normalize("kafka config invalid", errors.RFCCodeText("CDC:ErrKafkaInvalidConfig")) - ErrSinkURIInvalid = errors.Normalize("sink uri invalid", errors.RFCCodeText("CDC:ErrSinkURIInvalid")) - ErrMySQLTxnError = errors.Normalize("MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError")) - ErrMySQLQueryError = errors.Normalize("MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError")) - ErrMySQLConnectionError = errors.Normalize("MySQL connection error", errors.RFCCodeText("CDC:ErrMySQLConnectionError")) - ErrMySQLInvalidConfig = errors.Normalize("MySQL config invalid", errors.RFCCodeText("CDC:ErrMySQLInvalidConfig")) - ErrMySQLWorkerPanic = errors.Normalize("MySQL worker panic", errors.RFCCodeText("CDC:ErrMySQLWorkerPanic")) - ErrAvroToEnvelopeError = errors.Normalize("to envelope failed", errors.RFCCodeText("CDC:ErrAvroToEnvelopeError")) - ErrAvroUnknownType = errors.Normalize("unknown type for Avro: %v", errors.RFCCodeText("CDC:ErrAvroUnknownType")) - ErrAvroMarshalFailed = errors.Normalize("json marshal failed", errors.RFCCodeText("CDC:ErrAvroMarshalFailed")) - ErrAvroEncodeFailed = errors.Normalize("encode to avro native data", errors.RFCCodeText("CDC:ErrAvroEncodeFailed")) - ErrAvroEncodeToBinary = errors.Normalize("encode to binray from native", errors.RFCCodeText("CDC:ErrAvroEncodeToBinary")) - ErrAvroSchemaAPIError = errors.Normalize("schema manager API error", errors.RFCCodeText("CDC:ErrAvroSchemaAPIError")) - ErrMaxwellEncodeFailed = errors.Normalize("maxwell encode failed", errors.RFCCodeText("CDC:ErrMaxwellEncodeFailed")) - ErrMaxwellDecodeFailed = errors.Normalize("maxwell decode failed", errors.RFCCodeText("CDC:ErrMaxwellDecodeFailed")) - ErrMaxwellInvalidData = errors.Normalize("maxwell invalid data", errors.RFCCodeText("CDC:ErrMaxwellInvalidData")) - ErrJSONCodecInvalidData = errors.Normalize("json codec invalid data", errors.RFCCodeText("CDC:ErrJSONCodecInvalidData")) - ErrJSONCodecRowTooLarge = errors.Normalize("json codec single row too large", errors.RFCCodeText("CDC:ErrJSONCodecRowTooLarge")) - ErrCanalDecodeFailed = errors.Normalize("canal decode failed", errors.RFCCodeText("CDC:ErrCanalDecodeFailed")) - ErrCanalEncodeFailed = errors.Normalize("canal encode failed", errors.RFCCodeText("CDC:ErrCanalEncodeFailed")) - ErrOldValueNotEnabled = errors.Normalize("old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled")) - ErrSinkInvalidConfig = errors.Normalize("sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig")) - ErrCraftCodecInvalidData = errors.Normalize("craft codec invalid data", errors.RFCCodeText("CDC:ErrCraftCodecInvalidData")) + ErrExecDDLFailed = errors.Normalize("exec DDL failed", errors.RFCCodeText("CDC:ErrExecDDLFailed")) + ErrDDLEventIgnored = errors.Normalize("ddl event is ignored", errors.RFCCodeText("CDC:ErrDDLEventIgnored")) + ErrKafkaSendMessage = errors.Normalize("kafka send message failed", errors.RFCCodeText("CDC:ErrKafkaSendMessage")) + ErrKafkaAsyncSendMessage = errors.Normalize("kafka async send message failed", errors.RFCCodeText("CDC:ErrKafkaAsyncSendMessage")) + ErrKafkaFlushUnfinished = errors.Normalize("flush not finished before producer close", errors.RFCCodeText("CDC:ErrKafkaFlushUnfinished")) + ErrKafkaInvalidPartitionNum = errors.Normalize("invalid partition num %d", errors.RFCCodeText("CDC:ErrKafkaInvalidPartitionNum")) + ErrKafkaNewSaramaProducer = errors.Normalize("new sarama producer", errors.RFCCodeText("CDC:ErrKafkaNewSaramaProducer")) + ErrKafkaInvalidClientID = errors.Normalize("invalid kafka client ID '%s'", errors.RFCCodeText("CDC:ErrKafkaInvalidClientID")) + ErrKafkaInvalidVersion = errors.Normalize("invalid kafka version", errors.RFCCodeText("CDC:ErrKafkaInvalidVersion")) + ErrPulsarNewProducer = errors.Normalize("new pulsar producer", errors.RFCCodeText("CDC:ErrPulsarNewProducer")) + ErrPulsarSendMessage = errors.Normalize("pulsar send message failed", errors.RFCCodeText("CDC:ErrPulsarSendMessage")) + ErrRedoConfigInvalid = errors.Normalize("redo log config invalid", errors.RFCCodeText("CDC:ErrRedoConfigInvalid")) + ErrRedoDownloadFailed = errors.Normalize("redo log down load to local failed", errors.RFCCodeText("CDC:ErrRedoDownloadFailed")) + ErrRedoWriterStopped = errors.Normalize("redo log writer stopped", errors.RFCCodeText("CDC:ErrRedoWriterStopped")) + ErrRedoFileOp = errors.Normalize("redo file operation", errors.RFCCodeText("CDC:ErrRedoFileOp")) + ErrRedoMetaFileNotFound = errors.Normalize("no redo meta file found in dir: %s", errors.RFCCodeText("CDC:ErrRedoMetaFileNotFound")) + ErrRedoMetaInitialize = errors.Normalize("initialize meta for redo log", errors.RFCCodeText("CDC:ErrRedoMetaInitialize")) + ErrFileSizeExceed = errors.Normalize("rawData size %d exceeds maximum file size %d", errors.RFCCodeText("CDC:ErrFileSizeExceed")) + ErrS3StorageAPI = errors.Normalize("s3 storage api", errors.RFCCodeText("CDC:ErrS3StorageAPI")) + ErrS3StorageInitialize = errors.Normalize("new s3 storage for redo log", errors.RFCCodeText("CDC:ErrS3StorageInitialize")) + ErrPrepareAvroFailed = errors.Normalize("prepare avro failed", errors.RFCCodeText("CDC:ErrPrepareAvroFailed")) + ErrAsyncBroadcastNotSupport = errors.Normalize("Async broadcasts not supported", errors.RFCCodeText("CDC:ErrAsyncBroadcastNotSupport")) + ErrKafkaInvalidConfig = errors.Normalize("kafka config invalid", errors.RFCCodeText("CDC:ErrKafkaInvalidConfig")) + ErrSinkURIInvalid = errors.Normalize("sink uri invalid", errors.RFCCodeText("CDC:ErrSinkURIInvalid")) + ErrMySQLTxnError = errors.Normalize("MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError")) + ErrMySQLQueryError = errors.Normalize("MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError")) + ErrMySQLConnectionError = errors.Normalize("MySQL connection error", errors.RFCCodeText("CDC:ErrMySQLConnectionError")) + ErrMySQLInvalidConfig = errors.Normalize("MySQL config invalid", errors.RFCCodeText("CDC:ErrMySQLInvalidConfig")) + ErrMySQLWorkerPanic = errors.Normalize("MySQL worker panic", errors.RFCCodeText("CDC:ErrMySQLWorkerPanic")) + ErrAvroToEnvelopeError = errors.Normalize("to envelope failed", errors.RFCCodeText("CDC:ErrAvroToEnvelopeError")) + ErrAvroUnknownType = errors.Normalize("unknown type for Avro: %v", errors.RFCCodeText("CDC:ErrAvroUnknownType")) + ErrAvroMarshalFailed = errors.Normalize("json marshal failed", errors.RFCCodeText("CDC:ErrAvroMarshalFailed")) + ErrAvroEncodeFailed = errors.Normalize("encode to avro native data", errors.RFCCodeText("CDC:ErrAvroEncodeFailed")) + ErrAvroEncodeToBinary = errors.Normalize("encode to binray from native", errors.RFCCodeText("CDC:ErrAvroEncodeToBinary")) + ErrAvroSchemaAPIError = errors.Normalize("schema manager API error", errors.RFCCodeText("CDC:ErrAvroSchemaAPIError")) + ErrMaxwellEncodeFailed = errors.Normalize("maxwell encode failed", errors.RFCCodeText("CDC:ErrMaxwellEncodeFailed")) + ErrMaxwellDecodeFailed = errors.Normalize("maxwell decode failed", errors.RFCCodeText("CDC:ErrMaxwellDecodeFailed")) + ErrMaxwellInvalidData = errors.Normalize("maxwell invalid data", errors.RFCCodeText("CDC:ErrMaxwellInvalidData")) + ErrJSONCodecInvalidData = errors.Normalize("json codec invalid data", errors.RFCCodeText("CDC:ErrJSONCodecInvalidData")) + ErrJSONCodecRowTooLarge = errors.Normalize("json codec single row too large", errors.RFCCodeText("CDC:ErrJSONCodecRowTooLarge")) + ErrCanalDecodeFailed = errors.Normalize("canal decode failed", errors.RFCCodeText("CDC:ErrCanalDecodeFailed")) + ErrCanalEncodeFailed = errors.Normalize("canal encode failed", errors.RFCCodeText("CDC:ErrCanalEncodeFailed")) + ErrOldValueNotEnabled = errors.Normalize("old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled")) + ErrSinkInvalidConfig = errors.Normalize("sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig")) + ErrCraftCodecInvalidData = errors.Normalize("craft codec invalid data", errors.RFCCodeText("CDC:ErrCraftCodecInvalidData")) // utilities related errors ErrToTLSConfigFailed = errors.Normalize("generate tls config failed", errors.RFCCodeText("CDC:ErrToTLSConfigFailed")) diff --git a/proto/generate-proto.sh b/proto/generate-proto.sh index 80a438b083f..d7bc9d8b2df 100755 --- a/proto/generate-proto.sh +++ b/proto/generate-proto.sh @@ -3,7 +3,6 @@ echo "generate canal & craft benchmark protocol code..." [ ! -d ./canal ] && mkdir ./canal -[ ! -d ./cdclog ] && mkdir ./cdclog [ ! -d ./benchmark ] && mkdir ./benchmark protoc --gofast_out=./canal EntryProtocol.proto From d060014eff26760aa9679c3bbc50eb56ec5aaaa8 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 1 Dec 2021 13:41:13 +0800 Subject: [PATCH 5/5] fix make check. --- errors.toml | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/errors.toml b/errors.toml index fc5f5ed16b1..8d9626d4428 100755 --- a/errors.toml +++ b/errors.toml @@ -216,11 +216,6 @@ error = ''' decode row data to datum failed ''' -["CDC:ErrEmitCheckpointTsFailed"] -error = ''' -emit checkpoint ts failed -''' - ["CDC:ErrEncodeFailed"] error = ''' encode failed: %s @@ -266,21 +261,6 @@ error = ''' can't find handle column, please check if the pk is handle ''' -["CDC:ErrFileSinkCreateDir"] -error = ''' -file sink create dir -''' - -["CDC:ErrFileSinkFileOp"] -error = ''' -file sink file operation -''' - -["CDC:ErrFileSinkMetaAlreadyExists"] -error = ''' -file sink meta file already exists -''' - ["CDC:ErrFileSizeExceed"] error = ''' rawData size %d exceeds maximum file size %d @@ -756,21 +736,6 @@ error = ''' resolve locks failed ''' -["CDC:ErrS3SinkInitialize"] -error = ''' -new s3 sink -''' - -["CDC:ErrS3SinkStorageAPI"] -error = ''' -s3 sink storage api -''' - -["CDC:ErrS3SinkWriteStorage"] -error = ''' -write to storage -''' - ["CDC:ErrS3StorageAPI"] error = ''' s3 storage api