Skip to content

Commit 070ebc0

Browse files
sijiemerlimat
authored andcommittedAug 31, 2017
Issue 733: Event Time support in java client (#734)
- add `event_time` field in message metadata proto - expose `setEventTime` in MessageBuilder - expose `getEventTime` in Message - add event_time test cases for message & message builder
1 parent f8b051a commit 070ebc0

File tree

7 files changed

+170
-0
lines changed

7 files changed

+170
-0
lines changed
 

‎pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java

+17
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,25 @@ public interface Message {
7373
*/
7474
MessageId getMessageId();
7575

76+
/**
77+
* Get the publish time of this message. The publish time is the timestamp that a client publish the message.
78+
*
79+
* @return publish time of this message.
80+
* @see #getEventTime()
81+
*/
7682
long getPublishTime();
7783

84+
/**
85+
* Get the event time associated with this message. It is typically set by the applications via
86+
* {@link MessageBuilder#setEventTime(long)}.
87+
*
88+
* <p>If there isn't any event time associated with this event, it will return 0.
89+
*
90+
* @see MessageBuilder#setEventTime(long)
91+
* @since 1.20.0
92+
*/
93+
long getEventTime();
94+
7895
/**
7996
* Check whether the message has a key
8097
*

‎pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java

+12
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,18 @@ public static MessageBuilder create() {
9797
*/
9898
MessageBuilder setKey(String key);
9999

100+
/**
101+
* Set the event time for a given message.
102+
*
103+
* <p>Applications can retrieve the event time by calling {@link Message#getEventTime()}.
104+
*
105+
* <p>Note: currently pulsar doesn't support event-time based index. so the subscribers can't
106+
* seek the messages by event time.
107+
*
108+
* @since 1.20.0
109+
*/
110+
MessageBuilder setEventTime(long timestamp);
111+
100112
/**
101113
* Override the replication clusters for this message.
102114
*

‎pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java

+9
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import static com.google.common.base.Preconditions.checkArgument;
22+
2123
import java.nio.ByteBuffer;
2224
import java.util.List;
2325
import java.util.Map;
@@ -79,6 +81,13 @@ public MessageBuilder setKey(String key) {
7981
return this;
8082
}
8183

84+
@Override
85+
public MessageBuilder setEventTime(long timestamp) {
86+
checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
87+
msgMetadataBuilder.setEventTime(timestamp);
88+
return this;
89+
}
90+
8291
@Override
8392
public MessageBuilder setReplicationClusters(List<String> clusters) {
8493
Preconditions.checkNotNull(clusters);

‎pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java

+9
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,15 @@ public long getPublishTime() {
152152
return msgMetadataBuilder.getPublishTime();
153153
}
154154

155+
@Override
156+
public long getEventTime() {
157+
checkNotNull(msgMetadataBuilder);
158+
if (msgMetadataBuilder.hasEventTime()) {
159+
return msgMetadataBuilder.getEventTime();
160+
}
161+
return 0;
162+
}
163+
155164
public boolean isExpired(int messageTTLInSeconds) {
156165
return messageTTLInSeconds != 0
157166
&& System.currentTimeMillis() > (getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl;
20+
21+
import static org.junit.Assert.assertEquals;
22+
23+
import org.apache.pulsar.client.api.Message;
24+
import org.apache.pulsar.client.api.MessageBuilder;
25+
import org.junit.Test;
26+
27+
/**
28+
* Unit test of {@link MessageBuilderImpl}.
29+
*/
30+
public class MessageBuilderTest {
31+
32+
@Test(expected = IllegalArgumentException.class)
33+
public void testSetEventTimeNegative() {
34+
MessageBuilder builder = MessageBuilder.create();
35+
builder.setEventTime(-1L);
36+
}
37+
38+
@Test(expected = IllegalArgumentException.class)
39+
public void testSetEventTimeZero() {
40+
MessageBuilder builder = MessageBuilder.create();
41+
builder.setEventTime(0L);
42+
}
43+
44+
@Test
45+
public void testSetEventTimePositive() {
46+
long eventTime = System.currentTimeMillis();
47+
MessageBuilder builder = MessageBuilder.create();
48+
builder.setContent(new byte[0]);
49+
builder.setEventTime(eventTime);
50+
Message msg = builder.build();
51+
assertEquals(eventTime, msg.getEventTime());
52+
}
53+
54+
@Test
55+
public void testBuildMessageWithoutEventTime() {
56+
MessageBuilder builder = MessageBuilder.create();
57+
builder.setContent(new byte[0]);
58+
Message msg = builder.build();
59+
assertEquals(0L, msg.getEventTime());
60+
}
61+
62+
}

‎pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java

+57
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pulsar-common/src/main/proto/PulsarApi.proto

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ message MessageMetadata {
6060
//optional sfixed64 checksum = 10;
6161
// differentiate single and batch message metadata
6262
optional int32 num_messages_in_batch = 11 [default = 1];
63+
64+
// the timestamp that this event occurs. it is typically set by applications.
65+
// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
66+
optional uint64 event_time = 12 [default = 0];
6367
}
6468

6569

0 commit comments

Comments
 (0)
Please sign in to comment.