Skip to content

Commit

Permalink
Upgrade the kafka library to set the timestamp correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
jsternberg committed Sep 19, 2019
1 parent 72e88b9 commit 3665c5d
Show file tree
Hide file tree
Showing 45 changed files with 5,636 additions and 2,253 deletions.
324 changes: 291 additions & 33 deletions Gopkg.lock

Large diffs are not rendered by default.

19 changes: 18 additions & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8642,6 +8642,7 @@ stream
Offset: 0,
Key: "kapacitor/cpu/serverA",
Message: "kapacitor/cpu/serverA is CRITICAL",
Time: time.Now().UTC(),
},
}

Expand All @@ -8658,7 +8659,23 @@ stream
got[i] = m
}

if err := compareListIgnoreOrder(got, exp, nil); err != nil {
cmpopts := []cmp.Option{
cmp.Comparer(func(a, b time.Time) bool {
diff := a.Sub(b)
if diff < 0 {
diff = -diff
}
// It is ok as long as the timestamp is within
// 5 seconds of the current time. If we are that close,
// then it likely means the timestamp was correctly
// written.
return diff < 5*time.Second
}),
}
cmpF := func(got, exp interface{}) bool {
return cmp.Equal(exp, got, cmpopts...)
}
if err := compareListIgnoreOrder(got, exp, cmpF); err != nil {
t.Error(err)
}
}
Expand Down
16 changes: 15 additions & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9634,8 +9634,22 @@ func TestServer_AlertHandlers(t *testing.T) {
Offset: 0,
Key: "id",
Message: string(adJSON) + "\n",
Time: time.Now().UTC(),
}}
if !cmp.Equal(exp, got) {
cmpopts := []cmp.Option{
cmp.Comparer(func(a, b time.Time) bool {
diff := a.Sub(b)
if diff < 0 {
diff = -diff
}
// It is ok as long as the timestamp is within
// 5 seconds of the current time. If we are that close,
// then it likely means the timestamp was correctly
// written.
return diff < 5*time.Second
}),
}
if !cmp.Equal(exp, got, cmpopts...) {
return fmt.Errorf("unexpected kafka messages -exp/+got:\n%s", cmp.Diff(exp, got))
}
return nil
Expand Down
54 changes: 49 additions & 5 deletions services/kafka/kafkatest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"sync"
"time"
)

// Provides an incomplete Kafka Server implementation.
Expand Down Expand Up @@ -60,6 +61,7 @@ func (s *Server) prepareBrokerMsg() {
s.brokerMessage = writeStr(s.brokerMessage, host)
portN, _ := strconv.Atoi(port)
s.brokerMessage = writeInt32(s.brokerMessage, int32(portN))
s.brokerMessage = writeInt16(s.brokerMessage, -1)
}

func (s *Server) preparePartitionMsg() {
Expand Down Expand Up @@ -122,11 +124,17 @@ func (s *Server) run(l net.Listener) {
go func() {
defer s.wg.Done()
defer c.Close()
if err := s.handle(c); err != nil {
s.errors = append(s.errors, err)
for {
if err := s.handle(c); err != nil {
if err == io.EOF {
return
}
s.errors = append(s.errors, err)
}
}
}()
case <-s.closing:
l.Close()
return
}
}
Expand Down Expand Up @@ -165,25 +173,46 @@ func (s *Server) handle(c net.Conn) error {
response = writeInt32(response, partition)
response = writeInt16(response, 0) // Error Code
response = writeInt64(response, offset)
response = writeInt64(response, -1) // Timestamp
response = writeInt32(response, 0) // ThrottleTime
response = writeInt32(response, 0) // ThrottleTime
case 3: // Metadata
topics, _ := readStrList(request)

// Write broker message
response = writeArray(response, [][]byte{s.brokerMessage})

// Write controller id
response = writeInt32(response, 0)

// Write topic metadata
response = writeArrayHeader(response, int32(len(topics)))
for _, t := range topics {
// Write Error Code
response = writeInt16(response, 0)
// Write topic name
response = writeStr(response, t)
// Write is_internal
response = writeBool(response, false)

// Write partition
response = writeArray(response, [][]byte{s.partitionMessage})
}
case 18:
// Hard code the api versions we are implementing to ensure
// the client knows which ones we plan on implementing.
response = writeInt16(response, 0) // Error Code

// Write api versions
response = writeArrayHeader(response, 2)

// Write produce request version
response = writeInt16(response, 0)
response = writeInt16(response, 2)
response = writeInt16(response, 2)

// Write metadata request version
response = writeInt16(response, 3)
response = writeInt16(response, 1)
response = writeInt16(response, 1)
default:
return fmt.Errorf("unsupported apiKey %d", apiKey)
}
Expand Down Expand Up @@ -213,7 +242,10 @@ func (s *Server) readProduceRequest(request []byte) (topic string, partition int
offset = readInt64(request[pos:])
pos += 8

pos += 4 + 4 + 1 + 1 + 8 // skip size, crc, magic, attributes, timestamp
pos += 4 + 4 + 1 + 1 // skip size, crc, magic, attributes

msecs := readInt64(request[pos:])
pos += 8

key, n := readByteArray(request[pos:])
pos += n
Expand All @@ -227,6 +259,7 @@ func (s *Server) readProduceRequest(request []byte) (topic string, partition int
Offset: offset,
Key: string(key),
Message: string(message),
Time: time.Unix(msecs/1000, msecs%1000*1000000).UTC(),
})
return
}
Expand Down Expand Up @@ -285,6 +318,16 @@ func writeInt16(dst []byte, n int) []byte {
return dst
}

func writeBool(dst []byte, b bool) []byte {
v := int8(0)
if b {
v = 1
}
return writeInt8(dst, v)
}
func writeInt8(dst []byte, n int8) []byte {
return append(dst, byte(n))
}
func writeInt32(dst []byte, n int32) []byte {
l := len(dst)
dst = append(dst, []byte{0, 0, 0, 0}...)
Expand Down Expand Up @@ -326,4 +369,5 @@ type Message struct {
Offset int64
Key string
Message string
Time time.Time
}
2 changes: 1 addition & 1 deletion vendor/github.com/mailru/easyjson/parser/parser_windows.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

145 changes: 141 additions & 4 deletions vendor/github.com/segmentio/kafka-go/balancer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3665c5d

Please sign in to comment.