diff --git a/docs/end-to-end.md b/docs/end-to-end.md index fe7dd10..d5c9581 100644 --- a/docs/end-to-end.md +++ b/docs/end-to-end.md @@ -3,66 +3,77 @@ This page describes the end-to-end monitoring feature in KMinion, how it works, and what metrics it provides. ## Motivation + > What is the issue? Why did we build this feature? -We can monitor metrics like CPU usage, free disk space, or even consumer group lag. -However, these metrics don't give us a good idea of the performance characteristics an actual, real-world, client -experiences when connected to the cluster. +We can monitor metrics like CPU usage, free disk space, or even consumer group lag. However, these metrics don't give us +a good idea of the performance characteristics an actual, real-world, client experiences when connected to the cluster. + +With the "classic" metrics lots of questions go unanswered: -With the "classic" metrics lots of questions go unanswered: - Can a client produce messages to the cluster? - Can clients produce & consume messages as well as commit group offsets with an acceptable latency? - Is the cluster in a healthy state from a client's perspective? ## Approach & Implementation + > How do we solve those issues? How does the feature work? -The most reliably way to get real-world performance and availability metrics is to actually run a producer/consumer +The most reliably way to get real-world performance and availability metrics is to actually run a producer/consumer ourselves. This is exactly what the end-to-end monitoring feature does! ## High Level Overview -In order to determine if the cluster is fully operational, and it's performance is within acceptable limits, -KMinion continuously produces and consumes messages to/from the cluster. That way we can measure things like ack-latency, + +In order to determine if the cluster is fully operational, and it's performance is within acceptable limits, KMinion +continuously produces and consumes messages to/from the cluster. That way we can measure things like ack-latency, commit-latency, and roundtrip-time. KMinion creates and manages its own topic for the end-to-end test messages. The name of the topic can be configured. **The first step** is to create a message and send it to the cluster. -- Every produced message is added to an internal tracker, so we can recognize messages being "lost". - A message is considered lost if it doesn't arrive back at the consumer within the configured time span. + +- Every produced message is added to an internal tracker, so we can recognize messages being "lost". A message is + considered lost if it doesn't arrive back at the consumer within the configured time span. **The second step** is to continuously consume the topic. -- As each message arrives, we calculate its roundtrip time (time from the point the message was created, until KMinion received it again) + +- As each message arrives, we calculate its roundtrip time (time from the point the message was created, until KMinion + received it again) - Consumer group offsets are committed periodically, while also recording the time each commit takes. ### Topic Management + The topic KMinion uses, is created and managed completely automatically (the topic name can be configured though). KMinion continuously checks the topic and fixes issues/imbalances automatically: -- Add partitions to the topic, so it has at least as many partitions as there are brokers. -- Will reassign partitions to ensure every broker leads at least one partition, and that all partitions' replicas - are distributed evenly across the brokers. KMinion tries to assign partitionIDs to brokers that have the same broker id. +- Add partitions to the topic, so it has at least as many partitions as there are brokers. +- Will reassign partitions to ensure every broker leads at least one partition, and that all partitions' replicas are + distributed evenly across the brokers. KMinion tries to assign partitionIDs to brokers that have the same broker id. ### Consumer Group Management -On startup each KMinion instance generates a unique identifier (UUID) that is used to create its own consumer group. -It incorporates the shared prefix from the config. + +On startup each KMinion instance generates a unique identifier (UUID) that is used to create its own consumer group. It +incorporates the shared prefix from the config. That is necessary because: + - Offsets must not be shared among multiple instances. - Each instance must always consume **all** partitions of the topic. -The instances' UUID is also embedded in every message, so each instance can easily filter out messages it didn't produce. -That's why it is perfectly fine to run multiple KMinion instances against the same cluster, using the same topic. - -KMinion also monitors and deletes consumer groups that use it's configured prefix. -That way, when an instance exits/restarts, previous consumer groups will be cleaned up quickly (check happens every 20s). +The instances' UUID is also embedded in every message, so each instance can easily filter out messages it didn't +produce. That's why it is perfectly fine to run multiple KMinion instances against the same cluster, using the same +topic. +KMinion also monitors and deletes consumer groups that use it's configured prefix. That way, when an instance +exits/restarts, previous consumer groups will be cleaned up quickly (check happens every 20s). ## Available Metrics + The end-to-end monitoring feature exports the following metrics. ### Counters + | Name | Description | | --- | --- | | `kminion_end_to_end_messages_produced_total ` | Messages KMinion *tried* to send | @@ -70,8 +81,8 @@ The end-to-end monitoring feature exports the following metrics. | `kminion_end_to_end_messages_received_total ` | Number of messages received (only counts those that match, i.e. that this instance actually produced itself) | | `kminion_end_to_end_commits_total` | Number of successful offset commits | - ### Histograms + | Name | Description | | --- | --- | | `kminion_end_to_end_produce_latency_seconds ` | Duration until the cluster acknowledged a message. | @@ -79,6 +90,7 @@ The end-to-end monitoring feature exports the following metrics. | `kminion_end_to_end_roundtrip_latency_seconds ` | Duration from creation of a message, until it was received/consumed again. | ## Config Properties + All config properties related to this feature are located in `minion.endToEnd`. ```yaml @@ -117,6 +129,11 @@ All config properties related to this feature are located in `minion.endToEnd`. consumer: # Prefix kminion uses when creating its consumer groups. Current kminion instance id will be appended automatically groupIdPrefix: kminion-end-to-end + + # Whether KMinion should try to delete empty consumer groups with the same prefix. This can be used if you want + # KMinion to cleanup it's old consumer groups. It should only be used if you use a unique prefix for KMinion. + deleteStaleConsumerGroups: false + # Defines the time limit beyond which a message is considered "lost" (failed the roundtrip), # also used as the upper bound for histogram buckets in "roundtrip_latency" roundtripSla: 20s diff --git a/docs/reference-config.yaml b/docs/reference-config.yaml index 08ba2f0..bcbed41 100644 --- a/docs/reference-config.yaml +++ b/docs/reference-config.yaml @@ -22,7 +22,7 @@ logger: level: info kafka: - brokers: [] + brokers: [ ] clientId: "kminion" rackId: "" tls: @@ -72,10 +72,10 @@ minion: # AllowedGroups are regex strings of group ids that shall be exported # You can specify allowed groups by providing literals like "my-consumergroup-name" or by providing regex expressions # like "/internal-.*/". - allowedGroups: [".*"] + allowedGroups: [ ".*" ] # IgnoredGroups are regex strings of group ids that shall be ignored/skipped when exporting metrics. Ignored groups # take precedence over allowed groups. - ignoredGroups: [] + ignoredGroups: [ ] topics: # Granularity can be per topic or per partition. If you want to reduce the number of exported metric series and # you aren't interested in per partition metrics you could choose "topic". @@ -83,10 +83,10 @@ minion: # AllowedTopics are regex strings of topic names whose topic metrics that shall be exported. # You can specify allowed topics by providing literals like "my-topic-name" or by providing regex expressions # like "/internal-.*/". - allowedTopics: [".*"] + allowedTopics: [ ".*" ] # IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics # take precedence over allowed topics. - ignoredTopics: [] + ignoredTopics: [ ] logDirs: # Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior # to version 1.0.0 as describing log dirs was not supported back then. @@ -132,6 +132,11 @@ minion: consumer: # Prefix kminion uses when creating its consumer groups. Current kminion instance id will be appended automatically groupIdPrefix: kminion-end-to-end + + # Whether KMinion should try to delete empty consumer groups with the same prefix. This can be used if you want + # KMinion to cleanup it's old consumer groups. It should only be used if you use a unique prefix for KMinion. + deleteStaleConsumerGroups: false + # This defines: # - Upper bound for histogram buckets in "roundtrip_latency" # - Time limit beyond which a message is considered "lost" (failed the roundtrip) diff --git a/e2e/config_consumer.go b/e2e/config_consumer.go index 960577a..602bdef 100644 --- a/e2e/config_consumer.go +++ b/e2e/config_consumer.go @@ -6,7 +6,8 @@ import ( ) type EndToEndConsumerConfig struct { - GroupIdPrefix string `koanf:"groupIdPrefix"` + GroupIdPrefix string `koanf:"groupIdPrefix"` + DeleteStaleConsumerGroups bool `koanf:"deleteStaleConsumerGroups"` RoundtripSla time.Duration `koanf:"roundtripSla"` CommitSla time.Duration `koanf:"commitSla"` @@ -14,11 +15,15 @@ type EndToEndConsumerConfig struct { func (c *EndToEndConsumerConfig) SetDefaults() { c.GroupIdPrefix = "kminion-end-to-end" + c.DeleteStaleConsumerGroups = false c.RoundtripSla = 20 * time.Second c.CommitSla = 10 * time.Second // no idea what to use as a good default value } func (c *EndToEndConsumerConfig) Validate() error { + if len(c.GroupIdPrefix) < 3 { + return fmt.Errorf("kminion prefix should be at least 3 characters long") + } if c.RoundtripSla <= 0 { return fmt.Errorf("consumer.roundtripSla must be greater than zero") diff --git a/e2e/consumer.go b/e2e/consumer.go index a825d5e..551bf36 100644 --- a/e2e/consumer.go +++ b/e2e/consumer.go @@ -35,9 +35,7 @@ func (s *Service) startConsumeMessages(ctx context.Context) { // Process messages fetches.EachRecord(func(record *kgo.Record) { - if record != nil { - s.processMessage(record, receiveTimestamp) - } + s.processMessage(record, receiveTimestamp) }) } } diff --git a/e2e/group_tracker.go b/e2e/group_tracker.go index 83d59d4..cadc795 100644 --- a/e2e/group_tracker.go +++ b/e2e/group_tracker.go @@ -2,7 +2,6 @@ package e2e import ( "context" - "fmt" "strings" "time" @@ -23,75 +22,56 @@ const ( // Whenever a kminion instance starts up it creates a consumer-group for itself in order to not "collide" with other kminion instances. // When an instance restarts (for whatever reason), it creates a new group again, so we'd end up with a lot of unused groups. type groupTracker struct { - svc *Service // used to obtain stuff like logger, kafka client, ... - logger *zap.Logger - ctx context.Context // cancellation context - - client *kgo.Client // kafka client - + cfg Config + logger *zap.Logger + client *kgo.Client // kafka client groupId string // our own groupId potentiallyEmptyGroups map[string]time.Time // groupName -> utc timestamp when the group was first seen - - isNotAuthorized bool // if we get a not authorized response while trying to delete old groups, this will be set to true, essentially disabling the tracker } -func newGroupTracker(svc *Service, ctx context.Context) *groupTracker { - - tracker := groupTracker{ - svc: svc, - logger: svc.logger.Named("groupTracker"), - ctx: ctx, - - client: svc.client, - - groupId: svc.groupId, +func newGroupTracker(cfg Config, logger *zap.Logger, client *kgo.Client, groupID string) *groupTracker { + return &groupTracker{ + cfg: cfg, + logger: logger.Named("groupTracker"), + client: client, + groupId: groupID, potentiallyEmptyGroups: make(map[string]time.Time), - - isNotAuthorized: false, } - - return &tracker } -func (g *groupTracker) start() { +func (g *groupTracker) start(ctx context.Context) { g.logger.Debug("starting group tracker") deleteOldGroupsTicker := time.NewTicker(oldGroupCheckInterval) - // stop ticker when context is cancelled - go func() { - <-g.ctx.Done() - g.logger.Debug("stopping group tracker, context was cancelled") - deleteOldGroupsTicker.Stop() - }() - - // look for old consumer groups and delete them - go func() { - for range deleteOldGroupsTicker.C { - err := g.checkAndDeleteOldConsumerGroups() + for { + select { + case <-ctx.Done(): + g.logger.Debug("stopping group tracker, context was cancelled") + return + case <-deleteOldGroupsTicker.C: + childCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + err := g.checkAndDeleteOldConsumerGroups(childCtx) if err != nil { g.logger.Error("failed to check for old consumer groups: %w", zap.Error(err)) } + cancel() } - }() -} - -func (g *groupTracker) checkAndDeleteOldConsumerGroups() error { - if g.isNotAuthorized { - return nil } +} +func (g *groupTracker) checkAndDeleteOldConsumerGroups(ctx context.Context) error { groupsRq := kmsg.NewListGroupsRequest() groupsRq.StatesFilter = []string{"Empty"} - g.logger.Debug("checking for empty kminion consumer groups...") + g.logger.Debug("checking for stale kminion consumer groups") - shardedResponse := g.client.RequestSharded(g.ctx, &groupsRq) + shardedResponse := g.client.RequestSharded(ctx, &groupsRq) // find groups that start with the kminion prefix - matchingGroups := make([]string, 0, 10) + matchingGroups := make([]string, 0) for _, shard := range shardedResponse { if shard.Err != nil { - g.logger.Error("error in response to ListGroupsRequest", zap.Error(shard.Err)) + g.logger.Error("error in response to ListGroupsRequest", zap.Int32("broker_id", shard.Meta.NodeID), zap.Error(shard.Err)) continue } @@ -108,21 +88,20 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error { continue // skip our own consumer group } - if strings.HasPrefix(name, g.svc.config.Consumer.GroupIdPrefix) { + if strings.HasPrefix(name, g.cfg.Consumer.GroupIdPrefix) { matchingGroups = append(matchingGroups, name) } } } // save new (previously unseen) groups to tracker - g.logger.Debug(fmt.Sprintf("found %v matching kminion consumer groups", len(matchingGroups)), zap.Strings("groups", matchingGroups)) + g.logger.Debug("checked for stale consumer groups", zap.Int("found_groups", len(matchingGroups)), zap.Strings("groups", matchingGroups)) for _, name := range matchingGroups { _, exists := g.potentiallyEmptyGroups[name] if !exists { // add it with the current timestamp - now := time.Now() - g.potentiallyEmptyGroups[name] = now - g.logger.Debug("new empty kminion group, adding to tracker", zap.String("group", name), zap.Time("firstSeen", now)) + g.potentiallyEmptyGroups[name] = time.Now() + g.logger.Debug("found new empty kminion group, adding it to the tracker", zap.String("group", name)) } } @@ -154,7 +133,7 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error { deleteRq := kmsg.NewDeleteGroupsRequest() deleteRq.Groups = groupsToDelete - deleteResp := g.client.RequestSharded(g.ctx, &deleteRq) + deleteResp := g.client.RequestSharded(ctx, &deleteRq) // done, now just errors // if we get a not authorized error we'll disable deleting groups @@ -190,7 +169,6 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error { if foundNotAuthorizedError { g.logger.Info("disabling trying to delete old kminion consumer-groups since one of the last delete results had an 'GroupAuthorizationFailed' error") - g.isNotAuthorized = true } return nil diff --git a/e2e/service.go b/e2e/service.go index d08ddcf..f138ecc 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -86,7 +86,7 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN clientHooks: hooks, } - svc.groupTracker = newGroupTracker(svc, ctx) + svc.groupTracker = newGroupTracker(cfg, logger, client, groupID) svc.messageTracker = newMessageTracker(svc) makeCounter := func(name string, help string) prometheus.Counter { @@ -141,53 +141,55 @@ func (s *Service) Start(ctx context.Context) error { s.partitionCount = partitions // finally start everything else (producing, consuming, continous validation, consumer group tracking) - go s.initEndToEnd(ctx) + go s.startReconciliation(ctx) + go s.startConsumeMessages(ctx) + go s.startProducer(ctx) + + // keep track of groups, delete old unused groups + if s.config.Consumer.DeleteStaleConsumerGroups { + go s.groupTracker.start(ctx) + } return nil } -func (s *Service) initEndToEnd(ctx context.Context) { - +func (s *Service) startReconciliation(ctx context.Context) { validateTopicTicker := time.NewTicker(s.config.TopicManagement.ReconciliationInterval) - produceTicker := time.NewTicker(s.config.ProbeInterval) - commitTicker := time.NewTicker(5 * time.Second) - // stop tickers when context is cancelled - go func() { - <-ctx.Done() - produceTicker.Stop() - validateTopicTicker.Stop() - commitTicker.Stop() - }() - - // keep checking end-to-end topic - go func() { - for range validateTopicTicker.C { + for { + select { + case <-ctx.Done(): + return + case <-validateTopicTicker.C: err := s.validateManagementTopic(ctx) if err != nil { s.logger.Error("failed to validate end-to-end topic", zap.Error(err)) } } - }() - - // keep track of groups, delete old unused groups - go s.groupTracker.start() - - // start consuming topic - go s.startConsumeMessages(ctx) + } +} - // start comitting offsets - go func() { - for range commitTicker.C { - s.commitOffsets(ctx) +func (s *Service) startProducer(ctx context.Context) { + produceTicker := time.NewTicker(s.config.ProbeInterval) + for { + select { + case <-ctx.Done(): + return + case <-produceTicker.C: + s.produceLatencyMessages(ctx) } - }() + } +} - // start producing to topic - go func() { - for range produceTicker.C { - s.produceLatencyMessages(ctx) +func (s *Service) startOffsetCommits(ctx context.Context) { + commitTicker := time.NewTicker(5 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-commitTicker.C: + s.commitOffsets(ctx) } - }() + } }