Skip to content

Commit

Permalink
fix: Use message ordering enabled property that comes with streaming …
Browse files Browse the repository at this point in the history
…pull responses (googleapis#1851)

* samples: schema evolution

* samples: schema evolution

* Format fixes

* Fix documentation for field.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add back in working asserts

* Formatting fixes

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Version/delete fixes

* samples: schema evolution

* samples: schema evolution

* Format fixes

* Fix documentation for field.

* Add back in working asserts

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Formatting fixes

* Version/delete fixes

* samples: Schema evolution (googleapis#1499)

* samples: schema evolution

* samples: schema evolution

* Format fixes

* Fix documentation for field.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add back in working asserts

* Formatting fixes

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Version/delete fixes

* samples: schema evolution

* samples: schema evolution

* Format fixes

* Fix documentation for field.

* Add back in working asserts

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Formatting fixes

* Version/delete fixes

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>

* Minor fixes for comments

* samples: Schema evolution (googleapis#1499)

* samples: schema evolution

* samples: schema evolution

* Format fixes

* Fix documentation for field.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add back in working asserts

* Formatting fixes

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Version/delete fixes

* samples: schema evolution

* samples: schema evolution

* Format fixes

* Fix documentation for field.

* Add back in working asserts

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Formatting fixes

* Version/delete fixes

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>

* Fix rollback example

* Formatting

* Formatting and wording fixes

* Add new schemas to test directory

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Samples: Fix exception handling

* fix: Set x-goog-request-params for streaming pull request

* Revert "fix: Set x-goog-request-params for streaming pull request"

This reverts commit 3185a3e.

* Revert "Revert "fix: Set x-goog-request-params for streaming pull request""

This reverts commit 3b1f4d9.

* Thread example

* Add examples for limited and unlimited exeuctors

* Add back missing semicolon

* Revert changes to original async example

* Revert changes to original async example

* Add examples of different threading models

* Make variables final to conform to style.

* Fix catches

* Fix ids

* Fix naming

* Revert "Merge pull request googleapis#2 from kamalaboulhosn/ML_experiments"

This reverts commit 5a435fa, reversing
changes made to c3a5725.

* Set blunderbuss config to auto-assign issues and PRs

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: Swap writer and reader schema to correct places in sample

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: Use message ordering enabled property that comes with streaming pull responses so that messages are only delivered to the callback one at a time in order when ordering is actually enabled

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
kamalaboulhosn and gcf-owl-bot[bot] authored Jan 5, 2024
1 parent ca693b1 commit d816138
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class MessageDispatcher {
private final FlowController flowController;

private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false);
private AtomicBoolean messageOrderingEnabled = new AtomicBoolean(false);

private final Waiter messagesWaiter;

Expand Down Expand Up @@ -343,6 +344,11 @@ void setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
}
}

@InternalApi
void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
this.messageOrderingEnabled.set(messageOrderingEnabled);
}

private static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;
Expand Down Expand Up @@ -506,7 +512,7 @@ public void run() {
}
}
};
if (message.getOrderingKey().isEmpty()) {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,12 @@ public void onResponse(StreamingPullResponse response) {

boolean exactlyOnceDeliveryEnabledResponse =
response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled();
boolean messageOrderingEnabledResponse =
response.getSubscriptionProperties().getMessageOrderingEnabled();

setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
messageDispatcher.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
messageDispatcher.setMessageOrderingEnabled(messageOrderingEnabledResponse);
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());

// Only request more if we're not shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,41 @@
import java.util.concurrent.*;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.threeten.bp.Duration;

public class MessageDispatcherTest {
private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data");
private static final int DELIVERY_INFO_COUNT = 3;
private static final String ACK_ID = "ACK-ID";
private static final String ORDERING_KEY = "KEY";
private static final ReceivedMessage TEST_MESSAGE =
ReceivedMessage.newBuilder()
.setAckId(ACK_ID)
.setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build())
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
.build();
private static final ByteString ORDERED_MESSAGE_DATA_1 = ByteString.copyFromUtf8("message-data1");
private static final ReceivedMessage ORDERED_TEST_MESSAGE_1 =
ReceivedMessage.newBuilder()
.setAckId("ACK-ID-1")
.setMessage(
PubsubMessage.newBuilder()
.setData(ORDERED_MESSAGE_DATA_1)
.setOrderingKey(ORDERING_KEY)
.build())
.build();
private static final ByteString ORDERED_MESSAGE_DATA_2 = ByteString.copyFromUtf8("message-data2");
private static final ReceivedMessage ORDERED_TEST_MESSAGE_2 =
ReceivedMessage.newBuilder()
.setAckId("ACK-ID-2")
.setMessage(
PubsubMessage.newBuilder()
.setData(ORDERED_MESSAGE_DATA_2)
.setOrderingKey(ORDERING_KEY)
.build())
.build();
private static final int MAX_SECONDS_PER_ACK_EXTENSION = 60;
private static final int MIN_ACK_DEADLINE_SECONDS = 10;
private static final Duration MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
Expand Down Expand Up @@ -494,6 +517,84 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryEnabledThenDisabled() {
Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()));
}

