diff --git a/CHANGES.md b/CHANGES.md index 92b578d16c85..ad93b497e549 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -120,6 +120,7 @@ ## New Features / Improvements * Previously available in Java sdk, Python sdk now also supports logging level overrides per module. ([#18222](https://github.com/apache/beam/issues/18222)). +* Added support for accessing GCP PubSub Message ordering keys (Java) ([BEAM-13592](https://issues.apache.org/jira/browse/BEAM-13592)) ## Breaking Changes diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index 8af2971e4f22..48d647779917 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -49,6 +49,13 @@ public interface PubsubClientFactory extends Serializable { * {@code timestampAttribute} and {@code idAttribute} to store custom timestamps/ids within * message metadata. */ + PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException; + PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) throws IOException; @@ -318,6 +325,9 @@ public static OutgoingMessage of( if (message.getAttributeMap() != null) { builder.putAllAttributes(message.getAttributeMap()); } + if (message.getOrderingKey() != null) { + builder.setOrderingKey(message.getOrderingKey()); + } return of(builder.build(), timestampMsSinceEpoch, recordId); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java index c10c60d717a5..dacd5b6ebe58 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java @@ -37,6 +37,9 @@ public List getCoderProviders() { TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithMessageIdCoder.of()), CoderProviders.forCoder( TypeDescriptor.of(PubsubMessage.class), - PubsubMessageWithAttributesAndMessageIdCoder.of())); + PubsubMessageWithAttributesAndMessageIdCoder.of()), + CoderProviders.forCoder( + TypeDescriptor.of(PubsubMessage.class), + PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index f70d30a55d63..6db088faccdd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -57,6 +57,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; @@ -84,11 +85,22 @@ private static class PubsubGrpcClientFactory implements PubsubClientFactory { public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) throws IOException { + + return newClient(timestampAttribute, idAttribute, options, null); + } + + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + String rootUrlOverride) + throws IOException { return new PubsubGrpcClient( timestampAttribute, idAttribute, DEFAULT_TIMEOUT_S, - channelForRootUrl(options.getPubsubRootUrl()), + channelForRootUrl(MoreObjects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())), options.getGcpCredential()); } @@ -190,7 +202,8 @@ private SubscriberBlockingStub subscriberStub() throws IOException { public int publish(TopicPath topic, List outgoingMessages) throws IOException { PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath()); for (OutgoingMessage outgoingMessage : outgoingMessages) { - PubsubMessage.Builder message = outgoingMessage.message().toBuilder(); + PubsubMessage.Builder message = + outgoingMessage.message().toBuilder().clearMessageId().clearPublishTime(); if (timestampAttribute != null) { message.putAttributes( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index a15dda36152f..a95b456aa52b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -556,6 +556,19 @@ public static Read readMessagesWithAttributesAndMessageId() { .build(); } + /** + * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The + * messages will contain a {@link PubsubMessage#getPayload() payload}, {@link + * PubsubMessage#getAttributeMap() attributes}, along with the {@link PubsubMessage#getMessageId() + * messageId} and {PubsubMessage#getOrderingKey() orderingKey} from PubSub. + */ + public static Read readMessagesWithAttributesAndMessageIdAndOrderingKey() { + return Read.newBuilder() + .setCoder(PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of()) + .setNeedsOrderingKey(true) + .build(); + } + /** * Returns A {@link PTransform} that continuously reads UTF-8 encoded strings from a Google Cloud * Pub/Sub stream. @@ -767,6 +780,8 @@ public abstract static class Read extends PTransform> abstract boolean getNeedsMessageId(); + abstract boolean getNeedsOrderingKey(); + abstract Builder toBuilder(); static Builder newBuilder(SerializableFunction parseFn) { @@ -775,6 +790,7 @@ static Builder newBuilder(SerializableFunction parseFn) builder.setPubsubClientFactory(FACTORY); builder.setNeedsAttributes(false); builder.setNeedsMessageId(false); + builder.setNeedsOrderingKey(false); return builder; } @@ -814,6 +830,8 @@ abstract static class Builder { abstract Builder setNeedsMessageId(boolean needsMessageId); + abstract Builder setNeedsOrderingKey(boolean needsOrderingKey); + abstract Builder setClock(Clock clock); abstract Read build(); @@ -1021,7 +1039,8 @@ public PCollection expand(PBegin input) { getTimestampAttribute(), getIdAttribute(), getNeedsAttributes(), - getNeedsMessageId()); + getNeedsMessageId(), + getNeedsOrderingKey()); PCollection read; PCollection preParse = input.apply(source); @@ -1126,6 +1145,8 @@ public abstract static class Write extends PTransform, PDone> /** The format function for input PubsubMessage objects. */ abstract SerializableFunction getFormatFn(); + abstract @Nullable String getPubsubRootUrl(); + abstract Builder toBuilder(); static Builder newBuilder(SerializableFunction formatFn) { @@ -1155,6 +1176,8 @@ abstract static class Builder { abstract Builder setFormatFn(SerializableFunction formatFn); + abstract Builder setPubsubRootUrl(String pubsubRootUrl); + abstract Write build(); } @@ -1234,6 +1257,10 @@ public Write withIdAttribute(String idAttribute) { return toBuilder().setIdAttribute(idAttribute).build(); } + public Write withPubsubRootUrl(String pubsubRootUrl) { + return toBuilder().setPubsubRootUrl(pubsubRootUrl).build(); + } + @Override public PDone expand(PCollection input) { if (getTopicProvider() == null) { @@ -1273,8 +1300,8 @@ public PDone expand(PCollection input) { MoreObjects.firstNonNull( getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), MoreObjects.firstNonNull( - getMaxBatchBytesSize(), - PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES))); + getMaxBatchBytesSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES), + getPubsubRootUrl())); } throw new RuntimeException(); // cases are exhaustive. } @@ -1341,16 +1368,19 @@ public void processElement(ProcessContext c) throws IOException, SizeLimitExceed byte[] payload = message.getPayload(); Map attributes = message.getAttributeMap(); + String orderingKey = message.getOrderingKey(); + + com.google.pubsub.v1.PubsubMessage.Builder msgBuilder = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(payload)) + .putAllAttributes(attributes); + + if (orderingKey != null) { + msgBuilder.setOrderingKey(orderingKey); + } // NOTE: The record id is always null. - output.add( - OutgoingMessage.of( - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(payload)) - .putAllAttributes(attributes) - .build(), - c.timestamp().getMillis(), - null)); + output.add(OutgoingMessage.of(msgBuilder.build(), c.timestamp().getMillis(), null)); currentOutputBytes += messageSize; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index d33ebf8917d4..9a008041fc68 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; @@ -73,6 +74,17 @@ private static HttpRequestInitializer chainHttpRequestInitializer( public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) throws IOException { + + return newClient(timestampAttribute, idAttribute, options, null); + } + + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + String rootUrlOverride) + throws IOException { Pubsub pubsub = new Pubsub.Builder( Transport.getTransport(), @@ -82,7 +94,7 @@ public PubsubClient newClient( // Do not log 404. It clutters the output and is possibly even required by the // caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setRootUrl(options.getPubsubRootUrl()) + .setRootUrl(MoreObjects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())) .setApplicationName(options.getAppName()) .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) .build(); @@ -133,6 +145,8 @@ public int publish(TopicPath topic, List outgoingMessages) thro if (!outgoingMessage.message().getOrderingKey().isEmpty()) { pubsubMessage.setOrderingKey(outgoingMessage.message().getOrderingKey()); } + + // N.B. publishTime and messageId are intentionally not set on the message that is published pubsubMessages.add(pubsubMessage); } PublishRequest request = new PublishRequest().setMessages(pubsubMessages); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java index 3801c2d71f1e..549daf92657f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -25,7 +25,7 @@ /** * Class representing a Pub/Sub message. Each message contains a single message payload, a map of - * attached attributes, and a message id. + * attached attributes, a message id and an ordering key. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -40,21 +40,34 @@ abstract static class Impl { abstract @Nullable String getMessageId(); + abstract @Nullable String getOrderingKey(); + static Impl create( - byte[] payload, @Nullable Map attributes, @Nullable String messageId) { - return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId); + byte[] payload, + @Nullable Map attributes, + @Nullable String messageId, + @Nullable String orderingKey) { + return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId, orderingKey); } } private Impl impl; public PubsubMessage(byte[] payload, @Nullable Map attributes) { - this(payload, attributes, null); + this(payload, attributes, null, null); } public PubsubMessage( byte[] payload, @Nullable Map attributes, @Nullable String messageId) { - impl = Impl.create(payload, attributes, messageId); + impl = Impl.create(payload, attributes, messageId, null); + } + + public PubsubMessage( + byte[] payload, + @Nullable Map attributes, + @Nullable String messageId, + @Nullable String orderingKey) { + impl = Impl.create(payload, attributes, messageId, orderingKey); } /** Returns the main PubSub message. */ @@ -78,6 +91,11 @@ public byte[] getPayload() { return impl.getMessageId(); } + /** Returns the ordering key of the message. */ + public @Nullable String getOrderingKey() { + return impl.getOrderingKey(); + } + @Override public String toString() { return impl.toString(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java new file mode 100644 index 000000000000..7c2a4250e87c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A coder for PubsubMessage including all fields of a PubSub message from server. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder + extends CustomCoder { + // A message's payload cannot be null + private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); + // A message's attributes can be null. + private static final Coder> ATTRIBUTES_CODER = + NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + // A message's messageId cannot be null + private static final Coder MESSAGE_ID_CODER = StringUtf8Coder.of(); + // A message's publish time, populated by server + private static final Coder PUBLISH_TIME_CODER = ProtoCoder.of(Timestamp.class); + // A message's ordering key can be null + private static final Coder ORDERING_KEY_CODER = NullableCoder.of(StringUtf8Coder.of()); + + public static Coder of(TypeDescriptor ignored) { + return of(); + } + + public static PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder of() { + return new PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder(); + } + + @Override + public void encode(PubsubMessage value, OutputStream outStream) throws IOException { + PAYLOAD_CODER.encode(value.getPayload(), outStream); + ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream); + MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); + // TODO(discuss what to do with publish_time field) + PUBLISH_TIME_CODER.encode(Timestamp.getDefaultInstance(), outStream); + ORDERING_KEY_CODER.encode(value.getOrderingKey(), outStream); + } + + @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + byte[] payload = PAYLOAD_CODER.decode(inStream); + Map attributes = ATTRIBUTES_CODER.decode(inStream); + String messageId = MESSAGE_ID_CODER.decode(inStream); + PUBLISH_TIME_CODER.decode(inStream); + String orderingKey = ORDERING_KEY_CODER.decode(inStream); + return new PubsubMessage(payload, attributes, messageId, orderingKey); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java index 1f0025a003a1..89879657a2b1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java @@ -40,12 +40,20 @@ public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) { if (messageId != null) { message.setMessageId(messageId); } + + String orderingKey = input.getOrderingKey(); + if (orderingKey != null) { + message.setOrderingKey(orderingKey); + } return message.build(); } public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) { return new PubsubMessage( - input.getData().toByteArray(), input.getAttributesMap(), input.getMessageId()); + input.getData().toByteArray(), + input.getAttributesMap(), + input.getMessageId(), + input.getOrderingKey()); } // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index d025d85240c4..6c2d3af1877f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -106,6 +106,16 @@ public static PubsubTestClientFactory createFactoryForPublish( activate( () -> setPublishState(expectedTopic, expectedOutgoingMessages, failingOutgoingMessages)); return new PubsubTestClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException { + return newClient(timestampAttribute, idAttribute, options); + } + @Override public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) @@ -137,6 +147,16 @@ public static PubsubTestClientFactory createFactoryForPull( activate( () -> setPullState(expectedSubscription, clock, ackTimeoutSec, expectedIncomingMessages)); return new PubsubTestClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException { + return newClient(timestampAttribute, idAttribute, options); + } + @Override public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) @@ -183,6 +203,16 @@ public void close() throws IOException { }); } + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException { + return newClient(timestampAttribute, idAttribute, options); + } + @Override public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) @@ -294,6 +324,16 @@ public void close() throws IOException { numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls); } + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) + throws IOException { + return newClient(timestampAttribute, idAttribute, options); + } + @Override public PubsubClient newClient( @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index b3ea42ed6522..cc3009c73131 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -198,6 +198,8 @@ private static class WriterFn extends DoFn private final int publishBatchSize; private final int publishBatchBytes; + private final String pubsubRootUrl; + /** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */ private transient @Nullable PubsubClient pubsubClient; @@ -218,6 +220,24 @@ private static class WriterFn extends DoFn this.idAttribute = idAttribute; this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; + this.pubsubRootUrl = null; + } + + WriterFn( + PubsubClientFactory pubsubFactory, + ValueProvider topic, + String timestampAttribute, + String idAttribute, + int publishBatchSize, + int publishBatchBytes, + String pubsubRootUrl) { + this.pubsubFactory = pubsubFactory; + this.topic = topic; + this.timestampAttribute = timestampAttribute; + this.idAttribute = idAttribute; + this.publishBatchSize = publishBatchSize; + this.publishBatchBytes = publishBatchBytes; + this.pubsubRootUrl = pubsubRootUrl; } /** BLOCKING Send {@code messages} as a batch to Pubsub. */ @@ -238,7 +258,10 @@ public void startBundle(StartBundleContext c) throws Exception { checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); pubsubClient = pubsubFactory.newClient( - timestampAttribute, idAttribute, c.getPipelineOptions().as(PubsubOptions.class)); + timestampAttribute, + idAttribute, + c.getPipelineOptions().as(PubsubOptions.class), + pubsubRootUrl); } @ProcessElement @@ -327,6 +350,8 @@ public void populateDisplayData(DisplayData.Builder builder) { */ private final RecordIdMethod recordIdMethod; + private final String pubsubRootUrl; + @VisibleForTesting PubsubUnboundedSink( PubsubClientFactory pubsubFactory, @@ -337,7 +362,8 @@ public void populateDisplayData(DisplayData.Builder builder) { int publishBatchSize, int publishBatchBytes, Duration maxLatency, - RecordIdMethod recordIdMethod) { + RecordIdMethod recordIdMethod, + String pubsubRootUrl) { this.pubsubFactory = pubsubFactory; this.topic = topic; this.timestampAttribute = timestampAttribute; @@ -346,6 +372,7 @@ public void populateDisplayData(DisplayData.Builder builder) { this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; this.maxLatency = maxLatency; + this.pubsubRootUrl = pubsubRootUrl; this.recordIdMethod = idAttribute == null ? RecordIdMethod.NONE : recordIdMethod; } @@ -364,7 +391,28 @@ public PubsubUnboundedSink( DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, - RecordIdMethod.RANDOM); + RecordIdMethod.RANDOM, + null); + } + + public PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + ValueProvider topic, + String timestampAttribute, + String idAttribute, + int numShards, + String pubsubRootUrl) { + this( + pubsubFactory, + topic, + timestampAttribute, + idAttribute, + numShards, + DEFAULT_PUBLISH_BATCH_SIZE, + DEFAULT_PUBLISH_BATCH_BYTES, + DEFAULT_MAX_LATENCY, + RecordIdMethod.RANDOM, + pubsubRootUrl); } public PubsubUnboundedSink( @@ -384,7 +432,30 @@ public PubsubUnboundedSink( publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, - RecordIdMethod.RANDOM); + RecordIdMethod.RANDOM, + null); + } + + public PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + ValueProvider topic, + String timestampAttribute, + String idAttribute, + int numShards, + int publishBatchSize, + int publishBatchBytes, + String pubsubRootUrl) { + this( + pubsubFactory, + topic, + timestampAttribute, + idAttribute, + numShards, + publishBatchSize, + publishBatchBytes, + DEFAULT_MAX_LATENCY, + RecordIdMethod.RANDOM, + pubsubRootUrl); } /** Get the topic being written to. */ public TopicPath getTopic() { @@ -451,7 +522,8 @@ public PDone expand(PCollection input) { outer.timestampAttribute, outer.idAttribute, outer.publishBatchSize, - outer.publishBatchBytes))); + outer.publishBatchBytes, + outer.pubsubRootUrl))); return PDone.in(input.getPipeline()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 857ef1cceb86..562ce824a51a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -1200,6 +1200,9 @@ public void populateDisplayData(DisplayData.Builder builder) { /** Whether this source should include the messageId from PubSub. */ private final boolean needsMessageId; + /** Whether this source should include the orderingKey from PubSub. */ + private final boolean needsOrderingKey; + @VisibleForTesting PubsubUnboundedSource( Clock clock, @@ -1210,7 +1213,8 @@ public void populateDisplayData(DisplayData.Builder builder) { @Nullable String timestampAttribute, @Nullable String idAttribute, boolean needsAttributes, - boolean needsMessageId) { + boolean needsMessageId, + boolean needsOrderingKey) { checkArgument( (topic == null) != (subscription == null), "Exactly one of topic and subscription must be given"); @@ -1223,6 +1227,7 @@ public void populateDisplayData(DisplayData.Builder builder) { this.idAttribute = idAttribute; this.needsAttributes = needsAttributes; this.needsMessageId = needsMessageId; + this.needsOrderingKey = needsOrderingKey; } /** Construct an unbounded source to consume from the Pubsub {@code subscription}. */ @@ -1243,6 +1248,7 @@ public PubsubUnboundedSource( timestampAttribute, idAttribute, needsAttributes, + false, false); } @@ -1265,6 +1271,7 @@ public PubsubUnboundedSource( timestampAttribute, idAttribute, needsAttributes, + false, false); } @@ -1287,7 +1294,8 @@ public PubsubUnboundedSource( timestampAttribute, idAttribute, needsAttributes, - needsMessageId); + needsMessageId, + false); } /** Get the project path. */ @@ -1333,6 +1341,10 @@ public boolean getNeedsMessageId() { return needsMessageId; } + public boolean getNeedsOrderingKey() { + return needsOrderingKey; + } + @Override public PCollection expand(PBegin input) { SerializableFunction function; @@ -1342,16 +1354,20 @@ public PCollection expand(PBegin input) { function = new DeserializeBytesIntoPubsubMessagePayloadOnly(); } Coder messageCoder; - if (getNeedsMessageId()) { - messageCoder = - getNeedsAttributes() - ? PubsubMessageWithAttributesAndMessageIdCoder.of() - : PubsubMessageWithMessageIdCoder.of(); + if (getNeedsOrderingKey()) { + messageCoder = PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of(); } else { - messageCoder = - getNeedsAttributes() - ? PubsubMessageWithAttributesCoder.of() - : PubsubMessagePayloadOnlyCoder.of(); + if (getNeedsMessageId()) { + messageCoder = + getNeedsAttributes() + ? PubsubMessageWithAttributesAndMessageIdCoder.of() + : PubsubMessageWithMessageIdCoder.of(); + } else { + messageCoder = + getNeedsAttributes() + ? PubsubMessageWithAttributesCoder.of() + : PubsubMessagePayloadOnlyCoder.of(); + } } PCollection messages = input diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java index ea722ac70c48..75f484d9d29a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java @@ -67,6 +67,7 @@ public void testTranslateSinkWithTopic() throws Exception { 0, 0, Duration.ZERO, + null, null); PubsubUnboundedSink.PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSink); PCollection input = pipeline.apply(Create.of(new byte[0])); @@ -96,7 +97,16 @@ public void testTranslateSinkWithTopicOverridden() throws Exception { ValueProvider runtimeProvider = pipeline.newProvider(TOPIC); PubsubUnboundedSink pubsubUnboundedSinkSink = new PubsubUnboundedSink( - null, runtimeProvider, TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, 0, 0, Duration.ZERO, null); + null, + runtimeProvider, + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + 0, + 0, + 0, + Duration.ZERO, + null, + null); PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSinkSink); PCollection input = pipeline.apply(Create.of(new byte[0])); PDone output = input.apply(pubsubSink); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoderTest.java new file mode 100644 index 000000000000..15d4a75f1793 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoderTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoderTest { + + private static final String DATA = "testData"; + private static final String MESSAGE_ID = "testMessageId"; + private static final Map ATTRIBUTES = + new ImmutableMap.Builder().put("1", "hello").build(); + private static final String ORDERING_KEY = "key123"; + private static final Coder TEST_CODER = + PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage( + DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_ID, ORDERING_KEY); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index f8cd86ee463c..418f65551e1a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -119,7 +119,8 @@ public void sendOneMessage() throws IOException { batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + RecordIdMethod.DETERMINISTIC, + null); p.apply(Create.of(ImmutableList.of(DATA))).apply(ParDo.of(new Stamp(ATTRIBUTES))).apply(sink); p.run(); } @@ -149,7 +150,8 @@ public void sendOneMessageWithoutAttributes() throws IOException { 1 /* batchSize */, 1 /* batchBytes */, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + RecordIdMethod.DETERMINISTIC, + null); p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp(null /* attributes */))) .apply(sink); @@ -188,7 +190,8 @@ public void sendMoreThanOneBatchByNumMessages() throws IOException { batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + RecordIdMethod.DETERMINISTIC, + null); p.apply(Create.of(data)).apply(ParDo.of(new Stamp())).apply(sink); p.run(); } @@ -231,7 +234,8 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { batchSize, batchBytes, Duration.standardSeconds(2), - RecordIdMethod.DETERMINISTIC); + RecordIdMethod.DETERMINISTIC, + null); p.apply(Create.of(data)).apply(ParDo.of(new Stamp())).apply(sink); p.run(); }