diff --git a/server/cdc_api.go b/server/cdc_api.go index 910f0538..04d367c4 100644 --- a/server/cdc_api.go +++ b/server/cdc_api.go @@ -16,7 +16,10 @@ package server -import "github.com/zilliztech/milvus-cdc/server/model/request" +import ( + "github.com/milvus-io/milvus/pkg/log" + "github.com/zilliztech/milvus-cdc/server/model/request" +) type CDCService interface { ReloadTask() @@ -36,33 +39,41 @@ func NewBaseCDC() *BaseCDC { } func (b *BaseCDC) ReloadTask() { + log.Warn("ReloadTask is not implemented, please check it") } func (b *BaseCDC) Create(request *request.CreateRequest) (*request.CreateResponse, error) { + log.Warn("Create is not implemented, please check it") return nil, nil } func (b *BaseCDC) Delete(request *request.DeleteRequest) (*request.DeleteResponse, error) { + log.Warn("Delete is not implemented, please check it") return nil, nil } func (b *BaseCDC) Pause(request *request.PauseRequest) (*request.PauseResponse, error) { + log.Warn("Pause is not implemented, please check it") return nil, nil } func (b *BaseCDC) Resume(request *request.ResumeRequest) (*request.ResumeResponse, error) { + log.Warn("Resume is not implemented, please check it") return nil, nil } func (b *BaseCDC) Get(request *request.GetRequest) (*request.GetResponse, error) { + log.Warn("Get is not implemented, please check it") return nil, nil } func (b *BaseCDC) GetPosition(req *request.GetPositionRequest) (*request.GetPositionResponse, error) { + log.Warn("GetPosition is not implemented, please check it") return nil, nil } func (b *BaseCDC) List(request *request.ListRequest) (*request.ListResponse, error) { + log.Warn("List is not implemented, please check it") return nil, nil } diff --git a/server/cdc_api_test.go b/server/cdc_api_test.go new file mode 100644 index 00000000..c94d879d --- /dev/null +++ b/server/cdc_api_test.go @@ -0,0 +1,40 @@ +package server + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultCDCServer(t *testing.T) { + baseCDC := NewBaseCDC() + baseCDC.ReloadTask() + { + _, err := baseCDC.Create(nil) + assert.NoError(t, err) + } + { + _, err := baseCDC.Delete(nil) + assert.NoError(t, err) + } + { + _, err := baseCDC.Pause(nil) + assert.NoError(t, err) + } + { + _, err := baseCDC.Resume(nil) + assert.NoError(t, err) + } + { + _, err := baseCDC.Get(nil) + assert.NoError(t, err) + } + { + _, err := baseCDC.GetPosition(nil) + assert.NoError(t, err) + } + { + _, err := baseCDC.List(nil) + assert.NoError(t, err) + } +} diff --git a/server/cdc_impl.go b/server/cdc_impl.go index db6d626a..7c833e06 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -65,6 +65,7 @@ type ReplicateEntity struct { type MetaCDC struct { BaseCDC metaStoreFactory serverapi.MetaStoreFactory + mqFactoryCreator cdcreader.FactoryCreator rootPath string config *CDCServerConfig @@ -118,6 +119,7 @@ func NewMetaCDC(serverConfig *CDCServerConfig) *MetaCDC { cdc := &MetaCDC{ metaStoreFactory: factory, config: serverConfig, + mqFactoryCreator: cdcreader.NewDefaultFactoryCreator(), } cdc.collectionNames.data = make(map[string][]string) cdc.collectionNames.excludeData = make(map[string][]string) @@ -143,33 +145,6 @@ func (e *MetaCDC) ReloadTask() { log.Panic("fail to get all task info", zap.Error(err)) } - // if reverse { - // var err error - // reverseTxn, commitFunc, err := e.metaStoreFactory.Txn(ctx) - // if err != nil { - // log.Panic("fail to new the reverse txn", zap.Error(err)) - // } - // for _, taskInfo := range taskInfos { - // if taskInfo.MilvusConnectParam.Host == currentConfig.Host && taskInfo.MilvusConnectParam.Port == currentConfig.Port { - // taskInfo.MilvusConnectParam.Host = reverseConfig.Host - // taskInfo.MilvusConnectParam.Port = reverseConfig.Port - // taskInfo.MilvusConnectParam.Username = reverseConfig.Username - // taskInfo.MilvusConnectParam.Password = reverseConfig.Password - // taskInfo.MilvusConnectParam.EnableTLS = reverseConfig.EnableTLS - // if err = e.metaStoreFactory.GetTaskInfoMetaStore(ctx).Put(ctx, taskInfo, reverseTxn); err != nil { - // log.Panic("fail to put the task info to metastore when reversing", zap.Error(err)) - // } - // // TODO need to use new target position in the future, not delete and receive the msg from the latest position - // if err = e.metaStoreFactory.GetTaskCollectionPositionMetaStore(ctx).Delete(ctx, &meta.TaskCollectionPosition{TaskID: taskInfo.TaskID}, reverseTxn); err != nil { - // log.Panic("fail to delete the task collection position to metastore when reversing", zap.Error(err)) - // } - // } - // } - // if err = commitFunc(err); err != nil { - // log.Panic("fail to commit the reverse txn", zap.Error(err)) - // } - // } - for _, taskInfo := range taskInfos { milvusAddress := fmt.Sprintf("%s:%d", taskInfo.MilvusConnectParam.Host, taskInfo.MilvusConnectParam.Port) newCollectionNames := lo.Map(taskInfo.CollectionInfos, func(t model.CollectionInfo, _ int) string { @@ -234,9 +209,6 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon existCollectionNames := e.collectionNames.data[milvusAddress] excludeCollectionNames = make([]string, len(existCollectionNames)) copy(excludeCollectionNames, existCollectionNames) - // if !lo.Contains(excludeCollectionNames, util.RPCRequestCollectionName) { - // excludeCollectionNames = append(excludeCollectionNames, util.RPCRequestCollectionName) - // } e.collectionNames.excludeData[milvusAddress] = excludeCollectionNames } e.collectionNames.data[milvusAddress] = append(e.collectionNames.data[milvusAddress], newCollectionNames...) @@ -335,17 +307,6 @@ func (e *MetaCDC) validCreateRequest(req *request.CreateRequest) error { return servererror.NewClientError("the cache size is less zero") } - // if req.RPCChannelInfo.Name == "" { - // if err := e.checkCollectionInfos(req.CollectionInfos); err != nil { - // return err - // } - // } else { - // if len(req.CollectionInfos) > 0 { - // return servererror.NewClientError("the collection info should be empty when the rpc channel is not empty") - // } - // req.CollectionInfos = []model.CollectionInfo{{Name: util.RPCRequestCollectionName}} - // } - if err := e.checkCollectionInfos(req.CollectionInfos); err != nil { return err } @@ -418,6 +379,7 @@ func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) err milvusAddress := fmt.Sprintf("%s:%d", milvusConnectParam.Host, milvusConnectParam.Port) e.replicateEntityMap.RLock() replicateEntity, ok := e.replicateEntityMap.data[milvusAddress] + log.Info("ok", zap.Any("ok", ok)) e.replicateEntityMap.RUnlock() newReplicateEntity := func() (*ReplicateEntity, error) { @@ -439,7 +401,7 @@ func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) err channelManager, err := cdcreader.NewReplicateChannelManager(config.MQConfig{ Pulsar: e.config.SourceConfig.Pulsar, Kafka: e.config.SourceConfig.Kafka, - }, cdcreader.NewDefaultFactoryCreator(), milvusClient, bufferSize) + }, e.mqFactoryCreator, milvusClient, bufferSize) if err != nil { log.Warn("fail to create replicate channel manager", zap.Error(err)) return nil, servererror.NewClientError("fail to create replicate channel manager") @@ -600,7 +562,7 @@ func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) err writeCallback.UpdateTaskCollectionPosition(TmpCollectionID, TmpCollectionName, channelName, metaPosition, metaPosition, nil) return true - }, cdcreader.NewDefaultFactoryCreator()) + }, e.mqFactoryCreator) readCtx, cancelReadFunc := context.WithCancel(context.Background()) e.replicateEntityMap.Lock() // replicateEntity.readerObj = collectionReader @@ -726,7 +688,7 @@ func (e *MetaCDC) GetPosition(req *request.GetPositionRequest) (*request.GetPosi ctx := context.Background() positions, err := e.metaStoreFactory.GetTaskCollectionPositionMetaStore(ctx).Get(ctx, &meta.TaskCollectionPosition{TaskID: req.TaskID}, nil) if err != nil { - return nil, err + return nil, servererror.NewServerError(err) } resp := &request.GetPositionResponse{} if len(positions) > 0 { diff --git a/server/cdc_impl_test.go b/server/cdc_impl_test.go index 1d9d1d8e..32754e60 100644 --- a/server/cdc_impl_test.go +++ b/server/cdc_impl_test.go @@ -17,16 +17,30 @@ package server import ( + "context" + "encoding/base64" + "log" + "net" "testing" + "time" "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" - "github.com/zilliztech/milvus-cdc/core/mocks" - "github.com/zilliztech/milvus-cdc/core/util" + sdkmocks "github.com/milvus-io/milvus-sdk-go/v2/mocks" + "github.com/zilliztech/milvus-cdc/core/config" + coremocks "github.com/zilliztech/milvus-cdc/core/mocks" + "github.com/zilliztech/milvus-cdc/core/pb" + "github.com/zilliztech/milvus-cdc/server/mocks" "github.com/zilliztech/milvus-cdc/server/model" + "github.com/zilliztech/milvus-cdc/server/model/meta" "github.com/zilliztech/milvus-cdc/server/model/request" ) @@ -79,573 +93,971 @@ func TestNewMetaCDC(t *testing.T) { }) }) - mockEtcdCli := mocks.NewKVApi(t) - mockEtcdCli.On("Endpoints").Return(endpoints) - mockEtcdCli.On("Status", mock.Anything, endpoints[0]).Return(&clientv3.StatusResponse{}, nil) - t.Run("etcd meta store", func(t *testing.T) { - etcdServerConfig := &CDCServerConfig{ - MetaStoreConfig: CDCMetaStoreConfig{ - StoreType: "etcd", - EtcdEndpoints: endpoints, - RootPath: rootPath, - }, - SourceConfig: MilvusSourceConfig{ - EtcdAddress: endpoints, - EtcdRootPath: "by-dev", - EtcdMetaSubPath: "meta", - }, - } - - util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { - return mockEtcdCli, nil - }, func() { - assert.NotPanics(t, func() { - NewMetaCDC(etcdServerConfig) + // invalid address + assert.Panics(t, func() { + NewMetaCDC(&CDCServerConfig{ + MetaStoreConfig: CDCMetaStoreConfig{ + StoreType: "etcd", + EtcdEndpoints: []string{"unknown"}, + RootPath: "cdc-test", + }, }) }) - util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { - return nil, errors.New("foo") - }, func() { - assert.Panics(t, func() { - NewMetaCDC(etcdServerConfig) + // invalid source etcd + assert.Panics(t, func() { + NewMetaCDC(&CDCServerConfig{ + MetaStoreConfig: CDCMetaStoreConfig{ + StoreType: "etcd", + EtcdEndpoints: endpoints, + RootPath: "cdc-test", + }, + SourceConfig: MilvusSourceConfig{ + EtcdAddress: []string{"unknown"}, + }, }) }) - util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { - return mockEtcdCli, nil - }, func() { - etcdServerConfig.MetaStoreConfig.EtcdEndpoints = []string{"127.0.0.2:2377"} + // success + { + NewMetaCDC(&CDCServerConfig{ + MetaStoreConfig: CDCMetaStoreConfig{ + StoreType: "etcd", + EtcdEndpoints: endpoints, + RootPath: "cdc-test", + }, + SourceConfig: MilvusSourceConfig{ + EtcdAddress: endpoints, + }, + }) + } + }) + + t.Run("mysql meta store", func(t *testing.T) { + // invalid address + { assert.Panics(t, func() { - NewMetaCDC(etcdServerConfig) + NewMetaCDC(&CDCServerConfig{ + MetaStoreConfig: CDCMetaStoreConfig{ + StoreType: "mysql", + MysqlSourceUrl: "unknown", + RootPath: "cdc-test", + }, + }) + }) + } + + // success + { + NewMetaCDC(&CDCServerConfig{ + MetaStoreConfig: CDCMetaStoreConfig{ + StoreType: "mysql", + MysqlSourceUrl: mysqlUrl, + RootPath: "cdc-test", + }, + SourceConfig: MilvusSourceConfig{ + EtcdAddress: endpoints, + }, }) + } + }) +} + +func TestReload(t *testing.T) { + t.Run("reverser invalid config", func(t *testing.T) { + metaCDC := &MetaCDC{} + metaCDC.config = &CDCServerConfig{ + EnableReverse: true, + } + assert.Panics(t, func() { + metaCDC.ReloadTask() }) }) - t.Run("mysql meta store", func(t *testing.T) { - mysqlServerConfig := &CDCServerConfig{ - MetaStoreConfig: CDCMetaStoreConfig{ - StoreType: "mysql", - MysqlSourceUrl: mysqlUrl, + t.Run("fail to get task meta", func(t *testing.T) { + metaCDC := &MetaCDC{} + initMetaCDCMap(metaCDC) + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + + metaCDC.config = &CDCServerConfig{ + EnableReverse: false, + } + metaCDC.metaStoreFactory = factory + assert.Panics(t, func() { + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("test")).Once() + metaCDC.ReloadTask() + }) + }) + + t.Run("success", func(t *testing.T) { + metaCDC := &MetaCDC{} + initMetaCDCMap(metaCDC) + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + positionStore := mocks.NewMetaStore[*meta.TaskCollectionPosition](t) + + metaCDC.config = &CDCServerConfig{ + EnableReverse: false, + } + metaCDC.metaStoreFactory = factory + assert.Panics(t, func() { + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Once() + factory.EXPECT().GetTaskCollectionPositionMetaStore(mock.Anything).Return(positionStore).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{ + { + TaskID: "1234", + State: meta.TaskStateRunning, + MilvusConnectParam: model.MilvusConnectParam{ + Host: "127.0.0.1", + Port: 19530, + }, + CollectionInfos: []model.CollectionInfo{ + { + Name: "foo", + }, + }, + }, + }, nil).Once() + positionStore.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("test")).Once() + + metaCDC.replicateEntityMap.Lock() + metaCDC.replicateEntityMap.data = map[string]*ReplicateEntity{ + "127.0.0.1:19530": {}, + } + metaCDC.replicateEntityMap.Unlock() + + metaCDC.ReloadTask() + }) + }) +} + +func TestValidCreateRequest(t *testing.T) { + metaCDC := &MetaCDC{} + t.Run("empty host", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{}) + assert.Error(t, err) + }) + t.Run("empty port", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + }, + }) + assert.Error(t, err) + }) + t.Run("empty username and default password", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 19530, + Password: "xxx", + }, + }) + assert.Error(t, err) + }) + t.Run("invalid connect time", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 19530, + ConnectTimeout: -1, + }, + }) + assert.Error(t, err) + }) + t.Run("invalid buffer period", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 19530, + ConnectTimeout: 10, + }, + BufferConfig: model.BufferConfig{ + Period: -1, + }, + }) + assert.Error(t, err) + }) + t.Run("invalid buffer size", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 19530, + ConnectTimeout: 10, + }, + BufferConfig: model.BufferConfig{ + Period: 10, + Size: -1, + }, + }) + assert.Error(t, err) + }) + t.Run("empty collection info", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 19530, + ConnectTimeout: 10, + }, + BufferConfig: model.BufferConfig{ + Period: 10, + Size: 10, + }, + CollectionInfos: []model.CollectionInfo{}, + }) + assert.Error(t, err) + }) + t.Run("not star collection", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 19530, + ConnectTimeout: 10, + }, + BufferConfig: model.BufferConfig{ + Period: 10, + Size: 10, + }, + CollectionInfos: []model.CollectionInfo{ + { + Name: "foo", + }, + }, + }) + assert.Error(t, err) + }) + t.Run("empty rpc channel", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 19530, + ConnectTimeout: 10, + }, + BufferConfig: model.BufferConfig{ + Period: 10, + Size: 10, + }, + CollectionInfos: []model.CollectionInfo{ + { + Name: "*", + }, }, - SourceConfig: MilvusSourceConfig{ - EtcdAddress: endpoints, - EtcdRootPath: "by-dev", - EtcdMetaSubPath: "meta", + }) + assert.Error(t, err) + }) + t.Run("fail to connect target", func(t *testing.T) { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 19530, + ConnectTimeout: 3, + }, + BufferConfig: model.BufferConfig{ + Period: 10, + Size: 10, + }, + CollectionInfos: []model.CollectionInfo{ + { + Name: "*", + }, + }, + RPCChannelInfo: model.ChannelInfo{ + Name: "foo", + }, + }) + assert.Error(t, err) + }) + t.Run("success", func(t *testing.T) { + _, closeFunc := NewMockMilvus(t) + defer closeFunc() + + err := metaCDC.validCreateRequest(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 50051, + ConnectTimeout: 5, + }, + BufferConfig: model.BufferConfig{ + Period: 10, + Size: 10, + }, + CollectionInfos: []model.CollectionInfo{ + { + Name: "*", + }, + }, + RPCChannelInfo: model.ChannelInfo{ + Name: "foo", + }, + }) + assert.NoError(t, err) + }) +} + +func TestCreateRequest(t *testing.T) { + t.Run("success", func(t *testing.T) { + metaCDC := &MetaCDC{ + config: &CDCServerConfig{ + MaxTaskNum: 10, + SourceConfig: MilvusSourceConfig{ + ReadChanLen: 10, + Pulsar: config.PulsarConfig{ + Address: "localhost:6650", + }, + EtcdAddress: []string{"localhost:2379"}, + EtcdRootPath: "source-cdc-test", + EtcdMetaSubPath: "meta", + DefaultPartitionName: "_default", + }, }, } - util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { - return mockEtcdCli, nil - }, func() { - assert.NotPanics(t, func() { - NewMetaCDC(mysqlServerConfig) + initMetaCDCMap(metaCDC) + defer ClearEtcdData("source-cdc-test") + + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + positionStore := mocks.NewMetaStore[*meta.TaskCollectionPosition](t) + mqFactoryCreator := coremocks.NewFactoryCreator(t) + mqFactory := msgstream.NewMockFactory(t) + mq := msgstream.NewMockMsgStream(t) + + metaCDC.mqFactoryCreator = mqFactoryCreator + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store) + factory.EXPECT().GetTaskCollectionPositionMetaStore(mock.Anything).Return(positionStore) + // check the task num + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{}, nil).Once() + // save position + positionStore.EXPECT().Put(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // save task meta + store.EXPECT().Put(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // new channel manager + mqFactoryCreator.EXPECT().NewPmsFactory(mock.Anything).Return(mqFactory) + // get task position + positionStore.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskCollectionPosition{ + { + Positions: map[string]*meta.PositionInfo{ + "ch1": { + Time: 1, + DataPair: &commonpb.KeyDataPair{ + Key: "ch1", + Data: []byte("ch1-position"), + }, + }, + }, + }, + }, nil).Once() + // new channel reader + mqFactory.EXPECT().NewMsgStream(mock.Anything).Return(mq, nil).Once() + mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // start read channel + streamChan := make(chan *msgstream.MsgPack) + mq.EXPECT().Chan().Return(streamChan) + // update state + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{ + { + TaskID: "1", + State: meta.TaskStateInitial, + }, + }, nil).Once() + store.EXPECT().Put(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + + metaCDC.metaStoreFactory = factory + + _, closeFunc := NewMockMilvus(t) + defer closeFunc() + + { + _, err := metaCDC.Create(&request.CreateRequest{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 50051, + ConnectTimeout: 5, + }, + BufferConfig: model.BufferConfig{ + Period: 10, + Size: 10, + }, + CollectionInfos: []model.CollectionInfo{ + { + Name: "*", + }, + }, + RPCChannelInfo: model.ChannelInfo{ + Name: "foo", + }, + Positions: map[string]string{ + "ch1": base64.StdEncoding.EncodeToString([]byte("ch1-position")), + }, }) + assert.NoError(t, err) + } + }) +} + +func initMetaCDCMap(cdc *MetaCDC) { + cdc.replicateEntityMap.Lock() + cdc.replicateEntityMap.data = map[string]*ReplicateEntity{} + cdc.replicateEntityMap.Unlock() + + cdc.collectionNames.Lock() + cdc.collectionNames.data = map[string][]string{} + cdc.collectionNames.excludeData = map[string][]string{} + cdc.collectionNames.Unlock() + + cdc.cdcTasks.Lock() + cdc.cdcTasks.data = map[string]*meta.TaskInfo{} + cdc.cdcTasks.Unlock() +} + +func NewMockMilvus(t *testing.T) (*sdkmocks.MilvusServiceServer, func() error) { + listen, err := net.Listen("tcp", ":50051") + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + server := grpc.NewServer() + milvusService := sdkmocks.NewMilvusServiceServer(t) + milvusService.EXPECT().Connect(mock.Anything, mock.Anything).Return(&milvuspb.ConnectResponse{ + Status: &commonpb.Status{}, + }, nil) + milvuspb.RegisterMilvusServiceServer(server, milvusService) + + go func() { + log.Println("Server started on port 50051") + if err := server.Serve(listen); err != nil { + log.Println("server error", err) + } + }() + time.Sleep(time.Second) + return milvusService, func() error { + return listen.Close() + } +} + +func ClearEtcdData(rootPath string) { + etcdCli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{"localhost:2379"}, + DialTimeout: 5 * time.Second, + }) + etcdCli.Delete(context.Background(), rootPath, clientv3.WithPrefix()) + etcdCli.Close() +} + +func TestShouldReadCollection(t *testing.T) { + t.Run("all collection", func(t *testing.T) { + f := GetShouldReadFunc(&meta.TaskInfo{ + CollectionInfos: []model.CollectionInfo{ + { + Name: "*", + }, + }, + ExcludeCollections: []string{"foo"}, }) + assert.True(t, f(&pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "hoo", + }, + })) + assert.False(t, f(&pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "foo", + }, + })) + }) - util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { - return mockEtcdCli, nil - }, func() { - assert.Panics(t, func() { - mysqlServerConfig.MetaStoreConfig.MysqlSourceUrl = "root:123456@tcp(127.0.0.2:3307)/milvuscdc?charset=utf8" - NewMetaCDC(mysqlServerConfig) - }) + t.Run("some collection", func(t *testing.T) { + f := GetShouldReadFunc(&meta.TaskInfo{ + CollectionInfos: []model.CollectionInfo{ + { + Name: "a", + }, + { + Name: "b", + }, + }, + ExcludeCollections: []string{"foo"}, }) + assert.True(t, f(&pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "a", + }, + })) + assert.False(t, f(&pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "c", + }, + })) + assert.False(t, f(&pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "foo", + }, + })) }) } -//func TestReloadTask(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// mockEtcdCli.On("EtcdEndpoints").Return(endpoints) -// mockEtcdCli.On("Status", mock.Anything, endpoints[0]).Return(&clientv3.StatusResponse{}, nil) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// key := getTaskInfoPrefix(rootPath) -// taskID1 := "123" -// info1 := &meta.TaskInfo{ -// TaskID: taskID1, -// State: meta.TaskStateRunning, -// } -// value1, _ := json.Marshal(info1) -// -// util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { -// return mockEtcdCli, nil -// }, func() { -// cdc := NewMetaCDC(serverConfig) -// t.Run("etcd get error", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key, mock.Anything). -// Return(nil, errors.New("etcd error")) -// defer call.Unset() -// -// assert.Panics(t, func() { -// cdc.ReloadTask() -// }) -// }) -// -// t.Run("unmarshal error", func(t *testing.T) { -// taskID2 := "456" -// invalidValue := []byte(`"task_id": 123`) // task_id should be a string -// call := mockEtcdCli.On("Get", mock.Anything, key, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(getTaskInfoKey(rootPath, taskID1)), -// Value: value1, -// }, -// { -// Key: []byte(getTaskInfoKey(rootPath, taskID2)), -// Value: invalidValue, -// }, -// }, -// }, nil) -// defer call.Unset() -// assert.Panics(t, func() { -// cdc.ReloadTask() -// }) -// }) -// -// t.Run("success", func(t *testing.T) { -// factoryMock := server_mocks.NewCDCFactory(t) -// cdc.factoryCreator = func(_ NewReaderFunc, _ NewWriterFunc) CDCFactory { -// return factoryMock -// } -// call := mockEtcdCli.On("Get", mock.Anything, key, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(getTaskInfoKey(rootPath, taskID1)), -// Value: value1, -// }, -// }, -// }, nil) -// defer call.Unset() -// factoryMock.On("NewReader").Return(&reader.DefaultReader{}, nil) -// factoryMock.On("NewWriter").Return(&writer.DefaultWriter{}, nil) -// -// cdc.ReloadTask() -// cdc.cdcTasks.RLock() -// defer cdc.cdcTasks.RUnlock() -// assert.NotNil(t, cdc.cdcTasks.data[taskID1]) -// }) -// }) -//} -// -//func TestValidCreateRequest(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// mockEtcdCli.On("EtcdEndpoints").Return(endpoints) -// mockEtcdCli.On("Status", mock.Anything, endpoints[0]).Return(&clientv3.StatusResponse{}, nil) -// -// util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { -// return mockEtcdCli, nil -// }, func() { -// cdc := NewMetaCDC(serverConfig) -// assertion := assert.New(t) -// -// createRequest := &request.CreateRequest{} -// -// // check connect host -// assertion.Error(cdc.validCreateRequest(createRequest)) -// -// // check connect port -// createRequest.MilvusConnectParam.Host = "localhost" -// assertion.Error(cdc.validCreateRequest(createRequest)) -// -// // check connect username and password -// createRequest.MilvusConnectParam.Port = 19530 -// createRequest.MilvusConnectParam.Username = "foo" -// assertion.Error(cdc.validCreateRequest(createRequest)) -// createRequest.MilvusConnectParam.Username = "" -// createRequest.MilvusConnectParam.Password = "123" -// assertion.Error(cdc.validCreateRequest(createRequest)) -// -// // assert connect timeout -// createRequest.MilvusConnectParam.Username = "foo" -// createRequest.MilvusConnectParam.ConnectTimeout = -1 -// assertion.Error(cdc.validCreateRequest(createRequest)) -// -// // check buffer period -// createRequest.MilvusConnectParam.ConnectTimeout = 10 -// createRequest.BufferConfig.Period = -1 -// assertion.Error(cdc.validCreateRequest(createRequest)) -// -// // check buffer size -// createRequest.BufferConfig.Period = 10 -// createRequest.BufferConfig.Size = -1 -// assertion.Error(cdc.validCreateRequest(createRequest)) -// -// // check collection info -// createRequest.BufferConfig.Size = 10 -// assertion.Error(cdc.validCreateRequest(createRequest)) -// cdc.config.MaxNameLength = 5 -// createRequest.CollectionInfos = []model.CollectionInfo{ -// {}, -// {Name: "foooooo"}, -// } -// assertion.Error(cdc.validCreateRequest(createRequest)) -// -// createRequest.CollectionInfos = []model.CollectionInfo{ -// {Name: "*"}, -// {Name: "foooooo"}, -// } -// assertion.Error(cdc.validCreateRequest(createRequest)) -// -// // fail connect milvus -// createRequest.CollectionInfos = []model.CollectionInfo{ -// {Name: "fo"}, -// {Name: "ao"}, -// } -// assertion.Error(cdc.validCreateRequest(createRequest)) -// -// // mock server -// createRequest.MilvusConnectParam.Username = "" -// createRequest.MilvusConnectParam.Password = "" -// lis, err := net.Listen("tcp", -// fmt.Sprintf("%s:%d", createRequest.MilvusConnectParam.Host, createRequest.MilvusConnectParam.Port)) -// if err != nil { -// assert.FailNow(t, err.Error()) -// } -// s := grpc.NewServer() -// milvuspb.RegisterMilvusServiceServer(s, &milvuspb.UnimplementedMilvusServiceServer{}) -// go func() { -// if err := s.Serve(lis); err != nil { -// assert.FailNow(t, err.Error()) -// } -// }() -// defer s.Stop() -// assertion.NoError(cdc.validCreateRequest(createRequest)) -// }) -//} -// -//func MockMilvusServer(t *testing.T) func() { -// lis, err := net.Listen("tcp", -// fmt.Sprintf("%s:%d", "localhost", 19530)) -// if err != nil { -// assert.FailNow(t, err.Error()) -// } -// s := grpc.NewServer() -// milvuspb.RegisterMilvusServiceServer(s, &milvuspb.UnimplementedMilvusServiceServer{}) -// go func() { -// if err := s.Serve(lis); err != nil { -// assert.FailNow(t, err.Error()) -// } -// }() -// return func() { -// s.Stop() -// } -//} -// -//func TestCreateRequest(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// mockEtcdCli.On("EtcdEndpoints").Return(endpoints) -// mockEtcdCli.On("Status", mock.Anything, endpoints[0]).Return(&clientv3.StatusResponse{}, nil) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { -// return mockEtcdCli, nil -// }, func() { -// cdc := NewMetaCDC(serverConfig) -// assertion := assert.New(t) -// var ( -// resp *request.CreateResponse -// err error -// ) -// stopFunc := MockMilvusServer(t) -// defer stopFunc() -// -// t.Run("check error", func(t *testing.T) { -// resp, err = cdc.Create(&request.CreateRequest{}) -// assertion.Nil(resp) -// assertion.Error(err) -// }) -// -// t.Run("success", func(t *testing.T) { -// factoryMock := server_mocks.NewCDCFactory(t) -// cdc.factoryCreator = func(_ NewReaderFunc, _ NewWriterFunc) CDCFactory { -// return factoryMock -// } -// info := &meta.TaskInfo{ -// State: meta.TaskStateInitial, -// } -// infoByte, _ := json.Marshal(info) -// call1 := mockEtcdCli.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Value: infoByte, -// }, -// }, -// }, nil) -// defer call1.Unset() -// factoryMock.On("NewReader").Return(&reader.DefaultReader{}, nil) -// factoryMock.On("NewWriter").Return(&writer.DefaultWriter{}, nil) -// -// call2 := mockEtcdCli.On("Put", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("put error")) -// _, err = cdc.Create(createRequest) -// assertion.Error(err) -// call2.Unset() -// -// call2 = mockEtcdCli.On("Put", mock.Anything, mock.Anything, mock.Anything).Return(&clientv3.PutResponse{}, nil) -// defer call2.Unset() -// resp, err = cdc.Create(createRequest) -// assertion.NotEmpty(resp.TaskID) -// assertion.NoError(err) -// cdc.cdcTasks.RLock() -// assert.NotNil(t, cdc.cdcTasks.data[resp.TaskID]) -// cdc.cdcTasks.RUnlock() -// -// // duplicate collection -// _, err = cdc.Create(createRequest) -// assertion.Error(err) -// -// // star collection -// _, err = cdc.Create(starRequest) -// assertion.NoError(err) -// -// _, err = cdc.Create(&request.CreateRequest{ -// MilvusConnectParam: model.MilvusConnectParam{ -// Host: "localhost", -// Port: 19530, -// }, -// CollectionInfos: []model.CollectionInfo{ -// { -// Name: "col2", -// }, -// }, -// }) -// assertion.Error(err) -// }) -// }) -//} -// -//func TestDeleteRequest(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// mockEtcdCli.On("EtcdEndpoints").Return(endpoints) -// mockEtcdCli.On("Status", mock.Anything, endpoints[0]).Return(&clientv3.StatusResponse{}, nil) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { -// return mockEtcdCli, nil -// }, func() { -// cdc := NewMetaCDC(serverConfig) -// assertion := assert.New(t) -// var ( -// resp *request.DeleteResponse -// err error -// ) -// -// t.Run("no task", func(t *testing.T) { -// resp, err = cdc.Delete(&request.DeleteRequest{TaskID: "foo"}) -// assertion.Nil(resp) -// assertion.Error(err) -// }) -// -// stopFunc := MockMilvusServer(t) -// defer stopFunc() -// -// t.Run("success", func(t *testing.T) { -// factoryMock := server_mocks.NewCDCFactory(t) -// cdc.factoryCreator = func(_ NewReaderFunc, _ NewWriterFunc) CDCFactory { -// return factoryMock -// } -// info := &meta.TaskInfo{ -// CollectionInfos: []model.CollectionInfo{ -// {Name: collectionName}, -// }, -// MilvusConnectParam: model.MilvusConnectParam{ -// Host: "localhost", -// Port: 19530, -// }, -// State: meta.TaskStateInitial, -// } -// infoByte, _ := json.Marshal(info) -// call1 := mockEtcdCli.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Value: infoByte, -// }, -// }, -// }, nil) -// defer call1.Unset() -// call2 := mockEtcdCli.On("Put", mock.Anything, mock.Anything, mock.Anything).Return(&clientv3.PutResponse{}, nil) -// defer call2.Unset() -// factoryMock.On("NewReader").Return(&reader.DefaultReader{}, nil) -// factoryMock.On("NewWriter").Return(&writer.DefaultWriter{}, nil) -// -// createResp, err := cdc.Create(createRequest) -// assertion.NotEmpty(createResp.TaskID) -// assertion.NoError(err) -// -// _, err = cdc.Create(starRequest) -// assertion.NoError(err) -// -// call3 := mockEtcdCli.On("Txn", mock.Anything).Return(&MockTxn{err: errors.New("txn error")}) -// _, err = cdc.Delete(&request.DeleteRequest{TaskID: createResp.TaskID}) -// assertion.Error(err) -// call3.Unset() -// -// call3 = mockEtcdCli.On("Txn", mock.Anything).Return(&MockTxn{}) -// defer call3.Unset() -// resp, err = cdc.Delete(&request.DeleteRequest{TaskID: createResp.TaskID}) -// assertion.NotNil(resp) -// assertion.NoError(err) -// -// _, err = cdc.Create(&request.CreateRequest{ -// MilvusConnectParam: model.MilvusConnectParam{ -// Host: "localhost", -// Port: 19530, -// }, -// CollectionInfos: []model.CollectionInfo{ -// { -// Name: "col2", -// }, -// }, -// }) -// assertion.Error(err) -// -// _, err = cdc.Create(createRequest) -// assertion.NoError(err) -// }) -// }) -//} -// -//type MockEmptyReader struct { -// reader.DefaultReader -//} -// -//func (m MockEmptyReader) StartRead(_ context.Context) <-chan *coremodel.CDCData { -// return make(<-chan *coremodel.CDCData) -//} -// -//func TestPauseResumeRequest(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// mockEtcdCli.On("EtcdEndpoints").Return(endpoints) -// mockEtcdCli.On("Status", mock.Anything, endpoints[0]).Return(&clientv3.StatusResponse{}, nil) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// util.MockEtcdClient(func(cfg clientv3.Config) (util.KVApi, error) { -// return mockEtcdCli, nil -// }, func() { -// cdc := NewMetaCDC(serverConfig) -// assertion := assert.New(t) -// -// t.Run("pause no task", func(t *testing.T) { -// resp, err := cdc.Pause(&request.PauseRequest{TaskID: "foo"}) -// assertion.Nil(resp) -// assertion.Error(err) -// }) -// -// t.Run("resume no task", func(t *testing.T) { -// resp, err := cdc.Resume(&request.ResumeRequest{TaskID: "foo"}) -// assertion.Nil(resp) -// assertion.Error(err) -// }) -// -// stopFunc := MockMilvusServer(t) -// defer stopFunc() -// -// t.Run("success", func(t *testing.T) { -// factoryMock := server_mocks.NewCDCFactory(t) -// cdc.factoryCreator = func(_ NewReaderFunc, _ NewWriterFunc) CDCFactory { -// return factoryMock -// } -// info := &meta.TaskInfo{ -// State: meta.TaskStateInitial, -// } -// infoByte, _ := json.Marshal(info) -// call2 := mockEtcdCli.On("Put", mock.Anything, mock.Anything, mock.Anything).Return(&clientv3.PutResponse{}, nil) -// defer call2.Unset() -// factoryMock.On("NewReader").Return(&MockEmptyReader{}, nil) -// factoryMock.On("NewWriter").Return(&writer.DefaultWriter{}, nil) -// call1 := mockEtcdCli.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Value: infoByte, -// }, -// }, -// }, nil) -// createResp, err := cdc.Create(createRequest) -// call1.Unset() -// assertion.NotEmpty(createResp.TaskID) -// assertion.NoError(err) -// -// info = &meta.TaskInfo{ -// State: meta.TaskStateRunning, -// } -// infoByte, _ = json.Marshal(info) -// call1 = mockEtcdCli.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Value: infoByte, -// }, -// }, -// }, nil) -// pauseResp, err := cdc.Pause(&request.PauseRequest{TaskID: createResp.TaskID}) -// call1.Unset() -// assertion.NotNil(pauseResp) -// assertion.NoError(err) -// -// info = &meta.TaskInfo{ -// State: meta.TaskStatePaused, -// } -// infoByte, _ = json.Marshal(info) -// call1 = mockEtcdCli.On("Get", mock.Anything, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Value: infoByte, -// }, -// }, -// }, nil) -// resumeResp, err := cdc.Resume(&request.ResumeRequest{TaskID: createResp.TaskID}) -// call1.Unset() -// assertion.NotNil(resumeResp) -// assertion.NoError(err) -// }) -// -// }) -//} -// -//func TestGetShouldReadFunc(t *testing.T) { -// t.Run("base", func(t *testing.T) { -// f := GetShouldReadFunc(&meta.TaskInfo{ -// CollectionInfos: []model.CollectionInfo{ -// {Name: "foo1"}, -// {Name: "foo2"}, -// }, -// }) -// assert.True(t, f(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "foo1"}})) -// assert.True(t, f(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "foo2"}})) -// assert.False(t, f(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "foo"}})) -// }) -// -// t.Run("star", func(t *testing.T) { -// f := GetShouldReadFunc(&meta.TaskInfo{ -// CollectionInfos: []model.CollectionInfo{ -// {Name: "*"}, -// }, -// }) -// assert.True(t, f(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "foo1"}})) -// assert.True(t, f(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "foo2"}})) -// assert.True(t, f(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "foo"}})) -// }) -// -// t.Run("mix star", func(t *testing.T) { -// f := GetShouldReadFunc(&meta.TaskInfo{ -// CollectionInfos: []model.CollectionInfo{ -// {Name: "*"}, -// }, -// ExcludeCollections: []string{"foo1", "foo2"}, -// }) -// assert.False(t, f(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "foo1"}})) -// assert.False(t, f(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "foo2"}})) -// assert.True(t, f(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{Name: "foo"}})) -// }) -//} +func TestList(t *testing.T) { + t.Run("err", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + metaCDC.metaStoreFactory = factory + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("test")).Once() + + _, err := metaCDC.List(&request.ListRequest{}) + assert.Error(t, err) + }) + + t.Run("ok", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + metaCDC.metaStoreFactory = factory + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{ + { + TaskID: "1", + State: meta.TaskStateInitial, + }, + }, nil) + + resp, err := metaCDC.List(&request.ListRequest{}) + assert.NoError(t, err) + assert.Len(t, resp.Tasks, 1) + assert.Equal(t, "1", resp.Tasks[0].TaskID) + }) +} + +func TestTaskPosition(t *testing.T) { + t.Run("err", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskCollectionPosition](t) + metaCDC.metaStoreFactory = factory + + factory.EXPECT().GetTaskCollectionPositionMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("test")).Once() + + _, err := metaCDC.GetPosition(&request.GetPositionRequest{}) + assert.Error(t, err) + }) + + t.Run("ok", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskCollectionPosition](t) + metaCDC.metaStoreFactory = factory + + factory.EXPECT().GetTaskCollectionPositionMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskCollectionPosition{ + { + TaskID: "1", + Positions: map[string]*meta.PositionInfo{ + "ch1": { + Time: 1, + DataPair: &commonpb.KeyDataPair{ + Key: "ch1-position", + Data: []byte("ch1-position"), + }, + }, + }, + OpPositions: map[string]*meta.PositionInfo{ + "ch1": { + Time: 1, + DataPair: &commonpb.KeyDataPair{ + Key: "ch1", + Data: []byte("ch2-position"), + }, + }, + }, + TargetPositions: map[string]*meta.PositionInfo{ + "ch1-tar": { + Time: 1, + DataPair: &commonpb.KeyDataPair{ + Key: "ch1-tar", + Data: []byte("ch3-position"), + }, + }, + }, + }, + }, nil) + + resp, err := metaCDC.GetPosition(&request.GetPositionRequest{}) + assert.NoError(t, err) + assert.Len(t, resp.Positions, 1) + assert.Equal(t, "ch1", resp.Positions[0].ChannelName) + assert.EqualValues(t, 1, resp.Positions[0].Time) + assert.Equal(t, base64.StdEncoding.EncodeToString([]byte("ch1-position")), resp.Positions[0].MsgID) + + assert.Len(t, resp.OpPositions, 1) + assert.Equal(t, "ch1", resp.OpPositions[0].ChannelName) + assert.EqualValues(t, 1, resp.OpPositions[0].Time) + assert.Equal(t, base64.StdEncoding.EncodeToString([]byte("ch2-position")), resp.OpPositions[0].MsgID) + + assert.Len(t, resp.TargetPositions, 1) + assert.Equal(t, "ch1-tar", resp.TargetPositions[0].ChannelName) + assert.EqualValues(t, 1, resp.TargetPositions[0].Time) + assert.Equal(t, base64.StdEncoding.EncodeToString([]byte("ch3-position")), resp.TargetPositions[0].MsgID) + }) +} + +func TestGet(t *testing.T) { + t.Run("err", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + metaCDC.metaStoreFactory = factory + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("test")).Once() + + _, err := metaCDC.Get(&request.GetRequest{}) + assert.Error(t, err) + }) + + t.Run("not found", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + metaCDC.metaStoreFactory = factory + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{}, nil).Once() + + _, err := metaCDC.Get(&request.GetRequest{}) + assert.Error(t, err) + }) + + t.Run("ok", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + metaCDC.metaStoreFactory = factory + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{ + { + TaskID: "1", + State: meta.TaskStateInitial, + }, + }, nil) + + resp, err := metaCDC.Get(&request.GetRequest{}) + assert.NoError(t, err) + assert.Equal(t, "1", resp.TaskID) + }) +} + +func TestResume(t *testing.T) { + t.Run("not found task", func(t *testing.T) { + metaCDC := &MetaCDC{} + initMetaCDCMap(metaCDC) + + _, err := metaCDC.Resume(&request.ResumeRequest{TaskID: "1"}) + assert.Error(t, err) + }) + + t.Run("fail no mq config", func(t *testing.T) { + metaCDC := &MetaCDC{ + config: &CDCServerConfig{ + MaxTaskNum: 10, + SourceConfig: MilvusSourceConfig{ + ReadChanLen: 10, + EtcdAddress: []string{"localhost:2379"}, + EtcdRootPath: "source-cdc-test", + EtcdMetaSubPath: "meta", + DefaultPartitionName: "_default", + }, + }, + } + initMetaCDCMap(metaCDC) + defer ClearEtcdData("source-cdc-test") + + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + positionStore := mocks.NewMetaStore[*meta.TaskCollectionPosition](t) + mqFactoryCreator := coremocks.NewFactoryCreator(t) + + metaCDC.mqFactoryCreator = mqFactoryCreator + metaCDC.metaStoreFactory = factory + _, closeFunc := NewMockMilvus(t) + defer closeFunc() + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Maybe() + factory.EXPECT().GetTaskCollectionPositionMetaStore(mock.Anything).Return(positionStore).Maybe() + { + metaCDC.cdcTasks.Lock() + metaCDC.cdcTasks.data["1"] = &meta.TaskInfo{ + TaskID: "1", + State: meta.TaskStatePaused, + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 50051, + ConnectTimeout: 5, + }, + CollectionInfos: []model.CollectionInfo{ + { + Name: "*", + }, + }, + RPCRequestChannelInfo: model.ChannelInfo{ + Name: "foo", + }, + WriterCacheConfig: model.BufferConfig{ + Period: 10, + Size: 10, + }, + } + metaCDC.cdcTasks.Unlock() + _, err := metaCDC.Resume(&request.ResumeRequest{TaskID: "1"}) + assert.Error(t, err) + } + }) + + t.Run("success", func(t *testing.T) { + metaCDC := &MetaCDC{ + config: &CDCServerConfig{ + MaxTaskNum: 10, + SourceConfig: MilvusSourceConfig{ + ReadChanLen: 10, + Pulsar: config.PulsarConfig{ + Address: "localhost:6650", + }, + EtcdAddress: []string{"localhost:2379"}, + EtcdRootPath: "source-cdc-test", + EtcdMetaSubPath: "meta", + DefaultPartitionName: "_default", + }, + }, + } + initMetaCDCMap(metaCDC) + defer ClearEtcdData("source-cdc-test") + + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + positionStore := mocks.NewMetaStore[*meta.TaskCollectionPosition](t) + mqFactoryCreator := coremocks.NewFactoryCreator(t) + mqFactory := msgstream.NewMockFactory(t) + mq := msgstream.NewMockMsgStream(t) + + metaCDC.mqFactoryCreator = mqFactoryCreator + metaCDC.metaStoreFactory = factory + _, closeFunc := NewMockMilvus(t) + defer closeFunc() + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Maybe() + factory.EXPECT().GetTaskCollectionPositionMetaStore(mock.Anything).Return(positionStore).Maybe() + // // check the task num + // store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{}, nil).Once() + // // save position + // positionStore.EXPECT().Put(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // // save task meta + // store.EXPECT().Put(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // new channel manager + mqFactoryCreator.EXPECT().NewPmsFactory(mock.Anything).Return(mqFactory) + // get task position + positionStore.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskCollectionPosition{ + { + Positions: map[string]*meta.PositionInfo{ + "ch1": { + Time: 1, + DataPair: &commonpb.KeyDataPair{ + Key: "ch1", + Data: []byte("ch1-position"), + }, + }, + }, + }, + }, nil).Once() + // new channel reader + mqFactory.EXPECT().NewMsgStream(mock.Anything).Return(mq, nil).Once() + mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // start read channel + streamChan := make(chan *msgstream.MsgPack) + mq.EXPECT().Chan().Return(streamChan) + // update state + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{ + { + TaskID: "1", + State: meta.TaskStateInitial, + }, + }, nil).Once() + store.EXPECT().Put(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + + { + metaCDC.cdcTasks.Lock() + metaCDC.cdcTasks.data["1"] = &meta.TaskInfo{ + TaskID: "1", + State: meta.TaskStatePaused, + MilvusConnectParam: model.MilvusConnectParam{ + Host: "localhost", + Port: 50051, + ConnectTimeout: 5, + }, + CollectionInfos: []model.CollectionInfo{ + { + Name: "*", + }, + }, + RPCRequestChannelInfo: model.ChannelInfo{ + Name: "foo", + }, + WriterCacheConfig: model.BufferConfig{ + Period: 10, + Size: 10, + }, + } + metaCDC.cdcTasks.Unlock() + _, err := metaCDC.Resume(&request.ResumeRequest{TaskID: "1"}) + assert.NoError(t, err) + } + }) +} + +func TestPause(t *testing.T) { + t.Run("not found task", func(t *testing.T) { + metaCDC := &MetaCDC{} + initMetaCDCMap(metaCDC) + + _, err := metaCDC.Pause(&request.PauseRequest{TaskID: "1"}) + assert.Error(t, err) + }) + + t.Run("fail to update state", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + metaCDC.metaStoreFactory = factory + + initMetaCDCMap(metaCDC) + metaCDC.cdcTasks.Lock() + metaCDC.cdcTasks.data["1"] = &meta.TaskInfo{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "127.0.0.1", + Port: 6666, + }, + } + metaCDC.cdcTasks.Unlock() + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{}, nil) + + _, err := metaCDC.Pause(&request.PauseRequest{TaskID: "1"}) + assert.Error(t, err) + }) + + t.Run("ok", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + store := mocks.NewMetaStore[*meta.TaskInfo](t) + metaCDC.metaStoreFactory = factory + + initMetaCDCMap(metaCDC) + metaCDC.cdcTasks.Lock() + metaCDC.cdcTasks.data["1"] = &meta.TaskInfo{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "127.0.0.1", + Port: 6666, + }, + } + metaCDC.cdcTasks.Unlock() + + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(store).Once() + store.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{ + { + TaskID: "1", + State: meta.TaskStateRunning, + }, + }, nil) + store.EXPECT().Put(mock.Anything, mock.Anything, mock.Anything).Return(nil) + + _, err := metaCDC.Pause(&request.PauseRequest{TaskID: "1"}) + assert.NoError(t, err) + }) +} + +func TestDelete(t *testing.T) { + t.Run("not found task", func(t *testing.T) { + metaCDC := &MetaCDC{} + initMetaCDCMap(metaCDC) + + _, err := metaCDC.Delete(&request.DeleteRequest{TaskID: "1"}) + assert.Error(t, err) + }) + + t.Run("fail to delete task", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + metaStore := mocks.NewMetaStore[*meta.TaskInfo](t) + positionStore := mocks.NewMetaStore[*meta.TaskCollectionPosition](t) + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(metaStore).Maybe() + factory.EXPECT().GetTaskCollectionPositionMetaStore(mock.Anything).Return(positionStore).Maybe() + metaCDC.metaStoreFactory = factory + + initMetaCDCMap(metaCDC) + metaCDC.cdcTasks.Lock() + metaCDC.cdcTasks.data["1"] = &meta.TaskInfo{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "127.0.0.1", + Port: 6666, + }, + } + metaCDC.cdcTasks.Unlock() + + metaStore.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Once() + _, err := metaCDC.Delete(&request.DeleteRequest{TaskID: "1"}) + assert.Error(t, err) + }) + + t.Run("ok", func(t *testing.T) { + metaCDC := &MetaCDC{} + factory := mocks.NewMetaStoreFactory(t) + metaStore := mocks.NewMetaStore[*meta.TaskInfo](t) + positionStore := mocks.NewMetaStore[*meta.TaskCollectionPosition](t) + factory.EXPECT().GetTaskInfoMetaStore(mock.Anything).Return(metaStore).Maybe() + factory.EXPECT().GetTaskCollectionPositionMetaStore(mock.Anything).Return(positionStore).Maybe() + metaCDC.metaStoreFactory = factory + + initMetaCDCMap(metaCDC) + metaCDC.cdcTasks.Lock() + metaCDC.cdcTasks.data["1"] = &meta.TaskInfo{ + MilvusConnectParam: model.MilvusConnectParam{ + Host: "127.0.0.1", + Port: 6666, + }, + } + metaCDC.cdcTasks.Unlock() + metaCDC.replicateEntityMap.Lock() + metaCDC.replicateEntityMap.data["127.0.0.1:6666"] = &ReplicateEntity{ + quitFunc: func() {}, + } + metaCDC.replicateEntityMap.Unlock() + + metaStore.EXPECT().Get(mock.Anything, mock.Anything, mock.Anything).Return([]*meta.TaskInfo{ + { + TaskID: "1", + MilvusConnectParam: model.MilvusConnectParam{ + Host: "127.0.0.1", + Port: 6666, + }, + CollectionInfos: []model.CollectionInfo{ + {Name: "*"}, + }, + }, + }, nil).Once() + factory.EXPECT().Txn(mock.Anything).Return(nil, func(err error) error { + return nil + }, nil).Once() + metaStore.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + positionStore.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + + _, err := metaCDC.Delete(&request.DeleteRequest{TaskID: "1"}) + assert.NoError(t, err) + }) +} diff --git a/server/core_test.go b/server/core_test.go deleted file mode 100644 index de7a0f32..00000000 --- a/server/core_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package server - -import ( - "os" - "sync" - "testing" - "time" - - "github.com/milvus-io/milvus/pkg/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - "sigs.k8s.io/yaml" -) - -func TestID(t *testing.T) { - a := make(map[int]int) - for i := 0; i < 10; i++ { - a[i] = i - } - wg := sync.WaitGroup{} - for j := range a { - wg.Add(1) - j := j - go func() { - defer wg.Done() - time.Sleep(time.Millisecond * 100) - println(j) - }() - } - wg.Wait() -} - -func TestYaml(t *testing.T) { - //file, err := os.Open(filepath.Clean("/Users/derek/fubang/milvus-cdc/server/configs/cdc.yaml")) - //if err != nil { - // t.Fail() - //} - //scanner := bufio.NewScanner(file) - //fileContent := "" - //for scanner.Scan() { - // fileContent += scanner.Text() + "\n" - //} - //if err := scanner.Err(); err != nil { - // t.Fail() - //} - - fileContent, _ := os.ReadFile("/Users/derek/fubang/milvus-cdc/server/configs/cdc2.yaml") - println(string(fileContent)) - var p2 CDCServerConfig - err := yaml.Unmarshal(fileContent, &p2) - assert.NoError(t, err) - log.Info("cdc config", zap.Any("config", p2)) -} diff --git a/server/meta_test.go b/server/meta_test.go deleted file mode 100644 index 9585c567..00000000 --- a/server/meta_test.go +++ /dev/null @@ -1,521 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -// -//import ( -// "testing" -// -// "github.com/cockroachdb/errors" -// "github.com/goccy/go-json" -// "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" -// "github.com/stretchr/testify/assert" -// "github.com/stretchr/testify/mock" -// "github.com/zilliztech/milvus-cdc/core/mocks" -// "github.com/zilliztech/milvus-cdc/core/util" -// "github.com/zilliztech/milvus-cdc/server/model/meta" -// "go.etcd.io/etcd/api/v3/mvccpb" -// clientv3 "go.etcd.io/etcd/client/v3" -//) -// -//func TestGetTaskInfo(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// rootPath := "/tasks" -// taskID := "123" -// key := getTaskInfoKey(rootPath, taskID) -// info := &meta.TaskInfo{ -// TaskID: taskID, -// } -// value, _ := json.Marshal(info) -// t.Run("success", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(key), -// Value: value, -// }, -// }, -// }, nil) -// defer call.Unset() -// -// got, err := getTaskInfo(mockEtcdCli, rootPath, taskID) -// -// assert.NoError(t, err) -// assert.Equal(t, info, got) -// }) -// -// t.Run("etcd error", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key).Return(nil, errors.New("etcd error")) -// defer call.Unset() -// -// got, err := getTaskInfo(mockEtcdCli, rootPath, taskID) -// -// assert.Nil(t, got) -// assert.Error(t, err) -// }) -// -// t.Run("not found error", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{}, -// }, nil) -// defer call.Unset() -// -// got, err := getTaskInfo(mockEtcdCli, rootPath, taskID) -// -// assert.Nil(t, got) -// assert.Error(t, err) -// assert.True(t, errors.Is(err, NotFoundErr)) -// }) -// -// t.Run("json unmarshal error", func(t *testing.T) { -// invalidValue := []byte(`"task_id": 123`) // task_id should be a string -// call := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(key), -// Value: invalidValue, -// }, -// }, -// }, nil) -// defer call.Unset() -// -// got, err := getTaskInfo(mockEtcdCli, rootPath, taskID) -// -// assert.Nil(t, got) -// assert.Error(t, err) -// }) -//} -// -//func TestGetAllTaskInfo(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// rootPath := "/tasks" -// key := getTaskInfoPrefix(rootPath) -// taskID1 := "123" -// info1 := &meta.TaskInfo{ -// TaskID: taskID1, -// } -// value1, _ := json.Marshal(info1) -// -// taskID2 := "456" -// info2 := &meta.TaskInfo{ -// TaskID: taskID2, -// } -// value2, _ := json.Marshal(info2) -// t.Run("success", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(getTaskInfoKey(rootPath, taskID1)), -// Value: value1, -// }, -// { -// Key: []byte(getTaskInfoKey(rootPath, taskID2)), -// Value: value2, -// }, -// }, -// }, nil) -// defer call.Unset() -// -// got, err := getAllTaskInfo(mockEtcdCli, rootPath) -// -// assert.NoError(t, err) -// assert.Equal(t, info1, got[0]) -// assert.Equal(t, info2, got[1]) -// }) -// -// t.Run("etcd error", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key, mock.Anything).Return(nil, errors.New("etcd error")) -// defer call.Unset() -// -// got, err := getAllTaskInfo(mockEtcdCli, rootPath) -// -// assert.Nil(t, got) -// assert.Error(t, err) -// }) -// -// t.Run("not found error", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{}, -// }, nil) -// defer call.Unset() -// -// got, err := getAllTaskInfo(mockEtcdCli, rootPath) -// -// assert.Nil(t, got) -// assert.Error(t, err) -// assert.True(t, errors.Is(err, NotFoundErr)) -// }) -// -// t.Run("json unmarshal error", func(t *testing.T) { -// invalidValue := []byte(`"task_id": 123`) // task_id should be a string -// call := mockEtcdCli.On("Get", mock.Anything, key, mock.Anything).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(getTaskInfoKey(rootPath, taskID1)), -// Value: value1, -// }, -// { -// Key: []byte(getTaskInfoKey(rootPath, taskID2)), -// Value: invalidValue, -// }, -// }, -// }, nil) -// defer call.Unset() -// -// got, err := getAllTaskInfo(mockEtcdCli, rootPath) -// -// assert.Nil(t, got) -// assert.Error(t, err) -// }) -//} -// -//func TestUpdateTaskState(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// rootPath := "/tasks" -// taskID := "123" -// key := getTaskInfoKey(rootPath, taskID) -// info := &meta.TaskInfo{ -// TaskID: taskID, -// State: meta.TaskStateInitial, -// } -// value, _ := json.Marshal(info) -// t.Run("success", func(t *testing.T) { -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(key), -// Value: value, -// }, -// }, -// }, nil) -// defer call1.Unset() -// call2 := mockEtcdCli.On("Put", mock.Anything, key, mock.Anything).Return(&clientv3.PutResponse{}, nil) -// defer call2.Unset() -// -// err := updateTaskState(mockEtcdCli, rootPath, taskID, meta.TaskStateRunning, []meta.TaskState{meta.TaskStateInitial}) -// assert.NoError(t, err) -// -// err = updateTaskState(mockEtcdCli, rootPath, taskID, meta.TaskStateRunning, []meta.TaskState{meta.TaskStatePaused}) -// assert.Error(t, err) -// }) -// -// t.Run("get error", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key).Return(nil, errors.New("etcd error")) -// defer call.Unset() -// err := updateTaskState(mockEtcdCli, rootPath, taskID, meta.TaskStateRunning, []meta.TaskState{meta.TaskStateInitial}) -// assert.Error(t, err) -// }) -// -// t.Run("etcd error", func(t *testing.T) { -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(key), -// Value: value, -// }, -// }, -// }, nil) -// defer call1.Unset() -// call2 := mockEtcdCli.On("Put", mock.Anything, key, mock.Anything).Return(nil, errors.New("etcd error")) -// defer call2.Unset() -// -// err := updateTaskState(mockEtcdCli, rootPath, taskID, meta.TaskStateRunning, []meta.TaskState{meta.TaskStateInitial}) -// assert.Error(t, err) -// }) -//} -// -//func TestUpdateTaskFailedReason(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// rootPath := "/tasks" -// taskID := "123" -// key := getTaskInfoKey(rootPath, taskID) -// info := &meta.TaskInfo{ -// TaskID: taskID, -// } -// value, _ := json.Marshal(info) -// t.Run("success", func(t *testing.T) { -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(key), -// Value: value, -// }, -// }, -// }, nil) -// defer call1.Unset() -// call2 := mockEtcdCli.On("Put", mock.Anything, key, mock.Anything).Return(&clientv3.PutResponse{}, nil) -// defer call2.Unset() -// -// err := updateTaskFailedReason(mockEtcdCli, rootPath, taskID, "fail reason") -// assert.NoError(t, err) -// }) -// -// t.Run("get error", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key).Return(nil, errors.New("etcd error")) -// defer call.Unset() -// err := updateTaskState(mockEtcdCli, rootPath, taskID, meta.TaskStateRunning, []meta.TaskState{meta.TaskStateInitial}) -// assert.Error(t, err) -// }) -// -// t.Run("etcd error", func(t *testing.T) { -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(key), -// Value: value, -// }, -// }, -// }, nil) -// defer call1.Unset() -// call2 := mockEtcdCli.On("Put", mock.Anything, key, mock.Anything).Return(nil, errors.New("etcd error")) -// defer call2.Unset() -// -// err := updateTaskFailedReason(mockEtcdCli, rootPath, taskID, "fail reason") -// assert.Error(t, err) -// }) -//} -// -//func TestUpdateTaskCollectionPosition(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// rootPath := "/tasks" -// taskID := "123" -// collectionName := "col1" -// var collectionID int64 = 1000 -// key := getTaskCollectionPositionKey(rootPath, taskID, collectionID) -// info := &meta.TaskCollectionPosition{ -// TaskID: taskID, -// CollectionName: collectionName, -// } -// value, _ := json.Marshal(info) -// t.Run("success-no-data", func(t *testing.T) { -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{}, -// }, nil) -// defer call1.Unset() -// call2 := mockEtcdCli.On("Put", mock.Anything, key, mock.Anything).Return(&clientv3.PutResponse{}, nil) -// defer call2.Unset() -// -// err := updateTaskCollectionPosition(mockEtcdCli, rootPath, taskID, collectionID, collectionName, "ch1", &commonpb.KeyDataPair{}) -// assert.NoError(t, err) -// }) -// -// t.Run("success-empty-data", func(t *testing.T) { -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(key), -// Value: value, -// }, -// }, -// }, nil) -// defer call1.Unset() -// pName := "ch1" -// kd := &commonpb.KeyDataPair{Key: "xxx", Data: []byte(`"task_id": 123`)} -// infoTest := &meta.TaskCollectionPosition{ -// TaskID: taskID, -// CollectionName: collectionName, -// Positions: map[string]*commonpb.KeyDataPair{ -// pName: kd, -// }, -// } -// infoByte, _ := json.Marshal(infoTest) -// call2 := mockEtcdCli.On("Put", mock.Anything, key, util.ToString(infoByte)).Return(&clientv3.PutResponse{}, nil).Once() -// defer call2.Unset() -// -// err := updateTaskCollectionPosition(mockEtcdCli, rootPath, taskID, collectionID, collectionName, pName, kd) -// assert.NoError(t, err) -// }) -// -// t.Run("success-a-data", func(t *testing.T) { -// pName := "ch1" -// kd := &commonpb.KeyDataPair{Key: "xxx", Data: []byte(`"task_id": 123`)} -// info := &meta.TaskCollectionPosition{ -// TaskID: taskID, -// CollectionName: collectionName, -// Positions: map[string]*commonpb.KeyDataPair{ -// pName: kd, -// }, -// } -// value, _ := json.Marshal(info) -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(key), -// Value: value, -// }, -// }, -// }, nil) -// defer call1.Unset() -// -// kd2 := &commonpb.KeyDataPair{Key: "xxx", Data: []byte(`"task_id": 345`)} -// infoTest := &meta.TaskCollectionPosition{ -// TaskID: taskID, -// CollectionName: collectionName, -// Positions: map[string]*commonpb.KeyDataPair{ -// pName: kd2, -// }, -// } -// infoByte, _ := json.Marshal(infoTest) -// call2 := mockEtcdCli.On("Put", mock.Anything, key, util.ToString(infoByte)).Return(&clientv3.PutResponse{}, nil) -// defer call2.Unset() -// -// err := updateTaskCollectionPosition(mockEtcdCli, rootPath, taskID, collectionID, collectionName, pName, kd2) -// assert.NoError(t, err) -// }) -// -// t.Run("get error", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key).Return(nil, errors.New("etcd error")) -// defer call.Unset() -// pName := "ch1" -// kd := &commonpb.KeyDataPair{Key: "xxx", Data: []byte(`"task_id": 123`)} -// -// err := updateTaskCollectionPosition(mockEtcdCli, rootPath, taskID, collectionID, collectionName, pName, kd) -// assert.Error(t, err) -// }) -// -// t.Run("etcd error", func(t *testing.T) { -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{}, -// }, nil) -// defer call1.Unset() -// call2 := mockEtcdCli.On("Put", mock.Anything, key, mock.Anything).Return(nil, errors.New("etcd error")) -// defer call2.Unset() -// -// err := updateTaskCollectionPosition(mockEtcdCli, rootPath, taskID, collectionID, collectionName, "ch1", &commonpb.KeyDataPair{}) -// assert.Error(t, err) -// }) -//} -// -//func TestDeleteTaskCollectionPosition(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// rootPath := "/tasks" -// taskID := "123" -// var collectionID int64 = 1000 -// key := getTaskCollectionPositionKey(rootPath, taskID, collectionID) -// -// t.Run("success", func(t *testing.T) { -// call := mockEtcdCli.On("Delete", mock.Anything, key).Return(&clientv3.DeleteResponse{}, nil) -// defer call.Unset() -// -// err := deleteTaskCollectionPosition(mockEtcdCli, rootPath, taskID, collectionID) -// assert.NoError(t, err) -// }) -// -// t.Run("etcd error", func(t *testing.T) { -// call := mockEtcdCli.On("Delete", mock.Anything, key).Return(nil, errors.New("etcd error")) -// defer call.Unset() -// -// err := deleteTaskCollectionPosition(mockEtcdCli, rootPath, taskID, collectionID) -// assert.Error(t, err) -// }) -//} -// -//type MockTxn struct { -// err error -//} -// -//func (m *MockTxn) If(cs ...clientv3.Cmp) clientv3.Txn { -// return m -//} -// -//func (m *MockTxn) Then(ops ...clientv3.Op) clientv3.Txn { -// return m -//} -// -//func (m *MockTxn) Else(ops ...clientv3.Op) clientv3.Txn { -// return m -//} -// -//func (m *MockTxn) Commit() (*clientv3.TxnResponse, error) { -// return &clientv3.TxnResponse{}, m.err -//} -// -//func TestDeleteTask(t *testing.T) { -// mockEtcdCli := mocks.NewKVApi(t) -// util.EtcdOpRetryTime = 1 -// defer func() { -// util.EtcdOpRetryTime = 5 -// }() -// -// rootPath := "/tasks" -// taskID := "123" -// key := getTaskInfoKey(rootPath, taskID) -// t.Run("success", func(t *testing.T) { -// info := &meta.TaskInfo{ -// TaskID: taskID, -// } -// value, _ := json.Marshal(info) -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(&clientv3.GetResponse{ -// Kvs: []*mvccpb.KeyValue{ -// { -// Key: []byte(key), -// Value: value, -// }, -// }, -// }, nil) -// defer call1.Unset() -// call2 := mockEtcdCli.On("Txn", mock.Anything).Return(&MockTxn{}) -// defer call2.Unset() -// -// returnInfo, err := deleteTask(mockEtcdCli, rootPath, taskID) -// assert.Equal(t, taskID, returnInfo.TaskID) -// assert.NoError(t, err) -// }) -// -// t.Run("get error", func(t *testing.T) { -// call := mockEtcdCli.On("Get", mock.Anything, key).Return(nil, errors.New("etcd error")) -// defer call.Unset() -// -// _, err := deleteTask(mockEtcdCli, rootPath, taskID) -// assert.Error(t, err) -// }) -// -// t.Run("etcd error", func(t *testing.T) { -// call1 := mockEtcdCli.On("Get", mock.Anything, key).Return(nil, errors.New("etcd error")) -// defer call1.Unset() -// call2 := mockEtcdCli.On("Txn", mock.Anything).Return(&MockTxn{}) -// defer call2.Unset() -// -// _, err := deleteTask(mockEtcdCli, rootPath, taskID) -// assert.Error(t, err) -// }) -//} diff --git a/server/var.go b/server/var.go deleted file mode 100644 index 4c782792..00000000 --- a/server/var.go +++ /dev/null @@ -1,25 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -// import ( -// "github.com/zilliztech/milvus-cdc/core/util" -// ) -// -// var ( -// log = util.Log -// )