From 968c05851a7d094bcaab63fb7ec2af77150e690e Mon Sep 17 00:00:00 2001 From: kimkyung-goog Date: Tue, 26 Feb 2019 08:32:05 -0500 Subject: [PATCH] Change the order of publishing batches when a large message is requested; Add unit tests --- .../com/google/cloud/pubsub/v1/Publisher.java | 4 +-- .../cloud/pubsub/v1/PublisherImplTest.java | 33 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 3673efd6a593..a49f5a96828a 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -269,8 +269,8 @@ && hasBatchingBytes() if (batchToSend != null) { logger.log(Level.FINER, "Scheduling a batch for immediate sending."); - publishOutstandingBatch(batchToSend); publishAllOutstanding(); + publishOutstandingBatch(batchToSend); } // If the message is over the size limit, it was not added to the pending messages and it will @@ -278,9 +278,9 @@ && hasBatchingBytes() if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) { logger.log( Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send."); + publishAllOutstanding(); publishOutstandingBatch( new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize, orderingKey)); - publishAllOutstanding(); } return publishResult; diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 3d0557e97719..8f60719c59fa 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -338,6 +338,39 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { publisher.shutdown(); } + @Test + public void testLargeMessagesDoNotReorderBatches() throws Exception { + // Set the maximum batching size to 20 bytes. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(10L) + .setRequestByteThreshold(20L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + + ApiFuture publishFuture3 = + sendTestMessageWithOrderingKey(publisher, "VeryLargeMessage", "OrderB"); + // Verify that messages with "OrderB" were delivered in order. + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture3.get())); + + assertTrue(publishFuture1.isDone()); + assertTrue(publishFuture2.isDone()); + assertTrue(publishFuture3.isDone()); + + publisher.shutdown(); + } + @Test public void testOrderingKeyWhenDisabled_throwsException() throws Exception { // Message ordering is disabled by default.