Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Publish idempotence #1323

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,27 @@
<className>com/google/cloud/pubsublite/AdminClient</className>
<method>*</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/PublisherSettings$Builder</className>
<method>*</method>
</difference>
<difference>
<differenceType>4001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<to>**</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
<to>**</to>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ public abstract class MessageMetadata {
/** The partition a message was published to. */
public abstract Partition partition();

/** The offset a message was assigned. */
/**
* The offset a message was assigned.
*
* <p>If this MessageMetadata was returned for a publish result and publish idempotence was
* enabled, the offset may be -1 when the message was identified as a duplicate of an already
* successfully published message, but the server did not have sufficient information to return
* the message's offset at publish time. Messages received by subscribers will always have the
* correct offset.
*/
public abstract Offset offset();

/** Construct a MessageMetadata from a Partition and Offset. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.internal.wire.UuidBuilder;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.Optional;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -79,6 +81,12 @@ public abstract class PublisherSettings {
/** Batching settings for this publisher to use. Apply per-partition. */
abstract BatchingSettings batchingSettings();

/**
* Whether to enable publish idempotence, where the server will ensure that unique messages within
* a single publisher session are stored only once. Default true.
*/
abstract boolean enableIdempotence();

/** A provider for credentials. */
abstract CredentialsProvider credentialsProvider();

Expand Down Expand Up @@ -106,6 +114,7 @@ public static Builder newBuilder() {
.setCredentialsProvider(
PublisherServiceSettings.defaultCredentialsProviderBuilder().build())
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
.setEnableIdempotence(true)
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder());
}

