Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventcollector: add more metrics to help debug the lag #660

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
)

var (
handleEventDuration = metrics.EventCollectorHandleEventDuration
handleEventDuration = metrics.EventCollectorHandleEventDuration
metricsDSInputChanLen = metrics.DynamicStreamEventChanSize.WithLabelValues("event-collector")
metricsDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("event-collector")
)

type DispatcherRequest struct {
Expand All @@ -49,7 +51,7 @@ type DispatcherRequest struct {
OnlyUse bool
}

type TargetAndDispatcherRequest struct {
type DispatcherRequestWithTarget struct {
Target node.ID
Topic string
Req DispatcherRequest
Expand All @@ -70,14 +72,13 @@ EventCollector is the relay between EventService and DispatcherManager, responsi
EventCollector is an instance-level component.
*/
type EventCollector struct {
serverId node.ID
dispatcherMap sync.Map
globalMemoryQuota int64
mc messaging.MessageCenter
wg sync.WaitGroup
serverId node.ID
dispatcherMap sync.Map
mc messaging.MessageCenter
wg sync.WaitGroup

// dispatcherRequestChan is used cached dispatcher request when some error occurs.
dispatcherRequestChan *chann.DrainableChann[TargetAndDispatcherRequest]
dispatcherRequestChan *chann.DrainableChann[DispatcherRequestWithTarget]

logCoordinatorRequestChan *chann.DrainableChann[*logservicepb.ReusableEventServiceRequest]

Expand All @@ -99,9 +100,8 @@ type EventCollector struct {
func New(ctx context.Context, globalMemoryQuota int64, serverId node.ID) *EventCollector {
eventCollector := EventCollector{
serverId: serverId,
globalMemoryQuota: globalMemoryQuota,
dispatcherMap: sync.Map{},
dispatcherRequestChan: chann.NewAutoDrainChann[TargetAndDispatcherRequest](),
dispatcherRequestChan: chann.NewAutoDrainChann[DispatcherRequestWithTarget](),
logCoordinatorRequestChan: chann.NewAutoDrainChann[*logservicepb.ReusableEventServiceRequest](),
mc: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter),
metricDispatcherReceivedKVEventCount: metrics.DispatcherReceivedEventCount.WithLabelValues("KVEvent"),
Expand Down Expand Up @@ -200,7 +200,7 @@ func (c *EventCollector) ResetDispatcherStat(stat *DispatcherStat) {
}

func (c *EventCollector) addDispatcherRequestToSendingQueue(serverId node.ID, topic string, req DispatcherRequest) {
c.dispatcherRequestChan.In() <- TargetAndDispatcherRequest{
c.dispatcherRequestChan.In() <- DispatcherRequestWithTarget{
Target: serverId,
Topic: topic,
Req: req,
Expand Down Expand Up @@ -268,15 +268,15 @@ func (c *EventCollector) mustSendDispatcherRequest(target node.ID, topic string,
ChangefeedId: req.Dispatcher.GetChangefeedID().ToPB(),
DispatcherId: req.Dispatcher.GetId().ToPB(),
ActionType: req.ActionType,
// FIXME: It can be another server id in the future.
// ServerId is the id of the request sender.
ServerId: c.serverId.String(),
TableSpan: req.Dispatcher.GetTableSpan(),
StartTs: req.StartTs,
OnlyReuse: req.OnlyUse,
},
}

// If the action type is register, we need fill all config related fields.
// If the action type is register and reset, we need fill all config related fields.
if req.ActionType == eventpb.ActionType_ACTION_TYPE_REGISTER ||
req.ActionType == eventpb.ActionType_ACTION_TYPE_RESET {
message.RegisterDispatcherRequest.FilterConfig = req.Dispatcher.GetFilterConfig()
Expand All @@ -297,7 +297,7 @@ func (c *EventCollector) mustSendDispatcherRequest(target node.ID, topic string,
zap.Stringer("target", target),
zap.Error(err))
// Put the request back to the channel for later retry.
c.dispatcherRequestChan.In() <- TargetAndDispatcherRequest{
c.dispatcherRequestChan.In() <- DispatcherRequestWithTarget{
Target: target,
Topic: topic,
Req: req,
Expand All @@ -309,8 +309,9 @@ func (c *EventCollector) mustSendDispatcherRequest(target node.ID, topic string,

// RecvEventsMessage is the handler for the events message from EventService.
func (c *EventCollector) RecvEventsMessage(_ context.Context, targetMessage *messaging.TargetMessage) error {
inflightDuration := time.Since(time.UnixMilli(targetMessage.CreateAt)).Milliseconds()
c.metricReceiveEventLagDuration.Observe(float64(inflightDuration))
inflightDuration := time.Since(time.UnixMilli(targetMessage.CreateAt)).Seconds()
c.metricReceiveEventLagDuration.Observe(inflightDuration)

start := time.Now()
for _, msg := range targetMessage.Message {
switch msg.(type) {
Expand Down Expand Up @@ -341,7 +342,8 @@ func (c *EventCollector) RecvEventsMessage(_ context.Context, targetMessage *mes
log.Panic("invalid message type", zap.Any("msg", msg))
}
}
handleEventDuration.Observe(float64(time.Since(start).Milliseconds()))

handleEventDuration.Observe(time.Since(start).Seconds())
return nil
}

Expand All @@ -355,6 +357,9 @@ func (c *EventCollector) updateMetrics(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
dsMetrics := c.ds.GetMetrics()
metricsDSInputChanLen.Set(float64(dsMetrics.EventChanSize))
metricsDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
c.updateResolvedTsMetric()
}
}
Expand Down Expand Up @@ -597,6 +602,7 @@ func (d *DispatcherStat) unregisterDispatcher(eventCollector *EventCollector) {
}
}

// FIXME: Implement this method.
func (d *DispatcherStat) resetDispatcher(eventCollector *EventCollector) {
d.eventServiceInfo.RLock()
defer d.eventServiceInfo.RUnlock()
Expand Down
66 changes: 33 additions & 33 deletions downstreamadapter/eventcollector/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,39 @@ func (h *EventsHandler) Handle(stat *DispatcherStat, events ...dispatcher.Dispat
if len(events) == 0 {
return false
}
switch events[0].GetType() {
case commonEvent.TypeDDLEvent,
commonEvent.TypeSyncPointEvent,
commonEvent.TypeHandshakeEvent,
commonEvent.TypeReadyEvent,
commonEvent.TypeNotReusableEvent:
if len(events) > 1 {
log.Panic("receive multiple non-batchable events",
zap.String("changefeedID", stat.target.GetChangefeedID().ID().String()),
zap.Stringer("dispatcher", stat.target.GetId()),
zap.Any("events", events))
}
case commonEvent.TypeResolvedEvent,
commonEvent.TypeDMLEvent:
// TypeResolvedEvent and TypeDMLEvent can be in the same batch
for i := 0; i < len(events); i++ {
if events[i].GetType() != commonEvent.TypeResolvedEvent && events[i].GetType() != commonEvent.TypeDMLEvent {
log.Panic("receive multiple events with upexpected types",
zap.String("changefeedID", stat.target.GetChangefeedID().ID().String()),
zap.Stringer("dispatcher", stat.target.GetId()),
zap.Any("events", events))
}
}
default:
for i := 1; i < len(events); i++ {
if events[i].GetType() != events[0].GetType() {
log.Panic("receive multiple events with different types",
zap.String("changefeedID", stat.target.GetChangefeedID().ID().String()),
zap.Stringer("dispatcher", stat.target.GetId()),
zap.Any("events", events))
}
}
}
// switch events[0].GetType() {
// case commonEvent.TypeDDLEvent,
// commonEvent.TypeSyncPointEvent,
// commonEvent.TypeHandshakeEvent,
// commonEvent.TypeReadyEvent,
// commonEvent.TypeNotReusableEvent:
// if len(events) > 1 {
// log.Panic("receive multiple non-batchable events",
// zap.String("changefeedID", stat.target.GetChangefeedID().ID().String()),
// zap.Stringer("dispatcher", stat.target.GetId()),
// zap.Any("events", events))
// }
// case commonEvent.TypeResolvedEvent,
// commonEvent.TypeDMLEvent:
// // TypeResolvedEvent and TypeDMLEvent can be in the same batch
// for i := 0; i < len(events); i++ {
// if events[i].GetType() != commonEvent.TypeResolvedEvent && events[i].GetType() != commonEvent.TypeDMLEvent {
// log.Panic("receive multiple events with upexpected types",
// zap.String("changefeedID", stat.target.GetChangefeedID().ID().String()),
// zap.Stringer("dispatcher", stat.target.GetId()),
// zap.Any("events", events))
// }
// }
// default:
// for i := 1; i < len(events); i++ {
// if events[i].GetType() != events[0].GetType() {
// log.Panic("receive multiple events with different types",
// zap.String("changefeedID", stat.target.GetChangefeedID().ID().String()),
// zap.Stringer("dispatcher", stat.target.GetId()),
// zap.Any("events", events))
// }
// }
// }

// just check the first event type, because all event types should be same
switch events[0].GetType() {
Expand Down
Loading
Loading