Skip to content

Commit

Permalink
Update the grafana json and Add some tool scripts
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Jan 12, 2024
1 parent 88dfac1 commit 2750b22
Show file tree
Hide file tree
Showing 16 changed files with 721 additions and 50 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ run:
skip-dirs:
- pb
- mocks
- tool

linters:
disable-all: true
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ lint-fix:
@echo "Running gofumpt fix"
@gofumpt -l -w ./
@echo "Running gci fix"
@gci write ./ --skip-generated -s standard -s default -s "prefix(github.com/zilliztech)" --custom-order
@gci write ./ -s standard -s default -s "prefix(github.com/zilliztech)" --custom-order --skip-generated

static-check:
@echo "Running go-lint check:"
Expand Down
5 changes: 4 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ ignore:
- "examples/*"
- "tests/*"
- "**/mocks/*"
- "*_gen.go"
- "*_gen.go"
- "server/main/*"
- "server/tool/*"
- "server/configs/*"
3 changes: 2 additions & 1 deletion core/reader/channel_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ var _ api.Reader = (*ChannelReader)(nil)
func NewChannelReader(channelName, seekPosition string,
mqConfig config.MQConfig,
dataHandler func(context.Context, *msgstream.MsgPack) bool,
creator FactoryCreator) (api.Reader, error) {
creator FactoryCreator,
) (api.Reader, error) {
channelReader := &ChannelReader{
factoryCreator: creator,
channelName: channelName,
Expand Down
1 change: 0 additions & 1 deletion core/reader/etcd_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ func (e *EtcdOp) internalGetAllPartition(ctx context.Context, filters []api.Part
existedPartitionInfos = append(existedPartitionInfos, info)
}
return existedPartitionInfos, nil

}

func (e *EtcdOp) GetAllPartition(ctx context.Context, filter api.PartitionFilter) ([]*pb.PartitionInfo, error) {
Expand Down
7 changes: 3 additions & 4 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,9 @@ func (r *replicateChannelManager) stopReadChannel(pChannelName string, collectio
return
}
channelHandler.RemoveCollection(collectionID)
//if channelHandler.IsEmpty() {
// if channelHandler.IsEmpty() {
// channelHandler.Close()
//}
// }
}

type replicateChannelHandler struct {
Expand Down Expand Up @@ -747,15 +747,14 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
Msgs: make([]msgstream.TsMsg, 0),
}
beginTS := pack.BeginTs
endTS := pack.EndTs

minTS := GetTSManager().GetMinTS(r.pChannelName)
if minTS == 0 {
r.sendErrEvent(errors.Newf("fail to get channel ts, channel: %s", r.pChannelName))
log.Warn("fail to get channel ts", zap.String("channel", r.pChannelName))
return nil
}
endTS = minTS
endTS := minTS
if beginTS > endTS {
beginTS = endTS
}
Expand Down
51 changes: 41 additions & 10 deletions core/reader/replicate_channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (

"github.com/zilliztech/milvus-cdc/core/api"
"github.com/zilliztech/milvus-cdc/core/config"
"github.com/zilliztech/milvus-cdc/core/log"
"github.com/zilliztech/milvus-cdc/core/mocks"
"github.com/zilliztech/milvus-cdc/core/model"
"github.com/zilliztech/milvus-cdc/core/pb"
"github.com/zilliztech/milvus-cdc/core/util"
)

func TestNewReplicateChannelManager(t *testing.T) {
Expand Down Expand Up @@ -255,6 +257,9 @@ func TestStartReadCollection(t *testing.T) {
}
err := realManager.AddPartition(context.Background(), &pb.CollectionInfo{
ID: 41,
Schema: &schemapb.CollectionSchema{
Name: "test",
},
}, &pb.PartitionInfo{})
assert.Error(t, err)
}
Expand Down Expand Up @@ -297,6 +302,14 @@ func TestStartReadCollection(t *testing.T) {
})
}

func noRetry(handler *replicateChannelHandler) {
handler.retryOptions = util.GetRetryOptions(config.RetrySettings{
RetryTimes: 1,
MaxBackOff: 1,
InitBackOff: 1,
})
}