Expand All @@ -127,6 +136,12 @@ public abstract Builder setMessageTransformer(
/** Batching settings for this publisher to use. Apply per-partition. */
public abstract Builder setBatchingSettings(BatchingSettings batchingSettings);

/**
* Whether to enable publish idempotence, where the server will ensure that unique messages
* within a single publisher session are stored only once. Default true.
*/
public abstract Builder setEnableIdempotence(boolean enableIdempotence);

/** A provider for credentials. */
public abstract Builder setCredentialsProvider(CredentialsProvider credentialsProvider);

Expand Down Expand Up @@ -163,6 +178,7 @@ private PublisherServiceClient newServiceClient() throws ApiException {

private PartitionPublisherFactory getPartitionPublisherFactory() {
PublisherServiceClient client = newServiceClient();
ByteString publisherClientId = UuidBuilder.toByteString(UuidBuilder.generate());
return new PartitionPublisherFactory() {
@Override
public com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> newPublisher(
Expand All @@ -180,6 +196,9 @@ public com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> newPublis
RoutingMetadata.of(topicPath(), partition));
return client.publishCallable().splitCall(responseStream, context);
});
if (enableIdempotence()) {
singlePartitionBuilder.setClientId(publisherClientId);
}
return singlePartitionBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;

import com.google.auto.value.AutoValue;
import java.io.Serializable;

/** A sequence number for a published message, for implementing publish idempotency. */
@AutoValue
public abstract class PublishSequenceNumber implements Serializable {

/** Create a publish sequence number from its long value. */
public static PublishSequenceNumber of(long sequenceNumber) {
return new AutoValue_PublishSequenceNumber(sequenceNumber);
}

/** The sequence number that should be set for the first message in a publisher session. */
public static final PublishSequenceNumber FIRST = PublishSequenceNumber.of(0);

/** Returns the next sequence number that follows the current. */
public PublishSequenceNumber next() {
return PublishSequenceNumber.of(value() + 1);
}

/** The long value of this publish sequence number. */
public abstract long value();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Message;
import java.io.Flushable;

/**
* A PubSub Lite publisher that requires a sequence number assigned to every message, for publish
* idempotency. Errors are handled out of band. Thread safe.
*/
public interface SequencedPublisher<ResponseT> extends ApiService, Flushable {
/**
* Publish a new message with an assigned sequence number.
*
* <p>Behavior is undefined if a call to flush() is outstanding or close() has already been
* called. This method never blocks.
*
* <p>Guarantees that if a single publish future has an exception set, all publish calls made
* after that will also have an exception set.
*/
ApiFuture<ResponseT> publish(Message message, PublishSequenceNumber sequenceNumber);

/** Attempts to cancel all outstanding publishes. */
void cancelOutstandingPublishes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient;
import com.google.common.flogger.GoogleLogger;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
import java.util.UUID;

@AutoValue
Expand All @@ -39,7 +37,7 @@ public abstract class AssignerSettings {
abstract UUID uuid();

public static Builder newBuilder() {
return new AutoValue_AssignerSettings.Builder().setUuid(UUID.randomUUID());
return new AutoValue_AssignerSettings.Builder().setUuid(UuidBuilder.generate());
}

@AutoValue.Builder
Expand All @@ -58,16 +56,13 @@ public abstract static class Builder {
}

public Assigner instantiate() {
ByteBuffer uuidBuffer = ByteBuffer.allocate(16);
uuidBuffer.putLong(uuid().getMostSignificantBits());
uuidBuffer.putLong(uuid().getLeastSignificantBits());
logger.atInfo().log(
"Subscription %s using UUID %s for assignment.", subscriptionPath(), uuid());

InitialPartitionAssignmentRequest initial =
InitialPartitionAssignmentRequest.newBuilder()
.setSubscription(subscriptionPath().toString())
.setClientId(ByteString.copyFrom(uuidBuffer.array()))
.setClientId(UuidBuilder.toByteString(uuid()))
.build();
return new AssignerImpl(serviceClient(), initial, receiver());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.util.Collection;

interface BatchPublisher extends AutoCloseable {
/** Publish the batch of messages. Failures are communicated out of band. */
void publish(Collection<PubSubMessage> messages);
/**
* Publish the batch of messages, with the given sequence number of the first message in the
* batch. Failures are communicated out of band.
*/
void publish(Collection<PubSubMessage> messages, PublishSequenceNumber firstSequenceNumber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;

interface BatchPublisherFactory
extends SingleConnectionFactory<PublishRequest, PublishResponse, Offset, BatchPublisher> {}
extends SingleConnectionFactory<
PublishRequest, PublishResponse, MessagePublishResponse, BatchPublisher> {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,52 @@
import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;

import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
import com.google.cloud.pubsublite.proto.MessagePublishRequest;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Collection;
import java.util.Optional;

class BatchPublisherImpl extends SingleConnection<PublishRequest, PublishResponse, Offset>
class BatchPublisherImpl
extends SingleConnection<PublishRequest, PublishResponse, MessagePublishResponse>
implements BatchPublisher {
private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
private Optional<Offset> lastOffset = Optional.empty();

static class Factory implements BatchPublisherFactory {
@Override
public BatchPublisherImpl New(
StreamFactory<PublishRequest, PublishResponse> streamFactory,
ResponseObserver<Offset> clientStream,
ResponseObserver<MessagePublishResponse> clientStream,
PublishRequest initialRequest) {
return new BatchPublisherImpl(streamFactory::New, clientStream, initialRequest);
}
}

private final boolean sendSequenceNumbers;

private BatchPublisherImpl(
PublishStreamFactory streamFactory,
ResponseObserver<Offset> publishCompleteStream,
ResponseObserver<MessagePublishResponse> publishCompleteStream,
PublishRequest initialRequest) {
super(streamFactory, publishCompleteStream);
initialize(initialRequest);

// Publish idempotency is enabled when a publisher client id is specified. Otherwise do not send
// sequence numbers to the stream.
this.sendSequenceNumbers = !initialRequest.getInitialRequest().getClientId().isEmpty();
}

@Override
public void publish(Collection<PubSubMessage> messages) {
public void publish(
Collection<PubSubMessage> messages, PublishSequenceNumber firstSequenceNumber) {
PublishRequest.Builder builder = PublishRequest.newBuilder();
builder.getMessagePublishRequestBuilder().addAllMessages(messages);
MessagePublishRequest.Builder publishRequestBuilder = builder.getMessagePublishRequestBuilder();
publishRequestBuilder.addAllMessages(messages);
if (sendSequenceNumbers) {
publishRequestBuilder.setFirstSequenceNumber(firstSequenceNumber.value());
}
sendToStream(builder.build());
}

Expand All @@ -77,18 +81,6 @@ protected void handleStreamResponse(PublishResponse response) throws CheckedApiE
checkState(
response.hasMessageResponse(),
"Received response on stream which was neither a message or initial response.");
onMessageResponse(response.getMessageResponse());
}

private void onMessageResponse(MessagePublishResponse response) throws CheckedApiException {
Offset offset = Offset.of(response.getStartCursor().getOffset());
try (CloseableMonitor.Hold h = monitor.enter()) {
if (lastOffset.isPresent() && offset.value() <= lastOffset.get().value()) {
throw new CheckedApiException(
"Received out of order offsets on stream.", Code.FAILED_PRECONDITION);
}
lastOffset = Optional.of(offset);
}
sendToClient(offset);
sendToClient(response.getMessageResponse());
}
}
Loading