From f1d02e9c6a741088479c9221af4c0250d5678b1f Mon Sep 17 00:00:00 2001 From: Thinh Ha Date: Sun, 21 Apr 2019 15:56:01 +0100 Subject: [PATCH 1/4] [BEAM-3489] add PubSub messageId to PubsubMessage --- .../runners/dataflow/worker/PubsubReader.java | 4 +- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 28 +++++++ .../beam/sdk/io/gcp/pubsub/PubsubMessage.java | 17 ++++- ...essageWithAttributesAndMessageIdCoder.java | 74 +++++++++++++++++++ .../PubsubMessageWithMessageIdCoder.java | 65 ++++++++++++++++ .../io/gcp/pubsub/PubsubUnboundedSource.java | 8 +- .../beam/sdk/io/gcp/pubsub/PubsubReadIT.java | 32 ++++++++ 7 files changed, 221 insertions(+), 7 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java index 819e29b5a5c7..5d3dd9185b6b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java @@ -122,7 +122,9 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce value = parseFn.apply( new PubsubMessage( - pubsubMessage.getData().toByteArray(), pubsubMessage.getAttributes())); + pubsubMessage.getData().toByteArray(), + pubsubMessage.getAttributesMap(), + pubsubMessage.getMessageId())); } else { value = coder.decode(data, Coder.Context.OUTER); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 592e058fc086..509fb6dbb904 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -459,6 +459,20 @@ public static Read readMessages() { .build(); } + /** + * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The + * messages will only contain a {@link PubsubMessage#getPayload() payload} with the messageId from + * PubSub, but no {@link PubsubMessage#getAttributeMap() attributes}. + */ + public static Read readMessagesWithMessageId() { + return new AutoValue_PubsubIO_Read.Builder() + .setPubsubClientFactory(FACTORY) + .setCoder(PubsubMessageWithMessageIdCoder.of()) + .setParseFn(new IdentityMessageFn()) + .setNeedsAttributes(false) + .build(); + } + /** * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The * messages will contain both a {@link PubsubMessage#getPayload() payload} and {@link @@ -473,6 +487,20 @@ public static Read readMessagesWithAttributes() { .build(); } + /** + * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The + * messages will contain both a {@link PubsubMessage#getPayload() payload} and {@link + * PubsubMessage#getAttributeMap() attributes}, along with the messageId from PubSub. + */ + public static Read readMessagesWithAttributesAndMessageId() { + return new AutoValue_PubsubIO_Read.Builder() + .setPubsubClientFactory(FACTORY) + .setCoder(PubsubMessageWithAttributesAndMessageIdCoder.of()) + .setParseFn(new IdentityMessageFn()) + .setNeedsAttributes(true) + .build(); + } + /** * Returns A {@link PTransform} that continuously reads UTF-8 encoded strings from a Google Cloud * Pub/Sub stream. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java index 0021d6608d09..a4da89c62e7e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -23,17 +23,25 @@ import javax.annotation.Nullable; /** - * Class representing a Pub/Sub message. Each message contains a single message payload and a map of - * attached attributes. + * Class representing a Pub/Sub message. Each message contains a single message payload, a map of + * attached attributes, and a message id. */ public class PubsubMessage { private byte[] message; private Map attributes; + private String messageId; public PubsubMessage(byte[] payload, Map attributes) { this.message = payload; this.attributes = attributes; + this.messageId = null; + } + + public PubsubMessage(byte[] payload, Map attributes, String messageId) { + this.message = payload; + this.attributes = attributes; + this.messageId = messageId; } /** Returns the main PubSub message. */ @@ -52,4 +60,9 @@ public String getAttribute(String attribute) { public Map getAttributeMap() { return attributes; } + + /** Returns the messageId of the message. */ + public String getMessageId() { + return messageId; + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java new file mode 100644 index 000000000000..8ee8045ad576 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A coder for PubsubMessage including attributes and the message id from the PubSub server. */ +public class PubsubMessageWithAttributesAndMessageIdCoder extends CustomCoder { + // A message's payload cannot be null + private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); + // A message's attributes can be null. + private static final Coder> ATTRIBUTES_CODER = + NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + // A message's messageId cannot be null + private static final Coder MESSAGE_ID_CODER = StringUtf8Coder.of(); + + public static Coder of(TypeDescriptor ignored) { + return of(); + } + + public static PubsubMessageWithAttributesAndMessageIdCoder of() { + return new PubsubMessageWithAttributesAndMessageIdCoder(); + } + + @Override + public void encode(PubsubMessage value, OutputStream outStream) throws IOException { + encode(value, outStream, Context.NESTED); + } + + public void encode(PubsubMessage value, OutputStream outStream, Context context) + throws IOException { + PAYLOAD_CODER.encode(value.getPayload(), outStream); + ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream); + MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); + } + + @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override + public PubsubMessage decode(InputStream inStream, Context context) throws IOException { + byte[] payload = PAYLOAD_CODER.decode(inStream); + Map attributes = ATTRIBUTES_CODER.decode(inStream); + String messageId = MESSAGE_ID_CODER.decode(inStream); + return new PubsubMessage(payload, attributes, messageId); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java new file mode 100644 index 000000000000..f772809f2ac5 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; + +/** + * A coder for PubsubMessage treating the raw bytes being decoded as the message's payload, with the + * message id from the PubSub server. + */ +public class PubsubMessageWithMessageIdCoder extends CustomCoder { + private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); + // A message's messageId cannot be null + private static final Coder MESSAGE_ID_CODER = StringUtf8Coder.of(); + + public static PubsubMessageWithMessageIdCoder of() { + return new PubsubMessageWithMessageIdCoder(); + } + + @Override + public void encode(PubsubMessage value, OutputStream outStream) throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override + public void encode(PubsubMessage value, OutputStream outStream, Context context) + throws IOException { + PAYLOAD_CODER.encode(value.getPayload(), outStream, context); + MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); + } + + @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override + public PubsubMessage decode(InputStream inStream, Context context) throws IOException { + byte[] payload = PAYLOAD_CODER.decode(inStream); + String messageId = MESSAGE_ID_CODER.decode(inStream); + return new PubsubMessage(payload, ImmutableMap.of(), messageId); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 89452a2339d1..253e4a545244 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -826,9 +826,9 @@ public boolean start() throws IOException { } /** - * BLOCKING Return {@literal true} if a Pubsub messaage is available, {@literal false} if none - * is available at this time or we are over-subscribed. May BLOCK while extending ACKs or - * fetching available messages. Will not block waiting for messages. + * BLOCKING Return {@literal true} if a Pubsub message is available, {@literal false} if none is + * available at this time or we are over-subscribed. May BLOCK while extending ACKs or fetching + * available messages. Will not block waiting for messages. */ @Override public boolean advance() throws IOException { @@ -884,7 +884,7 @@ public PubsubMessage getCurrent() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - return new PubsubMessage(current.elementBytes, current.attributes); + return new PubsubMessage(current.elementBytes, current.attributes, current.recordId); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java index e2bd88e67b14..bc6eeb8c76e9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; import org.joda.time.Duration; import org.junit.Rule; @@ -61,4 +62,35 @@ public void testReadPublicData() throws Exception { // noop } } + + @Test + public void testReadPubsubMessageId() throws Exception { + // The pipeline will never terminate on its own + pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false); + + PCollection messages = + pipeline.apply( + PubsubIO.readMessagesWithMessageId() + .fromTopic("projects/pubsub-public-data/topics/taxirides-realtime")); + + messages.apply( + "isMessageIdNonNull", + signal.signalSuccessWhen( + messages.getCoder(), + pubsubMessages -> + pubsubMessages.stream().noneMatch(m -> Strings.isNullOrEmpty(m.getMessageId())))); + + Supplier start = signal.waitForStart(Duration.standardMinutes(5)); + pipeline.apply(signal.signalStart()); + PipelineResult job = pipeline.run(); + start.get(); + + signal.waitForSuccess(Duration.standardSeconds(30)); + // A runner may not support cancel + try { + job.cancel(); + } catch (UnsupportedOperationException exc) { + // noop + } + } } From 71f6ddff86bed238ee26d2578baba0817e079d78 Mon Sep 17 00:00:00 2001 From: Thinh Ha Date: Sun, 23 Jun 2019 23:26:41 +0100 Subject: [PATCH 2/4] [BEAM-3489] add Coder test --- .../pubsub/PubsubCoderProviderRegistrar.java | 7 ++- .../beam/sdk/io/gcp/pubsub/PubsubMessage.java | 1 + ...essageWithAttributesAndMessageIdCoder.java | 10 --- .../PubsubMessageWithMessageIdCoder.java | 13 +--- .../PubsubMessagePayloadOnlyCoderTest.java | 56 +++++++++++++++++ ...geWithAttributesAndMessageIdCoderTest.java | 62 +++++++++++++++++++ .../PubsubMessageWithAttributesCoderTest.java | 60 ++++++++++++++++++ .../PubsubMessageWithMessageIdCoderTest.java | 57 +++++++++++++++++ .../beam/sdk/io/gcp/pubsub/PubsubReadIT.java | 10 +-- 9 files changed, 249 insertions(+), 27 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoderTest.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoderTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java index 4be3c8ed38bd..c10c60d717a5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java @@ -32,6 +32,11 @@ public class PubsubCoderProviderRegistrar implements CoderProviderRegistrar { public List getCoderProviders() { return ImmutableList.of( CoderProviders.forCoder( - TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithAttributesCoder.of())); + TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithAttributesCoder.of()), + CoderProviders.forCoder( + TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithMessageIdCoder.of()), + CoderProviders.forCoder( + TypeDescriptor.of(PubsubMessage.class), + PubsubMessageWithAttributesAndMessageIdCoder.of())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java index a4da89c62e7e..34db3efbb08a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -62,6 +62,7 @@ public Map getAttributeMap() { } /** Returns the messageId of the message. */ + @Nullable public String getMessageId() { return messageId; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java index 8ee8045ad576..377bfc6a481b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java @@ -49,11 +49,6 @@ public static PubsubMessageWithAttributesAndMessageIdCoder of() { @Override public void encode(PubsubMessage value, OutputStream outStream) throws IOException { - encode(value, outStream, Context.NESTED); - } - - public void encode(PubsubMessage value, OutputStream outStream, Context context) - throws IOException { PAYLOAD_CODER.encode(value.getPayload(), outStream); ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream); MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); @@ -61,11 +56,6 @@ public void encode(PubsubMessage value, OutputStream outStream, Context context) @Override public PubsubMessage decode(InputStream inStream) throws IOException { - return decode(inStream, Context.NESTED); - } - - @Override - public PubsubMessage decode(InputStream inStream, Context context) throws IOException { byte[] payload = PAYLOAD_CODER.decode(inStream); Map attributes = ATTRIBUTES_CODER.decode(inStream); String messageId = MESSAGE_ID_CODER.decode(inStream); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java index f772809f2ac5..1633d18d8cf8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java @@ -41,23 +41,12 @@ public static PubsubMessageWithMessageIdCoder of() { @Override public void encode(PubsubMessage value, OutputStream outStream) throws IOException { - encode(value, outStream, Context.NESTED); - } - - @Override - public void encode(PubsubMessage value, OutputStream outStream, Context context) - throws IOException { - PAYLOAD_CODER.encode(value.getPayload(), outStream, context); + PAYLOAD_CODER.encode(value.getPayload(), outStream); MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); } @Override public PubsubMessage decode(InputStream inStream) throws IOException { - return decode(inStream, Context.NESTED); - } - - @Override - public PubsubMessage decode(InputStream inStream, Context context) throws IOException { byte[] payload = PAYLOAD_CODER.decode(inStream); String messageId = MESSAGE_ID_CODER.decode(inStream); return new PubsubMessage(payload, ImmutableMap.of(), messageId); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoderTest.java new file mode 100644 index 000000000000..b132814921de --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoderTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessagePayloadOnlyCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessagePayloadOnlyCoderTest { + + private static final String DATA = "testData"; + private static final Coder TEST_CODER = PubsubMessagePayloadOnlyCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), null); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java new file mode 100644 index 000000000000..282e62d56500 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessageWithAttributesAndMessageIdCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessageWithAttributesAndMessageIdCoderTest { + + private static final String DATA = "testData"; + private static final String MESSAGE_ID = "testMessageId"; + private static final Map ATTRIBUTES = + new ImmutableMap.Builder().put("1", "hello").build(); + private static final Coder TEST_CODER = + PubsubMessageWithAttributesAndMessageIdCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_ID); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java new file mode 100644 index 000000000000..2e65f4a8282b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessageWithAttributesCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessageWithAttributesCoderTest { + + private static final String DATA = "testData"; + private static final Map ATTRIBUTES = + new ImmutableMap.Builder().put("1", "hello").build(); + private static final Coder TEST_CODER = PubsubMessageWithAttributesCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoderTest.java new file mode 100644 index 000000000000..586b53687f39 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoderTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link PubsubMessageWithMessageIdCoder}. */ +@RunWith(JUnit4.class) +public class PubsubMessageWithMessageIdCoderTest { + + private static final String DATA = "testData"; + private static final String MESSAGE_ID = "testMessageId"; + private static final Coder TEST_CODER = PubsubMessageWithMessageIdCoder.of(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), null, MESSAGE_ID); + + @Test + public void testValueEncodable() throws Exception { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java index bc6eeb8c76e9..75ecd185f6a0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java @@ -70,7 +70,7 @@ public void testReadPubsubMessageId() throws Exception { PCollection messages = pipeline.apply( - PubsubIO.readMessagesWithMessageId() + PubsubIO.readMessagesWithAttributesAndMessageId() .fromTopic("projects/pubsub-public-data/topics/taxirides-realtime")); messages.apply( @@ -78,14 +78,16 @@ public void testReadPubsubMessageId() throws Exception { signal.signalSuccessWhen( messages.getCoder(), pubsubMessages -> - pubsubMessages.stream().noneMatch(m -> Strings.isNullOrEmpty(m.getMessageId())))); + pubsubMessages + .parallelStream() + .noneMatch(m -> Strings.isNullOrEmpty(m.getMessageId())))); - Supplier start = signal.waitForStart(Duration.standardMinutes(5)); + Supplier start = signal.waitForStart(Duration.standardMinutes(1)); pipeline.apply(signal.signalStart()); PipelineResult job = pipeline.run(); start.get(); - signal.waitForSuccess(Duration.standardSeconds(30)); + signal.waitForSuccess(Duration.standardMinutes(3)); // A runner may not support cancel try { job.cancel(); From d0b9caccfacdc09bba1893758821bcdb488448c6 Mon Sep 17 00:00:00 2001 From: Thinh Ha Date: Wed, 26 Jun 2019 08:17:07 +0100 Subject: [PATCH 3/4] [BEAM-3489] add messageId coders to PubsubUnboundedSource.getOutputCoder() --- .../beam/runners/dataflow/DataflowRunner.java | 10 +-- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 17 ++++- .../PubsubMessageWithMessageIdCoder.java | 2 +- .../io/gcp/pubsub/PubsubUnboundedSource.java | 70 +++++++++++++++++-- .../sdk/io/gcp/pubsub/TestPubsubSignal.java | 2 +- ...geWithAttributesAndMessageIdCoderTest.java | 2 +- .../PubsubMessageWithAttributesCoderTest.java | 2 +- .../beam/sdk/io/gcp/pubsub/PubsubReadIT.java | 29 +++++--- 8 files changed, 112 insertions(+), 22 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9128a1f6b58b..43f38c7dae30 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -100,6 +100,7 @@ import org.apache.beam.sdk.io.WriteFilesResult; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; @@ -1161,11 +1162,12 @@ PubsubUnboundedSource getOverriddenTransform() { @Override public PCollection expand(PBegin input) { + Coder coder = + transform.getNeedsMessageId() + ? new PubsubMessageWithAttributesAndMessageIdCoder() + : new PubsubMessageWithAttributesCoder(); return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - IsBounded.UNBOUNDED, - new PubsubMessageWithAttributesCoder()); + input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 509fb6dbb904..25240496fe3a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -441,6 +441,7 @@ public String toString() { private static Read read() { return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .build(); } @@ -456,6 +457,7 @@ public static Read readMessages() { .setCoder(PubsubMessagePayloadOnlyCoder.of()) .setParseFn(new IdentityMessageFn()) .setNeedsAttributes(false) + .setNeedsMessageId(false) .build(); } @@ -470,6 +472,7 @@ public static Read readMessagesWithMessageId() { .setCoder(PubsubMessageWithMessageIdCoder.of()) .setParseFn(new IdentityMessageFn()) .setNeedsAttributes(false) + .setNeedsMessageId(true) .build(); } @@ -484,6 +487,7 @@ public static Read readMessagesWithAttributes() { .setCoder(PubsubMessageWithAttributesCoder.of()) .setParseFn(new IdentityMessageFn()) .setNeedsAttributes(true) + .setNeedsMessageId(false) .build(); } @@ -498,6 +502,7 @@ public static Read readMessagesWithAttributesAndMessageId() { .setCoder(PubsubMessageWithAttributesAndMessageIdCoder.of()) .setParseFn(new IdentityMessageFn()) .setNeedsAttributes(true) + .setNeedsMessageId(true) .build(); } @@ -508,6 +513,7 @@ public static Read readMessagesWithAttributesAndMessageId() { public static Read readStrings() { return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setCoder(StringUtf8Coder.of()) .setParseFn(new ParsePayloadAsUtf8()) @@ -525,6 +531,7 @@ public static Read readProtos(Class messageClass) { ProtoCoder coder = ProtoCoder.of(messageClass); return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setCoder(coder) .setParseFn(new ParsePayloadUsingCoder<>(coder)) @@ -542,6 +549,7 @@ public static Read readAvros(Class clazz) { AvroCoder coder = AvroCoder.of(clazz); return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setCoder(coder) .setParseFn(new ParsePayloadUsingCoder<>(coder)) @@ -561,6 +569,7 @@ public static Read readAvroGenericRecords(org.apache.avro.Schema AvroCoder coder = AvroCoder.of(GenericRecord.class, avroSchema); return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setBeamSchema(schema) .setToRowFn(AvroUtils.getToRowFunction(GenericRecord.class, avroSchema)) @@ -586,6 +595,7 @@ public static Read readAvrosWithBeamSchema(Class clazz) { Schema schema = AvroUtils.getSchema(clazz, null); return new AutoValue_PubsubIO_Read.Builder() .setNeedsAttributes(false) + .setNeedsMessageId(false) .setPubsubClientFactory(FACTORY) .setBeamSchema(schema) .setToRowFn(AvroUtils.getToRowFunction(clazz, avroSchema)) @@ -673,6 +683,8 @@ public abstract static class Read extends PTransform> abstract boolean getNeedsAttributes(); + abstract boolean getNeedsMessageId(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -699,6 +711,8 @@ abstract static class Builder { abstract Builder setNeedsAttributes(boolean needsAttributes); + abstract Builder setNeedsMessageId(boolean needsMessageId); + abstract Builder setClock(@Nullable Clock clock); abstract Read build(); @@ -860,7 +874,8 @@ public PCollection expand(PBegin input) { subscriptionPath, getTimestampAttribute(), getIdAttribute(), - getNeedsAttributes()); + getNeedsAttributes(), + getNeedsMessageId()); PCollection read = input.apply(source).apply(MapElements.via(getParseFn())); return (getBeamSchema() != null) ? read.setSchema(getBeamSchema(), getToRowFn(), getFromRowFn()) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java index 1633d18d8cf8..f38e14d9a3cc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** * A coder for PubsubMessage treating the raw bytes being decoded as the message's payload, with the diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 253e4a545244..d8abfe17c407 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -1083,9 +1083,15 @@ public Coder getCheckpointMarkCoder() { @Override public Coder getOutputCoder() { - return outer.getNeedsAttributes() - ? PubsubMessageWithAttributesCoder.of() - : PubsubMessagePayloadOnlyCoder.of(); + if (outer.getNeedsMessageId()) { + return outer.getNeedsAttributes() + ? PubsubMessageWithAttributesAndMessageIdCoder.of() + : PubsubMessageWithMessageIdCoder.of(); + } else { + return outer.getNeedsAttributes() + ? PubsubMessageWithAttributesCoder.of() + : PubsubMessagePayloadOnlyCoder.of(); + } } @Override @@ -1188,6 +1194,9 @@ public void populateDisplayData(Builder builder) { /** Whether this source should load the attributes of the PubsubMessage, or only the payload. */ private final boolean needsAttributes; + /** Whether this source should include the messageId from PubSub. */ + private final boolean needsMessageId; + @VisibleForTesting PubsubUnboundedSource( Clock clock, @@ -1197,7 +1206,8 @@ public void populateDisplayData(Builder builder) { @Nullable ValueProvider subscription, @Nullable String timestampAttribute, @Nullable String idAttribute, - boolean needsAttributes) { + boolean needsAttributes, + boolean needsMessageId) { checkArgument( (topic == null) != (subscription == null), "Exactly one of topic and subscription must be given"); @@ -1209,6 +1219,7 @@ public void populateDisplayData(Builder builder) { this.timestampAttribute = timestampAttribute; this.idAttribute = idAttribute; this.needsAttributes = needsAttributes; + this.needsMessageId = needsMessageId; } /** Construct an unbounded source to consume from the Pubsub {@code subscription}. */ @@ -1228,7 +1239,52 @@ public PubsubUnboundedSource( subscription, timestampAttribute, idAttribute, - needsAttributes); + needsAttributes, + false); + } + + /** Construct an unbounded source to consume from the Pubsub {@code subscription}. */ + public PubsubUnboundedSource( + Clock clock, + PubsubClientFactory pubsubFactory, + @Nullable ValueProvider project, + @Nullable ValueProvider topic, + @Nullable ValueProvider subscription, + @Nullable String timestampAttribute, + @Nullable String idAttribute, + boolean needsAttributes) { + this( + clock, + pubsubFactory, + project, + topic, + subscription, + timestampAttribute, + idAttribute, + needsAttributes, + false); + } + + /** Construct an unbounded source to consume from the Pubsub {@code subscription}. */ + public PubsubUnboundedSource( + PubsubClientFactory pubsubFactory, + @Nullable ValueProvider project, + @Nullable ValueProvider topic, + @Nullable ValueProvider subscription, + @Nullable String timestampAttribute, + @Nullable String idAttribute, + boolean needsAttributes, + boolean needsMessageId) { + this( + null, + pubsubFactory, + project, + topic, + subscription, + timestampAttribute, + idAttribute, + needsAttributes, + needsMessageId); } /** Get the project path. */ @@ -1277,6 +1333,10 @@ public boolean getNeedsAttributes() { return needsAttributes; } + public boolean getNeedsMessageId() { + return needsMessageId; + } + @Override public PCollection expand(PBegin input) { return input diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java index 0eb5bf768b80..f4f8b18ffc54 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java @@ -336,7 +336,7 @@ public POutput expand(PCollection input) { * Stateful {@link DoFn} which caches the elements it sees and checks whether they satisfy the * predicate. * - *

When predicate is satisfied outputs "SUCCESS". If predicate throws execption, outputs + *

When predicate is satisfied outputs "SUCCESS". If predicate throws exception, outputs * "FAILURE". */ static class StatefulPredicateCheck extends DoFn, String> { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java index 282e62d56500..5a061ad2cd56 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoderTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java index 2e65f4a8282b..eb33fd3895a4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoderTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java index 75ecd185f6a0..9856606be2a0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java @@ -17,9 +17,11 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; +import java.util.Set; import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; @@ -28,10 +30,13 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Integration test for PubsubIO. */ @RunWith(JUnit4.class) public class PubsubReadIT { + private static final Logger LOG = LoggerFactory.getLogger(PubsubReadIT.class); @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create(); @Rule public transient TestPipeline pipeline = TestPipeline.create(); @@ -75,19 +80,14 @@ public void testReadPubsubMessageId() throws Exception { messages.apply( "isMessageIdNonNull", - signal.signalSuccessWhen( - messages.getCoder(), - pubsubMessages -> - pubsubMessages - .parallelStream() - .noneMatch(m -> Strings.isNullOrEmpty(m.getMessageId())))); + signal.signalSuccessWhen(messages.getCoder(), new NonEmptyMessageIdCheck())); - Supplier start = signal.waitForStart(Duration.standardMinutes(1)); + Supplier start = signal.waitForStart(Duration.standardMinutes(5)); pipeline.apply(signal.signalStart()); PipelineResult job = pipeline.run(); start.get(); - signal.waitForSuccess(Duration.standardMinutes(3)); + signal.waitForSuccess(Duration.standardMinutes(1)); // A runner may not support cancel try { job.cancel(); @@ -95,4 +95,17 @@ public void testReadPubsubMessageId() throws Exception { // noop } } + + private static class NonEmptyMessageIdCheck + implements SerializableFunction, Boolean> { + @Override + public Boolean apply(Set input) { + for (PubsubMessage message : input) { + if (Strings.isNullOrEmpty(message.getMessageId())) { + return false; + } + } + return true; + } + } } From b8313b0067f68d3cac95abe39cdf29ddd22e4575 Mon Sep 17 00:00:00 2001 From: Thinh Ha Date: Sun, 18 Aug 2019 23:06:20 +0100 Subject: [PATCH 4/4] [BEAM-3489] update docs --- .../java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 8 +++++--- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 25240496fe3a..afb0f5c07b30 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -463,8 +463,9 @@ public static Read readMessages() { /** * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The - * messages will only contain a {@link PubsubMessage#getPayload() payload} with the messageId from - * PubSub, but no {@link PubsubMessage#getAttributeMap() attributes}. + * messages will only contain a {@link PubsubMessage#getPayload() payload} with the {@link + * PubsubMessage#getMessageId() messageId} from PubSub, but no {@link + * PubsubMessage#getAttributeMap() attributes}. */ public static Read readMessagesWithMessageId() { return new AutoValue_PubsubIO_Read.Builder() @@ -494,7 +495,8 @@ public static Read readMessagesWithAttributes() { /** * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The * messages will contain both a {@link PubsubMessage#getPayload() payload} and {@link - * PubsubMessage#getAttributeMap() attributes}, along with the messageId from PubSub. + * PubsubMessage#getAttributeMap() attributes}, along with the {@link PubsubMessage#getMessageId() + * messageId} from PubSub. */ public static Read readMessagesWithAttributesAndMessageId() { return new AutoValue_PubsubIO_Read.Builder() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java index 34db3efbb08a..b437c0ad58d3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -61,7 +61,7 @@ public Map getAttributeMap() { return attributes; } - /** Returns the messageId of the message. */ + /** Returns the messageId of the message populated by Cloud Pub/Sub. */ @Nullable public String getMessageId() { return messageId;