Skip to content

Commit

Permalink
support the database/collection name mapping
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Nov 13, 2024
1 parent 94c6114 commit 247b045
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 118 deletions.
41 changes: 35 additions & 6 deletions core/reader/target_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ import (
var _ api.TargetAPI = (*TargetClient)(nil)

type TargetClient struct {
client client.Client
config TargetConfig
client client.Client
config TargetConfig
nameMappings util.Map[string, string]
}

type TargetConfig struct {
Expand Down Expand Up @@ -81,8 +82,9 @@ func (t *TargetClient) GetCollectionInfo(ctx context.Context, collectionName, da
}

collectionInfo := &model.CollectionInfo{}
err = t.milvusOp(ctx, databaseName, func(milvus client.Client) error {
collection, err := milvus.DescribeCollection(ctx, collectionName)
dbName, colName := t.mapDBAndCollectionName(databaseName, collectionName)
err = t.milvusOp(ctx, dbName, func(milvus client.Client) error {
collection, err := milvus.DescribeCollection(ctx, colName)
if err != nil {
return err
}
Expand Down Expand Up @@ -116,8 +118,9 @@ func (t *TargetClient) GetPartitionInfo(ctx context.Context, collectionName, dat
}
collectionInfo := &model.CollectionInfo{}
var partition []*entity.Partition
err = t.milvusOp(ctx, databaseName, func(milvus client.Client) error {
partition, err = milvus.ShowPartitions(ctx, collectionName)
dbName, colName := t.mapDBAndCollectionName(databaseName, collectionName)
err = t.milvusOp(ctx, dbName, func(milvus client.Client) error {
partition, err = milvus.ShowPartitions(ctx, colName)
if err != nil {
return err
}
Expand Down Expand Up @@ -178,3 +181,29 @@ func (t *TargetClient) GetDatabaseName(ctx context.Context, collectionName, data
dbLog.Warn("not found the database", zap.Any("databases", databaseNames))
return "", util.NotFoundDatabase
}

func (t *TargetClient) UpdateNameMappings(nameMappings map[string]string) {
for k, v := range nameMappings {
t.nameMappings.Store(k, v)
}
}

func (t *TargetClient) mapDBAndCollectionName(db, collection string) (string, string) {
if db == "" {
db = util.DefaultDbName
}
returnDB, returnCollection := db, collection
t.nameMappings.Range(func(source, target string) bool {
sourceDB, sourceCollection := util.GetCollectionNameFromFull(source)
if sourceDB == db && sourceCollection == collection {
returnDB, returnCollection = util.GetCollectionNameFromFull(target)
return false
}
if sourceDB == db && (sourceCollection == "*" || collection == "") {
returnDB, _ = util.GetCollectionNameFromFull(target)
return false
}
return true
})
return returnDB, returnCollection
}
12 changes: 12 additions & 0 deletions core/util/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,15 @@ func ParseVChannel(virtualName string) (ChannelInfo, error) {
ShardIndex: int(shardIdx),
}, nil
}

func GetFullCollectionName(collectionName string, databaseName string) string {
return fmt.Sprintf("%s.%s", databaseName, collectionName)
}

func GetCollectionNameFromFull(fullName string) (string, string) {
names := strings.Split(fullName, ".")
if len(names) != 2 {
panic("invalid full collection name")
}
return names[0], names[1]
}
Loading

0 comments on commit 247b045

Please sign in to comment.