diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 08c994b0e26e7..3e256f8af8686 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -503,7 +503,7 @@ func TestConsumerEventTime(t *testing.T) { et := timeFromUnixTimestampMillis(uint64(5)) _, err = producer.Send(ctx, &ProducerMessage{ Payload: []byte("test"), - EventTime: &et, + EventTime: et, }) assert.Nil(t, err) diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 3373a9fa6d8fe..69b45d7a4d7a1 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -92,12 +92,11 @@ func (r *dlqRouter) run() { msg := cm.Message.(*message) msgID := msg.ID() - eventTime := msg.EventTime() producer.SendAsync(context.Background(), &ProducerMessage{ Payload: msg.Payload(), Key: msg.Key(), Properties: msg.Properties(), - EventTime: &eventTime, + EventTime: msg.EventTime(), ReplicationClusters: msg.replicationClusters, }, func(MessageID, *ProducerMessage, error) { r.log.WithField("msgID", msgID).Debug("Sent message to DLQ") diff --git a/pulsar/message.go b/pulsar/message.go index 735ca41391c9c..bc38ebb5abac3 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -34,7 +34,11 @@ type ProducerMessage struct { Properties map[string]string // EventTime set the event time for a given message - EventTime *time.Time + // By default, messages don't have an event time associated, while the publish + // time will be be always present. + // Set the event time to a non-zero timestamp to explicitly declare the time + // that the event "happened", as opposed to when the message is being published. + EventTime time.Time // ReplicationClusters override the replication clusters for this message. ReplicationClusters []string @@ -77,7 +81,7 @@ type Message interface { // EventTime get the event time associated with this message. It is typically set by the applications via // `ProducerMessage.EventTime`. - // If there isn't any event time associated with this event, it will be nil. + // If EventTime is 0, it means there isn't any event time associated with this message. EventTime() time.Time // Key get the key of the message, if any diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a24ff667f4dea..f10adcf6f7d90 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -249,8 +249,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) { PayloadSize: proto.Int(len(msg.Payload)), } - if msg.EventTime != nil { - smm.EventTime = proto.Uint64(internal.TimestampMillis(*msg.EventTime)) + if msg.EventTime.UnixNano() != 0 { + smm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime)) } if msg.Key != "" { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 5e40ec2d9245a..66fa8a0f746d2 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -244,7 +244,7 @@ func TestEventTime(t *testing.T) { eventTime := timeFromUnixTimestampMillis(uint64(1565161612)) ID, err := producer.Send(context.Background(), &ProducerMessage{ Payload: []byte(fmt.Sprintf("test-event-time")), - EventTime: &eventTime, + EventTime: eventTime, }) assert.Nil(t, err) assert.NotNil(t, ID)