diff --git a/consume.go b/consume.go index c21759d..0f41b6e 100644 --- a/consume.go +++ b/consume.go @@ -361,43 +361,36 @@ type consumedMessage struct { } func newConsumedMessage(m *sarama.ConsumerMessage, encodeKey, encodeValue string) consumedMessage { - result := consumedMessage{Partition: m.Partition, Offset: m.Offset} + result := consumedMessage{ + Partition: m.Partition, + Offset: m.Offset, + Key: encodeBytes(m.Key, encodeKey), + Value: encodeBytes(m.Value, encodeValue), + } if !m.Timestamp.IsZero() { result.Timestamp = &m.Timestamp } - if len(m.Value) == 0 { - result.Value = nil - } else { - var str string - switch encodeValue { - case "hex": - str = hex.EncodeToString(m.Value) - case "base64": - str = base64.StdEncoding.EncodeToString(m.Value) - default: - str = string(m.Value) - } - result.Value = &str - } - - if len(m.Key) == 0 { - result.Key = nil - } else { - var str string - switch encodeKey { - case "hex": - str = hex.EncodeToString(m.Key) - case "base64": - str = base64.StdEncoding.EncodeToString(m.Key) - default: - str = string(m.Key) - } - result.Key = &str + return result +} + +func encodeBytes(data []byte, encoding string) *string { + if data == nil { + return nil } - return result + var str string + switch encoding { + case "hex": + str = hex.EncodeToString(data) + case "base64": + str = base64.StdEncoding.EncodeToString(data) + default: + str = string(data) + } + + return &str } func (cmd *consumeCmd) partitionLoop(out chan printContext, pc sarama.PartitionConsumer, p int32, end int64) {