Skip to content

Commit

Permalink
consume: extracts entity constructor.
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Sep 19, 2017
1 parent e3f60ba commit 1abcd68
Showing 1 changed file with 41 additions and 37 deletions.
78 changes: 41 additions & 37 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,46 @@ type consumedMessage struct {
Timestamp *time.Time `json:"timestamp,omitempty"`
}

func newConsumedMessage(m *sarama.ConsumerMessage, encodeKey, encodeValue string) consumedMessage {
result := consumedMessage{Partition: m.Partition, Offset: m.Offset}

if !m.Timestamp.IsZero() {
result.Timestamp = &m.Timestamp
}

if len(m.Value) == 0 {
result.Value = nil
} else {
var str string
switch encodeValue {
case "hex":
str = hex.EncodeToString(m.Value)
case "base64":
str = base64.StdEncoding.EncodeToString(m.Value)
default:
str = string(m.Value)
}
result.Value = &str
}

if len(m.Key) == 0 {
result.Key = nil
} else {
var str string
switch encodeKey {
case "hex":
str = hex.EncodeToString(m.Key)
case "base64":
str = base64.StdEncoding.EncodeToString(m.Key)
default:
str = string(m.Key)
}
result.Key = &str
}

return result
}

func (cmd *consumeCmd) partitionLoop(out chan printContext, pc sarama.PartitionConsumer, p int32, end int64) {
defer logClose(fmt.Sprintf("partition consumer %v", p), pc)
var (
Expand Down Expand Up @@ -389,43 +429,7 @@ func (cmd *consumeCmd) partitionLoop(out chan printContext, pc sarama.PartitionC
return
}

// TODO: move the below into a constructor
m := consumedMessage{Partition: msg.Partition, Offset: msg.Offset}

if !msg.Timestamp.IsZero() {
m.Timestamp = &msg.Timestamp
}

if len(msg.Value) == 0 {
m.Value = nil
} else {
var str string
switch cmd.encodeValue {
case "hex":
str = hex.EncodeToString(msg.Value)
case "base64":
str = base64.StdEncoding.EncodeToString(msg.Value)
default:
str = string(msg.Value)
}
m.Value = &str
}

if len(msg.Key) == 0 {
m.Key = nil
} else {
var str string
switch cmd.encodeKey {
case "hex":
str = hex.EncodeToString(msg.Key)
case "base64":
str = base64.StdEncoding.EncodeToString(msg.Key)
default:
str = string(msg.Key)
}
m.Key = &str
}

m := newConsumedMessage(msg, cmd.encodeKey, cmd.encodeValue)
ctx := printContext{output: m, done: make(chan struct{})}
out <- ctx
<-ctx.done
Expand Down

0 comments on commit 1abcd68

Please sign in to comment.