diff --git a/consume.go b/consume.go index ed9aec3..6f70265 100644 --- a/consume.go +++ b/consume.go @@ -18,6 +18,8 @@ import ( ) type consumeCmd struct { + sync.Mutex + topic string brokers []string tlsCA string @@ -35,8 +37,11 @@ type consumeCmd struct { client sarama.Client consumer sarama.Consumer offsetManager sarama.OffsetManager + poms map[int32]sarama.PartitionOffsetManager } +var offsetResume int64 = -3 + type offset struct { relative bool start int64 @@ -63,6 +68,13 @@ func (cmd *consumeCmd) resolveOffset(o offset, partition int32) (int64, error) { } return res + o.diff, nil + } else if o.start == offsetResume { + if cmd.group == "" { + return 0, fmt.Errorf("cannot resume without -group argument") + } + pom := cmd.getPOM(partition) + next, _ := pom.NextOffset() + return next, nil } return o.start + o.diff, nil @@ -91,7 +103,7 @@ type consumeArgs struct { func parseOffset(str string) (offset, error) { result := offset{} - re := regexp.MustCompile("(oldest|newest)?(-|\\+)?(\\d+)?") + re := regexp.MustCompile("(oldest|newest|resume)?(-|\\+)?(\\d+)?") matches := re.FindAllStringSubmatch(str, -1) if len(matches) == 0 || len(matches[0]) < 4 { @@ -124,6 +136,9 @@ func parseOffset(str string) (offset, error) { case "oldest": result.relative = true result.start = sarama.OffsetOldest + case "resume": + result.relative = true + result.start = offsetResume } return result, nil @@ -265,7 +280,7 @@ func (cmd *consumeCmd) parseFlags(as []string) consumeArgs { flags.StringVar(&args.version, "version", "", "Kafka protocol version") flags.StringVar(&args.encodeValue, "encodevalue", "string", "Present message value as (string|hex|base64), defaults to string.") flags.StringVar(&args.encodeKey, "encodekey", "string", "Present message key as (string|hex|base64), defaults to string.") - flags.StringVar(&args.group, "group", "", "Consumer group to use for marking offsets.") + flags.StringVar(&args.group, "group", "", "Consumer group to use for marking offsets. kt will mark offsets if this arg is supplied.") flags.Usage = func() { fmt.Fprintln(os.Stderr, "Usage of consume:") @@ -327,6 +342,7 @@ func (cmd *consumeCmd) run(args []string) { if len(partitions) == 0 { failf("Found no partitions to consume") } + defer cmd.closePOMs() cmd.consume(partitions) } @@ -430,21 +446,47 @@ func encodeBytes(data []byte, encoding string) *string { return &str } +func (cmd *consumeCmd) closePOMs() { + cmd.Lock() + for p, pom := range cmd.poms { + if err := pom.Close(); err != nil { + fmt.Fprintf(os.Stderr, "failed to close partition offset manager for partition %v err=%v", p, err) + } + } + cmd.Unlock() +} + +func (cmd *consumeCmd) getPOM(p int32) sarama.PartitionOffsetManager { + cmd.Lock() + if cmd.poms == nil { + cmd.poms = map[int32]sarama.PartitionOffsetManager{} + } + pom, ok := cmd.poms[p] + if ok { + cmd.Unlock() + return pom + } + + pom, err := cmd.offsetManager.ManagePartition(cmd.topic, p) + if err != nil { + cmd.Unlock() + failf("failed to create partition offset manager err=%v", err) + } + cmd.poms[p] = pom + cmd.Unlock() + return pom +} + func (cmd *consumeCmd) partitionLoop(out chan printContext, pc sarama.PartitionConsumer, p int32, end int64) { defer logClose(fmt.Sprintf("partition consumer %v", p), pc) var ( timer *time.Timer pom sarama.PartitionOffsetManager - err error timeout = make(<-chan time.Time) ) if cmd.group != "" { - pom, err = cmd.offsetManager.ManagePartition(cmd.topic, p) - if err != nil { - failf("failed to create partition offset manager err=%v", err) - } - defer pom.Close() + pom = cmd.getPOM(p) } for { @@ -527,11 +569,13 @@ The default is to consume from the oldest offset on every partition for the give The following syntax is supported for each offset: - (oldest|newest)?(+|-)?(\d+)? + (oldest|newest|resume)?(+|-)?(\d+)? - "oldest" and "newest" refer to the oldest and newest offsets known for a given partition. + - "resume" can be used in combination with -group. + - You can use "+" with a numeric value to skip the given number of messages since the oldest offset. For example, "1=+20" will skip 20 offset value since the oldest offset for partition 1. diff --git a/system_test.go b/system_test.go index 0224abe..b51b840 100644 --- a/system_test.go +++ b/system_test.go @@ -140,6 +140,49 @@ func TestSystem(t *testing.T) { require.Contains(t, stdErr, fmt.Sprintf("found partitions=[0] for topic=%v", topicName)) require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":1,"lag":0}]}`, topicName)) + fmt.Printf(">> ✓\n") + // + // kt produce + // + + req = map[string]interface{}{ + "value": fmt.Sprintf("hello, %s", randomString(6)), + "key": "boom", + "partition": float64(0), + } + buf, err = json.Marshal(req) + require.NoError(t, err) + status, stdOut, stdErr = newCmd().stdIn(string(buf)).run("./kt", "produce", "-topic", topicName) + fmt.Printf(">> system test kt produce -topic %v stdout:\n%s\n", topicName, stdOut) + fmt.Printf(">> system test kt produce -topic %v stderr:\n%s\n", topicName, stdErr) + require.Zero(t, status) + require.Empty(t, stdErr) + + err = json.Unmarshal([]byte(stdOut), &produceMessage) + require.NoError(t, err) + require.Equal(t, 1, produceMessage["count"]) + require.Equal(t, 0, produceMessage["partition"]) + require.Equal(t, 1, produceMessage["startOffset"]) + + fmt.Printf(">> ✓\n") + // + // kt consume + // + + status, stdOut, stdErr = newCmd().run("./kt", "consume", "-topic", topicName, "-offsets", "all=resume", "-timeout", "500ms", "-group", "hans") + fmt.Printf(">> system test kt consume -topic %v -offsets all=resume stdout:\n%s\n", topicName, stdOut) + fmt.Printf(">> system test kt consume -topic %v -offsets all=resume stderr:\n%s\n", topicName, stdErr) + require.Zero(t, status) + + lines = strings.Split(stdOut, "\n") + require.True(t, len(lines) == 2) // actual line and an empty one + + err = json.Unmarshal([]byte(lines[len(lines)-2]), &lastConsumed) + require.NoError(t, err) + require.Equal(t, req["value"], lastConsumed["value"]) + require.Equal(t, req["key"], lastConsumed["key"]) + require.Equal(t, req["partition"], lastConsumed["partition"]) + fmt.Printf(">> ✓\n") // // kt group reset @@ -174,7 +217,7 @@ func TestSystem(t *testing.T) { fmt.Printf(">> system test kt group -topic %v stderr:\n%s\n", topicName, stdErr) require.Zero(t, status) require.Contains(t, stdErr, fmt.Sprintf("found partitions=[0] for topic=%v", topicName)) - require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":0,"lag":1}]}`, topicName)) + require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":0,"lag":2}]}`, topicName)) fmt.Printf(">> ✓\n") //