Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Reactive support for Pub/Sub subscription (#1461)
Browse files Browse the repository at this point in the history
The polling Pub/Sub flux is demand-sensitive, implementing the pull/push strategy.
Unbounded/bounded demand is treated differently:
* For unlimited demand, the Pub/Sub subscription will be polled regularly, at intervals determined by `pollingPeriodMs` parameter passed in when creating the `Flux`.
* For bounded demand, as many messages as possible (up to the requested number) are delivered immediately, with the remaining messages delivered as they become available.
  • Loading branch information
elefeint authored Mar 22, 2019
1 parent 72d7464 commit 8df2a6b
Show file tree
Hide file tree
Showing 14 changed files with 643 additions and 3 deletions.
39 changes: 38 additions & 1 deletion docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ This starter is also available from https://start.spring.io[Spring Initializr] t

=== Sample

A https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-sample[sample application] is available.
Sample applications for https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-sample[using the template] and https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-reactive-sample[using a subscription-backed reactive stream] are available.

=== Pub/Sub Operations & Template

Expand Down Expand Up @@ -150,6 +150,43 @@ include::../../test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/P

Please refer to our https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-integration-pubsub-json-sample[Pub/Sub JSON Payload Sample App] as a reference for using this functionality.

=== Reactive Stream Subscription

It is also possible to acquire a reactive stream backed by a subscription.
To do so, a Project Reactor dependency (`io.projectreactor:reactor-core`) must be added to the project.
The combination of the Pub/Sub starter and the Project Reactor dependencies will then make a `PubSubReactiveFactory` bean available, which can then be used to get a `Publisher`.

[source,java]
----
@Autowired
PubSubReactiveFactory reactiveFactory;
// ...
Flux<AcknowledgeablePubsubMessage> flux
= reactiveFactory.createPolledFlux("exampleSubscription", 1000);
----

The `Flux` then represents an infinite stream of GCP Pub/Sub messages coming in through the specified subscription.
For unlimited demand, the Pub/Sub subscription will be polled regularly, at intervals determined by `pollingPeriodMs` parameter passed in when creating the `Flux`.
For bounded demand, the `pollingPeriodMs` parameter is unused.
Instead, as many messages as possible (up to the requested number) are delivered immediately, with the remaining messages delivered as they become available.

The full range of Project Reactor operations can be applied to the stream.
For example, if you only want to fetch 5 messages, you can use `limitRequest` operation to turn the infinite stream into a finite one:

[source,java]
----
Flux<AcknowledgeablePubsubMessage> fiveMessageFlux = flux.limitRequest(5);
----

Acknowledging messages flowing through the `Flux` should be manually acknowledged.

[source,java]
----
flux.doOnNext(AcknowledgeablePubsubMessage::ack);
----

=== Pub/Sub management

`PubSubAdmin` is the abstraction provided by Spring Cloud GCP to manage Google Cloud Pub/Sub resources.
Expand Down
6 changes: 6 additions & 0 deletions spring-cloud-gcp-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<optional>true</optional>
</dependency>

<!-- Cloud SQL -->
<dependency>
<groupId>org.springframework</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,4 +381,5 @@ public SubscriptionAdminClient subscriptionAdminClient(
public TransportChannelProvider transportChannelProvider() {
return InstantiatingGrpcChannelProvider.newBuilder().build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gcp.autoconfigure.pubsub;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate;
import org.springframework.cloud.gcp.pubsub.reactive.PubSubReactiveFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Reactive Pub/Sub support autoconfiguration.
*
* @author Elena Felder
*
* @since 1.2
*/
@Configuration
@AutoConfigureAfter(GcpPubSubAutoConfiguration.class)
@ConditionalOnClass(Flux.class)
@ConditionalOnProperty(value = "spring.cloud.gcp.pubsub.reactive.enabled", matchIfMissing = true)
public class GcpPubSubReactiveAutoConfiguration {

@Bean
@ConditionalOnMissingBean(name = "pubSubReactiveScheduler")
Scheduler pubSubReactiveScheduler() {
return Schedulers.elastic();
}

@Bean
@ConditionalOnMissingBean
public PubSubReactiveFactory pubSubReactiveFactory(
PubSubSubscriberTemplate subscriberTemplate, @Qualifier("pubSubReactiveScheduler") Scheduler scheduler) {
return new PubSubReactiveFactory(subscriberTemplate, scheduler);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ org.springframework.cloud.gcp.autoconfigure.pubsub.GcpPubSubEmulatorConfiguratio
org.springframework.cloud.gcp.autoconfigure.core.GcpContextAutoConfiguration,\
org.springframework.cloud.gcp.autoconfigure.logging.StackdriverLoggingAutoConfiguration,\
org.springframework.cloud.gcp.autoconfigure.pubsub.GcpPubSubAutoConfiguration,\
org.springframework.cloud.gcp.autoconfigure.pubsub.GcpPubSubReactiveAutoConfiguration,\
org.springframework.cloud.gcp.autoconfigure.spanner.GcpSpannerAutoConfiguration,\
org.springframework.cloud.gcp.autoconfigure.datastore.GcpDatastoreAutoConfiguration,\
org.springframework.cloud.gcp.autoconfigure.datastore.health.DatastoreHealthIndicatorAutoConfiguration,\
Expand Down
11 changes: 9 additions & 2 deletions spring-cloud-gcp-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
<artifactId>spring-integration-core</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand All @@ -34,8 +41,8 @@

<!-- Tests -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gcp.pubsub.reactive;

import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.api.gax.rpc.DeadlineExceededException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;

import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations;
import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage;
import org.springframework.util.Assert;

/**
* A factory for procuring {@link Flux} instances backed by GCP Pub/Sub Subscriptions.
*
* @author Elena Felder
*
* @since 1.2
*/
public final class PubSubReactiveFactory {

private static final Log LOGGER = LogFactory.getLog(PubSubReactiveFactory.class);

private final PubSubSubscriberOperations subscriberOperations;

private final Scheduler scheduler;

/**
* Instantiate `PubSubReactiveFactory` capable of generating subscription-based streams.
* @param subscriberOperations template for interacting with GCP Pub/Sub subscriber operations.
* @param scheduler scheduler to use for asynchronously retrieving Pub/Sub messages.
*/
public PubSubReactiveFactory(PubSubSubscriberOperations subscriberOperations, Scheduler scheduler) {
Assert.notNull(subscriberOperations, "subscriberOperations cannot be null.");
Assert.notNull(scheduler, "scheduler cannot be null.");
this.subscriberOperations = subscriberOperations;
this.scheduler = scheduler;
}

/**
* Create an infinite stream {@link Flux} of {@link AcknowledgeablePubsubMessage} objects.
* <p>The {@link Flux} respects backpressure by using of Pub/Sub Synchronous Pull to retrieve
* batches of up to the requested number of messages until the full demand is fulfilled
* or subscription terminated.
* <p>For unlimited demand, the underlying subscription will be polled at a regular interval,
* requesting up to {@code Integer.MAX_VALUE} messages at each poll.
* <p>For specific demand, as many messages as are available will be returned immediately,
* with remaining demand being fulfilled in the future.
* Pub/Sub timeout will cause a retry with the same demand.
* @param subscriptionName subscription from which to retrieve messages.
* @param pollingPeriodMs how frequently to poll the source subscription in case of unlimited demand, in milliseconds.
* @return infinite stream of {@link AcknowledgeablePubsubMessage} objects.
*/
public Flux<AcknowledgeablePubsubMessage> poll(String subscriptionName, long pollingPeriodMs) {

return Flux.create(sink -> {

Scheduler.Worker subscriptionWorker = this.scheduler.createWorker();

sink.onRequest((numRequested) -> {
if (numRequested == Long.MAX_VALUE) {
// unlimited demand
subscriptionWorker.schedulePeriodically(
new NonBlockingUnlimitedDemandPullTask(subscriptionName, sink), 0, pollingPeriodMs, TimeUnit.MILLISECONDS);
}
else {
subscriptionWorker.schedule(new BlockingLimitedDemandPullTask(subscriptionName, numRequested, sink));
}
});

sink.onCancel(subscriptionWorker);

});
}

private abstract class PubSubPullTask implements Runnable {

protected final String subscriptionName;

protected final FluxSink<AcknowledgeablePubsubMessage> sink;

PubSubPullTask(String subscriptionName, FluxSink<AcknowledgeablePubsubMessage> sink) {
this.subscriptionName = subscriptionName;
this.sink = sink;
}

/**
* Retrieve up to a specified number of messages, sending them to the subscription.
* @param demand maximum number of messages to retrieve
* @param block whether to wait for the first message to become available
* @return number of messages retrieved
*/
protected int pullToSink(int demand, boolean block) {

List<AcknowledgeablePubsubMessage> messages =
PubSubReactiveFactory.this.subscriberOperations.pull(this.subscriptionName, demand, !block);

if (!this.sink.isCancelled()) {
messages.forEach(sink::next);
}

return messages.size();
}

}

/**
* Runnable task issuing blocking Pub/Sub Pull requests until the specified number of
* messages has been retrieved.
*/
private class BlockingLimitedDemandPullTask extends PubSubPullTask {

private final long initialDemand;

BlockingLimitedDemandPullTask(String subscriptionName, long initialDemand, FluxSink<AcknowledgeablePubsubMessage> sink) {
super(subscriptionName, sink);
this.initialDemand = initialDemand;
}

@Override
public void run() {
long demand = this.initialDemand;
List<AcknowledgeablePubsubMessage> messages;

while (demand > 0 && !this.sink.isCancelled()) {
try {
int intDemand = demand > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) demand;
demand -= pullToSink(intDemand, true);
}
catch (DeadlineExceededException e) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Blocking pull timed out due to empty subscription "
+ this.subscriptionName
+ "; retrying.");
}
}
}
}

}

/**
* Runnable task issuing a single Pub/Sub Pull request for all available messages.
* Terminates immediately if no messages are available.
*/
private class NonBlockingUnlimitedDemandPullTask extends PubSubPullTask {

NonBlockingUnlimitedDemandPullTask(String subscriptionName, FluxSink<AcknowledgeablePubsubMessage> sink) {
super(subscriptionName, sink);
}

@Override
public void run() {
pullToSink(Integer.MAX_VALUE, false);
}

}

}
Loading

0 comments on commit 8df2a6b

Please sign in to comment.