@Test
public void testOrderedDeliveryOrderingDisabled() throws Exception {
MessageReceiver mockMessageReceiver = mock(MessageReceiver.class);
MessageDispatcher messageDispatcher =
getMessageDispatcher(mockMessageReceiver, Executors.newFixedThreadPool(5));

// This would normally be set from the streaming pull response in the
// StreamingSubscriberConnection
messageDispatcher.setMessageOrderingEnabled(false);

CountDownLatch receiveCalls = new CountDownLatch(2);

doAnswer(
new Answer<Void>() {
public Void answer(InvocationOnMock invocation) throws Exception {
Thread.sleep(1000);
receiveCalls.countDown();
return null;
}
})
.when(mockMessageReceiver)
.receiveMessage(eq(ORDERED_TEST_MESSAGE_1.getMessage()), any(AckReplyConsumer.class));
doAnswer(
new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
// Ensure the previous method didn't finish and we could process in parallel.
assertEquals(2, receiveCalls.getCount());
receiveCalls.countDown();
return null;
}
})
.when(mockMessageReceiver)
.receiveMessage(eq(ORDERED_TEST_MESSAGE_2.getMessage()), any(AckReplyConsumer.class));

messageDispatcher.processReceivedMessages(
Arrays.asList(ORDERED_TEST_MESSAGE_1, ORDERED_TEST_MESSAGE_2));
receiveCalls.await();
}

@Test
public void testOrderedDeliveryOrderingEnabled() throws Exception {
MessageReceiver mockMessageReceiver = mock(MessageReceiver.class);
MessageDispatcher messageDispatcher =
getMessageDispatcher(mockMessageReceiver, Executors.newFixedThreadPool(5));

// This would normally be set from the streaming pull response in the
// StreamingSubscriberConnection
messageDispatcher.setMessageOrderingEnabled(true);

CountDownLatch receiveCalls = new CountDownLatch(2);

doAnswer(
new Answer<Void>() {
public Void answer(InvocationOnMock invocation) throws Exception {
Thread.sleep(1000);
receiveCalls.countDown();
return null;
}
})
.when(mockMessageReceiver)
.receiveMessage(eq(ORDERED_TEST_MESSAGE_1.getMessage()), any(AckReplyConsumer.class));
doAnswer(
new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
// Ensure the previous method has finished completely.
assertEquals(1, receiveCalls.getCount());
receiveCalls.countDown();
return null;
}
})
.when(mockMessageReceiver)
.receiveMessage(eq(ORDERED_TEST_MESSAGE_2.getMessage()), any(AckReplyConsumer.class));

messageDispatcher.processReceivedMessages(
Arrays.asList(ORDERED_TEST_MESSAGE_1, ORDERED_TEST_MESSAGE_2));
receiveCalls.await();
}

@Test
public void testAckExtensionCustomMinExactlyOnceDeliveryDisabledThenEnabled() {
int customMinSeconds = 30;
Expand Down Expand Up @@ -569,20 +670,28 @@ private void assertMinAndMaxAckDeadlines(
}

private MessageDispatcher getMessageDispatcher() {
return getMessageDispatcher(mock(MessageReceiver.class));
return getMessageDispatcher(mock(MessageReceiver.class), MoreExecutors.directExecutor());
}

private MessageDispatcher getMessageDispatcher(MessageReceiver messageReceiver) {
return getMessageDispatcherFromBuilder(MessageDispatcher.newBuilder(messageReceiver));
return getMessageDispatcherFromBuilder(
MessageDispatcher.newBuilder(messageReceiver), MoreExecutors.directExecutor());
}

private MessageDispatcher getMessageDispatcher(
MessageReceiver messageReceiver, Executor executor) {
return getMessageDispatcherFromBuilder(MessageDispatcher.newBuilder(messageReceiver), executor);
}

private MessageDispatcher getMessageDispatcher(
MessageReceiverWithAckResponse messageReceiverWithAckResponse) {
return getMessageDispatcherFromBuilder(
MessageDispatcher.newBuilder(messageReceiverWithAckResponse));
MessageDispatcher.newBuilder(messageReceiverWithAckResponse),
MoreExecutors.directExecutor());
}

private MessageDispatcher getMessageDispatcherFromBuilder(MessageDispatcher.Builder builder) {
private MessageDispatcher getMessageDispatcherFromBuilder(
MessageDispatcher.Builder builder, Executor executor) {
MessageDispatcher messageDispatcher =
builder
.setAckProcessor(mockAckProcessor)
Expand All @@ -594,7 +703,7 @@ private MessageDispatcher getMessageDispatcherFromBuilder(MessageDispatcher.Buil
.setMaxDurationPerAckExtensionDefaultUsed(true)
.setAckLatencyDistribution(mock(Distribution.class))
.setFlowController(mock(FlowController.class))
.setExecutor(MoreExecutors.directExecutor())
.setExecutor(executor)
.setSystemExecutor(systemExecutor)
.setApiClock(clock)
.build();
Expand Down

0 comments on commit d816138

Please sign in to comment.