Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More appropriate error handling approach #21

Merged
merged 2 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: Test

on:
push:
branches:
- main
pull_request:
branches:
- main
#on:
# push:
# branches:
# - main
# pull_request:
# branches:
# - main

jobs:
milvus-cdc-test:
Expand Down
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ coverage:
status:
project:
default:
threshold: 0% #Allow the coverage to drop by threshold%, and posting a success status.
threshold: 80% #Allow the coverage to drop by threshold%, and posting a success status.
branches:
- main
patch:
Expand Down
6 changes: 6 additions & 0 deletions core/api/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
type Reader interface {
StartRead(ctx context.Context)
QuitRead(ctx context.Context)
ErrorChan() <-chan error
}

// DefaultReader All CDCReader implements should combine it
Expand All @@ -25,3 +26,8 @@ func (d *DefaultReader) StartRead(ctx context.Context) {
func (d *DefaultReader) QuitRead(ctx context.Context) {
log.Warn("QuitRead is not implemented, please check it")
}

func (d *DefaultReader) ErrorChan() <-chan error {
log.Warn("ErrorChan is not implemented, please check it")
return nil
}
21 changes: 21 additions & 0 deletions core/api/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"reflect"
"testing"
)

Expand Down Expand Up @@ -46,3 +47,23 @@ func TestDefaultReader_StartRead(t *testing.T) {
})
}
}

func TestDefaultReader_ErrorChan(t *testing.T) {
tests := []struct {
name string
want <-chan error
}{
{
name: "TestDefaultReader_ErrorChan",
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &DefaultReader{}
if got := d.ErrorChan(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("ErrorChan() = %v, want %v", got, tt.want)
}
})
}
}
3 changes: 3 additions & 0 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ReplicateAPIEvent struct {
CollectionInfo *pb.CollectionInfo
PartitionInfo *pb.PartitionInfo
ReplicateInfo *commonpb.ReplicateInfo
Error error
}

type ReplicateAPIEventType int
Expand All @@ -42,6 +43,8 @@ const (
ReplicateDropCollection
ReplicateCreatePartition
ReplicateDropPartition

ReplicateError = 100
)

type DefaultChannelManager struct{}
Expand Down
43 changes: 43 additions & 0 deletions core/mocks/reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 38 additions & 21 deletions core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package reader
import (
"context"
"sync"
"time"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/samber/lo"
"go.uber.org/zap"

Expand Down Expand Up @@ -57,7 +55,7 @@ type CollectionReader struct {
channelSeekPositions map[string]*msgpb.MsgPosition
replicateCollectionMap util.Map[int64, *pb.CollectionInfo]
replicateChannelMap util.Map[string, struct{}]
replicateChannelChan chan string
errChan chan error
shouldReadFunc ShouldReadFunc
startOnce sync.Once
quitOnce sync.Once
Expand All @@ -70,16 +68,18 @@ func NewCollectionReader(id string, channelManager api.ChannelManager, metaOp ap
metaOp: metaOp,
channelSeekPositions: seekPosition,
shouldReadFunc: shouldReadFunc,
replicateChannelChan: make(chan string, 10),
errChan: make(chan error),
}
return reader, nil
}

func (reader *CollectionReader) StartRead(ctx context.Context) {
reader.startOnce.Do(func() {
reader.metaOp.SubscribeCollectionEvent(reader.id, func(info *pb.CollectionInfo) bool {
log.Info("has watched to read collection", zap.String("name", info.Schema.Name))
collectionLog := log.With(zap.String("collection_name", info.Schema.Name), zap.Int64("collection_id", info.ID))
collectionLog.Info("has watched to read collection")
if !reader.shouldReadFunc(info) {
collectionLog.Info("the collection should not be read")
return false
}
startPositions := make([]*msgpb.MsgPosition, 0)
Expand All @@ -90,16 +90,19 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
})
}
if err := reader.channelManager.StartReadCollection(ctx, info, startPositions); err != nil {
log.Warn("fail to start to replicate the collection data in the watch process", zap.Int64("id", info.ID), zap.Error(err))
collectionLog.Warn("fail to start to replicate the collection data in the watch process", zap.Any("info", info), zap.Error(err))
reader.sendError(err)
}
reader.replicateCollectionMap.Store(info.ID, info)
log.Info("has started to read collection", zap.String("name", info.Schema.Name))
collectionLog.Info("has started to read collection")
return true
})
reader.metaOp.SubscribePartitionEvent(reader.id, func(info *pb.PartitionInfo) bool {
partitionLog := log.With(zap.Int64("collection_id", info.CollectionID), zap.Int64("partition_id", info.PartitionID), zap.String("partition_name", info.PartitionName))
partitionLog.Info("has watched to read partition")
collectionName := reader.metaOp.GetCollectionNameByID(ctx, info.CollectionID)
if collectionName == "" {
log.Info("the collection name is empty", zap.Int64("collection_id", info.CollectionID), zap.String("partition_name", info.PartitionName))
partitionLog.Info("the collection name is empty")
return true
}
tmpCollectionInfo := &pb.CollectionInfo{
Expand All @@ -109,17 +112,16 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
},
}
if !reader.shouldReadFunc(tmpCollectionInfo) {
partitionLog.Info("the partition should not be read", zap.String("name", collectionName))
return true
}

var err error
err = retry.Do(ctx, func() error {
err = reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
return err
}, retry.Sleep(time.Second))
err := reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
if err != nil {
log.Panic("fail to add partition", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName), zap.Error(err))
partitionLog.Warn("fail to add partition", zap.String("collection_name", collectionName), zap.Any("partition", info), zap.Error(err))
reader.sendError(err)
}
partitionLog.Info("has started to add partition")
return false
})
reader.metaOp.WatchCollection(ctx, nil)
Expand All @@ -130,12 +132,15 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
})
if err != nil {
log.Warn("get all collection failed", zap.Error(err))
reader.sendError(err)
return
}
seekPositions := lo.Values(reader.channelSeekPositions)
for _, info := range existedCollectionInfos {
log.Info("exist collection", zap.String("name", info.Schema.Name))
if err := reader.channelManager.StartReadCollection(ctx, info, seekPositions); err != nil {
log.Warn("fail to start to replicate the collection data", zap.Int64("id", info.ID), zap.Error(err))
log.Warn("fail to start to replicate the collection data", zap.Any("collection", info), zap.Error(err))
reader.sendError(err)
}
reader.replicateCollectionMap.Store(info.ID, info)
}
Expand All @@ -155,22 +160,29 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
log.Info("the collection is not in the watch list", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName))
return true
}
var err error
err = retry.Do(ctx, func() error {
err = reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
return err
}, retry.Sleep(time.Second))
err := reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
if err != nil {
log.Panic("fail to add partition", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName), zap.Error(err))
log.Warn("fail to add partition", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName), zap.Error(err))
reader.sendError(err)
}
return false
})
if err != nil {
log.Warn("get all partition failed", zap.Error(err))
reader.sendError(err)
}
})
}

