diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index b561b4711d521..1e7052f025371 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -56,7 +56,6 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -1164,17 +1163,48 @@ private PCollection expandReadContinued( @Nullable ValueProvider subscriptionPath) { TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + SerializableFunction parseFnWrapped = + new SerializableFunction() { + // flag that reported metrics + private final SerializableFunction underlying = + Objects.requireNonNull(getParseFn()); + private transient boolean reportedMetrics = false; + + // public + @Override + public T apply(PubsubMessage input) { + if (!reportedMetrics) { + LOG.info("reportling lineage..."); + // report Lineage once + if (topicPath != null) { + TopicPath topic = topicPath.get(); + if (topic != null) { + Lineage.getSources().add("pubsub", "topic", topic.getDataCatalogSegments()); + } + } + if (subscriptionPath != null) { + SubscriptionPath sub = subscriptionPath.get(); + if (sub != null) { + Lineage.getSources() + .add("pubsub", "subscription", sub.getDataCatalogSegments()); + } + } + reportedMetrics = true; + } + return underlying.apply(input); + } + }; PCollection read; if (getDeadLetterTopicProvider() == null && (getBadRecordRouter() instanceof ThrowingBadRecordRouter)) { - read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn())); + read = preParse.apply(MapElements.into(typeDescriptor).via(parseFnWrapped)); } else { // parse PubSub messages, separating out exceptions Result, KV> result = preParse.apply( "PubsubIO.Read/Map/Parse-Incoming-Messages", MapElements.into(typeDescriptor) - .via(getParseFn()) + .via(parseFnWrapped) .exceptionsVia(new WithFailures.ThrowableHandler() {})); // Emit parsed records @@ -1230,31 +1260,6 @@ private PCollection expandReadContinued( .withClientFactory(getPubsubClientFactory())); } } - // report Lineage once - preParse - .getPipeline() - .apply(Impulse.create()) - .apply( - ParDo.of( - new DoFn() { - @ProcessElement - public void process() { - if (topicPath != null) { - TopicPath topic = topicPath.get(); - if (topic != null) { - Lineage.getSources() - .add("pubsub", "topic", topic.getDataCatalogSegments()); - } - } - if (subscriptionPath != null) { - SubscriptionPath sub = subscriptionPath.get(); - if (sub != null) { - Lineage.getSources() - .add("pubsub", "subscription", sub.getDataCatalogSegments()); - } - } - } - })); return read.setCoder(getCoder()); }