Skip to content

Commit

Permalink
Support the db and improve the method of stopping tasks
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Nov 9, 2023
1 parent 57ac245 commit 5590e28
Show file tree
Hide file tree
Showing 30 changed files with 1,472 additions and 188 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
run: make test-go
- name: Upload coverage to Codecov
if: github.repository == 'zilliztech/milvus-cdc'
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v3.1.1
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.project.out
Expand Down
32 changes: 32 additions & 0 deletions core/api/data_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type DataHandler interface {
ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error

DescribeCollection(ctx context.Context, param *DescribeCollectionParam) error
DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error
}

type DefaultDataHandler struct{}
Expand Down Expand Up @@ -110,12 +111,22 @@ func (d *DefaultDataHandler) DescribeCollection(ctx context.Context, param *Desc
return nil
}

func (d *DefaultDataHandler) DescribeDatabase(ctx context.Context, param *DescribeDatabaseParam) error {
log.Warn("DescribeDatabase is not implemented, please check it")
return nil
}

type MsgBaseParam struct {
Base *commonpb.MsgBase
}

type ReplicateParam struct {
Database string
}

type CreateCollectionParam struct {
MsgBaseParam
ReplicateParam
Schema *entity.Schema
ShardsNum int32
ConsistencyLevel commonpb.ConsistencyLevel
Expand All @@ -124,63 +135,78 @@ type CreateCollectionParam struct {

type DropCollectionParam struct {
MsgBaseParam
ReplicateParam
CollectionName string
}

type InsertParam struct {
MsgBaseParam
ReplicateParam
CollectionName string
PartitionName string
Columns []entity.Column
}

type DeleteParam struct {
MsgBaseParam
ReplicateParam
CollectionName string
PartitionName string
Column entity.Column
}

type CreatePartitionParam struct {
MsgBaseParam
ReplicateParam
CollectionName string
PartitionName string
}

type DropPartitionParam struct {
MsgBaseParam
ReplicateParam
CollectionName string
PartitionName string
}

type CreateIndexParam struct {
ReplicateParam
milvuspb.CreateIndexRequest
}

type DropIndexParam struct {
ReplicateParam
milvuspb.DropIndexRequest
}

type LoadCollectionParam struct {
ReplicateParam
milvuspb.LoadCollectionRequest
}

type ReleaseCollectionParam struct {
ReplicateParam
milvuspb.ReleaseCollectionRequest
}

type CreateDatabaseParam struct {
ReplicateParam
milvuspb.CreateDatabaseRequest
}

type DropDatabaseParam struct {
ReplicateParam
milvuspb.DropDatabaseRequest
}

type FlushParam struct {
ReplicateParam
milvuspb.FlushRequest
}

type ReplicateMessageParam struct {
MsgBaseParam
ReplicateParam
ChannelName string
BeginTs, EndTs uint64
MsgsBytes [][]byte
Expand All @@ -190,5 +216,11 @@ type ReplicateMessageParam struct {
}

type DescribeCollectionParam struct {
ReplicateParam
Name string
}

type DescribeDatabaseParam struct {
ReplicateParam
Name string
}
26 changes: 26 additions & 0 deletions core/api/data_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,29 @@ func TestDefaultDataHandler_ReplicateMessage(t *testing.T) {
})
}
}

func TestDefaultDataHandler_DescribeDatabase(t *testing.T) {
type args struct {
ctx context.Context
param *DescribeDatabaseParam
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "TestDefaultDataHandler_DescribeDatabase",
args: args{},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &DefaultDataHandler{}
if err := d.DescribeDatabase(tt.args.ctx, tt.args.param); (err != nil) != tt.wantErr {
t.Errorf("DescribeDatabase() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
7 changes: 6 additions & 1 deletion core/api/message_manager.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package api

import "github.com/milvus-io/milvus/pkg/log"
import (
"context"

"github.com/milvus-io/milvus/pkg/log"
)

type MessageManager interface {
ReplicateMessage(message *ReplicateMessage)
Close(channelName string)
}

type ReplicateMessage struct {
Ctx context.Context
Param *ReplicateMessageParam
SuccessFunc func(param *ReplicateMessageParam)
FailFunc func(param *ReplicateMessageParam, err error)
Expand Down
6 changes: 6 additions & 0 deletions core/api/meta_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/milvus-io/milvus/pkg/log"

"github.com/zilliztech/milvus-cdc/core/model"
"github.com/zilliztech/milvus-cdc/core/pb"
)

Expand All @@ -23,6 +24,7 @@ type MetaOp interface {
GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error)
GetAllPartition(ctx context.Context, filter PartitionFilter) ([]*pb.PartitionInfo, error)
GetCollectionNameByID(ctx context.Context, id int64) string
GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo
}

// CollectionFilter the filter will be used before the collection is filled the schema info
Expand Down Expand Up @@ -79,3 +81,7 @@ func (d *DefaultMetaOp) GetCollectionNameByID(ctx context.Context, id int64) str
log.Warn("GetCollectionNameByID is not implemented, please check it")
return ""
}

func (d *DefaultMetaOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo {
return model.DatabaseInfo{}
}
27 changes: 27 additions & 0 deletions core/api/meta_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"
"testing"

"github.com/zilliztech/milvus-cdc/core/model"
"github.com/zilliztech/milvus-cdc/core/pb"
)

Expand Down Expand Up @@ -209,3 +210,29 @@ func TestDefaultMetaOp_WatchPartition(t *testing.T) {
})
}
}

