Skip to content

Commit

Permalink
use the uuid as the mq subscription name to avoid duplication
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Nov 27, 2024
1 parent 3c43e37 commit 7e1ad2f
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/reader/factory_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func GetMsgDispatcherClient(creator FactoryCreator, mqConfig config.MQConfig, tt
if err != nil {
return nil, err
}
return msgdispatcher.NewClient(warpFactory, "cdc", 8444), nil
return msgdispatcher.NewClient(warpFactory, "cdc-"+util.GetUUID(), 8444), nil
}

func GetStreamFactory(creator FactoryCreator, mqConfig config.MQConfig, ttMsgStream bool) (msgstream.Factory, error) {
Expand Down
6 changes: 6 additions & 0 deletions core/util/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/google/uuid"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"

Expand Down Expand Up @@ -217,3 +218,8 @@ func GetTaskIDFromCtx(ctx context.Context) string {
}
return taskID
}

func GetUUID() string {
uid := uuid.Must(uuid.NewRandom())
return strings.ReplaceAll(uid.String(), "-", "")
}
9 changes: 1 addition & 8 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import (
"encoding/base64"
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/google/uuid"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -436,7 +434,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
}

info := &meta.TaskInfo{
TaskID: e.getUUID(),
TaskID: util.GetUUID(),
MilvusConnectParam: req.MilvusConnectParam,
KafkaConnectParam: req.KafkaConnectParam,
CollectionInfos: req.CollectionInfos,
Expand Down Expand Up @@ -698,11 +696,6 @@ func (e *MetaCDC) checkCollectionInfos(infos []model.CollectionInfo) error {
return servererror.NewClientError(errMsg)
}

func (e *MetaCDC) getUUID() string {
uid := uuid.Must(uuid.NewRandom())
return strings.ReplaceAll(uid.String(), "-", "")
}

func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) error {
taskLog := log.With(zap.String("task_id", info.TaskID))
uKey := getTaskUniqueIDFromInfo(info)
Expand Down

0 comments on commit 7e1ad2f

Please sign in to comment.