7013
com/google/cloud/pubsublite/internal/**
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/MessageMetadata.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/MessageMetadata.java
index b7fdb87a8..ce9d469c0 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/MessageMetadata.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/MessageMetadata.java
@@ -33,7 +33,15 @@ public abstract class MessageMetadata {
/** The partition a message was published to. */
public abstract Partition partition();
- /** The offset a message was assigned. */
+ /**
+ * The offset a message was assigned.
+ *
+ * If this MessageMetadata was returned for a publish result and publish idempotence was
+ * enabled, the offset may be -1 when the message was identified as a duplicate of an already
+ * successfully published message, but the server did not have sufficient information to return
+ * the message's offset at publish time. Messages received by subscribers will always have the
+ * correct offset.
+ */
public abstract Offset offset();
/** Construct a MessageMetadata from a Partition and Offset. */
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java
index e1e6807e2..8a4f20f82 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java
@@ -42,11 +42,13 @@
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
+import com.google.cloud.pubsublite.internal.wire.UuidBuilder;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.Optional;
import org.threeten.bp.Duration;
@@ -79,6 +81,12 @@ public abstract class PublisherSettings {
/** Batching settings for this publisher to use. Apply per-partition. */
abstract BatchingSettings batchingSettings();
+ /**
+ * Whether to enable publish idempotence, where the server will ensure that unique messages within
+ * a single publisher session are stored only once. Default true.
+ */
+ abstract boolean enableIdempotence();
+
/** A provider for credentials. */
abstract CredentialsProvider credentialsProvider();
@@ -106,6 +114,7 @@ public static Builder newBuilder() {
.setCredentialsProvider(
PublisherServiceSettings.defaultCredentialsProviderBuilder().build())
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
+ .setEnableIdempotence(true)
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder());
}
@@ -127,6 +136,12 @@ public abstract Builder setMessageTransformer(
/** Batching settings for this publisher to use. Apply per-partition. */
public abstract Builder setBatchingSettings(BatchingSettings batchingSettings);
+ /**
+ * Whether to enable publish idempotence, where the server will ensure that unique messages
+ * within a single publisher session are stored only once. Default true.
+ */
+ public abstract Builder setEnableIdempotence(boolean enableIdempotence);
+
/** A provider for credentials. */
public abstract Builder setCredentialsProvider(CredentialsProvider credentialsProvider);
@@ -163,6 +178,7 @@ private PublisherServiceClient newServiceClient() throws ApiException {
private PartitionPublisherFactory getPartitionPublisherFactory() {
PublisherServiceClient client = newServiceClient();
+ ByteString publisherClientId = UuidBuilder.toByteString(UuidBuilder.generate());
return new PartitionPublisherFactory() {
@Override
public com.google.cloud.pubsublite.internal.Publisher newPublisher(
@@ -180,6 +196,9 @@ public com.google.cloud.pubsublite.internal.Publisher newPublis
RoutingMetadata.of(topicPath(), partition));
return client.publishCallable().splitCall(responseStream, context);
});
+ if (enableIdempotence()) {
+ singlePartitionBuilder.setClientId(publisherClientId);
+ }
return singlePartitionBuilder.build();
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/PublishSequenceNumber.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/PublishSequenceNumber.java
new file mode 100644
index 000000000..7e06305e9
--- /dev/null
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/PublishSequenceNumber.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.internal;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+
+/** A sequence number for a published message, for implementing publish idempotency. */
+@AutoValue
+public abstract class PublishSequenceNumber implements Serializable {
+
+ /** Create a publish sequence number from its long value. */
+ public static PublishSequenceNumber of(long sequenceNumber) {
+ return new AutoValue_PublishSequenceNumber(sequenceNumber);
+ }
+
+ /** The sequence number that should be set for the first message in a publisher session. */
+ public static final PublishSequenceNumber FIRST = PublishSequenceNumber.of(0);
+
+ /** Returns the next sequence number that follows the current. */
+ public PublishSequenceNumber next() {
+ return PublishSequenceNumber.of(value() + 1);
+ }
+
+ /** The long value of this publish sequence number. */
+ public abstract long value();
+}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SequencedPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SequencedPublisher.java
new file mode 100644
index 000000000..fe3dcc595
--- /dev/null
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SequencedPublisher.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiService;
+import com.google.cloud.pubsublite.Message;
+import java.io.Flushable;
+
+/**
+ * A PubSub Lite publisher that requires a sequence number assigned to every message, for publish
+ * idempotency. Errors are handled out of band. Thread safe.
+ */
+public interface SequencedPublisher extends ApiService, Flushable {
+ /**
+ * Publish a new message with an assigned sequence number.
+ *
+ * Behavior is undefined if a call to flush() is outstanding or close() has already been
+ * called. This method never blocks.
+ *
+ *
Guarantees that if a single publish future has an exception set, all publish calls made
+ * after that will also have an exception set.
+ */
+ ApiFuture publish(Message message, PublishSequenceNumber sequenceNumber);
+
+ /** Attempts to cancel all outstanding publishes. */
+ void cancelOutstandingPublishes();
+}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerSettings.java
index ea0c52200..51fb06921 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerSettings.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerSettings.java
@@ -21,8 +21,6 @@
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient;
import com.google.common.flogger.GoogleLogger;
-import com.google.protobuf.ByteString;
-import java.nio.ByteBuffer;
import java.util.UUID;
@AutoValue
@@ -39,7 +37,7 @@ public abstract class AssignerSettings {
abstract UUID uuid();
public static Builder newBuilder() {
- return new AutoValue_AssignerSettings.Builder().setUuid(UUID.randomUUID());
+ return new AutoValue_AssignerSettings.Builder().setUuid(UuidBuilder.generate());
}
@AutoValue.Builder
@@ -58,16 +56,13 @@ public abstract static class Builder {
}
public Assigner instantiate() {
- ByteBuffer uuidBuffer = ByteBuffer.allocate(16);
- uuidBuffer.putLong(uuid().getMostSignificantBits());
- uuidBuffer.putLong(uuid().getLeastSignificantBits());
logger.atInfo().log(
"Subscription %s using UUID %s for assignment.", subscriptionPath(), uuid());
InitialPartitionAssignmentRequest initial =
InitialPartitionAssignmentRequest.newBuilder()
.setSubscription(subscriptionPath().toString())
- .setClientId(ByteString.copyFrom(uuidBuffer.array()))
+ .setClientId(UuidBuilder.toByteString(uuid()))
.build();
return new AssignerImpl(serviceClient(), initial, receiver());
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisher.java
index e1b26128c..9aa4616b9 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisher.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisher.java
@@ -16,10 +16,14 @@
package com.google.cloud.pubsublite.internal.wire;
+import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.util.Collection;
interface BatchPublisher extends AutoCloseable {
- /** Publish the batch of messages. Failures are communicated out of band. */
- void publish(Collection messages);
+ /**
+ * Publish the batch of messages, with the given sequence number of the first message in the
+ * batch. Failures are communicated out of band.
+ */
+ void publish(Collection messages, PublishSequenceNumber firstSequenceNumber);
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherFactory.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherFactory.java
index 8f922f941..e0296bfa7 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherFactory.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherFactory.java
@@ -16,9 +16,10 @@
package com.google.cloud.pubsublite.internal.wire;
-import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.MessagePublishResponse;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;
interface BatchPublisherFactory
- extends SingleConnectionFactory {}
+ extends SingleConnectionFactory<
+ PublishRequest, PublishResponse, MessagePublishResponse, BatchPublisher> {}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherImpl.java
index 3609d2a5a..77a29b5a3 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherImpl.java
@@ -19,48 +19,52 @@
import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
import com.google.api.gax.rpc.ResponseObserver;
-import com.google.api.gax.rpc.StatusCode.Code;
-import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
+import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
+import com.google.cloud.pubsublite.proto.MessagePublishRequest;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Collection;
-import java.util.Optional;
-class BatchPublisherImpl extends SingleConnection
+class BatchPublisherImpl
+ extends SingleConnection
implements BatchPublisher {
- private final CloseableMonitor monitor = new CloseableMonitor();
-
- @GuardedBy("monitor.monitor")
- private Optional lastOffset = Optional.empty();
-
static class Factory implements BatchPublisherFactory {
@Override
public BatchPublisherImpl New(
StreamFactory streamFactory,
- ResponseObserver clientStream,
+ ResponseObserver clientStream,
PublishRequest initialRequest) {
return new BatchPublisherImpl(streamFactory::New, clientStream, initialRequest);
}
}
+ private final boolean sendSequenceNumbers;
+
private BatchPublisherImpl(
PublishStreamFactory streamFactory,
- ResponseObserver publishCompleteStream,
+ ResponseObserver publishCompleteStream,
PublishRequest initialRequest) {
super(streamFactory, publishCompleteStream);
initialize(initialRequest);
+
+ // Publish idempotency is enabled when a publisher client id is specified. Otherwise do not send
+ // sequence numbers to the stream.
+ this.sendSequenceNumbers = !initialRequest.getInitialRequest().getClientId().isEmpty();
}
@Override
- public void publish(Collection messages) {
+ public void publish(
+ Collection messages, PublishSequenceNumber firstSequenceNumber) {
PublishRequest.Builder builder = PublishRequest.newBuilder();
- builder.getMessagePublishRequestBuilder().addAllMessages(messages);
+ MessagePublishRequest.Builder publishRequestBuilder = builder.getMessagePublishRequestBuilder();
+ publishRequestBuilder.addAllMessages(messages);
+ if (sendSequenceNumbers) {
+ publishRequestBuilder.setFirstSequenceNumber(firstSequenceNumber.value());
+ }
sendToStream(builder.build());
}
@@ -77,18 +81,6 @@ protected void handleStreamResponse(PublishResponse response) throws CheckedApiE
checkState(
response.hasMessageResponse(),
"Received response on stream which was neither a message or initial response.");
- onMessageResponse(response.getMessageResponse());
- }
-
- private void onMessageResponse(MessagePublishResponse response) throws CheckedApiException {
- Offset offset = Offset.of(response.getStartCursor().getOffset());
- try (CloseableMonitor.Hold h = monitor.enter()) {
- if (lastOffset.isPresent() && offset.value() <= lastOffset.get().value()) {
- throw new CheckedApiException(
- "Received out of order offsets on stream.", Code.FAILED_PRECONDITION);
- }
- lastOffset = Optional.of(offset);
- }
- sendToClient(offset);
+ sendToClient(response.getMessageResponse());
}
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilder.java
index 90fbed348..85a566e88 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilder.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilder.java
@@ -25,9 +25,12 @@
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.SequencedPublisher;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.util.Optional;
/**
* A builder for a PubSub Lite Publisher. Basic usage:
@@ -65,6 +68,13 @@ public abstract class PublisherBuilder {
abstract PublishStreamFactory streamFactory();
// Optional parameters.
+
+ /**
+ * Set the publisher client id to enable publish idempotency. The same client id must be set for
+ * all partitions.
+ */
+ abstract Optional clientId();
+
public static Builder builder() {
return new AutoValue_PublisherBuilder.Builder();
}
@@ -80,16 +90,31 @@ public abstract static class Builder {
public abstract Builder setStreamFactory(PublishStreamFactory streamFactory);
+ // Optional parameters.
+ public abstract Builder setClientId(ByteString clientId);
+
abstract PublisherBuilder autoBuild();
public Publisher build() throws ApiException {
+ return new SequenceAssigningPublisher(buildSequenced());
+ }
+
+ /**
+ * Builds the underlying publisher that can accept externally assigned sequence numbers for each
+ * message.
+ */
+ public SequencedPublisher buildSequenced() throws ApiException {
PublisherBuilder autoBuilt = autoBuild();
- return new PublisherImpl(
- autoBuilt.streamFactory(),
+ InitialPublishRequest.Builder requestBuilder =
InitialPublishRequest.newBuilder()
.setTopic(autoBuilt.topic().toString())
- .setPartition(autoBuilt.partition().value())
- .build(),
+ .setPartition(autoBuilt.partition().value());
+ if (autoBuilt.clientId().isPresent()) {
+ requestBuilder.setClientId(autoBuilt.clientId().get());
+ }
+ return new PublisherImpl(
+ autoBuilt.streamFactory(),
+ requestBuilder.build(),
validateBatchingSettings(autoBuilt.batching()));
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java
index 6a4aa8289..8bf00adb7 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java
@@ -17,11 +17,13 @@
package com.google.cloud.pubsublite.internal.wire;
import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toCollection;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
-import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
@@ -32,14 +34,18 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
-import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
+import com.google.cloud.pubsublite.internal.SequencedPublisher;
import com.google.cloud.pubsublite.internal.wire.SerialBatcher.UnbatchedMessage;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
+import com.google.cloud.pubsublite.proto.MessagePublishResponse;
+import com.google.cloud.pubsublite.proto.MessagePublishResponse.CursorRange;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Monitor;
import java.time.Duration;
@@ -50,11 +56,10 @@
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Future;
-import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
public final class PublisherImpl extends ProxyService
- implements Publisher, RetryingConnectionObserver {
+ implements SequencedPublisher, RetryingConnectionObserver {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final AlarmFactory alarmFactory;
@@ -79,7 +84,7 @@ public boolean isSatisfied() {
private boolean shutdown = false;
@GuardedBy("monitor.monitor")
- private Optional lastSentOffset = Optional.empty();
+ private Offset lastSentOffset = Offset.of(-1);
// batcherMonitor is always acquired after monitor.monitor when both are held.
private final CloseableMonitor batcherMonitor = new CloseableMonitor();
@@ -88,12 +93,24 @@ public boolean isSatisfied() {
private final SerialBatcher batcher;
private static class InFlightBatch {
- final List messages;
- final List> messageFutures;
+ final List messages;
- InFlightBatch(Collection toBatch) {
- messages = toBatch.stream().map(UnbatchedMessage::message).collect(Collectors.toList());
- messageFutures = toBatch.stream().map(UnbatchedMessage::future).collect(Collectors.toList());
+ InFlightBatch(List toBatch) {
+ this.messages = toBatch;
+ }
+
+ List messagesToSend() {
+ return messages.stream().map(UnbatchedMessage::message).collect(toImmutableList());
+ }
+
+ PublishSequenceNumber firstSequenceNumber() {
+ return messages.get(0).sequenceNumber();
+ }
+
+ void failBatch(int startIdx, CheckedApiException e) {
+ for (int i = startIdx; i < messages.size(); i++) {
+ messages.get(i).future().setException(e);
+ }
}
}
@@ -139,12 +156,10 @@ public PublisherImpl(
@GuardedBy("monitor.monitor")
private void rebatchForRestart() {
- Queue messages = new ArrayDeque<>();
- for (InFlightBatch batch : batchesInFlight) {
- for (int i = 0; i < batch.messages.size(); ++i) {
- messages.add(UnbatchedMessage.of(batch.messages.get(i), batch.messageFutures.get(i)));
- }
- }
+ Queue messages =
+ batchesInFlight.stream()
+ .flatMap(b -> b.messages.stream())
+ .collect(toCollection(ArrayDeque::new));
logger.atFiner().log(
"Re-publishing %s messages after reconnection for partition %s",
messages.size(), initialRequest.getInitialRequest().getPartition());
@@ -157,8 +172,8 @@ private void rebatchForRestart() {
if (size + messageSize > Constants.MAX_PUBLISH_BATCH_BYTES
|| count + 1 > Constants.MAX_PUBLISH_BATCH_COUNT) {
if (!currentBatch.isEmpty()) {
- batchesInFlight.add(new InFlightBatch(currentBatch));
- currentBatch = new ArrayDeque<>();
+ batchesInFlight.add(new InFlightBatch(ImmutableList.copyOf(currentBatch)));
+ currentBatch.clear();
count = 0;
size = 0;
}
@@ -168,7 +183,7 @@ private void rebatchForRestart() {
count += 1;
}
if (!currentBatch.isEmpty()) {
- batchesInFlight.add(new InFlightBatch(currentBatch));
+ batchesInFlight.add(new InFlightBatch(ImmutableList.copyOf(currentBatch)));
}
}
@@ -181,7 +196,11 @@ public void triggerReinitialize(CheckedApiException streamError) {
connection.modifyConnection(
connectionOr -> {
if (!connectionOr.isPresent()) return;
- batches.forEach(batch -> connectionOr.get().publish(batch.messages));
+ batches.forEach(
+ batch ->
+ connectionOr
+ .get()
+ .publish(batch.messagesToSend(), batch.firstSequenceNumber()));
});
} catch (CheckedApiException e) {
onPermanentError(e);
@@ -219,7 +238,7 @@ protected void stop() {
@GuardedBy("monitor.monitor")
private void terminateOutstandingPublishes(CheckedApiException e) {
batchesInFlight.forEach(
- batch -> batch.messageFutures.forEach(future -> future.setException(e)));
+ batch -> batch.messages.forEach(message -> message.future().setException(e)));
try (CloseableMonitor.Hold h = batcherMonitor.enter()) {
batcher.flush().forEach(batch -> batch.forEach(m -> m.future().setException(e)));
}
@@ -227,7 +246,7 @@ private void terminateOutstandingPublishes(CheckedApiException e) {
}
@Override
- public ApiFuture publish(Message message) {
+ public ApiFuture publish(Message message, PublishSequenceNumber sequenceNumber) {
PubSubMessage proto = message.toProto();
try (CloseableMonitor.Hold h = batcherMonitor.enter()) {
ApiService.State currentState = state();
@@ -239,7 +258,7 @@ public ApiFuture publish(Message message) {
Code.FAILED_PRECONDITION);
case STARTING:
case RUNNING:
- return batcher.add(proto);
+ return batcher.add(proto, sequenceNumber);
default:
throw new CheckedApiException(
"Cannot publish when Publisher state is " + currentState.name(),
@@ -275,14 +294,16 @@ private void flushToStream() {
}
@GuardedBy("monitor.monitor")
- private void processBatch(Collection batch) throws CheckedApiException {
+ private void processBatch(List batch) throws CheckedApiException {
if (batch.isEmpty()) return;
InFlightBatch inFlightBatch = new InFlightBatch(batch);
batchesInFlight.add(inFlightBatch);
connection.modifyConnection(
connectionOr -> {
checkState(connectionOr.isPresent(), "Published after the stream shut down.");
- connectionOr.get().publish(inFlightBatch.messages);
+ connectionOr
+ .get()
+ .publish(inFlightBatch.messagesToSend(), inFlightBatch.firstSequenceNumber());
});
}
@@ -294,23 +315,55 @@ public void flush() {
}
@Override
- public void onClientResponse(Offset value) throws CheckedApiException {
+ public void onClientResponse(MessagePublishResponse publishResponse) throws CheckedApiException {
+ // Ensure cursor ranges are sorted by increasing message batch index.
+ ImmutableList ranges =
+ ImmutableList.sortedCopyOf(
+ comparing(CursorRange::getStartIndex), publishResponse.getCursorRangesList());
try (CloseableMonitor.Hold h = monitor.enter()) {
checkState(
!batchesInFlight.isEmpty(), "Received publish response with no batches in flight.");
- if (lastSentOffset.isPresent() && lastSentOffset.get().value() >= value.value()) {
- throw new CheckedApiException(
- String.format(
- "Received publish response with offset %s that is inconsistent with previous"
- + " responses max %s",
- value, lastSentOffset.get()),
- Code.FAILED_PRECONDITION);
- }
InFlightBatch batch = batchesInFlight.remove();
- lastSentOffset = Optional.of(Offset.of(value.value() + batch.messages.size() - 1));
- for (int i = 0; i < batch.messageFutures.size(); i++) {
- Offset offset = Offset.of(value.value() + i);
- batch.messageFutures.get(i).set(offset);
+ int rangeIndex = 0;
+ for (int messageIndex = 0; messageIndex < batch.messages.size(); messageIndex++) {
+ UnbatchedMessage message = batch.messages.get(messageIndex);
+ try {
+ if (rangeIndex < ranges.size() && ranges.get(rangeIndex).getEndIndex() <= messageIndex) {
+ rangeIndex++;
+ if (rangeIndex < ranges.size()
+ && ranges.get(rangeIndex).getStartIndex()
+ < ranges.get(rangeIndex - 1).getEndIndex()) {
+ throw new CheckedApiException(
+ String.format(
+ "Server sent invalid cursor ranges in message publish response: %s",
+ publishResponse),
+ Code.FAILED_PRECONDITION);
+ }
+ }
+ if (rangeIndex < ranges.size()
+ && messageIndex >= ranges.get(rangeIndex).getStartIndex()
+ && messageIndex < ranges.get(rangeIndex).getEndIndex()) {
+ CursorRange range = ranges.get(rangeIndex);
+ Offset offset =
+ Offset.of(
+ range.getStartCursor().getOffset() + messageIndex - range.getStartIndex());
+ if (lastSentOffset.value() >= offset.value()) {
+ throw new CheckedApiException(
+ String.format(
+ "Received publish response with offset %s that is inconsistent with"
+ + " previous offset %s",
+ offset, lastSentOffset),
+ Code.FAILED_PRECONDITION);
+ }
+ message.future().set(offset);
+ lastSentOffset = offset;
+ } else {
+ message.future().set(Offset.of(-1));
+ }
+ } catch (CheckedApiException e) {
+ batch.failBatch(messageIndex, e);
+ throw e;
+ }
}
}
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SequenceAssigningPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SequenceAssigningPublisher.java
new file mode 100644
index 000000000..1c32e02c9
--- /dev/null
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SequenceAssigningPublisher.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.internal.wire;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.ProxyService;
+import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
+import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.SequencedPublisher;
+import java.io.IOException;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A publisher that assigns sequence numbers to messages and delegates to an underlying
+ * SequencedPublisher.
+ */
+public class SequenceAssigningPublisher extends ProxyService implements Publisher {
+ private final SequencedPublisher publisher;
+
+ @GuardedBy("this")
+ private PublishSequenceNumber nextSequence = PublishSequenceNumber.FIRST;
+
+ SequenceAssigningPublisher(SequencedPublisher publisher) throws ApiException {
+ super(publisher);
+ this.publisher = publisher;
+ }
+
+ // Publisher implementation.
+ @Override
+ public synchronized ApiFuture publish(Message message) {
+ ApiFuture future = publisher.publish(message, nextSequence);
+ nextSequence = nextSequence.next();
+ return future;
+ }
+
+ @Override
+ public void cancelOutstandingPublishes() {
+ publisher.cancelOutstandingPublishes();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ publisher.flush();
+ }
+}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java
index 9512910a5..bbeb71d40 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java
@@ -18,8 +18,11 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.StatusCode.Code;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -36,21 +39,43 @@ class SerialBatcher {
public abstract static class UnbatchedMessage {
public abstract PubSubMessage message();
+ public abstract PublishSequenceNumber sequenceNumber();
+
public abstract SettableApiFuture future();
- public static UnbatchedMessage of(PubSubMessage message, SettableApiFuture future) {
- return new AutoValue_SerialBatcher_UnbatchedMessage(message, future);
+ public static UnbatchedMessage of(
+ PubSubMessage message,
+ PublishSequenceNumber sequenceNumber,
+ SettableApiFuture future) {
+ return new AutoValue_SerialBatcher_UnbatchedMessage(message, sequenceNumber, future);
}
}
+ // Returns whether there is a sequence number discontinuity between two consecutive messages,
+ // which is not expected. Only the sequence number of the first message in a batch can be
+ // specified, and the remaining messages must have contiguous sequence numbers.
+ private static boolean hasSequenceDiscontinuity(
+ PublishSequenceNumber previous, PublishSequenceNumber current) {
+ return previous.value() + 1 != current.value();
+ }
+
SerialBatcher(long byteLimit, long messageLimit) {
this.byteLimit = byteLimit;
this.messageLimit = messageLimit;
}
- ApiFuture add(PubSubMessage message) {
+ ApiFuture add(PubSubMessage message, PublishSequenceNumber sequenceNumber)
+ throws CheckedApiException {
+ if (!messages.isEmpty()
+ && hasSequenceDiscontinuity(messages.peekLast().sequenceNumber(), sequenceNumber)) {
+ throw new CheckedApiException(
+ String.format(
+ "Discontinuity in publish sequence numbers; previous: %s, next: %s",
+ messages.peekLast().sequenceNumber(), sequenceNumber),
+ Code.FAILED_PRECONDITION);
+ }
SettableApiFuture future = SettableApiFuture.create();
- messages.add(UnbatchedMessage.of(message, future));
+ messages.add(UnbatchedMessage.of(message, sequenceNumber, future));
return future;
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java
index 60fb2d86d..e858979ed 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java
@@ -25,6 +25,8 @@
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import java.util.Optional;
@AutoValue
public abstract class SinglePartitionPublisherBuilder {
@@ -37,6 +39,9 @@ public abstract class SinglePartitionPublisherBuilder {
abstract BatchingSettings batchingSettings();
+ // Optional parameters.
+ abstract Optional clientId();
+
// For testing.
abstract PublisherBuilder.Builder underlyingBuilder();
@@ -57,6 +62,9 @@ public abstract static class Builder {
public abstract Builder setBatchingSettings(BatchingSettings batchingSettings);
+ // Optional parameters.
+ public abstract Builder setClientId(ByteString clientId);
+
// For testing.
@VisibleForTesting
abstract Builder setUnderlyingBuilder(PublisherBuilder.Builder underlyingBuilder);
@@ -72,6 +80,9 @@ public Publisher build() throws ApiException {
.setPartition(builder.partition())
.setStreamFactory(builder.streamFactory())
.setBatching(builder.batchingSettings());
+ if (builder.clientId().isPresent()) {
+ publisherBuilder.setClientId(builder.clientId().get());
+ }
return new SinglePartitionPublisher(publisherBuilder.build(), builder.partition());
}
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/UuidBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/UuidBuilder.java
new file mode 100644
index 000000000..e6eccc401
--- /dev/null
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/UuidBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.internal.wire;
+
+import com.google.protobuf.ByteString;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/** Utilities for generating and converting 128-bit UUIDs. */
+public final class UuidBuilder {
+
+ /** Generates a random UUID. */
+ public static UUID generate() {
+ return UUID.randomUUID();
+ }
+
+ /** Converts a UUID to a ByteString. */
+ public static ByteString toByteString(UUID uuid) {
+ ByteBuffer uuidBuffer = ByteBuffer.allocate(16);
+ uuidBuffer.putLong(uuid.getMostSignificantBits());
+ uuidBuffer.putLong(uuid.getLeastSignificantBits());
+ return ByteString.copyFrom(uuidBuffer.array());
+ }
+
+ private UuidBuilder() {}
+}
diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherImplTest.java
index 1c43b9512..61fb6c0cf 100755
--- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherImplTest.java
+++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherImplTest.java
@@ -36,12 +36,14 @@
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.ApiExceptionMatcher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
import com.google.cloud.pubsublite.proto.InitialPublishResponse;
import com.google.cloud.pubsublite.proto.MessagePublishRequest;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
+import com.google.cloud.pubsublite.proto.MessagePublishResponse.CursorRange;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;
@@ -63,7 +65,9 @@
@RunWith(JUnit4.class)
public class BatchPublisherImplTest {
- private static PublishRequest initialRequest() {
+ private static final ByteString CLIENT_ID = ByteString.copyFromUtf8("publisher");
+
+ private static PublishRequest initialRequest(ByteString clientId) {
return PublishRequest.newBuilder()
.setInitialRequest(
InitialPublishRequest.newBuilder()
@@ -75,17 +79,44 @@ private static PublishRequest initialRequest() {
.build()
.toString())
.setPartition(1024)
+ .setClientId(clientId)
.build())
.build();
}
+ private static PublishRequest messagePublishRequest(PubSubMessage... messages) {
+ return PublishRequest.newBuilder()
+ .setMessagePublishRequest(
+ MessagePublishRequest.newBuilder().addAllMessages(Arrays.asList(messages)))
+ .build();
+ }
+
+ private static PublishRequest messagePublishRequest(
+ PublishSequenceNumber firstSequence, PubSubMessage... messages) {
+ return PublishRequest.newBuilder()
+ .setMessagePublishRequest(
+ MessagePublishRequest.newBuilder()
+ .addAllMessages(Arrays.asList(messages))
+ .setFirstSequenceNumber(firstSequence.value()))
+ .build();
+ }
+
+ private static MessagePublishResponse messageResponse(Offset startOffset) {
+ Cursor startCursor = Cursor.newBuilder().setOffset(startOffset.value()).build();
+ return MessagePublishResponse.newBuilder()
+ .setStartCursor(startCursor)
+ .addCursorRanges(
+ CursorRange.newBuilder().setStartCursor(startCursor).setStartIndex(0).setEndIndex(5))
+ .build();
+ }
+
private static final BatchPublisherImpl.Factory FACTORY = new BatchPublisherImpl.Factory();
@Mock private PublishStreamFactory streamFactory;
@Mock private ClientStream mockRequestStream;
- @Mock private ResponseObserver mockOutputStream;
+ @Mock private ResponseObserver mockOutputStream;
private Optional> leakedResponseStream = Optional.empty();
@@ -98,8 +129,8 @@ public void setUp() throws IOException {
(Answer>)
args -> {
Preconditions.checkArgument(!leakedResponseStream.isPresent());
- ResponseObserver ResponseObserver = args.getArgument(0);
- leakedResponseStream = Optional.of(ResponseObserver);
+ ResponseObserver responseObserver = args.getArgument(0);
+ leakedResponseStream = Optional.of(responseObserver);
return mockRequestStream;
})
.when(streamFactory)
@@ -113,11 +144,13 @@ public void tearDown() {
}
}
- private class OffsetAnswer implements Answer {
- private final Offset offset;
+ private class MessageResponseAnswer implements Answer {
+ private final MessagePublishResponse response;
+ private final ByteString clientId;
- OffsetAnswer(Offset offset) {
- this.offset = offset;
+ MessageResponseAnswer(ByteString clientId, MessagePublishResponse response) {
+ this.response = response;
+ this.clientId = clientId;
}
@Override
@@ -125,13 +158,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
Preconditions.checkArgument(leakedResponseStream.isPresent());
leakedResponseStream
.get()
- .onResponse(
- PublishResponse.newBuilder()
- .setMessageResponse(
- MessagePublishResponse.newBuilder()
- .setStartCursor(Cursor.newBuilder().setOffset(offset.value())))
- .build());
- verify(mockRequestStream).send(initialRequest());
+ .onResponse(PublishResponse.newBuilder().setMessageResponse(response).build());
+ verify(mockRequestStream).send(initialRequest(clientId));
return null;
}
}
@@ -151,9 +179,9 @@ public void construct_SendsInitialThenResponse() {
return null;
})
.when(mockRequestStream)
- .send(initialRequest());
+ .send(initialRequest(ByteString.EMPTY));
try (BatchPublisherImpl publisher =
- FACTORY.New(streamFactory, mockOutputStream, initialRequest())) {}
+ FACTORY.New(streamFactory, mockOutputStream, initialRequest(ByteString.EMPTY))) {}
}
@Test
@@ -167,9 +195,9 @@ public void construct_SendsInitialThenError() {
return null;
})
.when(mockRequestStream)
- .send(initialRequest());
+ .send(initialRequest(ByteString.EMPTY));
try (BatchPublisherImpl publisher =
- FACTORY.New(streamFactory, mockOutputStream, initialRequest())) {
+ FACTORY.New(streamFactory, mockOutputStream, initialRequest(ByteString.EMPTY))) {
verify(mockOutputStream).onError(argThat(new ApiExceptionMatcher(Code.INTERNAL)));
verifyNoMoreInteractions(mockOutputStream);
}
@@ -177,16 +205,18 @@ public void construct_SendsInitialThenError() {
@Test
public void construct_SendsMessagePublishResponseError() {
- doAnswer(new OffsetAnswer(Offset.of(10))).when(mockRequestStream).send(initialRequest());
+ doAnswer(new MessageResponseAnswer(ByteString.EMPTY, messageResponse(Offset.of(10))))
+ .when(mockRequestStream)
+ .send(initialRequest(ByteString.EMPTY));
try (BatchPublisherImpl publisher =
- FACTORY.New(streamFactory, mockOutputStream, initialRequest())) {
+ FACTORY.New(streamFactory, mockOutputStream, initialRequest(ByteString.EMPTY))) {
verify(mockOutputStream).onError(argThat(new ApiExceptionMatcher(Code.FAILED_PRECONDITION)));
verifyNoMoreInteractions(mockOutputStream);
}
leakedResponseStream = Optional.empty();
}
- private BatchPublisherImpl initialize() {
+ private BatchPublisherImpl initialize(ByteString clientId) {
doAnswer(
(Answer)
args -> {
@@ -200,22 +230,23 @@ private BatchPublisherImpl initialize() {
return null;
})
.when(mockRequestStream)
- .send(initialRequest());
- return FACTORY.New(streamFactory, mockOutputStream, initialRequest());
+ .send(initialRequest(clientId));
+ return FACTORY.New(streamFactory, mockOutputStream, initialRequest(clientId));
}
@Test
public void responseAfterClose_Dropped() throws Exception {
- BatchPublisher publisher = initialize();
+ BatchPublisher publisher = initialize(ByteString.EMPTY);
publisher.close();
verify(mockRequestStream).closeSend();
- publisher.publish(ImmutableList.of(PubSubMessage.getDefaultInstance()));
+ publisher.publish(
+ ImmutableList.of(PubSubMessage.getDefaultInstance()), PublishSequenceNumber.FIRST);
verify(mockOutputStream, never()).onResponse(any());
}
@Test
public void duplicateInitial_Abort() {
- BatchPublisher unusedPublisher = initialize();
+ BatchPublisher unusedPublisher = initialize(ByteString.EMPTY);
PublishResponse.Builder builder = PublishResponse.newBuilder();
builder.getInitialResponseBuilder();
leakedResponseStream.get().onResponse(builder.build());
@@ -223,60 +254,63 @@ public void duplicateInitial_Abort() {
leakedResponseStream = Optional.empty();
}
- private static PublishRequest messagePublishRequest(PubSubMessage... messages) {
- return PublishRequest.newBuilder()
- .setMessagePublishRequest(
- MessagePublishRequest.newBuilder().addAllMessages(Arrays.asList(messages)))
- .build();
- }
-
@Test
- public void offsetResponseInOrder_Ok() {
- BatchPublisher publisher = initialize();
- doAnswer(new OffsetAnswer(Offset.of(10)))
+ public void setsSequenceNumbersWhenClientIdPresent() {
+ BatchPublisher publisher = initialize(CLIENT_ID);
+ doAnswer(new MessageResponseAnswer(CLIENT_ID, messageResponse(Offset.of(10))))
.when(mockRequestStream)
- .send(messagePublishRequest(PubSubMessage.getDefaultInstance()));
- doAnswer(new OffsetAnswer(Offset.of(20)))
+ .send(
+ messagePublishRequest(
+ PublishSequenceNumber.of(100), PubSubMessage.getDefaultInstance()));
+ doAnswer(new MessageResponseAnswer(CLIENT_ID, messageResponse(Offset.of(20))))
.when(mockRequestStream)
.send(
messagePublishRequest(
+ PublishSequenceNumber.of(200),
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()));
- publisher.publish(ImmutableList.of(PubSubMessage.getDefaultInstance()));
+ publisher.publish(
+ ImmutableList.of(PubSubMessage.getDefaultInstance()), PublishSequenceNumber.of(100));
publisher.publish(
ImmutableList.of(
- PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()));
+ PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()),
+ PublishSequenceNumber.of(200));
InOrder requests = inOrder(mockRequestStream);
requests
.verify(mockRequestStream)
- .send(messagePublishRequest(PubSubMessage.getDefaultInstance()));
+ .send(
+ messagePublishRequest(
+ PublishSequenceNumber.of(100), PubSubMessage.getDefaultInstance()));
requests
.verify(mockRequestStream)
.send(
messagePublishRequest(
+ PublishSequenceNumber.of(200),
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()));
InOrder outputs = inOrder(mockOutputStream);
- outputs.verify(mockOutputStream).onResponse(Offset.of(10));
- outputs.verify(mockOutputStream).onResponse(Offset.of(20));
+ outputs.verify(mockOutputStream).onResponse(messageResponse(Offset.of(10)));
+ outputs.verify(mockOutputStream).onResponse(messageResponse(Offset.of(20)));
verifyNoMoreInteractions(mockRequestStream);
verifyNoMoreInteractions(mockOutputStream);
}
@Test
- public void offsetResponseOutOfOrder_Exception() {
- BatchPublisher publisher = initialize();
- doAnswer(new OffsetAnswer(Offset.of(10)))
+ public void omitsSequenceNumbersWhenClientIdAbsent() {
+ BatchPublisher publisher = initialize(ByteString.EMPTY);
+ doAnswer(new MessageResponseAnswer(ByteString.EMPTY, messageResponse(Offset.of(10))))
.when(mockRequestStream)
.send(messagePublishRequest(PubSubMessage.getDefaultInstance()));
- doAnswer(new OffsetAnswer(Offset.of(5)))
+ doAnswer(new MessageResponseAnswer(ByteString.EMPTY, messageResponse(Offset.of(20))))
.when(mockRequestStream)
.send(
messagePublishRequest(
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()));
- publisher.publish(ImmutableList.of(PubSubMessage.getDefaultInstance()));
+ publisher.publish(
+ ImmutableList.of(PubSubMessage.getDefaultInstance()), PublishSequenceNumber.of(100));
publisher.publish(
ImmutableList.of(
- PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()));
+ PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()),
+ PublishSequenceNumber.of(200));
InOrder requests = inOrder(mockRequestStream);
requests
@@ -287,14 +321,10 @@ public void offsetResponseOutOfOrder_Exception() {
.send(
messagePublishRequest(
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()));
- requests.verify(mockRequestStream).closeSendWithError(argThat(new ApiExceptionMatcher()));
InOrder outputs = inOrder(mockOutputStream);
- outputs.verify(mockOutputStream).onResponse(Offset.of(10));
- outputs
- .verify(mockOutputStream)
- .onError(argThat(new ApiExceptionMatcher(Code.FAILED_PRECONDITION)));
- verifyNoMoreInteractions(mockRequestStream, mockOutputStream);
-
- leakedResponseStream = Optional.empty();
+ outputs.verify(mockOutputStream).onResponse(messageResponse(Offset.of(10)));
+ outputs.verify(mockOutputStream).onResponse(messageResponse(Offset.of(20)));
+ verifyNoMoreInteractions(mockRequestStream);
+ verifyNoMoreInteractions(mockOutputStream);
}
}
diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilderTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilderTest.java
index cb2beb030..be849c22e 100644
--- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilderTest.java
+++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilderTest.java
@@ -27,7 +27,9 @@
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.SequencedPublisher;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
+import com.google.protobuf.ByteString;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -36,7 +38,7 @@
public class PublisherBuilderTest {
@Test
public void testBuilder() {
- Publisher unusedPublisher =
+ PublisherBuilder.Builder builder =
PublisherBuilder.builder()
.setBatching(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
.setTopic(
@@ -46,7 +48,13 @@ public void testBuilder() {
.setName(TopicName.of("abc"))
.build())
.setPartition(Partition.of(85))
- .setStreamFactory(mock(PublishStreamFactory.class))
- .build();
+ .setStreamFactory(mock(PublishStreamFactory.class));
+ Publisher unusedPublisher = builder.build();
+ SequencedPublisher unusedSequencedPublisher = builder.buildSequenced();
+
+ // Optional parameters.
+ builder.setClientId(ByteString.copyFromUtf8("publisher"));
+ unusedPublisher = builder.build();
+ unusedSequencedPublisher = builder.buildSequenced();
}
}
diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java
index 543101a7b..98b061abc 100755
--- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java
+++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java
@@ -46,13 +46,18 @@
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
+import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
+import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
+import com.google.cloud.pubsublite.proto.MessagePublishResponse;
+import com.google.cloud.pubsublite.proto.MessagePublishResponse.CursorRange;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -84,6 +89,16 @@ public class PublisherImplTest {
.setElementCountThreshold(1000000L)
.build();
+ private static MessagePublishResponse messageResponse(Offset startOffset, int messageCount) {
+ return MessagePublishResponse.newBuilder()
+ .addCursorRanges(
+ CursorRange.newBuilder()
+ .setStartCursor(Cursor.newBuilder().setOffset(startOffset.value()))
+ .setStartIndex(0)
+ .setEndIndex(messageCount))
+ .build();
+ }
+
@Mock private PublishStreamFactory unusedStreamFactory;
@Mock private BatchPublisher mockBatchPublisher;
@Mock private BatchPublisherFactory mockPublisherFactory;
@@ -91,7 +106,7 @@ public class PublisherImplTest {
private PublisherImpl publisher;
private Future errorOccurredFuture;
- private ResponseObserver leakedOffsetStream;
+ private ResponseObserver leakedMessageResponseStream;
private Runnable leakedBatchAlarm;
@Before
@@ -99,7 +114,7 @@ public void setUp() throws CheckedApiException {
initMocks(this);
doAnswer(
args -> {
- leakedOffsetStream = args.getArgument(1);
+ leakedMessageResponseStream = args.getArgument(1);
return mockBatchPublisher;
})
.when(mockPublisherFactory)
@@ -123,80 +138,90 @@ public void setUp() throws CheckedApiException {
private void startPublisher() {
publisher.startAsync().awaitRunning();
- assertThat(leakedOffsetStream).isNotNull();
+ assertThat(leakedMessageResponseStream).isNotNull();
verify(mockPublisherFactory).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
}
@Test
- public void construct_CallsFactoryNew() {
+ public void construct_callsFactoryNew() {
startPublisher();
verifyNoMoreInteractions(mockPublisherFactory);
verifyNoInteractions(mockBatchPublisher);
}
@Test
- public void construct_FlushSendsBatched() throws Exception {
+ public void construct_flushSendsBatched() throws Exception {
startPublisher();
Message message = Message.builder().build();
- Future future = publisher.publish(message);
+ Future future = publisher.publish(message, PublishSequenceNumber.of(0));
doAnswer(
(Answer)
args -> {
- leakedOffsetStream.onResponse(Offset.of(10));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(10), 1));
return null;
})
.when(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message.toProto())),
+ eq(PublishSequenceNumber.of(0)));
publisher.flush();
verify(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message.toProto())),
+ eq(PublishSequenceNumber.of(0)));
assertThat(future.get()).isEqualTo(Offset.of(10));
verifyNoMoreInteractions(mockBatchPublisher);
}
@Test
- public void construct_CloseSendsBatched() throws Exception {
+ public void construct_closeSendsBatched() throws Exception {
startPublisher();
Message message = Message.builder().build();
- Future future = publisher.publish(message);
+ Future future = publisher.publish(message, PublishSequenceNumber.of(0));
doAnswer(
(Answer)
args -> {
- leakedOffsetStream.onResponse(Offset.of(10));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(10), 1));
return null;
})
.when(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message.toProto())),
+ eq(PublishSequenceNumber.of(0)));
publisher.stopAsync().awaitTerminated();
verify(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message.toProto())),
+ eq(PublishSequenceNumber.of(0)));
assertThat(future.get()).isEqualTo(Offset.of(10));
verify(mockBatchPublisher).close();
verifyNoMoreInteractions(mockBatchPublisher);
}
@Test
- public void publishBeforeStart_IsPermanentError() throws Exception {
+ public void publishBeforeStart_isPermanentError() throws Exception {
Message message = Message.builder().build();
- assertThrows(IllegalStateException.class, () -> publisher.publish(message));
+ assertThrows(
+ IllegalStateException.class, () -> publisher.publish(message, PublishSequenceNumber.of(0)));
assertThrows(IllegalStateException.class, () -> publisher.startAsync().awaitRunning());
verifyNoInteractions(mockPublisherFactory);
verifyNoInteractions(mockBatchPublisher);
}
@Test
- public void publishAfterError_IsError() throws Exception {
+ public void publishAfterError_isError() throws Exception {
startPublisher();
- leakedOffsetStream.onError(new CheckedApiException(Code.FAILED_PRECONDITION).underlying);
+ leakedMessageResponseStream.onError(
+ new CheckedApiException(Code.FAILED_PRECONDITION).underlying);
assertThrows(IllegalStateException.class, publisher::awaitTerminated);
errorOccurredFuture.get();
assertThrowableMatches(publisher.failureCause(), Code.FAILED_PRECONDITION);
Message message = Message.builder().build();
- Future future = publisher.publish(message);
+ Future future = publisher.publish(message, PublishSequenceNumber.of(0));
ExecutionException e = assertThrows(ExecutionException.class, future::get);
Optional statusOr = ExtractStatus.extract(e.getCause());
assertThat(statusOr.isPresent()).isTrue();
@@ -206,34 +231,37 @@ public void publishAfterError_IsError() throws Exception {
}
@Test
- public void multipleBatches_Ok() throws Exception {
+ public void multipleBatches_ok() throws Exception {
startPublisher();
Message message1 = Message.builder().build();
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();
- Future future1 = publisher.publish(message1);
- Future future2 = publisher.publish(message2);
+ Future future1 = publisher.publish(message1, PublishSequenceNumber.of(0));
+ Future future2 = publisher.publish(message2, PublishSequenceNumber.of(1));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
.publish(
- (Collection) argThat(hasItems(message1.toProto(), message2.toProto())));
- Future future3 = publisher.publish(message3);
+ (Collection) argThat(hasItems(message1.toProto(), message2.toProto())),
+ eq(PublishSequenceNumber.of(0)));
+ Future future3 = publisher.publish(message3, PublishSequenceNumber.of(2));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message3.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message3.toProto())),
+ eq(PublishSequenceNumber.of(2)));
assertThat(future1.isDone()).isFalse();
assertThat(future2.isDone()).isFalse();
assertThat(future3.isDone()).isFalse();
- leakedOffsetStream.onResponse(Offset.of(10));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(10), 2));
assertThat(future1.isDone()).isTrue();
assertThat(future1.get()).isEqualTo(Offset.of(10));
assertThat(future2.isDone()).isTrue();
assertThat(future2.get()).isEqualTo(Offset.of(11));
assertThat(future3.isDone()).isFalse();
- leakedOffsetStream.onResponse(Offset.of(12));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(12), 1));
assertThat(future3.isDone()).isTrue();
assertThat(future3.get()).isEqualTo(Offset.of(12));
@@ -241,7 +269,134 @@ public void multipleBatches_Ok() throws Exception {
}
@Test
- public void retryableError_RecreatesAndRetriesAll() throws Exception {
+ public void missingCursorRanges_ok() throws Exception {
+ startPublisher();
+ Message message1 = Message.builder().setData(ByteString.copyFromUtf8("data1")).build();
+ Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data2")).build();
+ Message message3 = Message.builder().setData(ByteString.copyFromUtf8("data3")).build();
+ Message message4 = Message.builder().setData(ByteString.copyFromUtf8("data4")).build();
+ Message message5 = Message.builder().setData(ByteString.copyFromUtf8("data5")).build();
+ Message message6 = Message.builder().setData(ByteString.copyFromUtf8("data6")).build();
+ Message message7 = Message.builder().setData(ByteString.copyFromUtf8("data7")).build();
+ Future future1 = publisher.publish(message1, PublishSequenceNumber.of(0));
+ Future future2 = publisher.publish(message2, PublishSequenceNumber.of(1));
+ Future future3 = publisher.publish(message3, PublishSequenceNumber.of(2));
+ Future future4 = publisher.publish(message4, PublishSequenceNumber.of(3));
+ Future future5 = publisher.publish(message5, PublishSequenceNumber.of(4));
+ Future future6 = publisher.publish(message6, PublishSequenceNumber.of(5));
+ Future future7 = publisher.publish(message7, PublishSequenceNumber.of(6));
+ leakedBatchAlarm.run();
+ verify(mockBatchPublisher)
+ .publish(
+ (Collection)
+ argThat(
+ hasItems(
+ message1.toProto(),
+ message2.toProto(),
+ message3.toProto(),
+ message4.toProto(),
+ message5.toProto(),
+ message6.toProto(),
+ message7.toProto())),
+ eq(PublishSequenceNumber.of(0)));
+
+ // The server should not respond with unsorted cursor ranges, but check that it is handled.
+ leakedMessageResponseStream.onResponse(
+ MessagePublishResponse.newBuilder()
+ .addCursorRanges(
+ CursorRange.newBuilder()
+ .setStartCursor(Cursor.newBuilder().setOffset(50))
+ .setStartIndex(4)
+ .setEndIndex(5))
+ .addCursorRanges(
+ CursorRange.newBuilder()
+ .setStartCursor(Cursor.newBuilder().setOffset(80))
+ .setStartIndex(5)
+ .setEndIndex(6))
+ .addCursorRanges(
+ CursorRange.newBuilder()
+ .setStartCursor(Cursor.newBuilder().setOffset(10))
+ .setStartIndex(1)
+ .setEndIndex(3))
+ .build());
+ assertThat(future1.isDone()).isTrue();
+ assertThat(future1.get()).isEqualTo(Offset.of(-1));
+ assertThat(future2.isDone()).isTrue();
+ assertThat(future2.get()).isEqualTo(Offset.of(10));
+ assertThat(future3.isDone()).isTrue();
+ assertThat(future3.get()).isEqualTo(Offset.of(11));
+ assertThat(future4.isDone()).isTrue();
+ assertThat(future4.get()).isEqualTo(Offset.of(-1));
+ assertThat(future5.isDone()).isTrue();
+ assertThat(future5.get()).isEqualTo(Offset.of(50));
+ assertThat(future6.isDone()).isTrue();
+ assertThat(future6.get()).isEqualTo(Offset.of(80));
+ assertThat(future7.isDone()).isTrue();
+ assertThat(future7.get()).isEqualTo(Offset.of(-1));
+
+ verifyNoMoreInteractions(mockBatchPublisher);
+ }
+
+ @Test
+ public void invalidCursorRanges_setsPermanentException() throws Exception {
+ startPublisher();
+ Message message1 = Message.builder().setData(ByteString.copyFromUtf8("data1")).build();
+ Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data2")).build();
+ Future future1 = publisher.publish(message1, PublishSequenceNumber.of(0));
+ Future future2 = publisher.publish(message2, PublishSequenceNumber.of(1));
+ leakedBatchAlarm.run();
+ verify(mockBatchPublisher)
+ .publish(
+ (Collection) argThat(hasItems(message1.toProto(), message2.toProto())),
+ eq(PublishSequenceNumber.of(0)));
+
+ leakedMessageResponseStream.onResponse(
+ MessagePublishResponse.newBuilder()
+ .addCursorRanges(
+ CursorRange.newBuilder()
+ .setStartCursor(Cursor.newBuilder().setOffset(100))
+ .setStartIndex(0)
+ .setEndIndex(1))
+ .addCursorRanges(
+ CursorRange.newBuilder()
+ .setStartCursor(Cursor.newBuilder().setOffset(200))
+ .setStartIndex(0)
+ .setEndIndex(1))
+ .build());
+ assertThat(future1.isDone()).isTrue();
+ assertThat(future1.get()).isEqualTo(Offset.of(100));
+
+ assertThrows(IllegalStateException.class, publisher::awaitTerminated);
+ assertThat(future2.isDone()).isTrue();
+ assertThrows(Exception.class, future2::get);
+ errorOccurredFuture.get();
+ assertThrowableMatches(publisher.failureCause(), Code.FAILED_PRECONDITION);
+
+ verifyNoMoreInteractions(mockBatchPublisher);
+ }
+
+ @Test
+ public void sequenceNumberDiscontinuity_setsPermanentException() throws Exception {
+ startPublisher();
+ Message message1 = Message.builder().setData(ByteString.copyFromUtf8("data1")).build();
+ Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data2")).build();
+ Future future1 = publisher.publish(message1, PublishSequenceNumber.of(10));
+ Future future2 = publisher.publish(message2, PublishSequenceNumber.of(10));
+
+ assertThrows(IllegalStateException.class, publisher::awaitTerminated);
+ assertThat(future1.isDone()).isTrue();
+ assertThat(future2.isDone()).isTrue();
+ assertThrows(Exception.class, future1::get);
+ assertThrows(Exception.class, future2::get);
+ errorOccurredFuture.get();
+ assertThrowableMatches(publisher.failureCause(), Code.FAILED_PRECONDITION);
+
+ verify(mockBatchPublisher).close();
+ verifyNoMoreInteractions(mockBatchPublisher);
+ }
+
+ @Test
+ public void retryableError_recreatesAndRetriesAll() throws Exception {
startPublisher();
Message message1 =
Message.builder()
@@ -251,15 +406,19 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception {
Message.builder()
.setData(ByteString.copyFromUtf8(String.join("", Collections.nCopies(21, "a"))))
.build();
- Future future1 = publisher.publish(message1);
+ Future future1 = publisher.publish(message1, PublishSequenceNumber.of(0));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message1.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message1.toProto())),
+ eq(PublishSequenceNumber.of(0)));
leakedBatchAlarm.run();
- Future future2 = publisher.publish(message2);
+ Future future2 = publisher.publish(message2, PublishSequenceNumber.of(1));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message2.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message2.toProto())),
+ eq(PublishSequenceNumber.of(1)));
assertThat(future1.isDone()).isFalse();
assertThat(future2.isDone()).isFalse();
@@ -268,7 +427,7 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception {
doReturn(mockBatchPublisher2)
.when(mockPublisherFactory)
.New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
- leakedOffsetStream.onError(new CheckedApiException(Code.UNKNOWN));
+ leakedMessageResponseStream.onError(new CheckedApiException(Code.UNKNOWN));
// wait for retry to complete
Thread.sleep(500);
@@ -277,19 +436,23 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception {
verifyNoMoreInteractions(mockBatchPublisher);
verify(mockPublisherFactory, times(2)).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
verify(mockBatchPublisher2)
- .publish((Collection) argThat(hasItems(message1.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message1.toProto())),
+ eq(PublishSequenceNumber.of(0)));
verify(mockBatchPublisher2)
- .publish((Collection) argThat(hasItems(message2.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message2.toProto())),
+ eq(PublishSequenceNumber.of(1)));
assertThat(future1.isDone()).isFalse();
assertThat(future2.isDone()).isFalse();
- leakedOffsetStream.onResponse(Offset.of(10));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(10), 1));
assertThat(future1.isDone()).isTrue();
assertThat(future1.get()).isEqualTo(Offset.of(10));
assertThat(future2.isDone()).isFalse();
- leakedOffsetStream.onResponse(Offset.of(50));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(50), 1));
assertThat(future2.isDone()).isTrue();
assertThat(future2.get()).isEqualTo(Offset.of(50));
@@ -297,7 +460,7 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception {
}
@Test
- public void retryableError_RebatchesProperly() throws Exception {
+ public void retryableError_rebatchesProperly() throws Exception {
startPublisher();
Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build();
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build();
@@ -314,23 +477,30 @@ public void retryableError_RebatchesProperly() throws Exception {
.mapToObj(x -> Message.builder().setData(ByteString.copyFromUtf8("clone-" + x)).build())
.collect(Collectors.toList());
- Future future1 = publisher.publish(message1);
- Future future2 = publisher.publish(message2);
+ Future future1 = publisher.publish(message1, PublishSequenceNumber.of(0));
+ Future future2 = publisher.publish(message2, PublishSequenceNumber.of(1));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
.publish(
- (Collection) argThat(hasItems(message1.toProto(), message2.toProto())));
+ (Collection) argThat(hasItems(message1.toProto(), message2.toProto())),
+ eq(PublishSequenceNumber.of(0)));
leakedBatchAlarm.run();
- Future future3 = publisher.publish(message3);
+ Future future3 = publisher.publish(message3, PublishSequenceNumber.of(2));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message3.toProto())));
- Future future4 = publisher.publish(message4);
+ .publish(
+ (Collection) argThat(hasItems(message3.toProto())),
+ eq(PublishSequenceNumber.of(2)));
+ Future future4 = publisher.publish(message4, PublishSequenceNumber.of(3));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message4.toProto())));
- List> remainingFutures =
- remaining.stream().map(publisher::publish).collect(Collectors.toList());
+ .publish(
+ (Collection) argThat(hasItems(message4.toProto())),
+ eq(PublishSequenceNumber.of(3)));
+ List> remainingFutures = new ArrayList<>();
+ for (int i = 0; i < remaining.size(); ++i) {
+ remainingFutures.add(publisher.publish(remaining.get(i), PublishSequenceNumber.of(i + 4)));
+ }
leakedBatchAlarm.run();
assertThat(future1.isDone()).isFalse();
@@ -345,7 +515,7 @@ public void retryableError_RebatchesProperly() throws Exception {
doReturn(mockBatchPublisher2)
.when(mockPublisherFactory)
.New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
- leakedOffsetStream.onError(new CheckedApiException(Code.UNKNOWN));
+ leakedMessageResponseStream.onError(new CheckedApiException(Code.UNKNOWN));
// wait for retry to complete
Thread.sleep(500);
@@ -356,10 +526,13 @@ public void retryableError_RebatchesProperly() throws Exception {
order
.verify(mockBatchPublisher2)
.publish(
- (Collection) argThat(hasItems(message1.toProto(), message2.toProto())));
+ (Collection) argThat(hasItems(message1.toProto(), message2.toProto())),
+ eq(PublishSequenceNumber.of(0)));
order
.verify(mockBatchPublisher2)
- .publish((Collection) argThat(hasItems(message3.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message3.toProto())),
+ eq(PublishSequenceNumber.of(2)));
ImmutableList.Builder expectedRebatch = ImmutableList.builder();
expectedRebatch.add(message4.toProto());
for (int i = 0; i < (Constants.MAX_PUBLISH_BATCH_COUNT - 1); ++i) {
@@ -367,11 +540,14 @@ public void retryableError_RebatchesProperly() throws Exception {
}
order
.verify(mockBatchPublisher2)
- .publish((Collection) argThat(contains(expectedRebatch.build().toArray())));
+ .publish(
+ (Collection) argThat(contains(expectedRebatch.build().toArray())),
+ eq(PublishSequenceNumber.of(3)));
order
.verify(mockBatchPublisher2)
.publish(
- (Collection) argThat(hasItems(Iterables.getLast(remaining).toProto())));
+ (Collection) argThat(hasItems(Iterables.getLast(remaining).toProto())),
+ eq(PublishSequenceNumber.of(1003)));
assertThat(future1.isDone()).isFalse();
assertThat(future2.isDone()).isFalse();
@@ -381,19 +557,20 @@ public void retryableError_RebatchesProperly() throws Exception {
assertThat(future.isDone()).isFalse();
}
- leakedOffsetStream.onResponse(Offset.of(10));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(10), 2));
assertThat(future1.isDone()).isTrue();
assertThat(future1.get()).isEqualTo(Offset.of(10));
assertThat(future2.isDone()).isTrue();
assertThat(future2.get()).isEqualTo(Offset.of(11));
assertThat(future3.isDone()).isFalse();
- leakedOffsetStream.onResponse(Offset.of(50));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(50), 1));
assertThat(future3.isDone()).isTrue();
assertThat(future3.get()).isEqualTo(Offset.of(50));
assertThat(future4.isDone()).isFalse();
- leakedOffsetStream.onResponse(Offset.of(100));
+ leakedMessageResponseStream.onResponse(
+ messageResponse(Offset.of(100), (int) Constants.MAX_PUBLISH_BATCH_COUNT));
assertThat(future4.isDone()).isTrue();
assertThat(future4.get()).isEqualTo(Offset.of(100));
for (int i = 0; i < (Constants.MAX_PUBLISH_BATCH_COUNT - 1); ++i) {
@@ -404,40 +581,43 @@ public void retryableError_RebatchesProperly() throws Exception {
Future lastFuture = Iterables.getLast(remainingFutures);
assertThat(lastFuture.isDone()).isFalse();
- leakedOffsetStream.onResponse(Offset.of(10000));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(10000), 1));
assertThat(lastFuture.isDone()).isTrue();
assertThat(lastFuture.get()).isEqualTo(Offset.of(10000));
}
@Test
- public void invalidOffsetSequence_SetsPermanentException() throws Exception {
+ public void invalidOffsetSequence_setsPermanentException() throws Exception {
startPublisher();
Message message1 = Message.builder().build();
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
Message message3 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();
- Future future1 = publisher.publish(message1);
- Future future2 = publisher.publish(message2);
+ Future future1 = publisher.publish(message1, PublishSequenceNumber.of(0));
+ Future future2 = publisher.publish(message2, PublishSequenceNumber.of(1));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
.publish(
- (Collection) argThat(hasItems(message1.toProto(), message2.toProto())));
- Future future3 = publisher.publish(message3);
+ (Collection) argThat(hasItems(message1.toProto(), message2.toProto())),
+ eq(PublishSequenceNumber.of(0)));
+ Future future3 = publisher.publish(message3, PublishSequenceNumber.of(2));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message3.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message3.toProto())),
+ eq(PublishSequenceNumber.of(2)));
assertThat(future1.isDone()).isFalse();
assertThat(future2.isDone()).isFalse();
assertThat(future3.isDone()).isFalse();
- leakedOffsetStream.onResponse(Offset.of(10));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(10), 2));
assertThat(future1.isDone()).isTrue();
assertThat(future1.get()).isEqualTo(Offset.of(10));
assertThat(future2.isDone()).isTrue();
assertThat(future2.get()).isEqualTo(Offset.of(11));
assertThat(future3.isDone()).isFalse();
- leakedOffsetStream.onResponse(Offset.of(11));
+ leakedMessageResponseStream.onResponse(messageResponse(Offset.of(11), 1));
assertThrows(IllegalStateException.class, publisher::awaitTerminated);
assertThat(future3.isDone()).isTrue();
assertThrows(Exception.class, future3::get);
@@ -453,14 +633,16 @@ public void cancelOutstandingPublishes_terminatesFutures() throws Exception {
// Publish a message and flush to stream.
Message message1 = Message.builder().setData(ByteString.copyFromUtf8("data")).build();
- Future future1 = publisher.publish(message1);
+ Future future1 = publisher.publish(message1, PublishSequenceNumber.of(0));
leakedBatchAlarm.run();
verify(mockBatchPublisher)
- .publish((Collection) argThat(hasItems(message1.toProto())));
+ .publish(
+ (Collection) argThat(hasItems(message1.toProto())),
+ eq(PublishSequenceNumber.of(0)));
// Publish another message but do not flush to stream yet.
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("other_data")).build();
- Future future2 = publisher.publish(message2);
+ Future future2 = publisher.publish(message2, PublishSequenceNumber.of(1));
// Cancel outstanding publishes and verify that both futures complete with a cancelled status.
assertThat(future1.isDone()).isFalse();
diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SequenceAssigningPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SequenceAssigningPublisherTest.java
new file mode 100644
index 000000000..b8527699a
--- /dev/null
+++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SequenceAssigningPublisherTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.internal.wire;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
+import com.google.cloud.pubsublite.internal.SequencedPublisher;
+import com.google.cloud.pubsublite.internal.testing.FakeApiService;
+import com.google.protobuf.ByteString;
+import java.util.concurrent.Future;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Spy;
+
+@RunWith(JUnit4.class)
+public final class SequenceAssigningPublisherTest {
+
+ private static Message makeMessage(String data) {
+ return Message.builder().setData(ByteString.copyFromUtf8(data)).build();
+ }
+
+ abstract static class FakeSequencedPublisher extends FakeApiService
+ implements SequencedPublisher {}
+
+ @Spy private FakeSequencedPublisher underlyingPublisher;
+
+ private SequenceAssigningPublisher publisher;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ publisher = new SequenceAssigningPublisher(underlyingPublisher);
+ }
+
+ @Test
+ public void publishAssignsSequenceNumbers() throws Exception {
+ Message message1 = makeMessage("msg1");
+ Message message2 = makeMessage("msg2");
+ Message message3 = makeMessage("msg3");
+
+ when(underlyingPublisher.publish(eq(message1), eq(PublishSequenceNumber.of(0))))
+ .thenReturn(ApiFutures.immediateFuture(Offset.of(100)));
+ when(underlyingPublisher.publish(eq(message2), eq(PublishSequenceNumber.of(1))))
+ .thenReturn(ApiFutures.immediateFuture(Offset.of(200)));
+ when(underlyingPublisher.publish(eq(message3), eq(PublishSequenceNumber.of(2))))
+ .thenReturn(ApiFutures.immediateFuture(Offset.of(300)));
+
+ Future future1 = publisher.publish(message1);
+ verify(underlyingPublisher).publish(eq(message1), eq(PublishSequenceNumber.of(0)));
+ Future future2 = publisher.publish(message2);
+ verify(underlyingPublisher).publish(eq(message2), eq(PublishSequenceNumber.of(1)));
+ Future future3 = publisher.publish(message3);
+ verify(underlyingPublisher).publish(eq(message3), eq(PublishSequenceNumber.of(2)));
+
+ assertThat(future1.isDone()).isTrue();
+ assertThat(future1.get()).isEqualTo(Offset.of(100));
+ assertThat(future2.isDone()).isTrue();
+ assertThat(future2.get()).isEqualTo(Offset.of(200));
+ assertThat(future3.isDone()).isTrue();
+ assertThat(future3.get()).isEqualTo(Offset.of(300));
+ }
+
+ @Test
+ public void cancelOutstandingPublishesDelegatesToUnderlying() throws Exception {
+ publisher.cancelOutstandingPublishes();
+ verify(underlyingPublisher).cancelOutstandingPublishes();
+ }
+
+ @Test
+ public void flushDelegatesToUnderlying() throws Exception {
+ publisher.flush();
+ verify(underlyingPublisher).flush();
+ }
+}
diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SerialBatcherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SerialBatcherTest.java
index 347fe93b7..2cc3fc5bd 100755
--- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SerialBatcherTest.java
+++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SerialBatcherTest.java
@@ -17,9 +17,12 @@
package com.google.cloud.pubsublite.internal.wire;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.wire.SerialBatcher.UnbatchedMessage;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.protobuf.ByteString;
@@ -48,10 +51,16 @@ private static List extractMessagesFromBatch(List extractSequenceNumbersFromBatch(
+ List messages) {
+ return messages.stream().map(UnbatchedMessage::sequenceNumber).collect(Collectors.toList());
+ }
+
@Test
public void needsImmediateFlushAtMessageLimit() throws Exception {
- SerialBatcher batcher = new SerialBatcher(/*byteLimit=*/ 10000, /*messageLimit=*/ 1);
- ApiFuture future = batcher.add(PubSubMessage.getDefaultInstance());
+ SerialBatcher batcher = new SerialBatcher(/* byteLimit= */ 10000, /* messageLimit= */ 1);
+ ApiFuture future =
+ batcher.add(PubSubMessage.getDefaultInstance(), PublishSequenceNumber.of(0));
List> batches = batcher.flush();
assertThat(batches).hasSize(1);
List messages = batches.get(0);
@@ -66,55 +75,87 @@ public void needsImmediateFlushAtMessageLimit() throws Exception {
public void moreThanLimitMultipleBatches() throws Exception {
SerialBatcher batcher =
new SerialBatcher(
- /*byteLimit=*/ MESSAGE_1.getSerializedSize() + MESSAGE_2.getSerializedSize(),
- /*messageLimit=*/ 1000);
- batcher.add(MESSAGE_1);
- batcher.add(MESSAGE_2);
- batcher.add(MESSAGE_3);
+ /* byteLimit= */ MESSAGE_1.getSerializedSize() + MESSAGE_2.getSerializedSize(),
+ /* messageLimit= */ 1000);
+ batcher.add(MESSAGE_1, PublishSequenceNumber.of(0));
+ batcher.add(MESSAGE_2, PublishSequenceNumber.of(1));
+ batcher.add(MESSAGE_3, PublishSequenceNumber.of(2));
List> batches = batcher.flush();
assertThat(batches).hasSize(2);
assertThat(extractMessagesFromBatch(batches.get(0))).containsExactly(MESSAGE_1, MESSAGE_2);
assertThat(extractMessagesFromBatch(batches.get(1))).containsExactly(MESSAGE_3);
+ assertThat(extractSequenceNumbersFromBatch(batches.get(0)))
+ .containsExactly(PublishSequenceNumber.of(0), PublishSequenceNumber.of(1));
+ assertThat(extractSequenceNumbersFromBatch(batches.get(1)))
+ .containsExactly(PublishSequenceNumber.of(2));
}
@Test
@SuppressWarnings({"CheckReturnValue", "FutureReturnValueIgnored"})
- public void flushMessageLimit() {
- SerialBatcher batcher = new SerialBatcher(/*byteLimit=*/ 10000, /*messageLimit=*/ 2);
- batcher.add(MESSAGE_1);
- batcher.add(MESSAGE_2);
- batcher.add(MESSAGE_3);
+ public void flushMessageLimit() throws Exception {
+ SerialBatcher batcher = new SerialBatcher(/* byteLimit= */ 10000, /* messageLimit= */ 2);
+ batcher.add(MESSAGE_1, PublishSequenceNumber.of(10));
+ batcher.add(MESSAGE_2, PublishSequenceNumber.of(11));
+ batcher.add(MESSAGE_3, PublishSequenceNumber.of(12));
List> batches = batcher.flush();
- assertThat(batches.size()).isEqualTo(2);
+ assertThat(batches).hasSize(2);
assertThat(extractMessagesFromBatch(batches.get(0))).containsExactly(MESSAGE_1, MESSAGE_2);
assertThat(extractMessagesFromBatch(batches.get(1))).containsExactly(MESSAGE_3);
+ assertThat(extractSequenceNumbersFromBatch(batches.get(0)))
+ .containsExactly(PublishSequenceNumber.of(10), PublishSequenceNumber.of(11));
+ assertThat(extractSequenceNumbersFromBatch(batches.get(1)))
+ .containsExactly(PublishSequenceNumber.of(12));
}
@Test
@SuppressWarnings({"CheckReturnValue", "FutureReturnValueIgnored"})
- public void flushByteLimit() {
+ public void flushByteLimit() throws Exception {
SerialBatcher batcher =
new SerialBatcher(
- /*byteLimit=*/ MESSAGE_1.getSerializedSize() + MESSAGE_2.getSerializedSize() + 1,
- /*messageLimit=*/ 10000);
- batcher.add(MESSAGE_1);
- batcher.add(MESSAGE_2);
- batcher.add(MESSAGE_3);
+ /* byteLimit= */ MESSAGE_1.getSerializedSize() + MESSAGE_2.getSerializedSize() + 1,
+ /* messageLimit= */ 10000);
+ batcher.add(MESSAGE_1, PublishSequenceNumber.of(100));
+ batcher.add(MESSAGE_2, PublishSequenceNumber.of(101));
+ batcher.add(MESSAGE_3, PublishSequenceNumber.of(102));
List> batches = batcher.flush();
- assertThat(batches.size()).isEqualTo(2);
+ assertThat(batches).hasSize(2);
assertThat(extractMessagesFromBatch(batches.get(0))).containsExactly(MESSAGE_1, MESSAGE_2);
assertThat(extractMessagesFromBatch(batches.get(1))).containsExactly(MESSAGE_3);
+ assertThat(extractSequenceNumbersFromBatch(batches.get(0)))
+ .containsExactly(PublishSequenceNumber.of(100), PublishSequenceNumber.of(101));
+ assertThat(extractSequenceNumbersFromBatch(batches.get(1)))
+ .containsExactly(PublishSequenceNumber.of(102));
}
@Test
@SuppressWarnings({"CheckReturnValue", "FutureReturnValueIgnored"})
- public void batchesMessagesAtLimit() {
+ public void batchesMessagesAtLimit() throws Exception {
SerialBatcher batcher =
new SerialBatcher(
- /*byteLimit=*/ MESSAGE_1.getSerializedSize() + MESSAGE_2.getSerializedSize(),
- /*messageLimit=*/ 10000);
- batcher.add(MESSAGE_2);
- batcher.add(MESSAGE_1);
+ /* byteLimit= */ MESSAGE_1.getSerializedSize() + MESSAGE_2.getSerializedSize(),
+ /* messageLimit= */ 10000);
+ batcher.add(MESSAGE_2, PublishSequenceNumber.of(0));
+ batcher.add(MESSAGE_1, PublishSequenceNumber.of(1));
assertThat(extractMessages(batcher.flush())).containsExactly(MESSAGE_2, MESSAGE_1);
}
+
+ @Test
+ @SuppressWarnings({"CheckReturnValue", "FutureReturnValueIgnored"})
+ public void failsSequenceDiscontinuities() throws Exception {
+ SerialBatcher batcher = new SerialBatcher(/* byteLimit= */ 10000, /* messageLimit= */ 10000);
+ batcher.add(MESSAGE_1, PublishSequenceNumber.of(100));
+
+ assertThrows(
+ CheckedApiException.class, () -> batcher.add(MESSAGE_2, PublishSequenceNumber.of(99)));
+ assertThrows(
+ CheckedApiException.class, () -> batcher.add(MESSAGE_2, PublishSequenceNumber.of(100)));
+ assertThrows(
+ CheckedApiException.class, () -> batcher.add(MESSAGE_2, PublishSequenceNumber.of(102)));
+
+ List> batches = batcher.flush();
+ assertThat(batches).hasSize(1);
+ assertThat(extractMessagesFromBatch(batches.get(0))).containsExactly(MESSAGE_1);
+ assertThat(extractSequenceNumbersFromBatch(batches.get(0)))
+ .containsExactly(PublishSequenceNumber.of(100));
+ }
}