From dcaf6705b418f6b0bd34762b786988ad93a01746 Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 27 Nov 2024 21:03:48 +0800 Subject: [PATCH] use the uuid as the mq subscription name to avoid duplication Signed-off-by: SimFG --- core/reader/factory_api.go | 2 +- core/util/string.go | 6 ++++++ server/cdc_impl.go | 9 +-------- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/reader/factory_api.go b/core/reader/factory_api.go index 10f322e..494236a 100644 --- a/core/reader/factory_api.go +++ b/core/reader/factory_api.go @@ -112,7 +112,7 @@ func GetMsgDispatcherClient(creator FactoryCreator, mqConfig config.MQConfig, tt if err != nil { return nil, err } - return msgdispatcher.NewClient(warpFactory, "cdc", 8444), nil + return msgdispatcher.NewClient(warpFactory, "cdc-"+util.GetUUID(), 8444), nil } func GetStreamFactory(creator FactoryCreator, mqConfig config.MQConfig, ttMsgStream bool) (msgstream.Factory, error) { diff --git a/core/util/string.go b/core/util/string.go index 91d1647..40bb0d6 100644 --- a/core/util/string.go +++ b/core/util/string.go @@ -28,6 +28,7 @@ import ( "strings" "unsafe" + "github.com/google/uuid" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -217,3 +218,8 @@ func GetTaskIDFromCtx(ctx context.Context) string { } return taskID } + +func GetUUID() string { + uid := uuid.Must(uuid.NewRandom()) + return strings.ReplaceAll(uid.String(), "-", "") +} diff --git a/server/cdc_impl.go b/server/cdc_impl.go index bd739ab..9910b5b 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -22,12 +22,10 @@ import ( "encoding/base64" "fmt" "strconv" - "strings" "sync" "time" "github.com/cockroachdb/errors" - "github.com/google/uuid" "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -436,7 +434,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon } info := &meta.TaskInfo{ - TaskID: e.getUUID(), + TaskID: util.GetUUID(), MilvusConnectParam: req.MilvusConnectParam, KafkaConnectParam: req.KafkaConnectParam, CollectionInfos: req.CollectionInfos, @@ -698,11 +696,6 @@ func (e *MetaCDC) checkCollectionInfos(infos []model.CollectionInfo) error { return servererror.NewClientError(errMsg) } -func (e *MetaCDC) getUUID() string { - uid := uuid.Must(uuid.NewRandom()) - return strings.ReplaceAll(uid.String(), "-", "") -} - func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) error { taskLog := log.With(zap.String("task_id", info.TaskID)) uKey := getTaskUniqueIDFromInfo(info)