func TestDefaultMetaOp_GetDatabaseInfoForCollection(t *testing.T) {
type args struct {
ctx context.Context
id int64
}
tests := []struct {
name string
args args
want model.DatabaseInfo
}{
{
name: "TestDefaultMetaOp_GetDatabaseInfoForCollection",
args: args{},
want: model.DatabaseInfo{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &DefaultMetaOp{}
if got := d.GetDatabaseInfoForCollection(tt.args.ctx, tt.args.id); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetDatabaseInfoForCollection() = %v, want %v", got, tt.want)
}
})
}
}
15 changes: 11 additions & 4 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

// ChannelManager a target must promise a manager
type ChannelManager interface {
SetCtx(ctx context.Context)

StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error
StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error
AddPartition(ctx context.Context, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error
Expand All @@ -24,15 +26,16 @@ type ChannelManager interface {
}

type TargetAPI interface {
GetCollectionInfo(ctx context.Context, collectionName string) (*model.CollectionInfo, error)
GetPartitionInfo(ctx context.Context, collectionName string) (*model.CollectionInfo, error)
GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)
}

type ReplicateAPIEvent struct {
EventType ReplicateAPIEventType
CollectionInfo *pb.CollectionInfo
PartitionInfo *pb.PartitionInfo
ReplicateInfo *commonpb.ReplicateInfo
ReplicateParam ReplicateParam
Error error
}

Expand All @@ -51,6 +54,10 @@ type DefaultChannelManager struct{}

var _ ChannelManager = (*DefaultChannelManager)(nil)

func (d *DefaultChannelManager) SetCtx(ctx context.Context) {
log.Warn("SetCtx is not implemented, please check it")
}

func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error {
log.Warn("StartReadCollection is not implemented, please check it")
return nil
Expand Down Expand Up @@ -85,12 +92,12 @@ type DefaultTargetAPI struct{}

var _ TargetAPI = (*DefaultTargetAPI)(nil)

func (d *DefaultTargetAPI) GetCollectionInfo(ctx context.Context, collectionName string) (*model.CollectionInfo, error) {
func (d *DefaultTargetAPI) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error) {
log.Warn("GetCollectionInfo is not implemented, please check it")
return nil, nil
}

func (d *DefaultTargetAPI) GetPartitionInfo(ctx context.Context, collectionName string) (*model.CollectionInfo, error) {
func (d *DefaultTargetAPI) GetPartitionInfo(ctx context.Context, collectionName string, databaseName string) (*model.CollectionInfo, error) {
log.Warn("GetPartitionInfo is not implemented, please check it")
return nil, nil
}
25 changes: 23 additions & 2 deletions core/api/replicate_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestDefaultTargetAPI_GetCollectionInfo(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &DefaultTargetAPI{}
got, err := d.GetCollectionInfo(tt.args.ctx, tt.args.collectionName)
got, err := d.GetCollectionInfo(tt.args.ctx, tt.args.collectionName, "")
if (err != nil) != tt.wantErr {
t.Errorf("GetCollectionInfo() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestDefaultTargetAPI_GetPartitionInfo(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &DefaultTargetAPI{}
got, err := d.GetPartitionInfo(tt.args.ctx, tt.args.collectionName)
got, err := d.GetPartitionInfo(tt.args.ctx, tt.args.collectionName, "")
if (err != nil) != tt.wantErr {
t.Errorf("GetPartitionInfo() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -222,3 +222,24 @@ func TestDefaultTargetAPI_GetPartitionInfo(t *testing.T) {
})
}
}

func TestDefaultChannelManager_SetCtx(t *testing.T) {
type args struct {
ctx context.Context
}
tests := []struct {
name string
args args
}{
{
name: "TestDefaultChannelManager_SetCtx",
args: args{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &DefaultChannelManager{}
d.SetCtx(tt.args.ctx)
})
}
}
33 changes: 33 additions & 0 deletions core/mocks/channel_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5590e28

Please sign in to comment.