func (reader *CollectionReader) sendError(err error) {
select {
case reader.errChan <- err:
log.Info("send the error", zap.String("id", reader.id), zap.Error(err))
default:
log.Info("skip the error, because it will quit soon", zap.String("id", reader.id), zap.Error(err))
}
}

func (reader *CollectionReader) QuitRead(ctx context.Context) {
reader.quitOnce.Do(func() {
reader.replicateCollectionMap.Range(func(_ int64, value *pb.CollectionInfo) bool {
Expand All @@ -182,5 +194,10 @@ func (reader *CollectionReader) QuitRead(ctx context.Context) {
})
reader.metaOp.UnsubscribeEvent(reader.id, api.CollectionEventType)
reader.metaOp.UnsubscribeEvent(reader.id, api.PartitionEventType)
reader.sendError(nil)
})
}

func (reader *CollectionReader) ErrorChan() <-chan error {
return reader.errChan
}
18 changes: 16 additions & 2 deletions core/reader/collection_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/zilliztech/milvus-cdc/core/pb"
)

// Before running this case, should start the etcd server
func TestCollectionReader(t *testing.T) {
etcdOp, err := NewEtcdOp(nil, "", "", "")
assert.NoError(t, err)
Expand Down Expand Up @@ -86,11 +87,20 @@ func TestCollectionReader(t *testing.T) {
return !strings.Contains(ci.Schema.Name, "test")
})
assert.NoError(t, err)
go func() {
select {
case <-time.After(time.Second):
t.Fail()
case err := <-reader.ErrorChan():
assert.Error(t, err)
}
}()
reader.StartRead(context.Background())
channelManager.EXPECT().StartReadCollection(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
channelManager.EXPECT().AddPartition(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
// add collection

{
// filter collection
field3 := &schemapb.FieldSchema{
FieldID: 100,
Name: "age",
Expand All @@ -106,6 +116,7 @@ func TestCollectionReader(t *testing.T) {
collectionBytes, _ := proto.Marshal(collectionInfo)
_, _ = realOp.etcdClient.Put(context.Background(), realOp.collectionPrefix()+"/1/100003", string(collectionBytes))

// filter partition
{
info := &pb.PartitionInfo{
State: pb.PartitionState_PartitionCreated,
Expand All @@ -116,8 +127,9 @@ func TestCollectionReader(t *testing.T) {
_, _ = realOp.etcdClient.Put(context.Background(), realOp.partitionPrefix()+"/100003/300047", getStringForMessage(info))
}
}
// add partition

{
// put collection
field3 := &schemapb.FieldSchema{
FieldID: 100,
Name: "age",
Expand All @@ -138,6 +150,8 @@ func TestCollectionReader(t *testing.T) {
}
collectionBytes, _ := proto.Marshal(collectionInfo)
_, _ = realOp.etcdClient.Put(context.Background(), realOp.collectionPrefix()+"/1/100004", string(collectionBytes))

// put partition
info := &pb.PartitionInfo{
State: pb.PartitionState_PartitionCreated,
PartitionName: "foo",
Expand Down
Loading
Loading