diff --git a/core/reader/factory_api.go b/core/reader/factory_api.go index 10f322e..494236a 100644 --- a/core/reader/factory_api.go +++ b/core/reader/factory_api.go @@ -112,7 +112,7 @@ func GetMsgDispatcherClient(creator FactoryCreator, mqConfig config.MQConfig, tt if err != nil { return nil, err } - return msgdispatcher.NewClient(warpFactory, "cdc", 8444), nil + return msgdispatcher.NewClient(warpFactory, "cdc-"+util.GetUUID(), 8444), nil } func GetStreamFactory(creator FactoryCreator, mqConfig config.MQConfig, ttMsgStream bool) (msgstream.Factory, error) { diff --git a/core/util/string.go b/core/util/string.go index 91d1647..8ba0d5b 100644 --- a/core/util/string.go +++ b/core/util/string.go @@ -31,6 +31,7 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/google/uuid" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/merr" @@ -217,3 +218,8 @@ func GetTaskIDFromCtx(ctx context.Context) string { } return taskID } + +func GetUUID() string { + uid := uuid.Must(uuid.NewRandom()) + return strings.ReplaceAll(uid.String(), "-", "") +} diff --git a/server/cdc_impl.go b/server/cdc_impl.go index bd739ab..9910b5b 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -22,12 +22,10 @@ import ( "encoding/base64" "fmt" "strconv" - "strings" "sync" "time" "github.com/cockroachdb/errors" - "github.com/google/uuid" "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -436,7 +434,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon } info := &meta.TaskInfo{ - TaskID: e.getUUID(), + TaskID: util.GetUUID(), MilvusConnectParam: req.MilvusConnectParam, KafkaConnectParam: req.KafkaConnectParam, CollectionInfos: req.CollectionInfos, @@ -698,11 +696,6 @@ func (e *MetaCDC) checkCollectionInfos(infos []model.CollectionInfo) error { return servererror.NewClientError(errMsg) } -func (e *MetaCDC) getUUID() string { - uid := uuid.Must(uuid.NewRandom()) - return strings.ReplaceAll(uid.String(), "-", "") -} - func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) error { taskLog := log.With(zap.String("task_id", info.TaskID)) uKey := getTaskUniqueIDFromInfo(info)