Skip to content

Commit

Permalink
consume: test for resume. fix #78
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Oct 30, 2018
1 parent 79e1b73 commit bad38f8
Showing 1 changed file with 44 additions and 1 deletion.
45 changes: 44 additions & 1 deletion system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,49 @@ func TestSystem(t *testing.T) {
require.Contains(t, stdErr, fmt.Sprintf("found partitions=[0] for topic=%v", topicName))
require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":1,"lag":0}]}`, topicName))

fmt.Printf(">> ✓\n")
//
// kt produce
//

req = map[string]interface{}{
"value": fmt.Sprintf("hello, %s", randomString(6)),
"key": "boom",
"partition": float64(0),
}
buf, err = json.Marshal(req)
require.NoError(t, err)
status, stdOut, stdErr = newCmd().stdIn(string(buf)).run("./kt", "produce", "-topic", topicName)
fmt.Printf(">> system test kt produce -topic %v stdout:\n%s\n", topicName, stdOut)
fmt.Printf(">> system test kt produce -topic %v stderr:\n%s\n", topicName, stdErr)
require.Zero(t, status)
require.Empty(t, stdErr)

err = json.Unmarshal([]byte(stdOut), &produceMessage)
require.NoError(t, err)
require.Equal(t, 1, produceMessage["count"])
require.Equal(t, 0, produceMessage["partition"])
require.Equal(t, 1, produceMessage["startOffset"])

fmt.Printf(">> ✓\n")
//
// kt consume
//

status, stdOut, stdErr = newCmd().run("./kt", "consume", "-topic", topicName, "-offsets", "all=resume", "-timeout", "500ms", "-group", "hans")
fmt.Printf(">> system test kt consume -topic %v -offsets all=resume stdout:\n%s\n", topicName, stdOut)
fmt.Printf(">> system test kt consume -topic %v -offsets all=resume stderr:\n%s\n", topicName, stdErr)
require.Zero(t, status)

lines = strings.Split(stdOut, "\n")
require.True(t, len(lines) == 2) // actual line and an empty one

err = json.Unmarshal([]byte(lines[len(lines)-2]), &lastConsumed)
require.NoError(t, err)
require.Equal(t, req["value"], lastConsumed["value"])
require.Equal(t, req["key"], lastConsumed["key"])
require.Equal(t, req["partition"], lastConsumed["partition"])

fmt.Printf(">> ✓\n")
//
// kt group reset
Expand Down Expand Up @@ -174,7 +217,7 @@ func TestSystem(t *testing.T) {
fmt.Printf(">> system test kt group -topic %v stderr:\n%s\n", topicName, stdErr)
require.Zero(t, status)
require.Contains(t, stdErr, fmt.Sprintf("found partitions=[0] for topic=%v", topicName))
require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":0,"lag":1}]}`, topicName))
require.Contains(t, stdOut, fmt.Sprintf(`{"name":"hans","topic":"%v","offsets":[{"partition":0,"offset":0,"lag":2}]}`, topicName))

fmt.Printf(">> ✓\n")
//
Expand Down

0 comments on commit bad38f8

Please sign in to comment.