Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consume: enable resuming #92

Merged
merged 3 commits into from
Oct 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
)

type consumeCmd struct {
sync.Mutex

topic string
brokers []string
tlsCA string
Expand All @@ -35,8 +37,11 @@ type consumeCmd struct {
client sarama.Client
consumer sarama.Consumer
offsetManager sarama.OffsetManager
poms map[int32]sarama.PartitionOffsetManager
}

var offsetResume int64 = -3

type offset struct {
relative bool
start int64
Expand All @@ -63,6 +68,13 @@ func (cmd *consumeCmd) resolveOffset(o offset, partition int32) (int64, error) {
}

return res + o.diff, nil
} else if o.start == offsetResume {
if cmd.group == "" {
return 0, fmt.Errorf("cannot resume without -group argument")
}
pom := cmd.getPOM(partition)
next, _ := pom.NextOffset()
return next, nil
}

return o.start + o.diff, nil
Expand Down Expand Up @@ -91,7 +103,7 @@ type consumeArgs struct {

func parseOffset(str string) (offset, error) {
result := offset{}
re := regexp.MustCompile("(oldest|newest)?(-|\\+)?(\\d+)?")
re := regexp.MustCompile("(oldest|newest|resume)?(-|\\+)?(\\d+)?")
matches := re.FindAllStringSubmatch(str, -1)

if len(matches) == 0 || len(matches[0]) < 4 {
Expand Down Expand Up @@ -124,6 +136,9 @@ func parseOffset(str string) (offset, error) {
case "oldest":
result.relative = true
result.start = sarama.OffsetOldest
case "resume":
result.relative = true
result.start = offsetResume
}

return result, nil
Expand Down Expand Up @@ -265,7 +280,7 @@ func (cmd *consumeCmd) parseFlags(as []string) consumeArgs {
flags.StringVar(&args.version, "version", "", "Kafka protocol version")
flags.StringVar(&args.encodeValue, "encodevalue", "string", "Present message value as (string|hex|base64), defaults to string.")
flags.StringVar(&args.encodeKey, "encodekey", "string", "Present message key as (string|hex|base64), defaults to string.")
flags.StringVar(&args.group, "group", "", "Consumer group to use for marking offsets.")
flags.StringVar(&args.group, "group", "", "Consumer group to use for marking offsets. kt will mark offsets if this arg is supplied.")

flags.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of consume:")
Expand Down Expand Up @@ -327,6 +342,7 @@ func (cmd *consumeCmd) run(args []string) {
if len(partitions) == 0 {
failf("Found no partitions to consume")
}
defer cmd.closePOMs()

cmd.consume(partitions)
}
Expand Down Expand Up @@ -430,21 +446,47 @@ func encodeBytes(data []byte, encoding string) *string {
return &str
}

func (cmd *consumeCmd) closePOMs() {
cmd.Lock()
for p, pom := range cmd.poms {
if err := pom.Close(); err != nil {
fmt.Fprintf(os.Stderr, "failed to close partition offset manager for partition %v err=%v", p, err)
}
}
cmd.Unlock()
}

func (cmd *consumeCmd) getPOM(p int32) sarama.PartitionOffsetManager {
cmd.Lock()
if cmd.poms == nil {
cmd.poms = map[int32]sarama.PartitionOffsetManager{}
}
pom, ok := cmd.poms[p]
if ok {
cmd.Unlock()
return pom
}

pom, err := cmd.offsetManager.ManagePartition(cmd.topic, p)
if err != nil {
cmd.Unlock()
failf("failed to create partition offset manager err=%v", err)
}
cmd.poms[p] = pom
cmd.Unlock()
return pom
}

func (cmd *consumeCmd) partitionLoop(out chan printContext, pc sarama.PartitionConsumer, p int32, end int64) {
defer logClose(fmt.Sprintf("partition consumer %v", p), pc)
var (
timer *time.Timer
pom sarama.PartitionOffsetManager
err error
timeout = make(<-chan time.Time)
)

if cmd.group != "" {
pom, err = cmd.offsetManager.ManagePartition(cmd.topic, p)
if err != nil {
failf("failed to create partition offset manager err=%v", err)
}
defer pom.Close()
pom = cmd.getPOM(p)
}

for {
Expand Down Expand Up @@ -527,11 +569,13 @@ The default is to consume from the oldest offset on every partition for the give

The following syntax is supported for each offset:

(oldest|newest)?(+|-)?(\d+)?
(oldest|newest|resume)?(+|-)?(\d+)?

- "oldest" and "newest" refer to the oldest and newest offsets known for a
given partition.

- "resume" can be used in combination with -group.

- You can use "+" with a numeric value to skip the given number of messages
since the oldest offset. For example, "1=+20" will skip 20 offset value since
the oldest offset for partition 1.
Expand Down
45 changes: 44 additions & 1 deletion system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,49 @@ func TestSystem(t *testing.T) {
require.Contains(t, stdErr, fmt.Sprintf("found partitions=[0] for topic=%v", topicName))
require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":1,"lag":0}]}`, topicName))

fmt.Printf(">> ✓\n")
//
// kt produce
//

req = map[string]interface{}{
"value": fmt.Sprintf("hello, %s", randomString(6)),
"key": "boom",
"partition": float64(0),
}
buf, err = json.Marshal(req)
require.NoError(t, err)
status, stdOut, stdErr = newCmd().stdIn(string(buf)).run("./kt", "produce", "-topic", topicName)
fmt.Printf(">> system test kt produce -topic %v stdout:\n%s\n", topicName, stdOut)
fmt.Printf(">> system test kt produce -topic %v stderr:\n%s\n", topicName, stdErr)
require.Zero(t, status)
require.Empty(t, stdErr)

err = json.Unmarshal([]byte(stdOut), &produceMessage)
require.NoError(t, err)
require.Equal(t, 1, produceMessage["count"])
require.Equal(t, 0, produceMessage["partition"])
require.Equal(t, 1, produceMessage["startOffset"])

fmt.Printf(">> ✓\n")
//
// kt consume
//

status, stdOut, stdErr = newCmd().run("./kt", "consume", "-topic", topicName, "-offsets", "all=resume", "-timeout", "500ms", "-group", "hans")
fmt.Printf(">> system test kt consume -topic %v -offsets all=resume stdout:\n%s\n", topicName, stdOut)
fmt.Printf(">> system test kt consume -topic %v -offsets all=resume stderr:\n%s\n", topicName, stdErr)
require.Zero(t, status)

lines = strings.Split(stdOut, "\n")
require.True(t, len(lines) == 2) // actual line and an empty one

err = json.Unmarshal([]byte(lines[len(lines)-2]), &lastConsumed)
require.NoError(t, err)
require.Equal(t, req["value"], lastConsumed["value"])
require.Equal(t, req["key"], lastConsumed["key"])
require.Equal(t, req["partition"], lastConsumed["partition"])

fmt.Printf(">> ✓\n")
//
// kt group reset
Expand Down Expand Up @@ -174,7 +217,7 @@ func TestSystem(t *testing.T) {
fmt.Printf(">> system test kt group -topic %v stderr:\n%s\n", topicName, stdErr)
require.Zero(t, status)
require.Contains(t, stdErr, fmt.Sprintf("found partitions=[0] for topic=%v", topicName))
require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":0,"lag":1}]}`, topicName))
require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":0,"lag":2}]}`, topicName))

fmt.Printf(">> ✓\n")
//
Expand Down