diff --git a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go index 081098b6ae9..e86ccf2a008 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go @@ -80,7 +80,7 @@ type dmlSink struct { alive struct { sync.RWMutex // msgCh is a channel to hold eventFragment. - msgCh chan eventFragment + msgCh *chann.DrainableChann[eventFragment] isDead bool } @@ -140,7 +140,7 @@ func NewCloudStorageSink( cancel: wgCancel, dead: make(chan struct{}), } - s.alive.msgCh = make(chan eventFragment, defaultChannelSize) + s.alive.msgCh = chann.NewDrainableChann[eventFragment]() encodedCh := make(chan eventFragment, defaultChannelSize) workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount) @@ -148,7 +148,7 @@ func NewCloudStorageSink( // create a group of encoding workers. for i := 0; i < defaultEncodingConcurrency; i++ { encoder := encoderBuilder.Build() - s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh, encodedCh) + s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh.Out(), encodedCh) } // create defragmenter. s.defragmenter = newDefragmenter(encodedCh, workerChannels) @@ -168,7 +168,7 @@ func NewCloudStorageSink( s.alive.Lock() s.alive.isDead = true - close(s.alive.msgCh) + s.alive.msgCh.CloseAndDrain() s.alive.Unlock() close(s.dead) @@ -234,7 +234,7 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single s.statistics.ObserveRows(txn.Event.Rows...) // emit a TxnCallbackableEvent encoupled with a sequence number starting from one. - s.alive.msgCh <- eventFragment{ + s.alive.msgCh.In() <- eventFragment{ seqNumber: seq, versionedTable: tbl, event: txn, diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 448a6217dfc..ffed62d0d1f 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -192,27 +192,28 @@ func (f *FilePathGenerator) CheckOrWriteSchema( _, checksum := mustParseSchemaName(tblSchemaFile) schemaFileCnt := 0 lastVersion := uint64(0) - prefix := fmt.Sprintf(tableSchemaPrefix+"schema_", def.Schema, def.Table) + subDir := fmt.Sprintf(tableSchemaPrefix, def.Schema, def.Table) checksumSuffix := fmt.Sprintf("%010d.json", checksum) - err = f.storage.WalkDir(ctx, &storage.WalkOption{ObjPrefix: prefix}, - func(path string, _ int64) error { - schemaFileCnt++ - if !strings.HasSuffix(path, checksumSuffix) { - return nil - } - version, parsedChecksum := mustParseSchemaName(path) - if parsedChecksum != checksum { - // TODO: parsedChecksum should be ignored, remove this panic - // after the new path protocol is verified. - log.Panic("invalid schema file name", - zap.String("path", path), zap.Any("checksum", checksum)) - } - if version > lastVersion { - lastVersion = version - } + err = f.storage.WalkDir(ctx, &storage.WalkOption{ + SubDir: subDir, /* use subDir to prevent walk the whole storage */ + ObjPrefix: subDir + "schema_", + }, func(path string, _ int64) error { + schemaFileCnt++ + if !strings.HasSuffix(path, checksumSuffix) { return nil - }, - ) + } + version, parsedChecksum := mustParseSchemaName(path) + if parsedChecksum != checksum { + // TODO: parsedChecksum should be ignored, remove this panic + // after the new path protocol is verified. + log.Panic("invalid schema file name", + zap.String("path", path), zap.Any("checksum", checksum)) + } + if version > lastVersion { + lastVersion = version + } + return nil + }) if err != nil { return err } diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 6a0dbe56c4a..ead5254fff4 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -17,9 +17,14 @@ import ( "context" "fmt" "net/url" + "os" + "path/filepath" "testing" "time" + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" @@ -275,3 +280,51 @@ func TestIsSchemaFile(t *testing.T) { "testCase: %s, path: %v", tt.name, tt.path) } } + +func TestCheckOrWriteSchema(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dir := t.TempDir() + f := testFilePathGenerator(ctx, t, dir) + + var columns []*timodel.ColumnInfo + ft := types.NewFieldType(mysql.TypeLong) + ft.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag) + col := &timodel.ColumnInfo{ + Name: timodel.NewCIStr("Id"), + FieldType: *ft, + DefaultValue: 10, + } + columns = append(columns, col) + tableInfo := &model.TableInfo{ + TableInfo: &timodel.TableInfo{Columns: columns}, + Version: 100, + TableName: model.TableName{ + Schema: "test", + Table: "table1", + TableID: 20, + }, + } + + table := VersionedTableName{ + TableNameWithPhysicTableID: tableInfo.TableName, + TableInfoVersion: tableInfo.Version, + } + + err := f.CheckOrWriteSchema(ctx, table, tableInfo) + require.NoError(t, err) + require.Equal(t, tableInfo.Version, f.versionMap[table]) + + // test only table version changed, schema file should be reused + table.TableInfoVersion = 101 + err = f.CheckOrWriteSchema(ctx, table, tableInfo) + require.NoError(t, err) + require.Equal(t, tableInfo.Version, f.versionMap[table]) + + dir = filepath.Join(dir, "test/table1/meta") + cnt, err := os.ReadDir(dir) + require.NoError(t, err) + require.Equal(t, 1, len(cnt)) +}