func TestReplicateChannelHandler(t *testing.T) {
t.Run("fail to new msg stream", func(t *testing.T) {
factory := msgstream.NewMockFactory(t)
Expand Down Expand Up @@ -337,11 +350,12 @@ func TestReplicateChannelHandler(t *testing.T) {
streamChan := make(chan *msgstream.MsgPack)
close(streamChan)
stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Close().Return().Once()
stream.EXPECT().Close().Return().Maybe()
stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Chan().Return(streamChan).Once()
stream.EXPECT().Chan().Return(streamChan)
handler, err := newReplicateChannelHandler(context.Background(), &model.SourceCollectionInfo{PChannelName: "test_p", SeekPosition: &msgstream.MsgPosition{ChannelName: "test_p", MsgID: []byte("test")}}, &model.TargetCollectionInfo{PChannel: "test_p"}, api.TargetAPI(nil), &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory})
assert.NoError(t, err)
noRetry(handler)
time.Sleep(100 * time.Microsecond)
handler.Close()
})
Expand All @@ -355,9 +369,9 @@ func TestReplicateChannelHandler(t *testing.T) {

factory.EXPECT().NewTtMsgStream(mock.Anything).Return(stream, nil)
stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Close().Return().Once()
stream.EXPECT().Close().Return().Maybe()
stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Chan().Return(streamChan).Once().Maybe()
stream.EXPECT().Chan().Return(streamChan)

handler, err := newReplicateChannelHandler(context.Background(), &model.SourceCollectionInfo{
PChannelName: "test_p",
Expand All @@ -370,6 +384,7 @@ func TestReplicateChannelHandler(t *testing.T) {
CollectionName: "foo",
}, targetClient, &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory})
assert.NoError(t, err)
noRetry(handler)
time.Sleep(100 * time.Millisecond)
assert.True(t, handler.containCollection("foo"))
handler.Close()
Expand All @@ -389,10 +404,10 @@ func TestReplicateChannelHandler(t *testing.T) {

factory.EXPECT().NewTtMsgStream(mock.Anything).Return(stream, nil)
stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Close().Return().Once()
stream.EXPECT().Close().Return().Maybe()
stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Chan().Return(streamChan).Once()
targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once()
stream.EXPECT().Chan().Return(streamChan)
targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error 1")).Once()
targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(&model.CollectionInfo{
Partitions: map[string]int64{
"p1": 10001,
Expand All @@ -406,6 +421,11 @@ func TestReplicateChannelHandler(t *testing.T) {
return newReplicateChannelHandler(context.Background(), &model.SourceCollectionInfo{CollectionID: 1, PChannelName: "test_p", SeekPosition: &msgstream.MsgPosition{ChannelName: "test_p", MsgID: []byte("test")}}, &model.TargetCollectionInfo{CollectionID: 100, CollectionName: "test", PChannel: "test_p"}, api.TargetAPI(targetClient), &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory})
}()
assert.NoError(t, err)
handler.retryOptions = util.GetRetryOptions(config.RetrySettings{
RetryTimes: 3,
MaxBackOff: 1,
InitBackOff: 1,
})
time.Sleep(100 * time.Millisecond)
handler.Close()

Expand All @@ -418,7 +438,7 @@ func TestReplicateChannelHandler(t *testing.T) {
},
})
}()
_ = handler.AddPartitionInfo(&pb.CollectionInfo{
err = handler.AddPartitionInfo(&pb.CollectionInfo{
ID: 2,
Schema: &schemapb.CollectionSchema{
Name: "test2",
Expand All @@ -427,14 +447,15 @@ func TestReplicateChannelHandler(t *testing.T) {
PartitionID: 2001,
PartitionName: "p2",
}, make(chan<- uint64))
assert.NoError(t, err)
time.Sleep(1500 * time.Millisecond)
handler.RemovePartitionInfo(2, "p2", 10002)

assert.False(t, handler.IsEmpty())
assert.NotNil(t, handler.msgPackChan)

// test updateTargetPartitionInfo
targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once()
targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error 2")).Once()
targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(&model.CollectionInfo{
Partitions: map[string]int64{
"p1": 30001,
Expand All @@ -456,14 +477,16 @@ func TestReplicateChannelHandler(t *testing.T) {
})

t.Run("handle msg pack", func(t *testing.T) {
GetTSManager().EmptyTS()

factory := msgstream.NewMockFactory(t)
stream := msgstream.NewMockMsgStream(t)
targetClient := mocks.NewTargetAPI(t)
streamChan := make(chan *msgstream.MsgPack)

factory.EXPECT().NewTtMsgStream(mock.Anything).Return(stream, nil)
stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Close().Return().Once()
stream.EXPECT().Close().Return().Maybe()
stream.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil).Once()
stream.EXPECT().Chan().Return(streamChan)

Expand All @@ -487,6 +510,7 @@ func TestReplicateChannelHandler(t *testing.T) {
},
}, targetClient, &api.DefaultMetaOp{}, nil, &model.HandlerOpts{Factory: factory})
assert.NoError(t, err)
noRetry(handler)
done := make(chan struct{})

go func() {
Expand Down Expand Up @@ -551,6 +575,7 @@ func TestReplicateChannelHandler(t *testing.T) {
}()

// create collection msg / create partition msg / timetick msg
log.Info("create collection msg / create partition msg / timetick msg")
streamChan <- &msgstream.MsgPack{
BeginTs: 1,
EndTs: 2,
Expand Down Expand Up @@ -617,6 +642,7 @@ func TestReplicateChannelHandler(t *testing.T) {
}

// insert msg
log.Info("insert msg")
streamChan <- &msgstream.MsgPack{
BeginTs: 1,
EndTs: 2,
Expand Down Expand Up @@ -651,6 +677,7 @@ func TestReplicateChannelHandler(t *testing.T) {
}

// delete msg
log.Info("delete msg")
streamChan <- &msgstream.MsgPack{
BeginTs: 1,
EndTs: 2,
Expand Down Expand Up @@ -699,6 +726,7 @@ func TestReplicateChannelHandler(t *testing.T) {
}

// drop collection msg
log.Info("drop collection msg")
streamChan <- &msgstream.MsgPack{
BeginTs: 1,
EndTs: 2,
Expand Down Expand Up @@ -730,6 +758,7 @@ func TestReplicateChannelHandler(t *testing.T) {
}

// drop partition msg
log.Info("drop partition msg")
streamChan <- &msgstream.MsgPack{
BeginTs: 1,
EndTs: 2,
Expand Down Expand Up @@ -762,6 +791,8 @@ func TestReplicateChannelHandler(t *testing.T) {
},
}

// close
log.Info("close")
handler.Close()
close(streamChan)
<-done
Expand Down
11 changes: 11 additions & 0 deletions core/reader/ts_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,14 @@ func (m *tsManager) GetMinTS(channelName string) uint64 {
TSMetricVec.WithLabelValues(channelName).Set(float64(msgTime))
return minTS
}

// EmptyTS Only for test
func (m *tsManager) EmptyTS() {
for k := range m.channelTS.GetUnsafeMap() {
m.channelTS.Delete(k)
}
for k := range m.channelRef.GetUnsafeMap() {
m.channelRef.Delete(k)
}
m.retryOptions = util.NoRetryOption()
}
8 changes: 8 additions & 0 deletions core/util/retry_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,11 @@ func GetRetryOptions(c config.RetrySettings) []retry.Option {
retry.MaxSleepTime(time.Duration(c.MaxBackOff) * time.Second),
}
}

func NoRetryOption() []retry.Option {
return []retry.Option{
retry.Attempts(1),
retry.Sleep(time.Second),
retry.MaxSleepTime(time.Second),
}
}
3 changes: 2 additions & 1 deletion core/writer/channel_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type ChannelWriter struct {

func NewChannelWriter(dataHandler api.DataHandler,
writerConfig config.WriterConfig,
droppedObjs map[string]map[string]uint64) api.Writer {
droppedObjs map[string]map[string]uint64,
) api.Writer {
w := &ChannelWriter{
dataHandler: dataHandler,
messageManager: NewReplicateMessageManager(dataHandler, writerConfig.MessageBufferSize),
Expand Down
26 changes: 19 additions & 7 deletions core/writer/milvus_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"google.golang.org/grpc"

"github.com/zilliztech/milvus-cdc/core/api"
"github.com/zilliztech/milvus-cdc/core/config"
"github.com/zilliztech/milvus-cdc/core/util"
)

func TestDataHandler(t *testing.T) {
Expand Down Expand Up @@ -68,7 +70,13 @@ func TestDataHandler(t *testing.T) {
dataHandler, err := NewMilvusDataHandler(AddressOption("localhost:50051"))
assert.NoError(t, err)
dataHandler.ignorePartition = true
ctx := context.Background()
dataHandler.retryOptions = util.GetRetryOptions(config.RetrySettings{
RetryTimes: 1,
MaxBackOff: 1,
InitBackOff: 1,
})
ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancelFunc()

// create collection
t.Run("create collection", func(t *testing.T) {
Expand Down Expand Up @@ -384,31 +392,35 @@ func TestDataHandler(t *testing.T) {

t.Run("flush", func(t *testing.T) {
{
milvusService.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(&milvuspb.BoolResponse{
call := milvusService.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(&milvuspb.BoolResponse{
Status: &commonpb.Status{},
Value: true,
}, nil).Once()
milvusService.EXPECT().Flush(mock.Anything, mock.Anything).Return(&milvuspb.FlushResponse{Status: &commonpb.Status{}}, nil).Once()
}, nil).Maybe()
flushCall := milvusService.EXPECT().Flush(mock.Anything, mock.Anything).Return(&milvuspb.FlushResponse{Status: &commonpb.Status{}}, nil).Once()
err := dataHandler.Flush(ctx, &api.FlushParam{
FlushRequest: milvuspb.FlushRequest{
CollectionNames: []string{"foo"},
},
})
assert.NoError(t, err)
call.Unset()
flushCall.Unset()
}

{
milvusService.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(&milvuspb.BoolResponse{
call := milvusService.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(&milvuspb.BoolResponse{
Status: &commonpb.Status{},
Value: true,
}, nil).Once()
milvusService.EXPECT().Flush(mock.Anything, mock.Anything).Return(&milvuspb.FlushResponse{Status: &commonpb.Status{Code: 500, ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil).Once()
}, nil).Maybe()
flushCall := milvusService.EXPECT().Flush(mock.Anything, mock.Anything).Return(&milvuspb.FlushResponse{Status: &commonpb.Status{Code: 500, ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil).Once()
err := dataHandler.Flush(ctx, &api.FlushParam{
FlushRequest: milvuspb.FlushRequest{
CollectionNames: []string{"foo"},
},
})
assert.Error(t, err)
call.Unset()
flushCall.Unset()
}
})

Expand Down
Loading

0 comments on commit 2750b22

Please sign in to comment.