Skip to content

Commit

Permalink
Expose EventTime consistently as a non-pointer (apache#186)
Browse files Browse the repository at this point in the history
* Expose EventTime consistently as a non-pointer

* Expanded docs
  • Loading branch information
merlimat authored Feb 14, 2020
1 parent be8ad8d commit fc390a6
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func TestConsumerEventTime(t *testing.T) {
et := timeFromUnixTimestampMillis(uint64(5))
_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("test"),
EventTime: &et,
EventTime: et,
})
assert.Nil(t, err)

Expand Down
3 changes: 1 addition & 2 deletions pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,11 @@ func (r *dlqRouter) run() {

msg := cm.Message.(*message)
msgID := msg.ID()
eventTime := msg.EventTime()
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
Properties: msg.Properties(),
EventTime: &eventTime,
EventTime: msg.EventTime(),
ReplicationClusters: msg.replicationClusters,
}, func(MessageID, *ProducerMessage, error) {
r.log.WithField("msgID", msgID).Debug("Sent message to DLQ")
Expand Down
8 changes: 6 additions & 2 deletions pulsar/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ type ProducerMessage struct {
Properties map[string]string

// EventTime set the event time for a given message
EventTime *time.Time
// By default, messages don't have an event time associated, while the publish
// time will be be always present.
// Set the event time to a non-zero timestamp to explicitly declare the time
// that the event "happened", as opposed to when the message is being published.
EventTime time.Time

// ReplicationClusters override the replication clusters for this message.
ReplicationClusters []string
Expand Down Expand Up @@ -77,7 +81,7 @@ type Message interface {

// EventTime get the event time associated with this message. It is typically set by the applications via
// `ProducerMessage.EventTime`.
// If there isn't any event time associated with this event, it will be nil.
// If EventTime is 0, it means there isn't any event time associated with this message.
EventTime() time.Time

// Key get the key of the message, if any
Expand Down
4 changes: 2 additions & 2 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
PayloadSize: proto.Int(len(msg.Payload)),
}

if msg.EventTime != nil {
smm.EventTime = proto.Uint64(internal.TimestampMillis(*msg.EventTime))
if msg.EventTime.UnixNano() != 0 {
smm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime))
}

if msg.Key != "" {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestEventTime(t *testing.T) {
eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("test-event-time")),
EventTime: &eventTime,
EventTime: eventTime,
})
assert.Nil(t, err)
assert.NotNil(t, ID)
Expand Down

0 comments on commit fc390a6

Please sign in to comment.