Skip to content

Commit

Permalink
Add cdc metrics and standardized some codes (#27)
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG authored Nov 13, 2023
1 parent 003bc1f commit aad9092
Show file tree
Hide file tree
Showing 14 changed files with 729 additions and 533 deletions.
15 changes: 15 additions & 0 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ const (
ReplicateError = 100
)

func (r ReplicateAPIEventType) String() string {
switch r {
case ReplicateCreateCollection:
return "CreateCollection"
case ReplicateDropCollection:
return "DropCollection"
case ReplicateCreatePartition:
return "CreatePartition"
case ReplicateDropPartition:
return "DropPartition"
default:
return "Unknown"
}
}

type DefaultChannelManager struct{}

var _ ChannelManager = (*DefaultChannelManager)(nil)
Expand Down
17 changes: 16 additions & 1 deletion core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,17 @@ type replicateChannelManager struct {

channelChan chan string
apiEventChan chan *api.ReplicateAPIEvent

msgPackCallback func(string, *msgstream.MsgPack)
}

func NewReplicateChannelManager(mqConfig config.MQConfig, factoryCreator FactoryCreator, client api.TargetAPI, messageBufferSize int, metaOp api.MetaOp) (api.ChannelManager, error) {
func NewReplicateChannelManager(mqConfig config.MQConfig,
factoryCreator FactoryCreator,
client api.TargetAPI,
messageBufferSize int,
metaOp api.MetaOp,
msgPackCallback func(string, *msgstream.MsgPack),
) (api.ChannelManager, error) {
var factory msgstream.Factory
switch {
case mqConfig.Pulsar.Address != "":
Expand All @@ -72,6 +80,7 @@ func NewReplicateChannelManager(mqConfig config.MQConfig, factoryCreator Factory
replicatePartitions: make(map[int64]map[int64]chan struct{}),
channelChan: make(chan string, 10),
apiEventChan: make(chan *api.ReplicateAPIEvent, 10),
msgPackCallback: msgPackCallback,
}, nil
}

Expand Down Expand Up @@ -353,6 +362,7 @@ func (r *replicateChannelManager) startReadChannel(sourceInfo *model.SourceColle
zap.String("channel_name", sourceInfo.PChannelName), zap.Int64("collection_id", sourceInfo.CollectionID), zap.Error(err))
return err
}
channelHandler.msgPackCallback = r.msgPackCallback
r.channelHandlerMap[sourceInfo.PChannelName] = channelHandler
r.channelChan <- sourceInfo.PChannelName
return nil
Expand Down Expand Up @@ -387,6 +397,7 @@ type replicateChannelHandler struct {
collectionNames map[string]int64
msgPackChan chan *msgstream.MsgPack
apiEventChan chan *api.ReplicateAPIEvent
msgPackCallback func(string, *msgstream.MsgPack)
}

func (r *replicateChannelHandler) AddCollection(collectionID int64, targetInfo *model.TargetCollectionInfo) {
Expand Down Expand Up @@ -423,6 +434,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection
return nil
}
targetInfo.PartitionBarrierChan[partitionID] = barrierChan
// TODO use goroutine pool
go func() {
_ = retry.Do(r.replicateCtx, func() error {
id := r.updateTargetPartitionInfo(collectionID, collectionName, partitionName)
Expand Down Expand Up @@ -656,6 +668,9 @@ func (r *replicateChannelHandler) handlePack(pack *msgstream.MsgPack) *msgstream
for _, position := range newPack.EndPositions {
position.ChannelName = pChannel
}
if r.msgPackCallback != nil {
r.msgPackCallback(r.pChannelName, newPack)
}
needTsMsg = needTsMsg || len(newPack.Msgs) == 0
if needTsMsg {
timeTickResult := msgpb.TimeTickMsg{
Expand Down
9 changes: 6 additions & 3 deletions core/reader/replicate_channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (

func TestNewReplicateChannelManager(t *testing.T) {
t.Run("empty config", func(t *testing.T) {
_, err := NewReplicateChannelManager(config.MQConfig{}, NewDefaultFactoryCreator(), nil, 10, &api.DefaultMetaOp{})
_, err := NewReplicateChannelManager(config.MQConfig{}, NewDefaultFactoryCreator(), nil, 10, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
})
assert.Error(t, err)
})

Expand All @@ -35,7 +36,8 @@ func TestNewReplicateChannelManager(t *testing.T) {
Pulsar: config.PulsarConfig{
Address: "pulsar://localhost:6650",
},
}, factoryCreator, nil, 10, &api.DefaultMetaOp{})
}, factoryCreator, nil, 10, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
})
assert.NoError(t, err)
})
}
Expand Down Expand Up @@ -95,7 +97,8 @@ func TestStartReadCollection(t *testing.T) {
Pulsar: config.PulsarConfig{
Address: "pulsar://localhost:6650",
},
}, factoryCreator, targetClient, 10, &api.DefaultMetaOp{})
}, factoryCreator, targetClient, 10, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
})
assert.NoError(t, err)
manager.SetCtx(context.Background())

Expand Down
Loading

0 comments on commit aad9092

Please sign in to comment.