diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
index 07fda807bd45b..ab64389fe1987 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
@@ -72,7 +72,7 @@ private static String wrapSegment(String value) {
*
*
* - {@code system:segment1.segment2}
- *
- {@code system:routine:segment1.segment2}
+ *
- {@code system:subtype:segment1.segment2}
*
- {@code system:`segment1.with.dots:clons`.segment2}
*
*
@@ -80,10 +80,10 @@ private static String wrapSegment(String value) {
*/
@Internal
public static String getFqName(
- String system, @Nullable String routine, Iterable segments) {
+ String system, @Nullable String subtype, Iterable segments) {
StringBuilder builder = new StringBuilder(system);
- if (!Strings.isNullOrEmpty(routine)) {
- builder.append(":").append(routine);
+ if (!Strings.isNullOrEmpty(subtype)) {
+ builder.append(":").append(subtype);
}
int idx = 0;
for (String segment : segments) {
@@ -111,8 +111,8 @@ public static String getFqName(String system, Iterable segments) {
/**
* Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}.
*/
- public void add(String system, @Nullable String routine, Iterable segments) {
- metric.add(getFqName(system, routine, segments));
+ public void add(String system, @Nullable String subtype, Iterable segments) {
+ metric.add(getFqName(system, subtype, segments));
}
/**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index b561b4711d521..1e7052f025371 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -56,7 +56,6 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -1164,17 +1163,48 @@ private PCollection expandReadContinued(
@Nullable ValueProvider subscriptionPath) {
TypeDescriptor typeDescriptor = new TypeDescriptor() {};
+ SerializableFunction parseFnWrapped =
+ new SerializableFunction() {
+ // flag that reported metrics
+ private final SerializableFunction underlying =
+ Objects.requireNonNull(getParseFn());
+ private transient boolean reportedMetrics = false;
+
+ // public
+ @Override
+ public T apply(PubsubMessage input) {
+ if (!reportedMetrics) {
+ LOG.info("reportling lineage...");
+ // report Lineage once
+ if (topicPath != null) {
+ TopicPath topic = topicPath.get();
+ if (topic != null) {
+ Lineage.getSources().add("pubsub", "topic", topic.getDataCatalogSegments());
+ }
+ }
+ if (subscriptionPath != null) {
+ SubscriptionPath sub = subscriptionPath.get();
+ if (sub != null) {
+ Lineage.getSources()
+ .add("pubsub", "subscription", sub.getDataCatalogSegments());
+ }
+ }
+ reportedMetrics = true;
+ }
+ return underlying.apply(input);
+ }
+ };
PCollection read;
if (getDeadLetterTopicProvider() == null
&& (getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
- read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
+ read = preParse.apply(MapElements.into(typeDescriptor).via(parseFnWrapped));
} else {
// parse PubSub messages, separating out exceptions
Result, KV> result =
preParse.apply(
"PubsubIO.Read/Map/Parse-Incoming-Messages",
MapElements.into(typeDescriptor)
- .via(getParseFn())
+ .via(parseFnWrapped)
.exceptionsVia(new WithFailures.ThrowableHandler() {}));
// Emit parsed records
@@ -1230,31 +1260,6 @@ private PCollection expandReadContinued(
.withClientFactory(getPubsubClientFactory()));
}
}
- // report Lineage once
- preParse
- .getPipeline()
- .apply(Impulse.create())
- .apply(
- ParDo.of(
- new DoFn() {
- @ProcessElement
- public void process() {
- if (topicPath != null) {
- TopicPath topic = topicPath.get();
- if (topic != null) {
- Lineage.getSources()
- .add("pubsub", "topic", topic.getDataCatalogSegments());
- }
- }
- if (subscriptionPath != null) {
- SubscriptionPath sub = subscriptionPath.get();
- if (sub != null) {
- Lineage.getSources()
- .add("pubsub", "subscription", sub.getDataCatalogSegments());
- }
- }
- }
- }));
return read.setCoder(getCoder());
}
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index 6b8e4754a79ce..065a497786cbf 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -352,25 +352,25 @@ def wrap_segment(segment: str) -> str:
@staticmethod
def get_fq_name(
- system: str, *segments: str, route: Optional[str] = None) -> str:
+ system: str, *segments: str, subtype: Optional[str] = None) -> str:
"""Assemble fully qualified name
(`FQN `_).
Format:
- `system:segment1.segment2`
- - `system:routine:segment1.segment2`
- - `system:`segment1.with.dots:clons`.segment2`
+ - `system:subtype:segment1.segment2`
+ - `system:`segment1.with.dots:colons`.segment2`
This helper method is for internal and testing usage only.
"""
segs = '.'.join(map(Lineage.wrap_segment, segments))
- if route:
- return ':'.join((system, route, segs))
+ if subtype:
+ return ':'.join((system, subtype, segs))
return ':'.join((system, segs))
def add(
- self, system: str, *segments: str, route: Optional[str] = None) -> None:
- self.metric.add(self.get_fq_name(system, *segments, route=route))
+ self, system: str, *segments: str, subtype: Optional[str] = None) -> None:
+ self.metric.add(self.get_fq_name(system, *segments, subtype=subtype))
@staticmethod
def query(results: MetricResults, label: str) -> Set[str]:
diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py
index 3a8da021101e5..1c6a11e200a54 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -264,10 +264,10 @@ def test_fq_name(self):
for k, v in test_cases.items():
self.assertEqual("apache:" + v, Lineage.get_fq_name("apache", k))
self.assertEqual(
- "apache:beam:" + v, Lineage.get_fq_name("apache", k, route="beam"))
+ "apache:beam:" + v, Lineage.get_fq_name("apache", k, subtype="beam"))
self.assertEqual(
"apache:beam:" + v + '.' + v,
- Lineage.get_fq_name("apache", k, k, route="beam"))
+ Lineage.get_fq_name("apache", k, k, subtype="beam"))
if __name__ == '__main__':