diff --git a/pom.xml b/pom.xml
index 0b588f45..6c766244 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
pubsublite-kafka-auth
- 1.9.2
+ 1.11.1
diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java
index cd3cd6da..8fb63de8 100644
--- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java
+++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java
@@ -16,19 +16,22 @@
package com.google.cloud.pubsublite.kafka;
+import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.protobuf.ByteString;
import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
class LiteHeaders implements Headers {
- private ImmutableListMultimap attributes;
+ private Map attributes;
- LiteHeaders(ImmutableListMultimap attributes) {
+ LiteHeaders(Map attributes) {
this.attributes = attributes;
}
@@ -68,23 +71,24 @@ public Header lastHeader(String s) {
@Override
public Iterable headers(String s) {
- if (attributes.containsKey(s))
- return Iterables.transform(attributes.get(s), value -> toHeader(s, value));
- return ImmutableList.of();
+ @Nullable AttributeValues values = attributes.get(s);
+ if (values == null) {
+ return ImmutableList.of();
+ }
+ return values.getValuesList().stream().map(v -> toHeader(s, v)).collect(Collectors.toList());
}
@Override
public Header[] toArray() {
- ImmutableList.Builder arrayBuilder = ImmutableList.builder();
- attributes
- .entries()
- .forEach(entry -> arrayBuilder.add(toHeader(entry.getKey(), entry.getValue())));
- return (Header[]) arrayBuilder.build().toArray();
+ return Iterators.toArray(iterator(), Header.class);
}
@Override
public Iterator iterator() {
- return Iterators.transform(
- attributes.entries().iterator(), entry -> toHeader(entry.getKey(), entry.getValue()));
+ return attributes.entrySet().stream()
+ .flatMap(
+ entry ->
+ entry.getValue().getValuesList().stream().map(v -> toHeader(entry.getKey(), v)))
+ .iterator();
}
}
diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java
index 61839d3c..e59982d5 100644
--- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java
+++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java
@@ -16,10 +16,11 @@
package com.google.cloud.pubsublite.kafka;
-import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.proto.AttributeValues;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableListMultimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
@@ -32,44 +33,52 @@
class RecordTransforms {
private RecordTransforms() {}
- static Message toMessage(ProducerRecord record) {
- Message.Builder builder =
- Message.builder()
+ static PubSubMessage toMessage(ProducerRecord record) {
+ PubSubMessage.Builder builder =
+ PubSubMessage.newBuilder()
.setKey(ByteString.copyFrom(record.key()))
.setData(ByteString.copyFrom(record.value()));
if (record.timestamp() != null) {
- builder = builder.setEventTime(Timestamps.fromMillis(record.timestamp()));
+ builder.setEventTime(Timestamps.fromMillis(record.timestamp()));
}
ImmutableListMultimap.Builder attributes = ImmutableListMultimap.builder();
record
.headers()
.forEach(header -> attributes.put(header.key(), ByteString.copyFrom(header.value())));
- return builder.setAttributes(attributes.build()).build();
+ attributes
+ .build()
+ .asMap()
+ .forEach(
+ (key, values) ->
+ builder.putAttributes(
+ key, AttributeValues.newBuilder().addAllValues(values).build()));
+ return builder.build();
}
static ConsumerRecord fromMessage(
- SequencedMessage message, TopicPath topic, Partition partition) {
- Headers headers = new LiteHeaders(message.message().attributes());
+ SequencedMessage sequenced, TopicPath topic, Partition partition) {
+ PubSubMessage message = sequenced.getMessage();
+ Headers headers = new LiteHeaders(message.getAttributesMap());
TimestampType type;
Timestamp timestamp;
- if (message.message().eventTime().isPresent()) {
+ if (message.hasEventTime()) {
type = TimestampType.CREATE_TIME;
- timestamp = message.message().eventTime().get();
+ timestamp = message.getEventTime();
} else {
type = TimestampType.LOG_APPEND_TIME;
- timestamp = message.publishTime();
+ timestamp = sequenced.getPublishTime();
}
return new ConsumerRecord<>(
topic.toString(),
(int) partition.value(),
- message.offset().value(),
+ sequenced.getCursor().getOffset(),
Timestamps.toMillis(timestamp),
type,
0L,
- message.message().key().size(),
- message.message().data().size(),
- message.message().key().toByteArray(),
- message.message().data().toByteArray(),
+ message.getKey().size(),
+ message.getData().size(),
+ message.getKey().toByteArray(),
+ message.getData().toByteArray(),
headers);
}
}
diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java
index 28806255..4c34be0e 100644
--- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java
+++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java
@@ -22,13 +22,13 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SeekRequest;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -115,7 +115,7 @@ ArrayDeque getMessages() throws CheckedApiException {
try (CloseableMonitor.Hold h = monitor.enter()) {
ArrayDeque messages = pullMessages();
if (!messages.isEmpty()) {
- lastReceived = Optional.of(Iterables.getLast(messages).offset());
+ lastReceived = Optional.of(Offset.of(Iterables.getLast(messages).getCursor().getOffset()));
needsCommitting = true;
}
return messages;
diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java
index f80b18cd..323b2c0e 100644
--- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java
+++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java
@@ -26,12 +26,12 @@
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
diff --git a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java
index 85a92f8e..12e63921 100644
--- a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java
+++ b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java
@@ -27,7 +27,6 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.AdminClient;
-import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
@@ -36,6 +35,7 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.concurrent.Future;
@@ -63,7 +63,7 @@ abstract static class FakePublisher extends FakeApiService
private static final ProducerRecord RECORD =
new ProducerRecord<>(
example(TopicPath.class).toString(), "abc".getBytes(), "defg".getBytes());
- private static final Message MESSAGE = RecordTransforms.toMessage(RECORD);
+ private static final PubSubMessage MESSAGE = RecordTransforms.toMessage(RECORD);
private static final TopicPartition TOPIC_PARTITION =
new TopicPartition(
example(TopicPath.class).toString(), (int) example(Partition.class).value());
diff --git a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java
index 20c2a8e9..1347960a 100644
--- a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java
+++ b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java
@@ -22,8 +22,11 @@
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.proto.AttributeValues;
+import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.protobuf.ByteString;
@@ -37,21 +40,24 @@
@RunWith(JUnit4.class)
public class RecordTransformsTest {
- private static final Message MESSAGE =
- Message.builder()
+ private static final PubSubMessage MESSAGE =
+ PubSubMessage.newBuilder()
.setKey(ByteString.copyFromUtf8("abc"))
.setData(ByteString.copyFromUtf8("def"))
.setEventTime(Timestamp.newBuilder().setSeconds(1).setNanos(1000000).build())
- .setAttributes(
- ImmutableListMultimap.of(
- "xxx",
- ByteString.copyFromUtf8("yyy"),
- "zzz",
- ByteString.copyFromUtf8("zzz"),
- "zzz",
- ByteString.copyFromUtf8("zzz")))
+ .putAttributes("xxx", single("yyy"))
+ .putAttributes(
+ "zzz",
+ AttributeValues.newBuilder()
+ .addValues(ByteString.copyFromUtf8("zzz"))
+ .addValues(ByteString.copyFromUtf8("zzz"))
+ .build())
.build();
+ private static AttributeValues single(String v) {
+ return AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8(v)).build();
+ }
+
@Test
public void publishTransform() {
ProducerRecord record =
@@ -65,15 +71,19 @@ public void publishTransform() {
LiteHeaders.toHeader("xxx", ByteString.copyFromUtf8("yyy")),
LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz")),
LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz"))));
- Message message = RecordTransforms.toMessage(record);
+ PubSubMessage message = RecordTransforms.toMessage(record);
assertThat(message).isEqualTo(MESSAGE);
}
@Test
public void subscribeTransform() {
SequencedMessage sequencedMessage =
- SequencedMessage.of(
- MESSAGE, Timestamp.newBuilder().setNanos(12345).build(), example(Offset.class), 123L);
+ SequencedMessage.newBuilder()
+ .setMessage(MESSAGE)
+ .setPublishTime(Timestamp.newBuilder().setNanos(12345))
+ .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value()))
+ .setSizeBytes(123)
+ .build();
ConsumerRecord record =
RecordTransforms.fromMessage(
sequencedMessage, example(TopicPath.class), example(Partition.class));
@@ -85,7 +95,7 @@ public void subscribeTransform() {
record
.headers()
.forEach(header -> headers.put(header.key(), ByteString.copyFrom(header.value())));
- assertThat(headers.build()).isEqualTo(MESSAGE.attributes());
+ assertThat(headers.build()).isEqualTo(Message.fromProto(MESSAGE).attributes());
assertThat(record.offset()).isEqualTo(example(Offset.class).value());
assertThat(record.topic()).isEqualTo(example(TopicPath.class).toString());
assertThat(record.partition()).isEqualTo(example(Partition.class).value());
diff --git a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java
index 4b3e2482..e35278bf 100644
--- a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java
+++ b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java
@@ -22,25 +22,22 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import com.google.api.core.ApiFutures;
-import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler;
+import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
-import com.google.protobuf.Timestamp;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.Optional;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -73,16 +70,8 @@ public void setUp() throws CheckedApiException {
.thenReturn(pullSubscriber);
}
- @After
- public void tearDown() throws Exception {
- verifyNoMoreInteractions(subscriberFactory);
- verifyNoMoreInteractions(pullSubscriber);
- verifyNoMoreInteractions(committer);
- }
-
private static SequencedMessage message(long offset) {
- return SequencedMessage.of(
- Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset), 0L);
+ return SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset)).build();
}
@Test
diff --git a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java
index 48855977..fd00d0d4 100644
--- a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java
+++ b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java
@@ -31,10 +31,8 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
-import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
@@ -44,12 +42,12 @@
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
-import com.google.protobuf.Timestamp;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
@@ -102,8 +100,9 @@ public void setUp() throws CheckedApiException {
}
private static SequencedMessage message(Offset offset) {
- return SequencedMessage.of(
- Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset.value()), 0L);
+ return SequencedMessage.newBuilder()
+ .setCursor(Cursor.newBuilder().setOffset(offset.value()))
+ .build();
}
private static SequencedMessage message(long offset) {