diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9128a1f6b58b..43f38c7dae30 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -100,6 +100,7 @@ import org.apache.beam.sdk.io.WriteFilesResult; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; @@ -1161,11 +1162,12 @@ PubsubUnboundedSource getOverriddenTransform() { @Override public PCollection expand(PBegin input) { + Coder coder = + transform.getNeedsMessageId() + ? new PubsubMessageWithAttributesAndMessageIdCoder() + : new PubsubMessageWithAttributesCoder(); return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - IsBounded.UNBOUNDED, - new PubsubMessageWithAttributesCoder()); + input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java index 819e29b5a5c7..5d3dd9185b6b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java @@ -122,7 +122,9 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce value = parseFn.apply( new PubsubMessage( - pubsubMessage.getData().toByteArray(), pubsubMessage.getAttributes())); + pubsubMessage.getData().toByteArray(), + pubsubMessage.getAttributesMap(), + pubsubMessage.getMessageId())); } else { value = coder.decode(data, Coder.Context.OUTER); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java index 4be3c8ed38bd..c10c60d717a5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java @@ -32,6 +32,11 @@ public class PubsubCoderProviderRegistrar implements CoderProviderRegistrar { public List getCoderProviders() { return ImmutableList.of( CoderProviders.forCoder( - TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithAttributesCoder.of())); + TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithAttributesCoder.of()), + CoderProviders.forCoder( + TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithMessageIdCoder.of()), + CoderProviders.forCoder( + TypeDescriptor.of(PubsubMessage.class), + PubsubMessageWithAttributesAndMessageIdCoder.of())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 592e058fc086..afb0f5c07b30 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -441,6 +441,7 @@ public String toString() { private static Read read() { return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .build(); } @@ -456,6 +457,23 @@ public static Read readMessages() { .setCoder(PubsubMessagePayloadOnlyCoder.of()) .setParseFn(new IdentityMessageFn()) .setNeedsAttributes(false) + .setNeedsMessageId(false) + .build(); + } + + /** + * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The + * messages will only contain a {@link PubsubMessage#getPayload() payload} with the {@link + * PubsubMessage#getMessageId() messageId} from PubSub, but no {@link + * PubsubMessage#getAttributeMap() attributes}. + */ + public static Read readMessagesWithMessageId() { + return new AutoValue_PubsubIO_Read.Builder() + .setPubsubClientFactory(FACTORY) + .setCoder(PubsubMessageWithMessageIdCoder.of()) + .setParseFn(new IdentityMessageFn()) + .setNeedsAttributes(false) + .setNeedsMessageId(true) .build(); } @@ -470,6 +488,23 @@ public static Read readMessagesWithAttributes() { .setCoder(PubsubMessageWithAttributesCoder.of()) .setParseFn(new IdentityMessageFn()) .setNeedsAttributes(true) + .setNeedsMessageId(false) + .build(); + } + + /** + * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The + * messages will contain both a {@link PubsubMessage#getPayload() payload} and {@link + * PubsubMessage#getAttributeMap() attributes}, along with the {@link PubsubMessage#getMessageId() + * messageId} from PubSub. + */ + public static Read readMessagesWithAttributesAndMessageId() { + return new AutoValue_PubsubIO_Read.Builder() + .setPubsubClientFactory(FACTORY) + .setCoder(PubsubMessageWithAttributesAndMessageIdCoder.of()) + .setParseFn(new IdentityMessageFn()) + .setNeedsAttributes(true) + .setNeedsMessageId(true) .build(); } @@ -480,6 +515,7 @@ public static Read readMessagesWithAttributes() { public static Read readStrings() { return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setCoder(StringUtf8Coder.of()) .setParseFn(new ParsePayloadAsUtf8()) @@ -497,6 +533,7 @@ public static Read readProtos(Class messageClass) { ProtoCoder coder = ProtoCoder.of(messageClass); return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setCoder(coder) .setParseFn(new ParsePayloadUsingCoder<>(coder)) @@ -514,6 +551,7 @@ public static Read readAvros(Class clazz) { AvroCoder coder = AvroCoder.of(clazz); return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setCoder(coder) .setParseFn(new ParsePayloadUsingCoder<>(coder)) @@ -533,6 +571,7 @@ public static Read readAvroGenericRecords(org.apache.avro.Schema AvroCoder coder = AvroCoder.of(GenericRecord.class, avroSchema); return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setBeamSchema(schema) .setToRowFn(AvroUtils.getToRowFunction(GenericRecord.class, avroSchema)) @@ -558,6 +597,7 @@ public static Read readAvrosWithBeamSchema(Class clazz) { Schema schema = AvroUtils.getSchema(clazz, null); return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setBeamSchema(schema) .setToRowFn(AvroUtils.getToRowFunction(clazz, avroSchema)) @@ -645,6 +685,8 @@ public abstract static class Read extends PTransform> abstract boolean getNeedsAttributes(); + abstract boolean getNeedsMessageId(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -671,6 +713,8 @@ abstract static class Builder { abstract Builder setNeedsAttributes(boolean needsAttributes); + abstract Builder setNeedsMessageId(boolean needsMessageId); + abstract Builder setClock(@Nullable Clock clock); abstract Read build(); @@ -832,7 +876,8 @@ public PCollection expand(PBegin input) { subscriptionPath, getTimestampAttribute(), getIdAttribute(), - getNeedsAttributes()); + getNeedsAttributes(), + getNeedsMessageId()); PCollection read = input.apply(source).apply(MapElements.via(getParseFn())); return (getBeamSchema() != null) ? read.setSchema(getBeamSchema(), getToRowFn(), getFromRowFn()) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java index 0021d6608d09..b437c0ad58d3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -23,17 +23,25 @@ import javax.annotation.Nullable; /** - * Class representing a Pub/Sub message. Each message contains a single message payload and a map of - * attached attributes. + * Class representing a Pub/Sub message. Each message contains a single message payload, a map of + * attached attributes, and a message id. */ public class PubsubMessage { private byte[] message; private Map attributes; + private String messageId; public PubsubMessage(byte[] payload, Map attributes) { this.message = payload; this.attributes = attributes; + this.messageId = null; + } + + public PubsubMessage(byte[] payload, Map attributes, String messageId) { + this.message = payload; + this.attributes = attributes; + this.messageId = messageId; } /** Returns the main PubSub message. */ @@ -52,4 +60,10 @@ public String getAttribute(String attribute) { public Map getAttributeMap() { return attributes; } + + /** Returns the messageId of the message populated by Cloud Pub/Sub. */ + @Nullable + public String getMessageId() { + return messageId; + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java new file mode 100644 index 000000000000..377bfc6a481b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A coder for PubsubMessage including attributes and the message id from the PubSub server. */ +public class PubsubMessageWithAttributesAndMessageIdCoder extends CustomCoder { + // A message's payload cannot be null + private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); + // A message's attributes can be null. + private static final Coder> ATTRIBUTES_CODER = + NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + // A message's messageId cannot be null + private static final Coder MESSAGE_ID_CODER = StringUtf8Coder.of(); + + public static Coder of(TypeDescriptor ignored) { + return of(); + } + + public static PubsubMessageWithAttributesAndMessageIdCoder of() { + return new PubsubMessageWithAttributesAndMessageIdCoder(); + } + + @Override + public void encode(PubsubMessage value, OutputStream outStream) throws IOException { + PAYLOAD_CODER.encode(value.getPayload(), outStream); + ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream); + MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); + } + + @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + byte[] payload = PAYLOAD_CODER.decode(inStream); + Map attributes = ATTRIBUTES_CODER.decode(inStream); + String messageId = MESSAGE_ID_CODER.decode(inStream); + return new PubsubMessage(payload, attributes, messageId); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java new file mode 100644 index 000000000000..f38e14d9a3cc --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** + * A coder for PubsubMessage treating the raw bytes being decoded as the message's payload, with the + * message id from the PubSub server. + */ +public class PubsubMessageWithMessageIdCoder extends CustomCoder { + private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); + // A message's messageId cannot be null + private static final Coder MESSAGE_ID_CODER = StringUtf8Coder.of(); + + public static PubsubMessageWithMessageIdCoder of() { + return new PubsubMessageWithMessageIdCoder(); + } + + @Override + public void encode(PubsubMessage value, OutputStream outStream) throws IOException { + PAYLOAD_CODER.encode(value.getPayload(), outStream); + MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); + } + + @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + byte[] payload = PAYLOAD_CODER.decode(inStream); + String messageId = MESSAGE_ID_CODER.decode(inStream); + return new PubsubMessage(payload, ImmutableMap.of(), messageId); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 89452a2339d1..d8abfe17c407 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -826,9 +826,9 @@ public boolean start() throws IOException { } /** - * BLOCKING Return {@literal true} if a Pubsub messaage is available, {@literal false} if none - * is available at this time or we are over-subscribed. May BLOCK while extending ACKs or - * fetching available messages. Will not block waiting for messages. + * BLOCKING Return {@literal true} if a Pubsub message is available, {@literal false} if none is + * available at this time or we are over-subscribed. May BLOCK while extending ACKs or fetching + * available messages. Will not block waiting for messages. */ @Override public boolean advance() throws IOException { @@ -884,7 +884,7 @@ public PubsubMessage getCurrent() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - return new PubsubMessage(current.elementBytes, current.attributes); + return new PubsubMessage(current.elementBytes, current.attributes, current.recordId); } @Override @@ -1083,9 +1083,15 @@ public Coder getCheckpointMarkCoder() { @Override public Coder getOutputCoder() { - return outer.getNeedsAttributes() - ? PubsubMessageWithAttributesCoder.of() - : PubsubMessagePayloadOnlyCoder.of(); + if (outer.getNeedsMessageId()) { + return outer.getNeedsAttributes() + ? PubsubMessageWithAttributesAndMessageIdCoder.of() + : PubsubMessageWithMessageIdCoder.of(); + } else { + return outer.getNeedsAttributes() + ? PubsubMessageWithAttributesCoder.of() + : PubsubMessagePayloadOnlyCoder.of(); + } } @Override @@ -1188,6 +1194,9 @@ public void populateDisplayData(Builder builder) { /** Whether this source should load the attributes of the PubsubMessage, or only the payload. */ private final boolean needsAttributes; + /** Whether this source should include the messageId from PubSub. */ + private final boolean needsMessageId; + @VisibleForTesting PubsubUnboundedSource( Clock clock, @@ -1197,7 +1206,8 @@ public void populateDisplayData(Builder builder) { @Nullable ValueProvider subscription, @Nullable String timestampAttribute, @Nullable String idAttribute, - boolean needsAttributes) { + boolean needsAttributes, + boolean needsMessageId) { checkArgument( (topic == null) != (subscription == null), "Exactly one of topic and subscription must be given"); @@ -1209,6 +1219,7 @@ public void populateDisplayData(Builder builder) { this.timestampAttribute = timestampAttribute; this.idAttribute = idAttribute; this.needsAttributes = needsAttributes; + this.needsMessageId = needsMessageId; } /** Construct an unbounded source to consume from the Pubsub {@code subscription}. */ @@ -1228,7 +1239,52 @@ public PubsubUnboundedSource( subscription, timestampAttribute, idAttribute, - needsAttributes); + needsAttributes, + false); + } + + /** Construct an unbounded source to consume from the Pubsub {@code subscription}. */ + public PubsubUnboundedSource( + Clock clock, + PubsubClientFactory pubsubFactory, + @Nullable ValueProvider project, + @Nullable ValueProvider topic, + @Nullable ValueProvider subscription, + @Nullable String timestampAttribute, + @Nullable String idAttribute, + boolean needsAttributes) { + this( + clock, + pubsubFactory, + project, + topic, + subscription, + timestampAttribute, + idAttribute, + needsAttributes, + false); + } + + /** Construct an unbounded source to consume from the Pubsub {@code subscription}. */ + public PubsubUnboundedSource( + PubsubClientFactory pubsubFactory, + @Nullable ValueProvider project, + @Nullable ValueProvider topic, + @Nullable ValueProvider subscription, + @Nullable String timestampAttribute, + @Nullable String idAttribute, + boolean needsAttributes, + boolean needsMessageId) { + this( + null, + pubsubFactory, + project, + topic, + subscription, + timestampAttribute, + idAttribute, + needsAttributes, + needsMessageId); } /** Get the project path. */ @@ -1277,6 +1333,10 @@ public boolean getNeedsAttributes() { return needsAttributes; } + public boolean getNeedsMessageId() { + return needsMessageId; + } + @Override public PCollection expand(PBegin input) { return input diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java index 0eb5bf768b80..f4f8b18ffc54 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java @@ -336,7 +336,7 @@ public POutput expand(PCollection input) { * Stateful {@link DoFn} which caches the elements it sees and checks whether they satisfy the * predicate. * - *

When predicate is satisfied outputs "SUCCESS". If predicate throws execption, outputs + *

When predicate is satisfied outputs "SUCCESS". If predicate throws exception, outputs * "FAILURE". */ static class StatefulPredicateCheck extends DoFn, String> { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoderTest.java new file mode 100644 index 000000000000..b132814921de --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoderTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessagePayloadOnlyCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessagePayloadOnlyCoderTest { + + private static final String DATA = "testData"; + private static final Coder TEST_CODER = PubsubMessagePayloadOnlyCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), null); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java new file mode 100644 index 000000000000..5a061ad2cd56 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessageWithAttributesAndMessageIdCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessageWithAttributesAndMessageIdCoderTest { + + private static final String DATA = "testData"; + private static final String MESSAGE_ID = "testMessageId"; + private static final Map ATTRIBUTES = + new ImmutableMap.Builder().put("1", "hello").build(); + private static final Coder TEST_CODER = + PubsubMessageWithAttributesAndMessageIdCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_ID); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java new file mode 100644 index 000000000000..eb33fd3895a4 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessageWithAttributesCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessageWithAttributesCoderTest { + + private static final String DATA = "testData"; + private static final Map ATTRIBUTES = + new ImmutableMap.Builder().put("1", "hello").build(); + private static final Coder TEST_CODER = PubsubMessageWithAttributesCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoderTest.java new file mode 100644 index 000000000000..586b53687f39 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoderTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessageWithMessageIdCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessageWithMessageIdCoderTest { + + private static final String DATA = "testData"; + private static final String MESSAGE_ID = "testMessageId"; + private static final Coder TEST_CODER = PubsubMessageWithMessageIdCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), null, MESSAGE_ID); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java index e2bd88e67b14..9856606be2a0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java @@ -17,20 +17,26 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; +import java.util.Set; import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Integration test for PubsubIO. */ @RunWith(JUnit4.class) public class PubsubReadIT { + private static final Logger LOG = LoggerFactory.getLogger(PubsubReadIT.class); @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create(); @Rule public transient TestPipeline pipeline = TestPipeline.create(); @@ -61,4 +67,45 @@ public void testReadPublicData() throws Exception { // noop } } + + @Test + public void testReadPubsubMessageId() throws Exception { + // The pipeline will never terminate on its own + pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false); + + PCollection messages = + pipeline.apply( + PubsubIO.readMessagesWithAttributesAndMessageId() + .fromTopic("projects/pubsub-public-data/topics/taxirides-realtime")); + + messages.apply( + "isMessageIdNonNull", + signal.signalSuccessWhen(messages.getCoder(), new NonEmptyMessageIdCheck())); + + Supplier start = signal.waitForStart(Duration.standardMinutes(5)); + pipeline.apply(signal.signalStart()); + PipelineResult job = pipeline.run(); + start.get(); + + signal.waitForSuccess(Duration.standardMinutes(1)); + // A runner may not support cancel + try { + job.cancel(); + } catch (UnsupportedOperationException exc) { + // noop + } + } + + private static class NonEmptyMessageIdCheck + implements SerializableFunction, Boolean> { + @Override + public Boolean apply(Set input) { + for (PubsubMessage message : input) { + if (Strings.isNullOrEmpty(message.getMessageId())) { + return false; + } + } + return true; + } + } }