From 973b7ad3e65bd0d6608af9defaab18ac0b057e3a Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 18 Jun 2024 22:53:11 +0000 Subject: [PATCH] Add ordering key size validation to validatePubsubMessageSize --- .../io/gcp/pubsub/PreparePubsubWriteDoFn.java | 14 +++++++ .../pubsub/PreparePubsubWriteDoFnTest.java | 39 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java index 47033451ab89..dde37138d21a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java @@ -63,6 +63,20 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS } int totalSize = payloadSize; + @Nullable String orderingKey = message.getOrderingKey(); + if (orderingKey != null) { + int orderingKeySize = orderingKey.getBytes(StandardCharsets.UTF_8).length; + if (orderingKeySize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES) { + throw new SizeLimitExceededException( + "Pubsub message ordering key of length " + + orderingKeySize + + " exceeds maximum of " + + PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES + + " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits"); + } + totalSize += orderingKeySize; + } + @Nullable Map attributes = message.getAttributeMap(); if (attributes != null) { if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java index 494189d43f36..a125a7b67e69 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java @@ -44,6 +44,19 @@ public void testValidatePubsubMessageSizeOnlyPayload() throws SizeLimitExceededE assertEquals(data.length, messageSize); } + @Test + public void testValidatePubsubMessageSizePayloadAndOrderingKey() + throws SizeLimitExceededException { + byte[] data = new byte[1024]; + String orderingKey = "key"; + PubsubMessage message = new PubsubMessage(data, null, null, orderingKey); + + int messageSize = + PreparePubsubWriteDoFn.validatePubsubMessageSize(message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE); + + assertEquals(data.length + orderingKey.getBytes(StandardCharsets.UTF_8).length, messageSize); + } + @Test public void testValidatePubsubMessageSizePayloadAndAttributes() throws SizeLimitExceededException { @@ -76,6 +89,19 @@ public void testValidatePubsubMessageSizePayloadTooLarge() { message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); } + @Test + public void testValidatePubsubMessageSizePayloadPlusOrderingKeyTooLarge() { + byte[] data = new byte[(10 << 20)]; + String orderingKey = "key"; + PubsubMessage message = new PubsubMessage(data, null, null, orderingKey); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + @Test public void testValidatePubsubMessageSizePayloadPlusAttributesTooLarge() { byte[] data = new byte[(10 << 20)]; @@ -121,6 +147,19 @@ public void testValidatePubsubMessageSizeAttributeValueTooLarge() { message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); } + @Test + public void testValidatePubsubMessageSizeOrderingKeyTooLarge() { + byte[] data = new byte[1024]; + String orderingKey = RandomStringUtils.randomAscii(1025); + PubsubMessage message = new PubsubMessage(data, null, null, orderingKey); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + @Test public void testValidatePubsubMessagePayloadTooLarge() { byte[] data = new byte[(10 << 20) + 1];