Skip to content

Commit

Permalink
Improve the wait object method and retry the op for a long time
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Jan 2, 2024
1 parent dcd3efe commit b2a188f
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 189 deletions.
2 changes: 1 addition & 1 deletion core/reader/etcd_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewEtcdOp(endpoints []string,
rootPath: rootPath,
metaSubPath: metaPath,
defaultPartitionName: defaultPartitionName,
retryOptions: util.GetRetryOptionsFor25s(),
retryOptions: util.GetRetryDefaultOptions(),
}

// set default value
Expand Down
31 changes: 19 additions & 12 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ import (
var _ api.ChannelManager = (*replicateChannelManager)(nil)

type replicateChannelManager struct {
replicateCtx context.Context
factory msgstream.Factory
targetClient api.TargetAPI
metaOp api.MetaOp
retryOptions []retry.Option
messageBufferSize int
replicateCtx context.Context
factory msgstream.Factory
targetClient api.TargetAPI
metaOp api.MetaOp

retryOptions []retry.Option
startReadRetryOptions []retry.Option
messageBufferSize int

channelLock sync.RWMutex
channelHandlerMap map[string]*replicateChannelHandler
Expand Down Expand Up @@ -88,10 +90,15 @@ func NewReplicateChannelManager(mqConfig config.MQConfig,
}

return &replicateChannelManager{
factory: factory,
targetClient: client,
metaOp: metaOp,
retryOptions: util.GetRetryOptions(readConfig.Retry),
factory: factory,
targetClient: client,
metaOp: metaOp,
retryOptions: util.GetRetryOptions(readConfig.Retry),
startReadRetryOptions: util.GetRetryOptions(config.RetrySettings{
RetryTimes: readConfig.Retry.RetryTimes,
InitBackOff: readConfig.Retry.InitBackOff,
MaxBackOff: readConfig.Retry.InitBackOff,
}),
messageBufferSize: readConfig.MessageBufferSize,
channelHandlerMap: make(map[string]*replicateChannelHandler),
replicateCollections: make(map[int64]chan struct{}),
Expand Down Expand Up @@ -140,7 +147,7 @@ func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info
err = retry.Do(ctx, func() error {
targetInfo, err = r.targetClient.GetCollectionInfo(ctx, info.Schema.Name, sourceDBInfo.Name)
return err
}, r.retryOptions...)
}, r.startReadRetryOptions...)
if err != nil {
log.Warn("failed to get target collection info", zap.Error(err))
return err
Expand Down Expand Up @@ -470,7 +477,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection
return errors.Newf("not found the partition [%s]", partitionName)
}
return nil
}, util.GetRetryOptionsFor25s()...)
}, util.GetRetryDefaultOptions()...)
}()
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions core/util/retry_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ import (
"github.com/zilliztech/milvus-cdc/core/config"
)

// GetRetryOptionsFor25s 1/2/4/8/10
func GetRetryOptionsFor25s() []retry.Option {
// GetRetryDefaultOptions 1/2/4/8/10
func GetRetryDefaultOptions() []retry.Option {
return []retry.Option{
retry.Attempts(5),
// about 2 days
retry.Attempts(17280),
retry.Sleep(time.Second),
retry.MaxSleepTime(10 * time.Second),
}
}

func GetRetryOptions(c config.RetrySettings) []retry.Option {
if c.RetryTimes == 0 || c.InitBackOff == 0 || c.MaxBackOff == 0 {
return GetRetryOptionsFor25s()
return GetRetryDefaultOptions()
}
return []retry.Option{
retry.Attempts(uint(c.RetryTimes)),
Expand Down
3 changes: 1 addition & 2 deletions core/util/retry_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@ import (
)

func TestRetry(t *testing.T) {
assert.Len(t, GetRetryOptionsFor25s(), 3)
assert.Len(t, GetRetryOptionsFor9s(), 1)
assert.Len(t, GetRetryDefaultOptions(), 3)
}
4 changes: 2 additions & 2 deletions core/writer/channel_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,15 @@ func (c *ChannelWriter) WaitObjReady(ctx context.Context, db, collection, partit
return true, nil
}
}
if db != "" && collection != "" {
if collection != "" {
state := c.WaitCollectionReady(ctx, collection, db, ts)
if state == InfoStateUnknown {
return false, errors.Newf("collection[%s] is not ready, db: %s", collection, db)
} else if state == InfoStateDropped {
return true, nil
}
}
if db != "" && collection != "" && partition != "" {
if collection != "" && partition != "" {
state := c.WaitPartitionReady(ctx, collection, partition, db, ts)
if state == InfoStateUnknown {
return false, errors.Newf("partition[%s] is not ready, collection: %s, db: %s", partition, collection, db)
Expand Down
Loading

0 comments on commit b2a188f

Please sign in to comment.