From aad9092d81d262cacd437270c4b17147d0c5f9ae Mon Sep 17 00:00:00 2001 From: SimFG Date: Mon, 13 Nov 2023 18:02:09 +0800 Subject: [PATCH] Add cdc metrics and standardized some codes (#27) Signed-off-by: SimFG --- core/api/replicate_manager.go | 15 + core/reader/replicate_channel_manager.go | 17 +- core/reader/replicate_channel_manager_test.go | 9 +- core/writer/channel_writer.go | 485 ++++++++++-------- server/cdc_impl.go | 475 +++++++++-------- server/metrics/metrics.go | 87 ++-- server/metrics/metrics_task_num.go | 8 +- server/metrics/metrics_task_num_test.go | 36 ++ server/model/meta/task.go | 4 - server/model/meta/task_test.go | 4 - server/store/etcd.go | 58 ++- server/store/meta_op.go | 2 + server/store/mysql.go | 57 +- server/writer_callback.go | 5 +- 14 files changed, 729 insertions(+), 533 deletions(-) create mode 100644 server/metrics/metrics_task_num_test.go diff --git a/core/api/replicate_manager.go b/core/api/replicate_manager.go index 52a9853f..970b32dd 100644 --- a/core/api/replicate_manager.go +++ b/core/api/replicate_manager.go @@ -50,6 +50,21 @@ const ( ReplicateError = 100 ) +func (r ReplicateAPIEventType) String() string { + switch r { + case ReplicateCreateCollection: + return "CreateCollection" + case ReplicateDropCollection: + return "DropCollection" + case ReplicateCreatePartition: + return "CreatePartition" + case ReplicateDropPartition: + return "DropPartition" + default: + return "Unknown" + } +} + type DefaultChannelManager struct{} var _ ChannelManager = (*DefaultChannelManager)(nil) diff --git a/core/reader/replicate_channel_manager.go b/core/reader/replicate_channel_manager.go index 0e6fcb3b..93cc27b0 100644 --- a/core/reader/replicate_channel_manager.go +++ b/core/reader/replicate_channel_manager.go @@ -47,9 +47,17 @@ type replicateChannelManager struct { channelChan chan string apiEventChan chan *api.ReplicateAPIEvent + + msgPackCallback func(string, *msgstream.MsgPack) } -func NewReplicateChannelManager(mqConfig config.MQConfig, factoryCreator FactoryCreator, client api.TargetAPI, messageBufferSize int, metaOp api.MetaOp) (api.ChannelManager, error) { +func NewReplicateChannelManager(mqConfig config.MQConfig, + factoryCreator FactoryCreator, + client api.TargetAPI, + messageBufferSize int, + metaOp api.MetaOp, + msgPackCallback func(string, *msgstream.MsgPack), +) (api.ChannelManager, error) { var factory msgstream.Factory switch { case mqConfig.Pulsar.Address != "": @@ -72,6 +80,7 @@ func NewReplicateChannelManager(mqConfig config.MQConfig, factoryCreator Factory replicatePartitions: make(map[int64]map[int64]chan struct{}), channelChan: make(chan string, 10), apiEventChan: make(chan *api.ReplicateAPIEvent, 10), + msgPackCallback: msgPackCallback, }, nil } @@ -353,6 +362,7 @@ func (r *replicateChannelManager) startReadChannel(sourceInfo *model.SourceColle zap.String("channel_name", sourceInfo.PChannelName), zap.Int64("collection_id", sourceInfo.CollectionID), zap.Error(err)) return err } + channelHandler.msgPackCallback = r.msgPackCallback r.channelHandlerMap[sourceInfo.PChannelName] = channelHandler r.channelChan <- sourceInfo.PChannelName return nil @@ -387,6 +397,7 @@ type replicateChannelHandler struct { collectionNames map[string]int64 msgPackChan chan *msgstream.MsgPack apiEventChan chan *api.ReplicateAPIEvent + msgPackCallback func(string, *msgstream.MsgPack) } func (r *replicateChannelHandler) AddCollection(collectionID int64, targetInfo *model.TargetCollectionInfo) { @@ -423,6 +434,7 @@ func (r *replicateChannelHandler) AddPartitionInfo(collectionInfo *pb.Collection return nil } targetInfo.PartitionBarrierChan[partitionID] = barrierChan + // TODO use goroutine pool go func() { _ = retry.Do(r.replicateCtx, func() error { id := r.updateTargetPartitionInfo(collectionID, collectionName, partitionName) @@ -656,6 +668,9 @@ func (r *replicateChannelHandler) handlePack(pack *msgstream.MsgPack) *msgstream for _, position := range newPack.EndPositions { position.ChannelName = pChannel } + if r.msgPackCallback != nil { + r.msgPackCallback(r.pChannelName, newPack) + } needTsMsg = needTsMsg || len(newPack.Msgs) == 0 if needTsMsg { timeTickResult := msgpb.TimeTickMsg{ diff --git a/core/reader/replicate_channel_manager_test.go b/core/reader/replicate_channel_manager_test.go index 29e7eb50..52ec2bec 100644 --- a/core/reader/replicate_channel_manager_test.go +++ b/core/reader/replicate_channel_manager_test.go @@ -23,7 +23,8 @@ import ( func TestNewReplicateChannelManager(t *testing.T) { t.Run("empty config", func(t *testing.T) { - _, err := NewReplicateChannelManager(config.MQConfig{}, NewDefaultFactoryCreator(), nil, 10, &api.DefaultMetaOp{}) + _, err := NewReplicateChannelManager(config.MQConfig{}, NewDefaultFactoryCreator(), nil, 10, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { + }) assert.Error(t, err) }) @@ -35,7 +36,8 @@ func TestNewReplicateChannelManager(t *testing.T) { Pulsar: config.PulsarConfig{ Address: "pulsar://localhost:6650", }, - }, factoryCreator, nil, 10, &api.DefaultMetaOp{}) + }, factoryCreator, nil, 10, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { + }) assert.NoError(t, err) }) } @@ -95,7 +97,8 @@ func TestStartReadCollection(t *testing.T) { Pulsar: config.PulsarConfig{ Address: "pulsar://localhost:6650", }, - }, factoryCreator, targetClient, 10, &api.DefaultMetaOp{}) + }, factoryCreator, targetClient, 10, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { + }) assert.NoError(t, err) manager.SetCtx(context.Background()) diff --git a/core/writer/channel_writer.go b/core/writer/channel_writer.go index ddcaa9bd..349f1a6e 100644 --- a/core/writer/channel_writer.go +++ b/core/writer/channel_writer.go @@ -19,16 +19,48 @@ import ( var _ api.Writer = (*ChannelWriter)(nil) +type ( + opMessageFunc func(ctx context.Context, msgBase *commonpb.MsgBase, msgPack msgstream.TsMsg) error + apiEventFunc func(ctx context.Context, apiEvent *api.ReplicateAPIEvent) error +) + type ChannelWriter struct { dataHandler api.DataHandler messageManager api.MessageManager + opMessageFuncs map[commonpb.MsgType]opMessageFunc + apiEventFuncs map[api.ReplicateAPIEventType]apiEventFunc } func NewChannelWriter(dataHandler api.DataHandler, messageBufferSize int) api.Writer { - return &ChannelWriter{ + w := &ChannelWriter{ dataHandler: dataHandler, messageManager: NewReplicateMessageManager(dataHandler, messageBufferSize), } + w.initAPIEventFuncs() + w.initOPMessageFuncs() + + return w +} + +func (c *ChannelWriter) initAPIEventFuncs() { + c.apiEventFuncs = map[api.ReplicateAPIEventType]apiEventFunc{ + api.ReplicateCreateCollection: c.createCollection, + api.ReplicateDropCollection: c.dropCollection, + api.ReplicateCreatePartition: c.createPartition, + api.ReplicateDropPartition: c.dropPartition, + } +} + +func (c *ChannelWriter) initOPMessageFuncs() { + c.opMessageFuncs = map[commonpb.MsgType]opMessageFunc{ + commonpb.MsgType_CreateDatabase: c.createDatabase, + commonpb.MsgType_DropDatabase: c.dropDatabase, + commonpb.MsgType_Flush: c.flush, + commonpb.MsgType_CreateIndex: c.createIndex, + commonpb.MsgType_DropIndex: c.dropIndex, + commonpb.MsgType_LoadCollection: c.loadCollection, + commonpb.MsgType_ReleaseCollection: c.releaseCollection, + } } func (c *ChannelWriter) HandleReplicateAPIEvent(ctx context.Context, apiEvent *api.ReplicateAPIEvent) error { @@ -37,86 +69,12 @@ func (c *ChannelWriter) HandleReplicateAPIEvent(ctx context.Context, apiEvent *a log.Info("finish to handle replicate api event", zap.Any("event", apiEvent.EventType)) }() - waitDatabaseReady := func() error { - if apiEvent.ReplicateParam.Database == "" { - return nil - } - if !c.WaitDatabaseReady(ctx, apiEvent.ReplicateParam.Database) { - log.Warn("database is not ready", zap.Any("event", apiEvent)) - return errors.New("database is not ready") - } - return nil - } - - switch apiEvent.EventType { - case api.ReplicateCreateCollection: - if err := waitDatabaseReady(); err != nil { - return err - } - collectionInfo := apiEvent.CollectionInfo - entitySchema := &entity.Schema{} - entitySchema = entitySchema.ReadProto(collectionInfo.Schema) - createParam := &api.CreateCollectionParam{ - MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, - ReplicateParam: apiEvent.ReplicateParam, - Schema: entitySchema, - ShardsNum: collectionInfo.ShardsNum, - ConsistencyLevel: collectionInfo.ConsistencyLevel, - Properties: collectionInfo.Properties, - } - err := c.dataHandler.CreateCollection(ctx, createParam) - if err != nil { - log.Warn("fail to create collection", zap.Any("event", apiEvent), zap.Error(err)) - } - return err - case api.ReplicateDropCollection: - if err := waitDatabaseReady(); err != nil { - return err - } - dropParam := &api.DropCollectionParam{ - MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, - ReplicateParam: apiEvent.ReplicateParam, - CollectionName: apiEvent.CollectionInfo.Schema.GetName(), - } - err := c.dataHandler.DropCollection(ctx, dropParam) - if err != nil { - log.Warn("fail to drop collection", zap.Any("event", apiEvent), zap.Error(err)) - } - return err - case api.ReplicateCreatePartition: - if err := waitDatabaseReady(); err != nil { - return err - } - createParam := &api.CreatePartitionParam{ - MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, - ReplicateParam: apiEvent.ReplicateParam, - CollectionName: apiEvent.CollectionInfo.Schema.GetName(), - PartitionName: apiEvent.PartitionInfo.PartitionName, - } - err := c.dataHandler.CreatePartition(ctx, createParam) - if err != nil { - log.Warn("fail to create partition", zap.Any("event", apiEvent), zap.Error(err)) - } - return err - case api.ReplicateDropPartition: - if err := waitDatabaseReady(); err != nil { - return err - } - dropParam := &api.DropPartitionParam{ - MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, - ReplicateParam: apiEvent.ReplicateParam, - CollectionName: apiEvent.CollectionInfo.Schema.GetName(), - PartitionName: apiEvent.PartitionInfo.PartitionName, - } - err := c.dataHandler.DropPartition(ctx, dropParam) - if err != nil { - log.Warn("fail to drop partition", zap.Any("event", apiEvent), zap.Error(err)) - } - return err - default: + f, ok := c.apiEventFuncs[apiEvent.EventType] + if !ok { log.Warn("unknown replicate api event", zap.Any("event", apiEvent)) return errors.New("unknown replicate api event") } + return f(ctx, apiEvent) } func (c *ChannelWriter) HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error) { @@ -170,7 +128,7 @@ func (c *ChannelWriter) HandleReplicateMessage(ctx context.Context, channelName if err != nil { return nil, nil, err } - return endPosition.MsgID, targetMsgBytes, err + return endPosition.MsgID, targetMsgBytes, nil } func (c *ChannelWriter) HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error) { @@ -187,141 +145,15 @@ func (c *ChannelWriter) HandleOpMessagePack(ctx context.Context, msgPack *msgstr msgBase := &commonpb.MsgBase{ReplicateInfo: &commonpb.ReplicateInfo{IsReplicate: true, MsgTimestamp: endTs}} for _, msg := range msgPack.Msgs { log.Info("receive msg", zap.String("type", msg.Type().String())) - switch msg.Type() { - case commonpb.MsgType_CreateDatabase: - createDatabaseMsg := msg.(*msgstream.CreateDatabaseMsg) - err := c.dataHandler.CreateDatabase(ctx, &api.CreateDatabaseParam{ - CreateDatabaseRequest: milvuspb.CreateDatabaseRequest{ - Base: msgBase, - DbName: createDatabaseMsg.GetDbName(), - }, - }) - if err != nil { - log.Warn("failed to create database", zap.Any("msg", createDatabaseMsg), zap.Error(err)) - return nil, err - } - case commonpb.MsgType_DropDatabase: - dropDatabaseMsg := msg.(*msgstream.DropDatabaseMsg) - err := c.dataHandler.DropDatabase(ctx, &api.DropDatabaseParam{ - DropDatabaseRequest: milvuspb.DropDatabaseRequest{ - Base: msgBase, - DbName: dropDatabaseMsg.GetDbName(), - }, - }) - if err != nil { - log.Warn("failed to drop database", zap.Any("msg", dropDatabaseMsg), zap.Error(err)) - return nil, err - } - case commonpb.MsgType_Flush: - flushMsg := msg.(*msgstream.FlushMsg) - if !c.WaitDatabaseReady(ctx, flushMsg.GetDbName()) { - log.Warn("database is not ready", zap.Any("msg", flushMsg)) - return nil, errors.New("database is not ready") - } - for _, s := range flushMsg.GetCollectionNames() { - if !c.WaitCollectionReady(ctx, s, flushMsg.GetDbName()) { - log.Warn("collection is not ready", zap.Any("msg", flushMsg)) - return nil, errors.New("collection is not ready") - } - } - err := c.dataHandler.Flush(ctx, &api.FlushParam{ - ReplicateParam: api.ReplicateParam{ - Database: flushMsg.GetDbName(), - }, - FlushRequest: milvuspb.FlushRequest{ - Base: msgBase, - CollectionNames: flushMsg.GetCollectionNames(), - }, - }) - if err != nil { - log.Warn("failed to flush", zap.Any("msg", flushMsg), zap.Error(err)) - return nil, err - } - case commonpb.MsgType_CreateIndex: - createIndexMsg := msg.(*msgstream.CreateIndexMsg) - if !c.WaitDatabaseReady(ctx, createIndexMsg.GetDbName()) { - log.Warn("database is not ready", zap.Any("msg", createIndexMsg)) - return nil, errors.New("database is not ready") - } - if !c.WaitCollectionReady(ctx, createIndexMsg.GetCollectionName(), createIndexMsg.GetDbName()) { - log.Warn("collection is not ready", zap.Any("msg", createIndexMsg)) - return nil, errors.New("collection is not ready") - } - err := c.dataHandler.CreateIndex(ctx, &api.CreateIndexParam{ - ReplicateParam: api.ReplicateParam{ - Database: createIndexMsg.GetDbName(), - }, - CreateIndexRequest: milvuspb.CreateIndexRequest{ - Base: msgBase, - CollectionName: createIndexMsg.GetCollectionName(), - FieldName: createIndexMsg.GetFieldName(), - IndexName: createIndexMsg.GetIndexName(), - ExtraParams: createIndexMsg.GetExtraParams(), - }, - }) - if err != nil { - log.Warn("fail to create index", zap.Any("msg", createIndexMsg), zap.Error(err)) - return nil, err - } - case commonpb.MsgType_DropIndex: - dropIndexMsg := msg.(*msgstream.DropIndexMsg) - err := c.dataHandler.DropIndex(ctx, &api.DropIndexParam{ - ReplicateParam: api.ReplicateParam{ - Database: dropIndexMsg.GetDbName(), - }, - DropIndexRequest: milvuspb.DropIndexRequest{ - Base: msgBase, - CollectionName: dropIndexMsg.GetCollectionName(), - FieldName: dropIndexMsg.GetFieldName(), - IndexName: dropIndexMsg.GetIndexName(), - }, - }) - if err != nil { - log.Warn("fail to drop index", zap.Any("msg", dropIndexMsg), zap.Error(err)) - return nil, err - } - case commonpb.MsgType_LoadCollection: - loadCollectionMsg := msg.(*msgstream.LoadCollectionMsg) - if !c.WaitDatabaseReady(ctx, loadCollectionMsg.GetDbName()) { - log.Warn("database is not ready", zap.Any("msg", loadCollectionMsg)) - return nil, errors.New("database is not ready") - } - if !c.WaitCollectionReady(ctx, loadCollectionMsg.GetCollectionName(), loadCollectionMsg.GetDbName()) { - log.Warn("collection is not ready", zap.Any("msg", loadCollectionMsg)) - return nil, errors.New("collection is not ready") - } - err := c.dataHandler.LoadCollection(ctx, &api.LoadCollectionParam{ - ReplicateParam: api.ReplicateParam{ - Database: loadCollectionMsg.GetDbName(), - }, - LoadCollectionRequest: milvuspb.LoadCollectionRequest{ - Base: msgBase, - CollectionName: loadCollectionMsg.GetCollectionName(), - }, - }) - if err != nil { - log.Warn("fail to load collection", zap.Any("msg", loadCollectionMsg), zap.Error(err)) - return nil, err - } - case commonpb.MsgType_ReleaseCollection: - releaseCollectionMsg := msg.(*msgstream.ReleaseCollectionMsg) - err := c.dataHandler.ReleaseCollection(ctx, &api.ReleaseCollectionParam{ - ReplicateParam: api.ReplicateParam{ - Database: releaseCollectionMsg.GetDbName(), - }, - ReleaseCollectionRequest: milvuspb.ReleaseCollectionRequest{ - Base: msgBase, - CollectionName: releaseCollectionMsg.GetCollectionName(), - }, - }) - if err != nil { - log.Warn("fail to release collection", zap.Any("msg", releaseCollectionMsg), zap.Error(err)) - return nil, err - } - default: + f, ok := c.opMessageFuncs[msg.Type()] + if !ok { log.Warn("unknown msg type", zap.Any("msg", msg)) return nil, errors.New("unknown msg type") } + err := f(ctx, msgBase, msg) + if err != nil { + return nil, err + } log.Info("finish to handle msg", zap.String("type", msg.Type().String())) } @@ -351,3 +183,234 @@ func (c *ChannelWriter) WaitDatabaseReady(ctx context.Context, databaseName stri }, util.GetRetryOptionsFor25s()...) return err == nil } + +func (c *ChannelWriter) createCollection(ctx context.Context, apiEvent *api.ReplicateAPIEvent) error { + if ready := c.WaitDatabaseReady(ctx, apiEvent.ReplicateParam.Database); !ready { + log.Warn("database is not ready", zap.String("database", apiEvent.ReplicateParam.Database)) + return errors.New("database is not ready") + } + collectionInfo := apiEvent.CollectionInfo + entitySchema := &entity.Schema{} + entitySchema = entitySchema.ReadProto(collectionInfo.Schema) + createParam := &api.CreateCollectionParam{ + MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, + ReplicateParam: apiEvent.ReplicateParam, + Schema: entitySchema, + ShardsNum: collectionInfo.ShardsNum, + ConsistencyLevel: collectionInfo.ConsistencyLevel, + Properties: collectionInfo.Properties, + } + err := c.dataHandler.CreateCollection(ctx, createParam) + if err != nil { + log.Warn("fail to create collection", zap.Any("event", apiEvent), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) dropCollection(ctx context.Context, apiEvent *api.ReplicateAPIEvent) error { + if ready := c.WaitDatabaseReady(ctx, apiEvent.ReplicateParam.Database); !ready { + log.Warn("database is not ready", zap.String("database", apiEvent.ReplicateParam.Database)) + return errors.New("database is not ready") + } + dropParam := &api.DropCollectionParam{ + MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, + ReplicateParam: apiEvent.ReplicateParam, + CollectionName: apiEvent.CollectionInfo.Schema.GetName(), + } + err := c.dataHandler.DropCollection(ctx, dropParam) + if err != nil { + log.Warn("fail to drop collection", zap.Any("event", apiEvent), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) createPartition(ctx context.Context, apiEvent *api.ReplicateAPIEvent) error { + if ready := c.WaitDatabaseReady(ctx, apiEvent.ReplicateParam.Database); !ready { + log.Warn("database is not ready", zap.String("database", apiEvent.ReplicateParam.Database)) + return errors.New("database is not ready") + } + createParam := &api.CreatePartitionParam{ + MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, + ReplicateParam: apiEvent.ReplicateParam, + CollectionName: apiEvent.CollectionInfo.Schema.GetName(), + PartitionName: apiEvent.PartitionInfo.PartitionName, + } + err := c.dataHandler.CreatePartition(ctx, createParam) + if err != nil { + log.Warn("fail to create partition", zap.Any("event", apiEvent), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) dropPartition(ctx context.Context, apiEvent *api.ReplicateAPIEvent) error { + if ready := c.WaitDatabaseReady(ctx, apiEvent.ReplicateParam.Database); !ready { + log.Warn("database is not ready", zap.String("database", apiEvent.ReplicateParam.Database)) + return errors.New("database is not ready") + } + dropParam := &api.DropPartitionParam{ + MsgBaseParam: api.MsgBaseParam{Base: &commonpb.MsgBase{ReplicateInfo: apiEvent.ReplicateInfo}}, + ReplicateParam: apiEvent.ReplicateParam, + CollectionName: apiEvent.CollectionInfo.Schema.GetName(), + PartitionName: apiEvent.PartitionInfo.PartitionName, + } + err := c.dataHandler.DropPartition(ctx, dropParam) + if err != nil { + log.Warn("fail to drop partition", zap.Any("event", apiEvent), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) createDatabase(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { + createDatabaseMsg := msg.(*msgstream.CreateDatabaseMsg) + err := c.dataHandler.CreateDatabase(ctx, &api.CreateDatabaseParam{ + CreateDatabaseRequest: milvuspb.CreateDatabaseRequest{ + Base: msgBase, + DbName: createDatabaseMsg.GetDbName(), + }, + }) + if err != nil { + log.Warn("failed to create database", zap.Any("msg", createDatabaseMsg), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) dropDatabase(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { + dropDatabaseMsg := msg.(*msgstream.DropDatabaseMsg) + err := c.dataHandler.DropDatabase(ctx, &api.DropDatabaseParam{ + DropDatabaseRequest: milvuspb.DropDatabaseRequest{ + Base: msgBase, + DbName: dropDatabaseMsg.GetDbName(), + }, + }) + if err != nil { + log.Warn("failed to drop database", zap.Any("msg", dropDatabaseMsg), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) flush(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { + flushMsg := msg.(*msgstream.FlushMsg) + if !c.WaitDatabaseReady(ctx, flushMsg.GetDbName()) { + log.Warn("database is not ready", zap.Any("msg", flushMsg)) + return errors.New("database is not ready") + } + for _, s := range flushMsg.GetCollectionNames() { + if !c.WaitCollectionReady(ctx, s, flushMsg.GetDbName()) { + log.Warn("collection is not ready", zap.Any("msg", flushMsg)) + return errors.New("collection is not ready") + } + } + err := c.dataHandler.Flush(ctx, &api.FlushParam{ + ReplicateParam: api.ReplicateParam{ + Database: flushMsg.GetDbName(), + }, + FlushRequest: milvuspb.FlushRequest{ + Base: msgBase, + CollectionNames: flushMsg.GetCollectionNames(), + }, + }) + if err != nil { + log.Warn("failed to flush", zap.Any("msg", flushMsg), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) createIndex(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { + createIndexMsg := msg.(*msgstream.CreateIndexMsg) + if !c.WaitDatabaseReady(ctx, createIndexMsg.GetDbName()) { + log.Warn("database is not ready", zap.Any("msg", createIndexMsg)) + return errors.New("database is not ready") + } + if !c.WaitCollectionReady(ctx, createIndexMsg.GetCollectionName(), createIndexMsg.GetDbName()) { + log.Warn("collection is not ready", zap.Any("msg", createIndexMsg)) + return errors.New("collection is not ready") + } + err := c.dataHandler.CreateIndex(ctx, &api.CreateIndexParam{ + ReplicateParam: api.ReplicateParam{ + Database: createIndexMsg.GetDbName(), + }, + CreateIndexRequest: milvuspb.CreateIndexRequest{ + Base: msgBase, + CollectionName: createIndexMsg.GetCollectionName(), + FieldName: createIndexMsg.GetFieldName(), + IndexName: createIndexMsg.GetIndexName(), + ExtraParams: createIndexMsg.GetExtraParams(), + }, + }) + if err != nil { + log.Warn("fail to create index", zap.Any("msg", createIndexMsg), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) dropIndex(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { + dropIndexMsg := msg.(*msgstream.DropIndexMsg) + err := c.dataHandler.DropIndex(ctx, &api.DropIndexParam{ + ReplicateParam: api.ReplicateParam{ + Database: dropIndexMsg.GetDbName(), + }, + DropIndexRequest: milvuspb.DropIndexRequest{ + Base: msgBase, + CollectionName: dropIndexMsg.GetCollectionName(), + FieldName: dropIndexMsg.GetFieldName(), + IndexName: dropIndexMsg.GetIndexName(), + }, + }) + if err != nil { + log.Warn("fail to drop index", zap.Any("msg", dropIndexMsg), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) loadCollection(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { + loadCollectionMsg := msg.(*msgstream.LoadCollectionMsg) + if !c.WaitDatabaseReady(ctx, loadCollectionMsg.GetDbName()) { + log.Warn("database is not ready", zap.Any("msg", loadCollectionMsg)) + return errors.New("database is not ready") + } + if !c.WaitCollectionReady(ctx, loadCollectionMsg.GetCollectionName(), loadCollectionMsg.GetDbName()) { + log.Warn("collection is not ready", zap.Any("msg", loadCollectionMsg)) + return errors.New("collection is not ready") + } + err := c.dataHandler.LoadCollection(ctx, &api.LoadCollectionParam{ + ReplicateParam: api.ReplicateParam{ + Database: loadCollectionMsg.GetDbName(), + }, + LoadCollectionRequest: milvuspb.LoadCollectionRequest{ + Base: msgBase, + CollectionName: loadCollectionMsg.GetCollectionName(), + }, + }) + if err != nil { + log.Warn("fail to load collection", zap.Any("msg", loadCollectionMsg), zap.Error(err)) + return err + } + return nil +} + +func (c *ChannelWriter) releaseCollection(ctx context.Context, msgBase *commonpb.MsgBase, msg msgstream.TsMsg) error { + releaseCollectionMsg := msg.(*msgstream.ReleaseCollectionMsg) + err := c.dataHandler.ReleaseCollection(ctx, &api.ReleaseCollectionParam{ + ReplicateParam: api.ReplicateParam{ + Database: releaseCollectionMsg.GetDbName(), + }, + ReleaseCollectionRequest: milvuspb.ReleaseCollectionRequest{ + Base: msgBase, + CollectionName: releaseCollectionMsg.GetCollectionName(), + }, + }) + if err != nil { + log.Warn("fail to release collection", zap.Any("msg", releaseCollectionMsg), zap.Error(err)) + return err + } + return nil +} diff --git a/server/cdc_impl.go b/server/cdc_impl.go index af64367f..a5c48e09 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -42,6 +42,7 @@ import ( cdcwriter "github.com/zilliztech/milvus-cdc/core/writer" serverapi "github.com/zilliztech/milvus-cdc/server/api" servererror "github.com/zilliztech/milvus-cdc/server/error" + "github.com/zilliztech/milvus-cdc/server/metrics" "github.com/zilliztech/milvus-cdc/server/model" "github.com/zilliztech/milvus-cdc/server/model/meta" "github.com/zilliztech/milvus-cdc/server/model/request" @@ -156,6 +157,8 @@ func (e *MetaCDC) ReloadTask() { e.cdcTasks.data[taskInfo.TaskID] = taskInfo e.cdcTasks.Unlock() + metrics.TaskNumVec.Add(taskInfo.State) + metrics.TaskStateVec.WithLabelValues(taskInfo.TaskID).Set(float64(taskInfo.State)) if err := e.startInternal(taskInfo, taskInfo.State == meta.TaskStateRunning); err != nil { log.Warn("fail to start the task", zap.Any("task_info", taskInfo), zap.Error(err)) _ = e.pauseTaskWithReason(taskInfo.TaskID, "fail to start task, err: "+err.Error(), []meta.TaskState{}) @@ -274,6 +277,8 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon revertCollectionNames() return nil, servererror.NewServerError(errors.WithMessage(err, "fail to put the task info to etcd")) } + metrics.TaskNumVec.Add(info.State) + metrics.TaskStateVec.WithLabelValues(info.TaskID).Set(float64(info.State)) e.cdcTasks.Lock() e.cdcTasks.data[info.TaskID] = info e.cdcTasks.Unlock() @@ -383,182 +388,9 @@ func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) err replicateEntity, ok := e.replicateEntityMap.data[milvusAddress] e.replicateEntityMap.RUnlock() - newReplicateEntity := func() (*ReplicateEntity, error) { - ctx := context.TODO() - timeoutCtx, cancelFunc := context.WithTimeout(ctx, time.Duration(milvusConnectParam.ConnectTimeout)*time.Second) - milvusClient, err := cdcreader.NewTarget(timeoutCtx, cdcreader.TargetConfig{ - Address: milvusAddress, - Username: milvusConnectParam.Username, - Password: milvusConnectParam.Password, - EnableTLS: milvusConnectParam.EnableTLS, - }) - cancelFunc() - if err != nil { - taskLog.Warn("fail to new target", zap.String("address", milvusAddress), zap.Error(err)) - return nil, servererror.NewClientError("fail to connect target milvus server") - } - // TODO improve the config - sourceConfig := e.config.SourceConfig - metaOp, err := cdcreader.NewEtcdOp(sourceConfig.EtcdAddress, sourceConfig.EtcdRootPath, sourceConfig.EtcdMetaSubPath, sourceConfig.DefaultPartitionName) - if err != nil { - taskLog.Warn("fail to new the meta op", zap.Error(err)) - return nil, servererror.NewClientError("fail to new the meta op") - } - - bufferSize := e.config.SourceConfig.ReadChanLen - channelManager, err := cdcreader.NewReplicateChannelManager(config.MQConfig{ - Pulsar: e.config.SourceConfig.Pulsar, - Kafka: e.config.SourceConfig.Kafka, - }, e.mqFactoryCreator, milvusClient, bufferSize, metaOp) - if err != nil { - taskLog.Warn("fail to create replicate channel manager", zap.Error(err)) - return nil, servererror.NewClientError("fail to create replicate channel manager") - } - targetConfig := milvusConnectParam - dataHandler, err := cdcwriter.NewMilvusDataHandler( - cdcwriter.AddressOption(fmt.Sprintf("%s:%d", targetConfig.Host, targetConfig.Port)), - cdcwriter.UserOption(targetConfig.Username, targetConfig.Password), - cdcwriter.TLSOption(targetConfig.EnableTLS), - cdcwriter.IgnorePartitionOption(targetConfig.IgnorePartition), - cdcwriter.ConnectTimeoutOption(targetConfig.ConnectTimeout)) - if err != nil { - taskLog.Warn("fail to new the data handler", zap.Error(err)) - return nil, servererror.NewClientError("fail to new the data handler, task_id: ") - } - writerObj := cdcwriter.NewChannelWriter(dataHandler, bufferSize) - - e.replicateEntityMap.Lock() - defer e.replicateEntityMap.Unlock() - entity, ok := e.replicateEntityMap.data[milvusAddress] - if !ok { - replicateCtx, cancelReplicateFunc := context.WithCancel(ctx) - channelManager.SetCtx(replicateCtx) - entity = &ReplicateEntity{ - targetClient: milvusClient, - channelManager: channelManager, - metaOp: metaOp, - writerObj: writerObj, - quitFunc: cancelReplicateFunc, - } - e.replicateEntityMap.data[milvusAddress] = entity - go func() { - for { - select { - case <-replicateCtx.Done(): - log.Warn("event chan, the replicate context has closed") - return - case replicateAPIEvent, ok := <-entity.channelManager.GetEventChan(): - if !ok { - taskLog.Warn("the replicate api event channel has closed") - return - } - if !e.isRunningTask(info.TaskID) { - taskLog.Warn("not running task", zap.Any("event", replicateAPIEvent)) - return - } - if replicateAPIEvent.EventType == api.ReplicateError { - taskLog.Warn("receive the error event", zap.Any("event", replicateAPIEvent)) - _ = e.pauseTaskWithReason(info.TaskID, "fail to read the replicate event", []meta.TaskState{}) - return - } - err := entity.writerObj.HandleReplicateAPIEvent(replicateCtx, replicateAPIEvent) - if err != nil { - taskLog.Warn("fail to handle replicate event", zap.Any("event", replicateAPIEvent), zap.Error(err)) - _ = e.pauseTaskWithReason(info.TaskID, "fail to handle the replicate event, err: "+err.Error(), []meta.TaskState{}) - return - } - } - } - }() - go func() { - writeCallback := NewWriteCallback(e.metaStoreFactory, e.rootPath, info.TaskID) - for { - select { - case <-replicateCtx.Done(): - log.Warn("channel chan, the replicate context has closed") - return - case channelName, ok := <-entity.channelManager.GetChannelChan(): - taskLog.Info("start to replicate channel", zap.String("channel", channelName)) - if !ok { - taskLog.Warn("the channel name channel has closed") - return - } - if !e.isRunningTask(info.TaskID) { - taskLog.Warn("not running task") - return - } - go func(c string) { - msgChan := entity.channelManager.GetMsgChan(c) - if msgChan == nil { - log.Warn("not found the message channel", zap.String("channel", c)) - return - } - for { - select { - case <-replicateCtx.Done(): - log.Warn("msg chan, the replicate context has closed") - return - case msgPack, ok := <-msgChan: - if !ok { - taskLog.Warn("the data channel has closed") - return - } - if !e.isRunningTask(info.TaskID) { - taskLog.Warn("not running task", zap.Any("pack", msgPack)) - return - } - if msgPack == nil { - log.Warn("the message pack is nil, the task may be stopping") - return - } - pChannel := msgPack.EndPositions[0].GetChannelName() - position, targetPosition, err := entity.writerObj.HandleReplicateMessage(replicateCtx, pChannel, msgPack) - if err != nil { - taskLog.Warn("fail to handle the replicate message", zap.Any("pack", msgPack), zap.Error(err)) - _ = e.pauseTaskWithReason(info.TaskID, "fail to handle replicate message, err:"+err.Error(), []meta.TaskState{}) - return - } - msgTime, _ := tsoutil.ParseHybridTs(msgPack.EndTs) - // TODO should delete the position when drop collection - metaPosition := &meta.PositionInfo{ - Time: msgTime, - DataPair: &commonpb.KeyDataPair{ - Key: c, - Data: position, - }, - } - var metaOpPosition *meta.PositionInfo - if msgPack.Msgs != nil && len(msgPack.Msgs) > 0 && msgPack.Msgs[0].Type() != commonpb.MsgType_TimeTick { - metaOpPosition = metaPosition - } - metaTargetPosition := &meta.PositionInfo{ - Time: msgTime, - DataPair: &commonpb.KeyDataPair{ - Key: pChannel, - Data: targetPosition, - }, - } - if position != nil { - err = writeCallback.UpdateTaskCollectionPosition(TmpCollectionID, TmpCollectionName, c, - metaPosition, metaOpPosition, metaTargetPosition) - if err != nil { - log.Warn("fail to update the collection position", zap.Any("pack", msgPack), zap.Error(err)) - _ = e.pauseTaskWithReason(info.TaskID, "fail to update task position, err:"+err.Error(), []meta.TaskState{}) - return - } - } - } - } - }(channelName) - } - } - }() - } - return entity, nil - } if !ok { var err error - replicateEntity, err = newReplicateEntity() + replicateEntity, err = e.newReplicateEntity(info) if err != nil { return err } @@ -603,8 +435,255 @@ func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) err if rpcRequestPosition == "" && channelSeekPosition[rpcRequestChannelName] != nil { rpcRequestPosition = base64.StdEncoding.EncodeToString(channelSeekPosition[rpcRequestChannelName].MsgID) } - channelReader, err := cdcreader.NewChannelReader(rpcRequestChannelName, - rpcRequestPosition, config.MQConfig{ + channelReader, err := e.getChannelReader(info, replicateEntity, rpcRequestChannelName, rpcRequestPosition) + if err != nil { + return err + } + readCtx, cancelReadFunc := context.WithCancel(context.Background()) + e.replicateEntityMap.Lock() + originQuitFunc := replicateEntity.quitFunc + replicateEntity.quitFunc = func() { + collectionReader.QuitRead(readCtx) + channelReader.QuitRead(readCtx) + cancelReadFunc() + if originQuitFunc != nil { + originQuitFunc() + } + } + e.replicateEntityMap.Unlock() + + if !ignoreUpdateState { + err = store.UpdateTaskState(e.metaStoreFactory.GetTaskInfoMetaStore(ctx), info.TaskID, meta.TaskStateRunning, []meta.TaskState{meta.TaskStateInitial, meta.TaskStatePaused}, "") + if err != nil { + taskLog.Warn("fail to update the task meta", zap.Error(err)) + return servererror.NewServerError(errors.WithMessage(err, "fail to update the task meta, task_id: "+info.TaskID)) + } + } + e.cdcTasks.Lock() + info.State = meta.TaskStateRunning + info.Reason = "" + e.cdcTasks.Unlock() + + collectionReader.StartRead(readCtx) + channelReader.StartRead(readCtx) + return nil +} + +func (e *MetaCDC) newReplicateEntity(info *meta.TaskInfo) (*ReplicateEntity, error) { + taskLog := log.With(zap.String("task_id", info.TaskID)) + milvusConnectParam := info.MilvusConnectParam + milvusAddress := fmt.Sprintf("%s:%d", milvusConnectParam.Host, milvusConnectParam.Port) + + ctx := context.TODO() + timeoutCtx, cancelFunc := context.WithTimeout(ctx, time.Duration(milvusConnectParam.ConnectTimeout)*time.Second) + milvusClient, err := cdcreader.NewTarget(timeoutCtx, cdcreader.TargetConfig{ + Address: milvusAddress, + Username: milvusConnectParam.Username, + Password: milvusConnectParam.Password, + EnableTLS: milvusConnectParam.EnableTLS, + }) + cancelFunc() + if err != nil { + taskLog.Warn("fail to new target", zap.String("address", milvusAddress), zap.Error(err)) + return nil, servererror.NewClientError("fail to connect target milvus server") + } + sourceConfig := e.config.SourceConfig + metaOp, err := cdcreader.NewEtcdOp(sourceConfig.EtcdAddress, sourceConfig.EtcdRootPath, sourceConfig.EtcdMetaSubPath, sourceConfig.DefaultPartitionName) + if err != nil { + taskLog.Warn("fail to new the meta op", zap.Error(err)) + return nil, servererror.NewClientError("fail to new the meta op") + } + + bufferSize := e.config.SourceConfig.ReadChanLen + channelManager, err := cdcreader.NewReplicateChannelManager(config.MQConfig{ + Pulsar: e.config.SourceConfig.Pulsar, + Kafka: e.config.SourceConfig.Kafka, + }, e.mqFactoryCreator, milvusClient, bufferSize, metaOp, func(s string, pack *msgstream.MsgPack) { + replicateMetric(info, s, pack, metrics.OPTypeRead) + }) + if err != nil { + taskLog.Warn("fail to create replicate channel manager", zap.Error(err)) + return nil, servererror.NewClientError("fail to create replicate channel manager") + } + targetConfig := milvusConnectParam + dataHandler, err := cdcwriter.NewMilvusDataHandler( + cdcwriter.AddressOption(fmt.Sprintf("%s:%d", targetConfig.Host, targetConfig.Port)), + cdcwriter.UserOption(targetConfig.Username, targetConfig.Password), + cdcwriter.TLSOption(targetConfig.EnableTLS), + cdcwriter.IgnorePartitionOption(targetConfig.IgnorePartition), + cdcwriter.ConnectTimeoutOption(targetConfig.ConnectTimeout)) + if err != nil { + taskLog.Warn("fail to new the data handler", zap.Error(err)) + return nil, servererror.NewClientError("fail to new the data handler, task_id: ") + } + writerObj := cdcwriter.NewChannelWriter(dataHandler, bufferSize) + + e.replicateEntityMap.Lock() + defer e.replicateEntityMap.Unlock() + entity, ok := e.replicateEntityMap.data[milvusAddress] + if !ok { + replicateCtx, cancelReplicateFunc := context.WithCancel(ctx) + channelManager.SetCtx(replicateCtx) + entity = &ReplicateEntity{ + targetClient: milvusClient, + channelManager: channelManager, + metaOp: metaOp, + writerObj: writerObj, + quitFunc: cancelReplicateFunc, + } + e.replicateEntityMap.data[milvusAddress] = entity + e.startReplicateAPIEvent(replicateCtx, info, entity) + e.startReplicateDMLChannel(replicateCtx, info, entity) + } + return entity, nil +} + +func (e *MetaCDC) startReplicateAPIEvent(replicateCtx context.Context, info *meta.TaskInfo, entity *ReplicateEntity) { + go func() { + taskLog := log.With(zap.String("task_id", info.TaskID)) + for { + select { + case <-replicateCtx.Done(): + log.Warn("event chan, the replicate context has closed") + return + case replicateAPIEvent, ok := <-entity.channelManager.GetEventChan(): + if !ok { + taskLog.Warn("the replicate api event channel has closed") + return + } + if !e.isRunningTask(info.TaskID) { + taskLog.Warn("not running task", zap.Any("event", replicateAPIEvent)) + return + } + if replicateAPIEvent.EventType == api.ReplicateError { + taskLog.Warn("receive the error event", zap.Any("event", replicateAPIEvent)) + _ = e.pauseTaskWithReason(info.TaskID, "fail to read the replicate event", []meta.TaskState{}) + return + } + err := entity.writerObj.HandleReplicateAPIEvent(replicateCtx, replicateAPIEvent) + if err != nil { + taskLog.Warn("fail to handle replicate event", zap.Any("event", replicateAPIEvent), zap.Error(err)) + _ = e.pauseTaskWithReason(info.TaskID, "fail to handle the replicate event, err: "+err.Error(), []meta.TaskState{}) + return + } + metrics.APIExecuteCountVec.WithLabelValues(info.TaskID, replicateAPIEvent.EventType.String()).Inc() + } + } + }() +} + +func (e *MetaCDC) startReplicateDMLChannel(replicateCtx context.Context, info *meta.TaskInfo, entity *ReplicateEntity) { + go func() { + taskLog := log.With(zap.String("task_id", info.TaskID)) + for { + select { + case <-replicateCtx.Done(): + log.Warn("channel chan, the replicate context has closed") + return + case channelName, ok := <-entity.channelManager.GetChannelChan(): + taskLog.Info("start to replicate channel", zap.String("channel", channelName)) + if !ok { + taskLog.Warn("the channel name channel has closed") + return + } + if !e.isRunningTask(info.TaskID) { + taskLog.Warn("not running task") + return + } + e.startReplicateDMLMsg(replicateCtx, info, entity, channelName) + } + } + }() +} + +func (e *MetaCDC) startReplicateDMLMsg(replicateCtx context.Context, info *meta.TaskInfo, entity *ReplicateEntity, channelName string) { + go func() { + taskLog := log.With(zap.String("task_id", info.TaskID)) + writeCallback := NewWriteCallback(e.metaStoreFactory, e.rootPath, info.TaskID) + + msgChan := entity.channelManager.GetMsgChan(channelName) + if msgChan == nil { + log.Warn("not found the message channel", zap.String("channel", channelName)) + return + } + for { + select { + case <-replicateCtx.Done(): + log.Warn("msg chan, the replicate context has closed") + return + case msgPack, ok := <-msgChan: + if !ok { + taskLog.Warn("the data channel has closed") + return + } + if !e.isRunningTask(info.TaskID) { + taskLog.Warn("not running task", zap.Any("pack", msgPack)) + return + } + if msgPack == nil { + log.Warn("the message pack is nil, the task may be stopping") + return + } + pChannel := msgPack.EndPositions[0].GetChannelName() + position, targetPosition, err := entity.writerObj.HandleReplicateMessage(replicateCtx, pChannel, msgPack) + if err != nil { + taskLog.Warn("fail to handle the replicate message", zap.Any("pack", msgPack), zap.Error(err)) + _ = e.pauseTaskWithReason(info.TaskID, "fail to handle replicate message, err:"+err.Error(), []meta.TaskState{}) + return + } + msgTime, _ := tsoutil.ParseHybridTs(msgPack.EndTs) + replicateMetric(info, channelName, msgPack, metrics.OPTypeWrite) + + // TODO should delete the position when drop collection? + metaPosition := &meta.PositionInfo{ + Time: msgTime, + DataPair: &commonpb.KeyDataPair{ + Key: channelName, + Data: position, + }, + } + var metaOpPosition *meta.PositionInfo + if msgPack.Msgs != nil && len(msgPack.Msgs) > 0 && msgPack.Msgs[0].Type() != commonpb.MsgType_TimeTick { + metaOpPosition = metaPosition + } + metaTargetPosition := &meta.PositionInfo{ + Time: msgTime, + DataPair: &commonpb.KeyDataPair{ + Key: pChannel, + Data: targetPosition, + }, + } + if position != nil { + err = writeCallback.UpdateTaskCollectionPosition(TmpCollectionID, TmpCollectionName, channelName, + metaPosition, metaOpPosition, metaTargetPosition) + if err != nil { + log.Warn("fail to update the collection position", zap.Any("pack", msgPack), zap.Error(err)) + _ = e.pauseTaskWithReason(info.TaskID, "fail to update task position, err:"+err.Error(), []meta.TaskState{}) + return + } + } + } + } + }() +} + +func replicateMetric(info *meta.TaskInfo, channelName string, msgPack *msgstream.MsgPack, op string) { + msgTime, _ := tsoutil.ParseHybridTs(msgPack.EndTs) + metrics.ReplicateTimeDifferenceVec. + WithLabelValues(info.TaskID, channelName, op). + Observe(float64(time.Since(time.UnixMilli(msgTime)).Milliseconds())) + var packSize int + for _, msg := range msgPack.Msgs { + packSize += msg.Size() + } + metrics.ReplicateDataSizeVec.WithLabelValues(info.TaskID, channelName, metrics.OPTypeWrite).Observe(float64(packSize)) +} + +func (e *MetaCDC) getChannelReader(info *meta.TaskInfo, replicateEntity *ReplicateEntity, channelName, channelPosition string) (api.Reader, error) { + taskLog := log.With(zap.String("task_id", info.TaskID)) + + channelReader, err := cdcreader.NewChannelReader(channelName, + channelPosition, config.MQConfig{ Pulsar: e.config.SourceConfig.Pulsar, Kafka: e.config.SourceConfig.Kafka, }, func(funcCtx context.Context, pack *msgstream.MsgPack) bool { @@ -612,13 +691,24 @@ func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) err taskLog.Warn("not running task", zap.Any("pack", pack)) return false } + msgTime, _ := tsoutil.ParseHybridTs(pack.EndTs) + + metrics.ReplicateTimeDifferenceVec. + WithLabelValues(info.TaskID, channelName, metrics.OPTypeRead). + Observe(float64(time.Since(time.UnixMilli(msgTime)).Milliseconds())) + positionBytes, err := replicateEntity.writerObj.HandleOpMessagePack(funcCtx, pack) if err != nil { taskLog.Warn("fail to handle the op message pack", zap.Error(err)) _ = e.pauseTaskWithReason(info.TaskID, "fail to handle the op message pack, err:"+err.Error(), []meta.TaskState{}) return false } - msgTime, _ := tsoutil.ParseHybridTs(pack.EndTs) + + metrics.ReplicateTimeDifferenceVec. + WithLabelValues(info.TaskID, channelName, metrics.OPTypeWrite). + Observe(float64(time.Since(time.UnixMilli(msgTime)).Milliseconds())) + metrics.APIExecuteCountVec.WithLabelValues(info.TaskID, pack.Msgs[0].Type().String()).Inc() + channelName := info.RPCRequestChannelInfo.Name metaPosition := &meta.PositionInfo{ Time: msgTime, @@ -639,36 +729,9 @@ func (e *MetaCDC) startInternal(info *meta.TaskInfo, ignoreUpdateState bool) err }, e.mqFactoryCreator) if err != nil { taskLog.Warn("fail to new the channel reader", zap.Error(err)) - return servererror.NewServerError(errors.WithMessage(err, "fail to new the channel reader")) + return nil, servererror.NewServerError(errors.WithMessage(err, "fail to new the channel reader")) } - readCtx, cancelReadFunc := context.WithCancel(context.Background()) - e.replicateEntityMap.Lock() - originQuitFunc := replicateEntity.quitFunc - replicateEntity.quitFunc = func() { - collectionReader.QuitRead(readCtx) - channelReader.QuitRead(readCtx) - cancelReadFunc() - if originQuitFunc != nil { - originQuitFunc() - } - } - e.replicateEntityMap.Unlock() - - if !ignoreUpdateState { - err = store.UpdateTaskState(e.metaStoreFactory.GetTaskInfoMetaStore(ctx), info.TaskID, meta.TaskStateRunning, []meta.TaskState{meta.TaskStateInitial, meta.TaskStatePaused}, "") - if err != nil { - taskLog.Warn("fail to update the task meta", zap.Error(err)) - return servererror.NewServerError(errors.WithMessage(err, "fail to update the task meta, task_id: "+info.TaskID)) - } - } - e.cdcTasks.Lock() - info.State = meta.TaskStateRunning - info.Reason = "" - e.cdcTasks.Unlock() - - collectionReader.StartRead(readCtx) - channelReader.StartRead(readCtx) - return nil + return channelReader, nil } func (e *MetaCDC) isRunningTask(taskID string) bool { diff --git a/server/metrics/metrics.go b/server/metrics/metrics.go index 6a233597..e5f747bf 100644 --- a/server/metrics/metrics.go +++ b/server/metrics/metrics.go @@ -38,15 +38,9 @@ const ( ReadErrorStatusLabel = "read_error" UnmarshalErrorStatusLabel = "unmarshal_error" - // write fail func label - WriteFailOnUpdatePosition = "on_update_position" - WriteFailOnFail = "on_fail" - WriteFailOnDrop = "on_drop" - - // read fail func label - ReadFailUnknown = "unknown" - ReadFailGetCollectionInfo = "get_collection_info" - ReadFailReadStream = "read_stream" + // op type + OPTypeRead = "read" + OPTypeWrite = "write" taskStateLabelName = "task_state" requestTypeLabelName = "request_type" @@ -61,23 +55,33 @@ const ( messageTypeLabelName = "msg_type" apiTypeLabelName = "api_type" apiStatusLabelName = "api_status" + opTypeName = "op_type" // read or write ) var ( registry *prometheus.Registry - buckets = prometheus.ExponentialBuckets(1, 2, 18) TaskNumVec = &TaskNumMetric{ metricDesc: prometheus.NewDesc(prometheus.BuildFQName(milvusNamespace, systemName, "task_num"), "cdc task number", []string{taskStateLabelName}, nil), } + + TaskStateVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: systemName, + Name: "task_state", + Help: "cdc task state", + }, []string{taskIDLabelName}) + TaskRequestLatencyVec = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, Subsystem: systemName, Name: "request_latency", Help: "cdc request latency on the client side ", - Buckets: buckets, + // 0.1 0.2 0.4 0.8 1.6 3.2 6.4 12.8 + Buckets: prometheus.ExponentialBuckets(0.1, 2, 8), }, []string{requestTypeLabelName}) TaskRequestCountVec = prometheus.NewCounterVec( @@ -88,66 +92,41 @@ var ( Help: "cdc request count", }, []string{requestTypeLabelName, requestStatusLabelName}) - StreamingCollectionCountVec = prometheus.NewCounterVec( - prometheus.CounterOpts{ + ReplicateTimeDifferenceVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Namespace: milvusNamespace, Subsystem: systemName, - Name: "streaming_collection_total", - Help: "the number of collections that are synchronizing data", - }, []string{taskIDLabelName, streamingCollectionStatusLabelName}) - - ReaderFailCountVec = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: milvusNamespace, - Subsystem: systemName, - Name: "reader_fail_total", - Help: "the fail count when the reader reads milvus data", - }, []string{taskIDLabelName, readFailFuncLabelName}) + Name: "replicate_tt_lag", + Help: "the time difference between the current time and the current message timestamp, unit: ms", + Buckets: []float64{100, 500, 1000, 5000, 10000, 50000, 100000, 200000, 400000, 800000}, + }, []string{taskIDLabelName, vchannelLabelName, opTypeName}) - WriterFailCountVec = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: milvusNamespace, - Subsystem: systemName, - Name: "writer_callback_fail_total", - Help: "the fail count when the writer executes the callback function", - }, []string{taskIDLabelName, writeFailFuncLabelName}) - - WriterTimeDifferenceVec = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + ReplicateDataSizeVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Namespace: milvusNamespace, Subsystem: systemName, - Name: "writer_tt_lag_ms", - Help: "the time difference between the current time and the current message timestamp", - }, []string{taskIDLabelName, collectionIDLabelName, vchannelLabelName}) - ReadMsgRowCountVec = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: milvusNamespace, - Subsystem: systemName, - Name: "read_msg_row_total", - Help: "the counter of messages that the reader has read", - }, []string{taskIDLabelName, collectionIDLabelName, messageTypeLabelName}) - WriteMsgRowCountVec = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: milvusNamespace, - Subsystem: systemName, - Name: "write_msg_row_total", - Help: "the counter of messages that the writer has written", - }, []string{taskIDLabelName, collectionIDLabelName, messageTypeLabelName}) + Name: "replicate_data_size", + Help: "the size of the message", + // 16 64 256 1k 4k 16k 64k 256k 1M 4M 16M 64M 256M + Buckets: prometheus.ExponentialBuckets(16, 4, 13), + }, []string{taskIDLabelName, vchannelLabelName, opTypeName}) + APIExecuteCountVec = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: milvusNamespace, Subsystem: systemName, Name: "api_execute_total", Help: "the counter of executing api", - }, []string{taskIDLabelName, collectionNameLabelName, apiTypeLabelName, apiStatusLabelName}) + }, []string{taskIDLabelName, apiTypeLabelName}) ) func init() { registry = prometheus.NewRegistry() registry.MustRegister(TaskNumVec) + registry.MustRegister(TaskStateVec) registry.MustRegister(TaskRequestLatencyVec) registry.MustRegister(TaskRequestCountVec) - registry.MustRegister(StreamingCollectionCountVec) - registry.MustRegister(ReaderFailCountVec) - registry.MustRegister(WriterFailCountVec) - registry.MustRegister(WriterTimeDifferenceVec) - registry.MustRegister(ReadMsgRowCountVec) - registry.MustRegister(WriteMsgRowCountVec) + registry.MustRegister(ReplicateTimeDifferenceVec) + registry.MustRegister(ReplicateDataSizeVec) registry.MustRegister(APIExecuteCountVec) } diff --git a/server/metrics/metrics_task_num.go b/server/metrics/metrics_task_num.go index 3a130ddf..764ebc54 100644 --- a/server/metrics/metrics_task_num.go +++ b/server/metrics/metrics_task_num.go @@ -31,10 +31,6 @@ type TaskNumMetric struct { pauseNum int } -func (t *TaskNumMetric) AddInitial() { - t.initialNum++ -} - // add it should be adapted if you modify the meta.MinTaskState or the meta.MaxTaskState func (t *TaskNumMetric) add(s meta.TaskState, errTip string) { innerLog := log.With(zap.String("tip", errTip), zap.Int("state", int(s))) @@ -84,6 +80,10 @@ func (t *TaskNumMetric) Delete(state meta.TaskState) { t.reduce(state, "delete") } +func (t *TaskNumMetric) Add(state meta.TaskState) { + t.add(state, "add state") +} + // Describe it should be adapted if you modify the meta.MinTaskState or the meta.MaxTaskState func (t *TaskNumMetric) Describe(descs chan<- *prometheus.Desc) { descs <- t.metricDesc diff --git a/server/metrics/metrics_task_num_test.go b/server/metrics/metrics_task_num_test.go new file mode 100644 index 00000000..715c2200 --- /dev/null +++ b/server/metrics/metrics_task_num_test.go @@ -0,0 +1,36 @@ +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/zilliztech/milvus-cdc/server/model/meta" +) + +func TestMetricTaskNum(t *testing.T) { + m := &TaskNumMetric{} + m.Add(meta.TaskStateInitial) + assertNum(t, m, 1, 0, 0) + + m.Add(meta.TaskStateRunning) + assertNum(t, m, 1, 1, 0) + + m.UpdateState(meta.TaskStatePaused, meta.TaskStateRunning) + assertNum(t, m, 1, 0, 1) + + m.UpdateState(meta.TaskStateRunning, meta.TaskStateInitial) + assertNum(t, m, 0, 1, 1) + + m.Delete(meta.TaskStatePaused) + assertNum(t, m, 0, 1, 0) + + m.UpdateState(meta.MinTaskState-1, meta.MaxTaskState+1) + assertNum(t, m, 0, 1, 0) +} + +func assertNum(t *testing.T, metric *TaskNumMetric, initialNum, runningNum, pauseNum int) { + assert.Equal(t, initialNum, metric.initialNum) + assert.Equal(t, runningNum, metric.runningNum) + assert.Equal(t, pauseNum, metric.pauseNum) +} diff --git a/server/model/meta/task.go b/server/model/meta/task.go index a9695bec..01906d24 100644 --- a/server/model/meta/task.go +++ b/server/model/meta/task.go @@ -30,8 +30,6 @@ const ( TaskStateInitial TaskState = iota TaskStateRunning TaskStatePaused - - TaskStateTerminate TaskState = 100 ) const ( @@ -53,8 +51,6 @@ func (t TaskState) String() string { return "Running" case TaskStatePaused: return "Paused" - case TaskStateTerminate: - return "Terminate" default: return fmt.Sprintf("Unknown value[%d]", t) } diff --git a/server/model/meta/task_test.go b/server/model/meta/task_test.go index 1d52eec7..acc12705 100644 --- a/server/model/meta/task_test.go +++ b/server/model/meta/task_test.go @@ -40,10 +40,6 @@ func TestTaskState(t *testing.T) { state = TaskStateRunning assert.True(t, state.IsValidTaskState()) assert.Equal(t, "Running", state.String()) - - state = TaskStateTerminate - assert.False(t, state.IsValidTaskState()) - assert.Equal(t, "Terminate", state.String()) } func TestTaskInfo(t *testing.T) { diff --git a/server/store/etcd.go b/server/store/etcd.go index 7b210619..d6a17de7 100644 --- a/server/store/etcd.go +++ b/server/store/etcd.go @@ -77,6 +77,10 @@ func (e *EtcdMetaStore) Txn(ctx context.Context) (any, func(err error) error, er if err == nil { txn.Then(e.txnMap[txn]...) _, err = txn.Commit() + if err != nil { + return err + } + return nil } return err } @@ -117,14 +121,10 @@ func (t *TaskInfoEtcdStore) Put(ctx context.Context, metaObj *meta.TaskInfo, txn return err } taskInfoKey := getTaskInfoKey(t.rootPath, metaObj.TaskID) - defer func() { - if err != nil { - t.log.Warn("fail to put task info", zap.Error(err)) - } - }() if txn != nil { if _, ok := t.txnMap[txn]; !ok { + t.log.Warn("txn not exist") return errors.New("txn not exist") } t.txnMap[txn] = append(t.txnMap[txn], clientv3.OpPut(taskInfoKey, util.ToString(objBytes))) @@ -132,7 +132,11 @@ func (t *TaskInfoEtcdStore) Put(ctx context.Context, metaObj *meta.TaskInfo, txn } _, err = t.etcdClient.Put(timeCtx, taskInfoKey, util.ToString(objBytes)) - return err + if err != nil { + t.log.Warn("fail to put task info", zap.Error(err)) + return err + } + return nil } func (t *TaskInfoEtcdStore) Get(ctx context.Context, metaObj *meta.TaskInfo, txn any) ([]*meta.TaskInfo, error) { @@ -186,14 +190,11 @@ func (t *TaskInfoEtcdStore) Delete(ctx context.Context, metaObj *meta.TaskInfo, t.txnMap[txn] = append(t.txnMap[txn], clientv3.OpDelete(taskInfoKey)) return nil } - var err error - defer func() { - if err != nil { - t.log.Warn("fail to put task info", zap.Error(err)) - } - }() - - _, err = t.etcdClient.Delete(timeCtx, taskInfoKey) + _, err := t.etcdClient.Delete(timeCtx, taskInfoKey) + if err != nil { + t.log.Warn("fail to delete task info", zap.Error(err)) + return err + } return err } @@ -230,21 +231,21 @@ func (t *TaskCollectionPositionEtcdStore) Put(ctx context.Context, metaObj *meta t.log.Warn("fail to marshal task position", zap.Error(err)) return err } - defer func() { - if err != nil { - t.log.Warn("fail to put task position", zap.Error(err)) - } - }() if txn != nil { if _, ok := t.txnMap[txn]; !ok { + t.log.Warn("txn not exist") return errors.New("txn not exist") } t.txnMap[txn] = append(t.txnMap[txn], clientv3.OpPut(taskPositionKey, util.ToString(positionBytes))) return nil } _, err = t.etcdClient.Put(timeCtx, taskPositionKey, util.ToString(positionBytes)) - return err + if err != nil { + t.log.Warn("fail to put task position", zap.Error(err)) + return err + } + return nil } func (t *TaskCollectionPositionEtcdStore) Get(ctx context.Context, metaObj *meta.TaskCollectionPosition, txn any) ([]*meta.TaskCollectionPosition, error) { @@ -261,6 +262,7 @@ func (t *TaskCollectionPositionEtcdStore) Get(ctx context.Context, metaObj *meta } if txn != nil { if _, ok := t.txnMap[txn]; !ok { + t.log.Warn("txn not exist") return nil, errors.New("txn not exist") } t.txnMap[txn] = append(t.txnMap[txn], clientv3.OpGet(key, ops...)) @@ -289,6 +291,7 @@ func (t *TaskCollectionPositionEtcdStore) Get(ctx context.Context, metaObj *meta func (t *TaskCollectionPositionEtcdStore) Delete(ctx context.Context, metaObj *meta.TaskCollectionPosition, txn any) error { if metaObj.TaskID == "" { + t.log.Warn("task id is empty") return errors.New("task id is empty") } timeCtx, cancel := context.WithTimeout(ctx, EtcdOpTimeout) @@ -301,20 +304,19 @@ func (t *TaskCollectionPositionEtcdStore) Delete(ctx context.Context, metaObj *m } if txn != nil { if _, ok := t.txnMap[txn]; !ok { + t.log.Warn("txn not exist") return errors.New("txn not exist") } t.txnMap[txn] = append(t.txnMap[txn], clientv3.OpDelete(key, ops...)) return nil } - var err error - defer func() { - if err != nil { - t.log.Warn("fail to delete task position", zap.Error(err)) - } - }() - _, err = t.etcdClient.Delete(timeCtx, key, ops...) - return err + _, err := t.etcdClient.Delete(timeCtx, key, ops...) + if err != nil { + t.log.Warn("fail to delete task position", zap.Error(err)) + return err + } + return nil } func EtcdStatus(ctx context.Context, etcdClient *clientv3.Client) error { diff --git a/server/store/meta_op.go b/server/store/meta_op.go index a7c9b55e..a5b4a0ea 100644 --- a/server/store/meta_op.go +++ b/server/store/meta_op.go @@ -89,6 +89,7 @@ func UpdateTaskState(taskInfoStore api.MetaStore[*meta.TaskInfo], taskID string, log.Warn("fail to put the task info to etcd", zap.String("task_id", taskID), zap.Error(err)) return err } + metrics.TaskStateVec.WithLabelValues(taskID).Set(float64(newState)) metrics.TaskNumVec.UpdateState(newState, oldState) return nil } @@ -196,6 +197,7 @@ func DeleteTask(factory api.MetaStoreFactory, taskID string) (*meta.TaskInfo, er commitErr := commitFunc(err) if commitErr == nil { metrics.TaskNumVec.Delete(info.State) + metrics.TaskStateVec.WithLabelValues(info.TaskID).Set(-1) return info, nil } return nil, commitErr diff --git a/server/store/mysql.go b/server/store/mysql.go index 84a45b15..8a81373f 100644 --- a/server/store/mysql.go +++ b/server/store/mysql.go @@ -31,8 +31,9 @@ func NewMySQLMetaStore(ctx context.Context, dataSourceName string, rootPath stri err := s.init(ctx, dataSourceName, rootPath) if err != nil { s.log.Warn("fail to init db object", zap.Error(err)) + return nil, err } - return s, err + return s, nil } func (s *MySQLMetaStore) init(ctx context.Context, dataSourceName string, rootPath string) error { @@ -116,8 +117,9 @@ func NewTaskInfoMysqlStore(ctx context.Context, db *sql.DB, rootPath string, txn err := m.init(ctx, db, rootPath) if err != nil { m.log.Warn("fail to init task info store", zap.Error(err)) + return nil, err } - return m, err + return m, nil } func (m *TaskInfoMysqlStore) init(ctx context.Context, db *sql.DB, rootPath string) error { @@ -134,10 +136,11 @@ func (m *TaskInfoMysqlStore) init(ctx context.Context, db *sql.DB, rootPath stri `) if err != nil { m.log.Warn("fail to create table", zap.Error(err)) + return err } m.db = db m.rootPath = rootPath - return err + return nil } func (m *TaskInfoMysqlStore) Put(ctx context.Context, metaObj *meta.TaskInfo, txn any) error { @@ -164,10 +167,16 @@ func (m *TaskInfoMysqlStore) Put(ctx context.Context, metaObj *meta.TaskInfo, tx } defer stmt.Close() _, err = stmt.ExecContext(ctx, taskInfoKey, metaObj.TaskID, util.ToString(objBytes), util.ToString(objBytes)) - return err + if err != nil { + return err + } + return nil } _, err = m.db.ExecContext(ctx, sqlStr, taskInfoKey, metaObj.TaskID, util.ToString(objBytes), util.ToString(objBytes)) - return err + if err != nil { + return err + } + return nil } func (m *TaskInfoMysqlStore) Get(ctx context.Context, metaObj *meta.TaskInfo, txn any) ([]*meta.TaskInfo, error) { @@ -244,15 +253,20 @@ func (m *TaskInfoMysqlStore) Delete(ctx context.Context, metaObj *meta.TaskInfo, } stmt, err := m.txnMap[txn]().PrepareContext(ctx, sqlStr) if err != nil { - m.log.Warn("fail to prepare delete statement", zap.Error(err)) return err } defer stmt.Close() _, err = stmt.ExecContext(ctx, taskID) - return err + if err != nil { + return err + } + return nil } _, err = m.db.ExecContext(ctx, sqlStr, taskID) - return err + if err != nil { + return err + } + return nil } type TaskCollectionPositionMysqlStore struct { @@ -271,8 +285,9 @@ func NewTaskCollectionPositionMysqlStore(ctx context.Context, db *sql.DB, rootPa err := s.init(ctx, db, rootPath) if err != nil { s.log.Warn("fail to init task collection position store", zap.Error(err)) + return nil, err } - return s, err + return s, nil } func (m *TaskCollectionPositionMysqlStore) init(ctx context.Context, db *sql.DB, rootPath string) error { @@ -294,10 +309,11 @@ func (m *TaskCollectionPositionMysqlStore) init(ctx context.Context, db *sql.DB, `) if err != nil { m.log.Warn("fail to create table", zap.Error(err)) + return err } m.db = db m.rootPath = rootPath - return err + return nil } func (m *TaskCollectionPositionMysqlStore) Put(ctx context.Context, metaObj *meta.TaskCollectionPosition, txn any) error { @@ -318,15 +334,20 @@ func (m *TaskCollectionPositionMysqlStore) Put(ctx context.Context, metaObj *met } stmt, err := m.txnMap[txn]().PrepareContext(ctx, sqlStr) if err != nil { - m.log.Warn("fail to prepare put statement", zap.Error(err)) return err } defer stmt.Close() _, err = stmt.ExecContext(ctx, taskPositionKey, metaObj.TaskID, metaObj.CollectionID, metaObj.CollectionName, util.ToString(positionBytes), util.ToString(opPositionBytes), util.ToString(targetPositionBytes), util.ToString(positionBytes), util.ToString(opPositionBytes), util.ToString(targetPositionBytes)) - return err + if err != nil { + return err + } + return nil } _, err = m.db.ExecContext(ctx, sqlStr, taskPositionKey, metaObj.TaskID, metaObj.CollectionID, metaObj.CollectionName, util.ToString(positionBytes), util.ToString(opPositionBytes), util.ToString(targetPositionBytes), util.ToString(positionBytes), util.ToString(opPositionBytes), util.ToString(targetPositionBytes)) - return err + if err != nil { + return err + } + return nil } func (m *TaskCollectionPositionMysqlStore) Get(ctx context.Context, metaObj *meta.TaskCollectionPosition, txn any) ([]*meta.TaskCollectionPosition, error) { @@ -434,8 +455,14 @@ func (m *TaskCollectionPositionMysqlStore) Delete(ctx context.Context, metaObj * } defer stmt.Close() _, err = stmt.ExecContext(ctx, sqlArgs...) - return err + if err != nil { + return err + } + return nil } _, err = m.db.ExecContext(ctx, sqlStr, sqlArgs...) - return err + if err != nil { + return err + } + return nil } diff --git a/server/writer_callback.go b/server/writer_callback.go index 183742bf..d0ac0ad7 100644 --- a/server/writer_callback.go +++ b/server/writer_callback.go @@ -25,7 +25,6 @@ import ( "github.com/zilliztech/milvus-cdc/core/util" "github.com/zilliztech/milvus-cdc/server/api" - "github.com/zilliztech/milvus-cdc/server/metrics" "github.com/zilliztech/milvus-cdc/server/model/meta" "github.com/zilliztech/milvus-cdc/server/store" ) @@ -63,7 +62,7 @@ func (w *WriteCallback) UpdateTaskCollectionPosition(collectionID int64, collect zap.String("vchannel_name", pChannelName), zap.String("position", util.Base64Encode(position)), zap.Error(err)) - metrics.WriterFailCountVec.WithLabelValues(w.taskID, metrics.WriteFailOnUpdatePosition).Inc() + return err } - return err + return nil }