Skip to content

Commit

Permalink
consume: distinguishes nil and empty slice. #67
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Sep 19, 2017
1 parent 1abcd68 commit 9310f92
Showing 1 changed file with 23 additions and 30 deletions.
53 changes: 23 additions & 30 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,43 +361,36 @@ type consumedMessage struct {
}

func newConsumedMessage(m *sarama.ConsumerMessage, encodeKey, encodeValue string) consumedMessage {
result := consumedMessage{Partition: m.Partition, Offset: m.Offset}
result := consumedMessage{
Partition: m.Partition,
Offset: m.Offset,
Key: encodeBytes(m.Key, encodeKey),
Value: encodeBytes(m.Value, encodeValue),
}

if !m.Timestamp.IsZero() {
result.Timestamp = &m.Timestamp
}

if len(m.Value) == 0 {
result.Value = nil
} else {
var str string
switch encodeValue {
case "hex":
str = hex.EncodeToString(m.Value)
case "base64":
str = base64.StdEncoding.EncodeToString(m.Value)
default:
str = string(m.Value)
}
result.Value = &str
}

if len(m.Key) == 0 {
result.Key = nil
} else {
var str string
switch encodeKey {
case "hex":
str = hex.EncodeToString(m.Key)
case "base64":
str = base64.StdEncoding.EncodeToString(m.Key)
default:
str = string(m.Key)
}
result.Key = &str
return result
}

func encodeBytes(data []byte, encoding string) *string {
if data == nil {
return nil
}

return result
var str string
switch encoding {
case "hex":
str = hex.EncodeToString(data)
case "base64":
str = base64.StdEncoding.EncodeToString(data)
default:
str = string(data)
}

return &str
}

func (cmd *consumeCmd) partitionLoop(out chan printContext, pc sarama.PartitionConsumer, p int32, end int64) {
Expand Down

0 comments on commit 9310f92

Please sign in to comment.