From d35fdcb83950ebc69045e9633810181197437b79 Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 12 Nov 2024 20:18:56 +0800 Subject: [PATCH] support the database/collection name mapping Signed-off-by: SimFG --- core/reader/target_client.go | 41 ++++++- core/util/string.go | 12 ++ core/writer/channel_writer.go | 187 ++++++++++++++++++++++------- core/writer/channel_writer_test.go | 2 +- server/cdc_impl.go | 111 +++++++++++++---- server/cdc_impl_test.go | 86 ++++++------- server/model/common.go | 6 + server/model/meta/task.go | 15 +++ server/model/request/create.go | 1 + server/tool/data_diff/main.go | 3 +- 10 files changed, 346 insertions(+), 118 deletions(-) diff --git a/core/reader/target_client.go b/core/reader/target_client.go index 66280cad..3310f98f 100644 --- a/core/reader/target_client.go +++ b/core/reader/target_client.go @@ -40,8 +40,9 @@ import ( var _ api.TargetAPI = (*TargetClient)(nil) type TargetClient struct { - client client.Client - config TargetConfig + client client.Client + config TargetConfig + nameMappings util.Map[string, string] } type TargetConfig struct { @@ -81,8 +82,9 @@ func (t *TargetClient) GetCollectionInfo(ctx context.Context, collectionName, da } collectionInfo := &model.CollectionInfo{} - err = t.milvusOp(ctx, databaseName, func(milvus client.Client) error { - collection, err := milvus.DescribeCollection(ctx, collectionName) + dbName, colName := t.mapDBAndCollectionName(databaseName, collectionName) + err = t.milvusOp(ctx, dbName, func(milvus client.Client) error { + collection, err := milvus.DescribeCollection(ctx, colName) if err != nil { return err } @@ -116,8 +118,9 @@ func (t *TargetClient) GetPartitionInfo(ctx context.Context, collectionName, dat } collectionInfo := &model.CollectionInfo{} var partition []*entity.Partition - err = t.milvusOp(ctx, databaseName, func(milvus client.Client) error { - partition, err = milvus.ShowPartitions(ctx, collectionName) + dbName, colName := t.mapDBAndCollectionName(databaseName, collectionName) + err = t.milvusOp(ctx, dbName, func(milvus client.Client) error { + partition, err = milvus.ShowPartitions(ctx, colName) if err != nil { return err } @@ -178,3 +181,29 @@ func (t *TargetClient) GetDatabaseName(ctx context.Context, collectionName, data dbLog.Warn("not found the database", zap.Any("databases", databaseNames)) return "", util.NotFoundDatabase } + +func (t *TargetClient) UpdateNameMappings(nameMappings map[string]string) { + for k, v := range nameMappings { + t.nameMappings.Store(k, v) + } +} + +func (t *TargetClient) mapDBAndCollectionName(db, collection string) (string, string) { + if db == "" { + db = util.DefaultDbName + } + returnDB, returnCollection := db, collection + t.nameMappings.Range(func(source, target string) bool { + sourceDB, sourceCollection := util.GetCollectionNameFromFull(source) + if sourceDB == db && sourceCollection == collection { + returnDB, returnCollection = util.GetCollectionNameFromFull(target) + return false + } + if sourceDB == db && (sourceCollection == "*" || collection == "") { + returnDB, _ = util.GetCollectionNameFromFull(target) + return false + } + return true + }) + return returnDB, returnCollection +} diff --git a/core/util/string.go b/core/util/string.go index ecc91b60..6a74d7df 100644 --- a/core/util/string.go +++ b/core/util/string.go @@ -188,3 +188,15 @@ func ParseVChannel(virtualName string) (ChannelInfo, error) { ShardIndex: int(shardIdx), }, nil } + +func GetFullCollectionName(collectionName string, databaseName string) string { + return fmt.Sprintf("%s.%s", databaseName, collectionName) +} + +func GetCollectionNameFromFull(fullName string) (string, string) { + names := strings.Split(fullName, ".") + if len(names) != 2 { + panic("invalid full collection name") + } + return names[0], names[1] +} diff --git a/core/writer/channel_writer.go b/core/writer/channel_writer.go index 9b9010bb..3dd15bf9 100644 --- a/core/writer/channel_writer.go +++ b/core/writer/channel_writer.go @@ -56,6 +56,7 @@ type ChannelWriter struct { dbInfos util.Map[string, uint64] collectionInfos util.Map[string, uint64] partitionInfos util.Map[string, uint64] + nameMappings util.Map[string, string] retryOptions []retry.Option downstream string @@ -168,6 +169,11 @@ func (c *ChannelWriter) HandleReplicateMessage(ctx context.Context, channelName zap.String("partition", insertMsg.GetPartitionName()), zap.Uint64("insert_data_len", insertMsg.GetNumRows()), ) + dbName := insertMsg.GetDbName() + colName := insertMsg.GetCollectionName() + dbName, colName = c.mapDBAndCollectionName(dbName, colName) + insertMsg.DbName = dbName + insertMsg.CollectionName = colName } if msg.Type() == commonpb.MsgType_Delete { deleteMsg := msg.(*msgstream.DeleteMsg) @@ -176,6 +182,27 @@ func (c *ChannelWriter) HandleReplicateMessage(ctx context.Context, channelName zap.String("partition", deleteMsg.GetPartitionName()), zap.Int64("delete_data_len", deleteMsg.GetNumRows()), ) + dbName := deleteMsg.GetDbName() + colName := deleteMsg.GetCollectionName() + dbName, colName = c.mapDBAndCollectionName(dbName, colName) + deleteMsg.DbName = dbName + deleteMsg.CollectionName = colName + } + if msg.Type() == commonpb.MsgType_DropPartition { + dropPartitionMsg := msg.(*msgstream.DropPartitionMsg) + dbName := dropPartitionMsg.GetDbName() + colName := dropPartitionMsg.GetCollectionName() + dbName, colName = c.mapDBAndCollectionName(dbName, colName) + dropPartitionMsg.DbName = dbName + dropPartitionMsg.CollectionName = colName + } + if msg.Type() == commonpb.MsgType_DropCollection { + dropCollectionMsg := msg.(*msgstream.DropCollectionMsg) + dbName := dropCollectionMsg.GetDbName() + colName := dropCollectionMsg.GetCollectionName() + dbName, colName = c.mapDBAndCollectionName(dbName, colName) + dropCollectionMsg.DbName = dbName + dropCollectionMsg.CollectionName = colName } log.Debug("replicate msg", logFields...) @@ -270,8 +297,8 @@ func (c *ChannelWriter) HandleOpMessagePack(ctx context.Context, msgPack *msgstr } // WaitDatabaseReady wait for database ready, return value: skip the op or not, wait timeout or not -func (c *ChannelWriter) WaitDatabaseReady(ctx context.Context, databaseName string, msgTs uint64) InfoState { - if databaseName == "" { +func (c *ChannelWriter) WaitDatabaseReady(ctx context.Context, databaseName string, msgTs uint64, collectionName string) InfoState { + if databaseName == "" || databaseName == util.DefaultDbName { return InfoStateCreated } createKey, dropKey := util.GetDBInfoKeys(databaseName) @@ -283,9 +310,10 @@ func (c *ChannelWriter) WaitDatabaseReady(ctx context.Context, databaseName stri return s } + dbName, _ := c.mapDBAndCollectionName(databaseName, collectionName) err := retry.Do(ctx, func() error { return c.dataHandler.DescribeDatabase(ctx, &api.DescribeDatabaseParam{ - Name: databaseName, + Name: dbName, }) }, c.retryOptions...) if err == nil { @@ -306,12 +334,13 @@ func (c *ChannelWriter) WaitCollectionReady(ctx context.Context, collectionName, return s } + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) err := retry.Do(ctx, func() error { return c.dataHandler.DescribeCollection(ctx, &api.DescribeCollectionParam{ ReplicateParam: api.ReplicateParam{ - Database: databaseName, + Database: dbName, }, - Name: collectionName, + Name: colName, }) }, c.retryOptions...) if err == nil { @@ -331,12 +360,13 @@ func (c *ChannelWriter) WaitPartitionReady(ctx context.Context, collectionName, return s } + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) err := retry.Do(ctx, func() error { return c.dataHandler.DescribePartition(ctx, &api.DescribePartitionParam{ ReplicateParam: api.ReplicateParam{ - Database: databaseName, + Database: dbName, }, - CollectionName: collectionName, + CollectionName: colName, PartitionName: partitionName, }) }, c.retryOptions...) @@ -371,7 +401,7 @@ func (c *ChannelWriter) WaitObjReady(ctx context.Context, db, collection, partit return false, nil } if db != "" { - state := c.WaitDatabaseReady(ctx, db, ts) + state := c.WaitDatabaseReady(ctx, db, ts, collection) if state == InfoStateUnknown { return false, errors.Newf("database[%s] is not ready", db) } else if state == InfoStateDropped { @@ -409,6 +439,9 @@ func (c *ChannelWriter) createCollection(ctx context.Context, apiEvent *api.Repl collectionInfo := apiEvent.CollectionInfo entitySchema := &entity.Schema{} entitySchema = entitySchema.ReadProto(collectionInfo.Schema) + dbName, colName := c.mapDBAndCollectionName(apiEvent.ReplicateParam.Database, entitySchema.CollectionName) + apiEvent.ReplicateParam.Database = dbName + entitySchema.CollectionName = colName createParam := &api.CreateCollectionParam{ MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, ReplicateParam: apiEvent.ReplicateParam, @@ -436,10 +469,12 @@ func (c *ChannelWriter) dropCollection(ctx context.Context, apiEvent *api.Replic } collectionName := apiEvent.CollectionInfo.Schema.GetName() databaseName := apiEvent.ReplicateParam.Database + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) + apiEvent.ReplicateParam.Database = dbName dropParam := &api.DropCollectionParam{ MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, ReplicateParam: apiEvent.ReplicateParam, - CollectionName: collectionName, + CollectionName: colName, } err := c.dataHandler.DropCollection(ctx, dropParam) if err != nil { @@ -459,10 +494,12 @@ func (c *ChannelWriter) createPartition(ctx context.Context, apiEvent *api.Repli zap.String("collection", apiEvent.CollectionInfo.Schema.GetName()), zap.String("partition", util.Base64ProtoObj(apiEvent.PartitionInfo))) return nil } + dbName, colName := c.mapDBAndCollectionName(apiEvent.ReplicateParam.Database, apiEvent.CollectionInfo.Schema.GetName()) + apiEvent.ReplicateParam.Database = dbName createParam := &api.CreatePartitionParam{ MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, ReplicateParam: apiEvent.ReplicateParam, - CollectionName: apiEvent.CollectionInfo.Schema.GetName(), + CollectionName: colName, PartitionName: apiEvent.PartitionInfo.PartitionName, } err := c.dataHandler.CreatePartition(ctx, createParam) @@ -489,10 +526,12 @@ func (c *ChannelWriter) dropPartition(ctx context.Context, apiEvent *api.Replica partitionName := apiEvent.PartitionInfo.PartitionName collectionName := apiEvent.CollectionInfo.Schema.GetName() databaseName := apiEvent.ReplicateParam.Database + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) + apiEvent.ReplicateParam.Database = dbName dropParam := &api.DropPartitionParam{ MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, ReplicateParam: apiEvent.ReplicateParam, - CollectionName: collectionName, + CollectionName: colName, PartitionName: partitionName, } err := c.dataHandler.DropPartition(ctx, dropParam) @@ -512,10 +551,11 @@ func (c *ChannelWriter) dropPartition(ctx context.Context, apiEvent *api.Replica func (c *ChannelWriter) createDatabase(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { createDatabaseMsg := msg.(*msgstream.CreateDatabaseMsg) + dbName, _ := c.mapDBAndCollectionName(createDatabaseMsg.GetDbName(), "") err := c.dataHandler.CreateDatabase(ctx, &api.CreateDatabaseParam{ CreateDatabaseRequest: &milvuspb.CreateDatabaseRequest{ Base: msgBase, - DbName: createDatabaseMsg.GetDbName(), + DbName: dbName, }, }) if err != nil { @@ -528,10 +568,11 @@ func (c *ChannelWriter) createDatabase(ctx context.Context, msgBase *commonpb.Ms func (c *ChannelWriter) dropDatabase(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { dropDatabaseMsg := msg.(*msgstream.DropDatabaseMsg) databaseName := dropDatabaseMsg.GetDbName() + dbName, _ := c.mapDBAndCollectionName(databaseName, "") err := c.dataHandler.DropDatabase(ctx, &api.DropDatabaseParam{ DropDatabaseRequest: &milvuspb.DropDatabaseRequest{ Base: msgBase, - DbName: databaseName, + DbName: dbName, }, }) if err != nil { @@ -546,6 +587,8 @@ func (c *ChannelWriter) dropDatabase(ctx context.Context, msgBase *commonpb.MsgB func (c *ChannelWriter) alterDatabase(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { alterDatabaseMsg := msg.(*msgstream.AlterDatabaseMsg) UpdateMsgBase(alterDatabaseMsg.Base, msgBase) + dbName, _ := c.mapDBAndCollectionName(alterDatabaseMsg.GetDbName(), "") + alterDatabaseMsg.AlterDatabaseRequest.DbName = dbName err := c.dataHandler.AlterDatabase(ctx, &api.AlterDatabaseParam{ AlterDatabaseRequest: alterDatabaseMsg.AlterDatabaseRequest, }) @@ -559,6 +602,8 @@ func (c *ChannelWriter) alterDatabase(ctx context.Context, msgBase *commonpb.Msg func (c *ChannelWriter) flush(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { flushMsg := msg.(*msgstream.FlushMsg) var collectionNames []string + var mapCollectionNames []string + var mapDBName string for _, s := range flushMsg.GetCollectionNames() { if skip, err := c.WaitObjReady(ctx, flushMsg.GetDbName(), s, "", flushMsg.EndTs()); err != nil { return err @@ -568,17 +613,25 @@ func (c *ChannelWriter) flush(ctx context.Context, msgBase *commonpb.MsgBase, ms continue } collectionNames = append(collectionNames, s) + dbName, colName := c.mapDBAndCollectionName(flushMsg.GetDbName(), s) + if mapDBName == "" { + mapDBName = dbName + } else if mapDBName != dbName { + log.Warn("flush msg has multiple databases", zap.String("db1", mapDBName), zap.String("db2", dbName)) + return errors.New("flush msg has multiple databases") + } + mapCollectionNames = append(mapCollectionNames, colName) } if len(collectionNames) == 0 { return nil } err := c.dataHandler.Flush(ctx, &api.FlushParam{ ReplicateParam: api.ReplicateParam{ - Database: flushMsg.GetDbName(), + Database: mapDBName, }, FlushRequest: &milvuspb.FlushRequest{ Base: msgBase, - CollectionNames: collectionNames, + CollectionNames: mapCollectionNames, }, }) if err != nil { @@ -605,19 +658,24 @@ func (c *ChannelWriter) createIndex(ctx context.Context, msgBase *commonpb.MsgBa return nil } UpdateMsgBase(createIndexMsg.Base, msgBase) + databaseName := createIndexMsg.GetDbName() + collectionName := createIndexMsg.GetCollectionName() + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) + createIndexMsg.DbName = dbName + createIndexMsg.CollectionName = colName err := c.dataHandler.CreateIndex(ctx, &api.CreateIndexParam{ ReplicateParam: api.ReplicateParam{ - Database: createIndexMsg.GetDbName(), + Database: dbName, }, CreateIndexRequest: createIndexMsg.CreateIndexRequest, }) if err != nil { log.Warn("fail to create index", zap.Any("msg", createIndexMsg), zap.Error(err)) - skip, _ := c.WaitObjReady(ctx, createIndexMsg.GetDbName(), createIndexMsg.GetCollectionName(), "", createIndexMsg.EndTs()) + skip, _ := c.WaitObjReady(ctx, databaseName, collectionName, "", createIndexMsg.EndTs()) if !skip { return err } - log.Info("collection has been dropped", zap.String("database", createIndexMsg.GetDbName()), + log.Info("collection has been dropped", zap.String("database", databaseName), zap.String("collection", createIndexMsg.GetCollectionName()), zap.String("msg", util.Base64Msg(msg))) } return nil @@ -632,25 +690,28 @@ func (c *ChannelWriter) dropIndex(ctx context.Context, msgBase *commonpb.MsgBase zap.String("collection", dropIndexMsg.GetCollectionName()), zap.String("msg", util.Base64Msg(msg))) return nil } + databaseName := dropIndexMsg.GetDbName() + collectionName := dropIndexMsg.GetCollectionName() + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) err := c.dataHandler.DropIndex(ctx, &api.DropIndexParam{ ReplicateParam: api.ReplicateParam{ - Database: dropIndexMsg.GetDbName(), + Database: dbName, }, DropIndexRequest: &milvuspb.DropIndexRequest{ Base: msgBase, - CollectionName: dropIndexMsg.GetCollectionName(), + CollectionName: colName, FieldName: dropIndexMsg.GetFieldName(), IndexName: dropIndexMsg.GetIndexName(), }, }) if err != nil { log.Warn("fail to drop index", zap.Any("msg", dropIndexMsg), zap.Error(err)) - skip, _ := c.WaitObjReady(ctx, dropIndexMsg.GetDbName(), dropIndexMsg.GetCollectionName(), "", dropIndexMsg.EndTs()) + skip, _ := c.WaitObjReady(ctx, databaseName, collectionName, "", dropIndexMsg.EndTs()) if !skip { return err } - log.Info("collection has been dropped", zap.String("database", dropIndexMsg.GetDbName()), - zap.String("collection", dropIndexMsg.GetCollectionName()), zap.String("msg", util.Base64Msg(msg))) + log.Info("collection has been dropped", zap.String("database", databaseName), + zap.String("collection", collectionName), zap.String("msg", util.Base64Msg(msg))) } return nil } @@ -665,6 +726,8 @@ func (c *ChannelWriter) alterIndex(ctx context.Context, msgBase *commonpb.MsgBas return nil } UpdateMsgBase(alterIndexMsg.Base, msgBase) + alterIndexMsg.DbName, alterIndexMsg.CollectionName = c.mapDBAndCollectionName( + alterIndexMsg.GetDbName(), alterIndexMsg.GetCollectionName()) err := c.dataHandler.AlterIndex(ctx, &api.AlterIndexParam{ AlterIndexRequest: alterIndexMsg.AlterIndexRequest, }) @@ -685,20 +748,25 @@ func (c *ChannelWriter) loadCollection(ctx context.Context, msgBase *commonpb.Ms return nil } UpdateMsgBase(loadCollectionMsg.Base, msgBase) + databaseName := loadCollectionMsg.GetDbName() + collectionName := loadCollectionMsg.GetCollectionName() + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) + loadCollectionMsg.DbName = dbName + loadCollectionMsg.CollectionName = colName err := c.dataHandler.LoadCollection(ctx, &api.LoadCollectionParam{ ReplicateParam: api.ReplicateParam{ - Database: loadCollectionMsg.GetDbName(), + Database: dbName, }, LoadCollectionRequest: loadCollectionMsg.LoadCollectionRequest, }) if err != nil { log.Warn("fail to load collection", zap.Any("msg", loadCollectionMsg), zap.Error(err)) - skip, _ := c.WaitObjReady(ctx, loadCollectionMsg.GetDbName(), loadCollectionMsg.GetCollectionName(), "", loadCollectionMsg.EndTs()) + skip, _ := c.WaitObjReady(ctx, databaseName, collectionName, "", loadCollectionMsg.EndTs()) if !skip { return err } - log.Info("collection has been dropped", zap.String("database", loadCollectionMsg.GetDbName()), - zap.String("collection", loadCollectionMsg.GetCollectionName()), zap.String("msg", util.Base64Msg(msg))) + log.Info("collection has been dropped", zap.String("database", databaseName), + zap.String("collection", collectionName), zap.String("msg", util.Base64Msg(msg))) } return nil } @@ -712,23 +780,26 @@ func (c *ChannelWriter) releaseCollection(ctx context.Context, msgBase *commonpb zap.String("collection", releaseCollectionMsg.GetCollectionName()), zap.String("msg", util.Base64Msg(msg))) return nil } + databaseName := releaseCollectionMsg.GetDbName() + collectionName := releaseCollectionMsg.GetCollectionName() + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) err := c.dataHandler.ReleaseCollection(ctx, &api.ReleaseCollectionParam{ ReplicateParam: api.ReplicateParam{ - Database: releaseCollectionMsg.GetDbName(), + Database: dbName, }, ReleaseCollectionRequest: &milvuspb.ReleaseCollectionRequest{ Base: msgBase, - CollectionName: releaseCollectionMsg.GetCollectionName(), + CollectionName: colName, }, }) if err != nil { log.Warn("fail to release collection", zap.Any("msg", releaseCollectionMsg), zap.Error(err)) - skip, _ := c.WaitObjReady(ctx, releaseCollectionMsg.GetDbName(), releaseCollectionMsg.GetCollectionName(), "", releaseCollectionMsg.EndTs()) + skip, _ := c.WaitObjReady(ctx, databaseName, collectionName, "", releaseCollectionMsg.EndTs()) if !skip { return err } - log.Info("collection has been dropped", zap.String("database", releaseCollectionMsg.GetDbName()), - zap.String("collection", releaseCollectionMsg.GetCollectionName()), zap.String("msg", util.Base64Msg(msg))) + log.Info("collection has been dropped", zap.String("database", databaseName), + zap.String("collection", collectionName), zap.String("msg", util.Base64Msg(msg))) } return nil } @@ -749,13 +820,16 @@ func (c *ChannelWriter) loadPartitions(ctx context.Context, msgBase *commonpb.Ms if len(partitions) == 0 { return nil } + databaseName := loadPartitionsMsg.GetDbName() + collectionName := loadPartitionsMsg.GetCollectionName() + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) err := c.dataHandler.LoadPartitions(ctx, &api.LoadPartitionsParam{ ReplicateParam: api.ReplicateParam{ - Database: loadPartitionsMsg.GetDbName(), + Database: dbName, }, LoadPartitionsRequest: &milvuspb.LoadPartitionsRequest{ Base: msgBase, - CollectionName: loadPartitionsMsg.GetCollectionName(), + CollectionName: colName, PartitionNames: partitions, ReplicaNumber: loadPartitionsMsg.GetReplicaNumber(), }, @@ -763,13 +837,13 @@ func (c *ChannelWriter) loadPartitions(ctx context.Context, msgBase *commonpb.Ms if err != nil { log.Warn("fail to load partitions", zap.Any("msg", loadPartitionsMsg), zap.Error(err)) for _, p := range partitions { - skip, _ := c.WaitObjReady(ctx, loadPartitionsMsg.GetDbName(), loadPartitionsMsg.GetCollectionName(), p, loadPartitionsMsg.EndTs()) + skip, _ := c.WaitObjReady(ctx, databaseName, collectionName, p, loadPartitionsMsg.EndTs()) if !skip { return err } } - log.Info("partition has been dropped", zap.String("database", loadPartitionsMsg.GetDbName()), - zap.String("collection", loadPartitionsMsg.GetCollectionName()), zap.Strings("partitions", partitions), zap.String("msg", util.Base64Msg(msg))) + log.Info("partition has been dropped", zap.String("database", databaseName), + zap.String("collection", collectionName), zap.Strings("partitions", partitions), zap.String("msg", util.Base64Msg(msg))) } return nil } @@ -790,26 +864,29 @@ func (c *ChannelWriter) releasePartitions(ctx context.Context, msgBase *commonpb if len(partitions) == 0 { return nil } + databaseName := releasePartitionsMsg.GetDbName() + collectionName := releasePartitionsMsg.GetCollectionName() + dbName, colName := c.mapDBAndCollectionName(databaseName, collectionName) err := c.dataHandler.ReleasePartitions(ctx, &api.ReleasePartitionsParam{ ReplicateParam: api.ReplicateParam{ - Database: releasePartitionsMsg.GetDbName(), + Database: databaseName, }, ReleasePartitionsRequest: &milvuspb.ReleasePartitionsRequest{ Base: msgBase, - CollectionName: releasePartitionsMsg.GetCollectionName(), + CollectionName: colName, PartitionNames: partitions, }, }) if err != nil { log.Warn("fail to release partitions", zap.Any("msg", releasePartitionsMsg), zap.Error(err)) for _, p := range partitions { - skip, _ := c.WaitObjReady(ctx, releasePartitionsMsg.GetDbName(), releasePartitionsMsg.GetCollectionName(), p, releasePartitionsMsg.EndTs()) + skip, _ := c.WaitObjReady(ctx, dbName, collectionName, p, releasePartitionsMsg.EndTs()) if !skip { return err } } - log.Info("partition has been dropped", zap.String("database", releasePartitionsMsg.GetDbName()), - zap.String("collection", releasePartitionsMsg.GetCollectionName()), zap.Strings("partitions", partitions), zap.String("msg", util.Base64Msg(msg))) + log.Info("partition has been dropped", zap.String("database", dbName), + zap.String("collection", collectionName), zap.Strings("partitions", partitions), zap.String("msg", util.Base64Msg(msg))) } return nil } @@ -905,6 +982,32 @@ func (c *ChannelWriter) operatePrivilege(ctx context.Context, msgBase *commonpb. return nil } +func (c *ChannelWriter) mapDBAndCollectionName(db, collection string) (string, string) { + if db == "" { + db = util.DefaultDbName + } + returnDB, returnCollection := db, collection + c.nameMappings.Range(func(source, target string) bool { + sourceDB, sourceCollection := util.GetCollectionNameFromFull(source) + if sourceDB == db && sourceCollection == collection { + returnDB, returnCollection = util.GetCollectionNameFromFull(target) + return false + } + if sourceDB == db && (sourceCollection == "*" || collection == "") { + returnDB, _ = util.GetCollectionNameFromFull(target) + return false + } + return true + }) + return returnDB, returnCollection +} + +func (c *ChannelWriter) UpdateNameMappings(nameMappings map[string]string) { + for k, v := range nameMappings { + c.nameMappings.Store(k, v) + } +} + func UpdateMsgBase(msgBase *commonpb.MsgBase, withReplicateInfo *commonpb.MsgBase) { msgBase.ReplicateInfo = withReplicateInfo.ReplicateInfo } diff --git a/core/writer/channel_writer_test.go b/core/writer/channel_writer_test.go index d3c97801..6af18032 100644 --- a/core/writer/channel_writer_test.go +++ b/core/writer/channel_writer_test.go @@ -70,7 +70,7 @@ func TestChannelWriter(t *testing.T) { dataHandler.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).Return(errors.New("mock")).Once() dataHandler.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).Return(nil).Once() realWriter := w.(*ChannelWriter) - assert.True(t, realWriter.WaitDatabaseReady(context.Background(), "test", 0) == InfoStateCreated) + assert.True(t, realWriter.WaitDatabaseReady(context.Background(), "test", 0, "") == InfoStateCreated) }) t.Run("handler api event", func(t *testing.T) { diff --git a/server/cdc_impl.go b/server/cdc_impl.go index 0dddffc1..6e7688ea 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -71,6 +71,19 @@ type ReplicateEntity struct { refCnt atomic.Int32 } +func (r *ReplicateEntity) UpdateMapping(mapping map[string]string) { + w, ok := r.writerObj.(*cdcwriter.ChannelWriter) + if !ok { + return + } + w.UpdateNameMappings(mapping) + targetClient, ok := r.targetClient.(*cdcreader.TargetClient) + if !ok { + return + } + targetClient.UpdateNameMappings(mapping) +} + type MetaCDC struct { BaseCDC metaStoreFactory serverapi.MetaStoreFactory @@ -85,6 +98,7 @@ type MetaCDC struct { data map[string][]string excludeData map[string][]string extraInfos map[string]model.ExtraInfo + nameMapping map[string]map[string]string } cdcTasks struct { sync.RWMutex @@ -144,6 +158,7 @@ func NewMetaCDC(serverConfig *CDCServerConfig) *MetaCDC { cdc.collectionNames.data = make(map[string][]string) cdc.collectionNames.excludeData = make(map[string][]string) cdc.collectionNames.extraInfos = make(map[string]model.ExtraInfo) + cdc.collectionNames.nameMapping = make(map[string]map[string]string) cdc.cdcTasks.data = make(map[string]*meta.TaskInfo) cdc.replicateEntityMap.data = make(map[string]*ReplicateEntity) return cdc @@ -233,29 +248,17 @@ func getTaskUniqueIDFromReq(req *request.CreateRequest) string { panic("fail to get the task unique id") } -func getFullCollectionName(collectionName string, databaseName string) string { - return fmt.Sprintf("%s.%s", databaseName, collectionName) -} - -func getCollectionNameFromFull(fullName string) (string, string) { - names := strings.Split(fullName, ".") - if len(names) != 2 { - panic("invalid full collection name") - } - return names[0], names[1] -} - func GetCollectionNamesFromTaskInfo(info *meta.TaskInfo) []string { var newCollectionNames []string if len(info.CollectionInfos) > 0 { newCollectionNames = lo.Map(info.CollectionInfos, func(t model.CollectionInfo, _ int) string { - return getFullCollectionName(t.Name, cdcreader.DefaultDatabase) + return util.GetFullCollectionName(t.Name, cdcreader.DefaultDatabase) }) } if len(info.DBCollections) > 0 { for db, infos := range info.DBCollections { for _, t := range infos { - newCollectionNames = append(newCollectionNames, getFullCollectionName(t.Name, db)) + newCollectionNames = append(newCollectionNames, util.GetFullCollectionName(t.Name, db)) } } } @@ -266,28 +269,58 @@ func GetCollectionNamesFromReq(req *request.CreateRequest) []string { var newCollectionNames []string if len(req.CollectionInfos) > 0 { newCollectionNames = lo.Map(req.CollectionInfos, func(t model.CollectionInfo, _ int) string { - return getFullCollectionName(t.Name, cdcreader.DefaultDatabase) + return util.GetFullCollectionName(t.Name, cdcreader.DefaultDatabase) }) } if len(req.DBCollections) > 0 { for db, infos := range req.DBCollections { for _, t := range infos { - newCollectionNames = append(newCollectionNames, getFullCollectionName(t.Name, db)) + newCollectionNames = append(newCollectionNames, util.GetFullCollectionName(t.Name, db)) } } } return newCollectionNames } +func GetCollectionMappingFromReq(req *request.CreateRequest) map[string]string { + mapCollectionNames := make(map[string]string) + for _, mapping := range req.NameMapping { + for s, t := range mapping.CollectionMapping { + mapCollectionNames[util.GetFullCollectionName(s, mapping.SourceDB)] = util.GetFullCollectionName(t, mapping.TargetDB) + } + if len(mapping.CollectionMapping) == 0 { + mapCollectionNames[util.GetFullCollectionName(cdcreader.AllCollection, mapping.SourceDB)] = util.GetFullCollectionName(cdcreader.AllCollection, mapping.TargetDB) + } + } + return mapCollectionNames +} + +func GetCollectionMappingFromTaskInfo(info *meta.TaskInfo) map[string]string { + mapCollectionNames := make(map[string]string) + for _, mapping := range info.NameMapping { + for s, t := range mapping.CollectionMapping { + mapCollectionNames[util.GetFullCollectionName(s, mapping.SourceDB)] = util.GetFullCollectionName(t, mapping.TargetDB) + } + if len(mapping.CollectionMapping) == 0 { + mapCollectionNames[util.GetFullCollectionName(cdcreader.AllCollection, mapping.SourceDB)] = util.GetFullCollectionName(cdcreader.AllCollection, mapping.TargetDB) + } + } + return mapCollectionNames +} + func matchCollectionName(sampleCollection, targetCollection string) (bool, bool) { - db1, collection1 := getCollectionNameFromFull(sampleCollection) - db2, collection2 := getCollectionNameFromFull(targetCollection) + db1, collection1 := util.GetCollectionNameFromFull(sampleCollection) + db2, collection2 := util.GetCollectionNameFromFull(targetCollection) return (db1 == db2 || db1 == cdcreader.AllDatabase) && (collection1 == collection2 || collection1 == cdcreader.AllCollection), db1 == cdcreader.AllDatabase || collection1 == cdcreader.AllCollection } -func (e *MetaCDC) checkDuplicateCollection(uKey string, newCollectionNames []string, extraInfo model.ExtraInfo) ([]string, error) { +func (e *MetaCDC) checkDuplicateCollection(uKey string, + newCollectionNames []string, + extraInfo model.ExtraInfo, + mapCollectionNames map[string]string, +) ([]string, error) { e.collectionNames.Lock() defer e.collectionNames.Unlock() existExtraInfo := e.collectionNames.extraInfos[uKey] @@ -298,7 +331,7 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string, newCollectionNames []str var duplicateCollections []string containsAny := false for _, name := range names { - d, c := getCollectionNameFromFull(name) + d, c := util.GetCollectionNameFromFull(name) if d == cdcreader.AllDatabase || c == cdcreader.AllCollection { containsAny = true } @@ -308,7 +341,7 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string, newCollectionNames []str duplicateCollections = append(duplicateCollections, newCollectionName) continue } - nd, nc := getCollectionNameFromFull(newCollectionName) + nd, nc := util.GetCollectionNameFromFull(newCollectionName) if nd == cdcreader.AllDatabase && nc == cdcreader.AllCollection { continue } @@ -326,7 +359,23 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string, newCollectionNames []str return nil, servererror.NewClientError(fmt.Sprintf("the collection name is duplicate with existing task, %v", duplicateCollections)) } } - // release lock early to accept other requests + if len(mapCollectionNames) > 0 { + for name := range mapCollectionNames { + var match bool + for _, newName := range newCollectionNames { + match, _ = matchCollectionName(newName, name) + if match { + break + } + } + if !match { + log.Warn("the collection name in the name mapping is not in the collection info", + zap.Strings("collection_names", newCollectionNames), + zap.String("mapping_name", name)) + return nil, servererror.NewClientError("the collection name in the name mapping is not in the collection info, checkout it") + } + } + } var excludeCollectionNames []string for _, newCollectionName := range newCollectionNames { for _, existCollectionName := range e.collectionNames.data[uKey] { @@ -340,6 +389,14 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string, newCollectionNames []str e.collectionNames.extraInfos[uKey] = model.ExtraInfo{ EnableUserRole: existExtraInfo.EnableUserRole || extraInfo.EnableUserRole, } + nameMappings := e.collectionNames.nameMapping[uKey] + if nameMappings == nil { + nameMappings = make(map[string]string) + e.collectionNames.nameMapping[uKey] = nameMappings + } + for s, t := range mapCollectionNames { + nameMappings[s] = t + } return excludeCollectionNames, nil } @@ -355,8 +412,9 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon } uKey := getTaskUniqueIDFromReq(req) newCollectionNames := GetCollectionNamesFromReq(req) + mapCollectionNames := GetCollectionMappingFromReq(req) - excludeCollectionNames, err := e.checkDuplicateCollection(uKey, newCollectionNames, req.ExtraInfo) + excludeCollectionNames, err := e.checkDuplicateCollection(uKey, newCollectionNames, req.ExtraInfo, mapCollectionNames) if err != nil { return nil, err } @@ -383,6 +441,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon KafkaConnectParam: req.KafkaConnectParam, CollectionInfos: req.CollectionInfos, DBCollections: req.DBCollections, + NameMapping: req.NameMapping, RPCRequestChannelInfo: req.RPCChannelInfo, ExtraInfo: req.ExtraInfo, ExcludeCollections: excludeCollectionNames, @@ -722,6 +781,7 @@ func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) err cancelReadFunc() }) replicateEntity.refCnt.Inc() + replicateEntity.UpdateMapping(GetCollectionMappingFromTaskInfo(info)) if !ignoreUpdateState { err = store.UpdateTaskState(e.metaStoreFactory.GetTaskInfoMetaStore(ctx), info.TaskID, meta.TaskStateRunning, []meta.TaskState{meta.TaskStateInitial, meta.TaskStatePaused}, "") @@ -841,6 +901,7 @@ func (e *MetaCDC) newReplicateEntity(info *meta.TaskInfo) (*ReplicateEntity, err }, metaOp.GetAllDroppedObj(), downstream) e.replicateEntityMap.Lock() defer e.replicateEntityMap.Unlock() + // TODO fubang should be fix entity, ok := e.replicateEntityMap.data[uKey] if !ok { replicateCtx, cancelReplicateFunc := context.WithCancel(ctx) @@ -1391,7 +1452,7 @@ func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string, collectionName s taskCollectionInfos = taskInfo.DBCollections[dbName] if taskCollectionInfos == nil { isExclude := lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool { - db, collection := getCollectionNameFromFull(s) + db, collection := util.GetCollectionNameFromFull(s) return db == dbName && collection == collectionName }) if isExclude { @@ -1410,7 +1471,7 @@ func MatchCollection(taskInfo *meta.TaskInfo, taskCollectionInfos []model.Collec return taskCollectionInfo.Name == currentCollectionName }) starContains := isAllCollection && !lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool { - match, _ := matchCollectionName(s, getFullCollectionName(currentCollectionName, currentDatabaseName)) + match, _ := matchCollectionName(s, util.GetFullCollectionName(currentCollectionName, currentDatabaseName)) return match }) return notStarContains || starContains diff --git a/server/cdc_impl_test.go b/server/cdc_impl_test.go index 48299adb..2dd580fa 100644 --- a/server/cdc_impl_test.go +++ b/server/cdc_impl_test.go @@ -1446,17 +1446,17 @@ func TestCheckDuplicateCollection(t *testing.T) { initMetaCDCMap(metaCDC) _, err := metaCDC.checkDuplicateCollection("foo", []string{}, model.ExtraInfo{ EnableUserRole: true, - }) + }, nil) assert.NoError(t, err) _, err = metaCDC.checkDuplicateCollection("foo", []string{}, model.ExtraInfo{ EnableUserRole: true, - }) + }, nil) assert.Error(t, err) _, err = metaCDC.checkDuplicateCollection("hoo", []string{}, model.ExtraInfo{ EnableUserRole: true, - }) + }, nil) assert.NoError(t, err) }) @@ -1465,37 +1465,37 @@ func TestCheckDuplicateCollection(t *testing.T) { initMetaCDCMap(metaCDC) excludeCollections, err := metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "default"), - getFullCollectionName("hoo", "default"), + util.GetFullCollectionName("foo", "default"), + util.GetFullCollectionName("hoo", "default"), }, model.ExtraInfo{ EnableUserRole: true, - }) + }, nil) assert.NoError(t, err) assert.Len(t, excludeCollections, 0) excludeCollections, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("*", "default"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("*", "default"), + }, model.ExtraInfo{}, nil) assert.NoError(t, err) assert.Len(t, excludeCollections, 2) _, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("doo", "default"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("doo", "default"), + }, model.ExtraInfo{}, nil) assert.Error(t, err) metaCDC.collectionNames.Lock() - metaCDC.collectionNames.data["foo"] = lo.Without(metaCDC.collectionNames.data["foo"], getFullCollectionName("foo", "default")) + metaCDC.collectionNames.data["foo"] = lo.Without(metaCDC.collectionNames.data["foo"], util.GetFullCollectionName("foo", "default")) metaCDC.collectionNames.Unlock() _, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("hoo", "default"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("hoo", "default"), + }, model.ExtraInfo{}, nil) assert.Error(t, err) excludeCollections, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "default"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("foo", "default"), + }, model.ExtraInfo{}, nil) assert.NoError(t, err) assert.Len(t, excludeCollections, 0) }) @@ -1505,37 +1505,37 @@ func TestCheckDuplicateCollection(t *testing.T) { initMetaCDCMap(metaCDC) excludeCollections, err := metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "db1"), - getFullCollectionName("foo", "db2"), + util.GetFullCollectionName("foo", "db1"), + util.GetFullCollectionName("foo", "db2"), }, model.ExtraInfo{ EnableUserRole: true, - }) + }, nil) assert.NoError(t, err) assert.Len(t, excludeCollections, 0) excludeCollections, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "*"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("foo", "*"), + }, model.ExtraInfo{}, nil) assert.NoError(t, err) assert.Len(t, excludeCollections, 2) _, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "db3"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("foo", "db3"), + }, model.ExtraInfo{}, nil) assert.Error(t, err) metaCDC.collectionNames.Lock() - metaCDC.collectionNames.data["foo"] = lo.Without(metaCDC.collectionNames.data["foo"], getFullCollectionName("foo", "db1")) + metaCDC.collectionNames.data["foo"] = lo.Without(metaCDC.collectionNames.data["foo"], util.GetFullCollectionName("foo", "db1")) metaCDC.collectionNames.Unlock() _, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "db2"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("foo", "db2"), + }, model.ExtraInfo{}, nil) assert.Error(t, err) excludeCollections, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "db1"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("foo", "db1"), + }, model.ExtraInfo{}, nil) assert.NoError(t, err) assert.Len(t, excludeCollections, 0) }) @@ -1545,47 +1545,47 @@ func TestCheckDuplicateCollection(t *testing.T) { initMetaCDCMap(metaCDC) excludeCollections, err := metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "*"), - getFullCollectionName("hoo", "db2"), + util.GetFullCollectionName("foo", "*"), + util.GetFullCollectionName("hoo", "db2"), }, model.ExtraInfo{ EnableUserRole: true, - }) + }, nil) assert.NoError(t, err) assert.Len(t, excludeCollections, 0) excludeCollections, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("*", "*"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("*", "*"), + }, model.ExtraInfo{}, nil) assert.NoError(t, err) assert.Len(t, excludeCollections, 2) _, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "db3"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("foo", "db3"), + }, model.ExtraInfo{}, nil) assert.Error(t, err) metaCDC.collectionNames.Lock() - metaCDC.collectionNames.data["foo"] = lo.Without(metaCDC.collectionNames.data["foo"], getFullCollectionName("foo", "*")) + metaCDC.collectionNames.data["foo"] = lo.Without(metaCDC.collectionNames.data["foo"], util.GetFullCollectionName("foo", "*")) metaCDC.collectionNames.Unlock() _, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "db2"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("foo", "db2"), + }, model.ExtraInfo{}, nil) assert.Error(t, err) _, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("hoo", "*"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("hoo", "*"), + }, model.ExtraInfo{}, nil) assert.Error(t, err) _, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("hoo", "db2"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("hoo", "db2"), + }, model.ExtraInfo{}, nil) assert.Error(t, err) excludeCollections, err = metaCDC.checkDuplicateCollection("foo", []string{ - getFullCollectionName("foo", "*"), - }, model.ExtraInfo{}) + util.GetFullCollectionName("foo", "*"), + }, model.ExtraInfo{}, nil) assert.NoError(t, err) assert.Len(t, excludeCollections, 0) }) diff --git a/server/model/common.go b/server/model/common.go index 686d7e5d..41222040 100644 --- a/server/model/common.go +++ b/server/model/common.go @@ -79,6 +79,12 @@ type BufferConfig struct { Size int `json:"size" mapstructure:"size"` } +type NameMapping struct { + SourceDB string `json:"source_db" mapstructure:"source_db"` + TargetDB string `json:"target_db" mapstructure:"target_db"` + CollectionMapping map[string]string `json:"collection_mapping" mapstructure:"collection_mapping"` +} + const ( // TmpCollectionID which means it's the user custom collection position TmpCollectionID int64 = -1 diff --git a/server/model/meta/task.go b/server/model/meta/task.go index 5ee3d5d0..4f0490c6 100644 --- a/server/model/meta/task.go +++ b/server/model/meta/task.go @@ -65,6 +65,7 @@ type TaskInfo struct { WriterCacheConfig model.BufferConfig CollectionInfos []model.CollectionInfo DBCollections map[string][]model.CollectionInfo + NameMapping []model.NameMapping RPCRequestChannelInfo model.ChannelInfo ExtraInfo model.ExtraInfo ExcludeCollections []string // it's used for the `*` collection name @@ -80,6 +81,20 @@ func (t *TaskInfo) CollectionNames() []string { return names } +func (t *TaskInfo) MapNames(db, collection string) (string, string) { + for _, mapping := range t.NameMapping { + if mapping.SourceDB == db { + if len(mapping.CollectionMapping) == 0 { + return mapping.TargetDB, collection + } + if newCollection, ok := mapping.CollectionMapping[collection]; ok { + return mapping.TargetDB, newCollection + } + } + } + return db, collection +} + type PositionInfo struct { Time int64 DataPair *commonpb.KeyDataPair diff --git a/server/model/request/create.go b/server/model/request/create.go index c246c081..0303ae9d 100644 --- a/server/model/request/create.go +++ b/server/model/request/create.go @@ -29,6 +29,7 @@ type CreateRequest struct { RPCChannelInfo model.ChannelInfo `json:"rpc_channel_info" mapstructure:"rpc_channel_info"` ExtraInfo model.ExtraInfo `json:"extra_info" mapstructure:"extra_info"` BufferConfig model.BufferConfig `json:"buffer_config" mapstructure:"buffer_config"` + NameMapping []model.NameMapping `json:"name_mapping" mapstructure:"name_mapping"` // Deprecated Positions map[string]string `json:"positions" mapstructure:"positions"` } diff --git a/server/tool/data_diff/main.go b/server/tool/data_diff/main.go index 7e404a76..42d48754 100644 --- a/server/tool/data_diff/main.go +++ b/server/tool/data_diff/main.go @@ -196,7 +196,8 @@ func GetStreamData( readConfig DataReadConfig, p *commonpb.KeyDataPair, pkField *schemapb.FieldSchema, - timeout int) map[string]*PKData { + timeout int, +) map[string]*PKData { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) defer cancel() pchannel := funcutil.ToPhysicalChannel(p.GetKey())