Skip to content

Commit

Permalink
group: improves group offset and lag results. fix #62
Browse files Browse the repository at this point in the history
* don't display sarama.OffsetNewest / sarama.OffsetOldest values. instead,
  display `null`. displaying -1/-2 led to incorrect lag calculation #62 and
  confusion when reading the results. i hope `null` is more explicit in that no
  offset has been marked for this topic/group/partition combination.

* don't display a result when no offset has been marked. instead display
  `null`. cf #62

* change result output structure from:
  {"name": "grp", "topic": "news", "offsets": {"1": {"offset": 2, "lag": 23}}}
  to
  {"name": "grp", "topic": "news", "offsets": [{"partition": 1, "offset": 2, "lag": 23}]}
  • Loading branch information
fgeller committed Aug 4, 2017
1 parent 8e4a96f commit d1b2589
Showing 1 changed file with 31 additions and 29 deletions.
60 changes: 31 additions & 29 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ type groupCmd struct {
}

type group struct {
Name string `json:"name"`
Topic string `json:"topic,omitempty"`
Offsets map[int32]groupOffset `json:"offsets,omitempty"`
Name string `json:"name"`
Topic string `json:"topic,omitempty"`
Offsets []groupOffset `json:"offsets,omitempty"`
}

type groupOffset struct {
Offset int64 `json:"offset"`
Lag int64 `json:"lag"`
Partition int32 `json:"partition"`
Offset *int64 `json:"offset"`
Lag *int64 `json:"lag"`
}

const (
Expand Down Expand Up @@ -119,8 +120,8 @@ func (cmd *groupCmd) run(args []string, q chan struct{}) {
}

func (cmd *groupCmd) printGroupTopicOffset(out chan printContext, grp, top string) {
target := group{Name: grp, Topic: top, Offsets: map[int32]groupOffset{}}
results := make(chan groupOffsetResult)
target := group{Name: grp, Topic: top, Offsets: []groupOffset{}}
results := make(chan groupOffset)
done := make(chan struct{})
parts := cmd.partitions
if len(cmd.partitions) == 0 {
Expand All @@ -137,7 +138,7 @@ awaitGroupOffsets:
for {
select {
case res := <-results:
target.Offsets[res.partition] = groupOffset{Offset: res.offset, Lag: res.lag}
target.Offsets = append(target.Offsets, res)
case <-done:
break awaitGroupOffsets
}
Expand All @@ -150,17 +151,22 @@ awaitGroupOffsets:
}
}

type groupOffsetResult struct {
partition int32
offset int64
lag int64
func (cmd *groupCmd) resolveOffset(top string, part int32, off int64) int64 {
resolvedOff, err := cmd.client.GetOffset(top, part, off)
if err != nil {
failf("failed to get offset to reset to for partition=%d err=%v", part, err)
}

if cmd.verbose {
fmt.Fprintf(os.Stderr, "resolved offset %v for topic=%s partition=%d to %v\n", off, top, part, resolvedOff)
}

return resolvedOff
}

func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, grp, top string, part int32, results chan groupOffsetResult) {
func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, grp, top string, part int32, results chan groupOffset) {
var (
err error
groupOff int64
partOff int64
offsetManager sarama.OffsetManager
shouldReset = cmd.reset >= 0 || cmd.reset == sarama.OffsetNewest || cmd.reset == sarama.OffsetOldest
)
Expand All @@ -178,29 +184,25 @@ func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, grp, top string, part
}
defer logClose("partition offset manager", pom)

groupOff, _ = pom.NextOffset()
groupOff, _ := pom.NextOffset()
if shouldReset {
resolvedOff := cmd.reset
switch resolvedOff {
case sarama.OffsetNewest, sarama.OffsetOldest:
off, err := cmd.client.GetOffset(top, part, cmd.reset)
if err != nil {
failf("failed to get offset to reset to for partition=%d err=%v", part, err)
}
resolvedOff = off
if cmd.verbose {
fmt.Fprintf(os.Stderr, "resolved reset offset for partition=%d to %v\n", part, resolvedOff)
}
if resolvedOff == sarama.OffsetNewest || resolvedOff == sarama.OffsetOldest {
resolvedOff = cmd.resolveOffset(top, part, cmd.reset)
}
groupOff = resolvedOff
pom.MarkOffset(resolvedOff, "")
}

if partOff, err = cmd.client.GetOffset(top, part, sarama.OffsetNewest); err != nil {
failf("failed to read partition offset for topic=%s partition=%d err=%v", top, part, err)
// we haven't reset it, and it wasn't set before - lag depends on client's config
if groupOff == sarama.OffsetNewest || groupOff == sarama.OffsetOldest {
results <- groupOffset{Partition: part}
return
}

results <- groupOffsetResult{partition: part, offset: groupOff, lag: partOff - groupOff}
partOff := cmd.resolveOffset(top, part, sarama.OffsetNewest)
lag := partOff - groupOff
results <- groupOffset{Partition: part, Offset: &groupOff, Lag: &lag}
}

func (cmd *groupCmd) fetchTopics() []string {
Expand Down

0 comments on commit d1b2589

Please sign in to comment.