-
Notifications
You must be signed in to change notification settings - Fork 0
/
kazoo-offsets.go
171 lines (138 loc) · 4.16 KB
/
kazoo-offsets.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"text/tabwriter"
"time"
"github.com/Shopify/sarama"
"github.com/cznic/sortutil"
"github.com/wvanbergen/kazoo-go"
)
var (
zookeeper = flag.String("zookeeper", os.Getenv("ZOOKEEPER_PEERS"), "Zookeeper connection string. It can include a chroot.")
zookeeperTimeout = flag.Int("zookeeper-timeout", 1000, "Zookeeper timeout in milliseconds.")
topic = flag.String("topic", os.Getenv("KAFKA_TOPIC"), "Kafka topic")
group = flag.String("group-id", os.Getenv("KAFKA_GROUP_ID"), "Kafka Consumer Group ID")
)
type ConsumerOffsets struct {
Group string
Topic string
Partitions map[int32]*PartitionOffsets
}
func (co ConsumerOffsets) Print() {
var ps sortutil.Int32Slice
for k, _ := range co.Partitions {
ps = append(ps, k)
}
ps.Sort()
w := new(tabwriter.Writer)
w.Init(os.Stdout, 0, 8, 4, '\t', 0)
fmt.Fprintf(w, "Group ID\tTopic\tPartition\tOffset\tLog Size\tLag\tOwner\n")
for _, p := range ps {
s := co.Partitions[p].Newest - co.Partitions[p].Oldest
l := co.Partitions[p].Newest - co.Partitions[p].Current
fmt.Fprintf(w, "%s\t%s\t%d\t%d\t%d\t%d\t%s\n", co.Group, co.Topic, p, co.Partitions[p].Current, s, l, co.Partitions[p].Owner)
}
w.Flush()
}
type PartitionOffsets struct {
Owner string
Oldest int64
Newest int64
Current int64
}
func main() {
flag.Parse()
checkFlags()
// Prevent zk library froom outputing garbage
log.SetOutput(ioutil.Discard)
conf := kazoo.NewConfig()
conf.Timeout = time.Duration(*zookeeperTimeout) * time.Millisecond
zk, err := kazoo.NewKazooFromConnectionString(*zookeeper, conf)
if err != nil {
printErrorAndExit(69, "Failed to connect to Zookeeper: %v", err)
}
defer zk.Close()
groups, err := zk.Consumergroups()
if err != nil {
printErrorAndExit(69, "Failed to get Kafka Consumer Groups from Zookeeper: %v", err)
}
g := groups.Find(*group)
offsets, err := g.FetchAllOffsets()
if err != nil {
printErrorAndExit(69, "Failed to get offsets for Kafka Consumer Group %s: %v", group, err)
}
co := ConsumerOffsets{
Group: *group,
Topic: *topic,
Partitions: make(map[int32]*PartitionOffsets),
}
for p, o := range offsets[*topic] {
id := "UNCLAIMED"
if owner, err := g.PartitionOwner(*topic, p); err != nil {
printErrorAndExit(69, "Failed to get partition owner for Consumer Group %s topic %s partition %d: err", group, topic, p, err)
} else if owner != nil {
id = owner.ID
}
co.Partitions[p] = &PartitionOffsets{
Owner: id,
Current: o,
}
}
b, err := zk.BrokerList()
if err != nil {
printErrorAndExit(69, "Failed to get kafka broker list from zookeeper: %v", err)
}
k, err := sarama.NewClient(b, sarama.NewConfig())
if err != nil {
printErrorAndExit(69, "Failed to connect to kafka: %v", err)
}
defer k.Close()
ps, err := k.Partitions(*topic)
if err != nil {
printErrorAndExit(69, "Failed to get partitions for topic %s from kafka: %v", topic, err)
}
for _, p := range ps {
oldest, err := k.GetOffset(*topic, p, sarama.OffsetOldest)
if err != nil {
printErrorAndExit(69, "Failed to get oldest offset for topic %s, partition %d, from kafka: %v", topic, p, err)
}
co.Partitions[p].Oldest = oldest
newest, err := k.GetOffset(*topic, p, sarama.OffsetNewest)
if err != nil {
printErrorAndExit(69, "Failed to get newest offset for topic %s, partition %d, from kafka: %v", topic, p, err)
}
co.Partitions[p].Newest = newest
}
co.Print()
}
func printUsageErrorAndExit(format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Available command line options:")
flag.PrintDefaults()
os.Exit(64)
}
func printErrorAndExit(code int, format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
os.Exit(code)
}
func checkFlags() {
var err string
if *zookeeper == "" {
err += "A zookeeper host string is required\n"
}
if *group == "" {
err += "A consumer group id is required.\n"
}
if *topic == "" {
err += "A topic name is required.\n"
}
if err != "" {
printUsageErrorAndExit(err)
}
}