Skip to content

Commit

Permalink
Move PubsubIO source Lineage report to MapElements
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Aug 30, 2024
1 parent cfe8fee commit ac8adfb
Showing 1 changed file with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -1164,17 +1163,48 @@ private PCollection<T> expandReadContinued(
@Nullable ValueProvider<SubscriptionPath> subscriptionPath) {

TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
SerializableFunction<PubsubMessage, T> parseFnWrapped =
new SerializableFunction<PubsubMessage, T>() {
// flag that reported metrics
private final SerializableFunction<PubsubMessage, T> underlying =
Objects.requireNonNull(getParseFn());
private transient boolean reportedMetrics = false;

// public
@Override
public T apply(PubsubMessage input) {
if (!reportedMetrics) {
LOG.info("reportling lineage...");
// report Lineage once
if (topicPath != null) {
TopicPath topic = topicPath.get();
if (topic != null) {
Lineage.getSources().add("pubsub", "topic", topic.getDataCatalogSegments());
}
}
if (subscriptionPath != null) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
Lineage.getSources()
.add("pubsub", "subscription", sub.getDataCatalogSegments());
}
}
reportedMetrics = true;
}
return underlying.apply(input);
}
};
PCollection<T> read;
if (getDeadLetterTopicProvider() == null
&& (getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
read = preParse.apply(MapElements.into(typeDescriptor).via(parseFnWrapped));
} else {
// parse PubSub messages, separating out exceptions
Result<PCollection<T>, KV<PubsubMessage, EncodableThrowable>> result =
preParse.apply(
"PubsubIO.Read/Map/Parse-Incoming-Messages",
MapElements.into(typeDescriptor)
.via(getParseFn())
.via(parseFnWrapped)
.exceptionsVia(new WithFailures.ThrowableHandler<PubsubMessage>() {}));

// Emit parsed records
Expand Down Expand Up @@ -1230,31 +1260,6 @@ private PCollection<T> expandReadContinued(
.withClientFactory(getPubsubClientFactory()));
}
}
// report Lineage once
preParse
.getPipeline()
.apply(Impulse.create())
.apply(
ParDo.of(
new DoFn<byte[], Void>() {
@ProcessElement
public void process() {
if (topicPath != null) {
TopicPath topic = topicPath.get();
if (topic != null) {
Lineage.getSources()
.add("pubsub", "topic", topic.getDataCatalogSegments());
}
}
if (subscriptionPath != null) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
Lineage.getSources()
.add("pubsub", "subscription", sub.getDataCatalogSegments());
}
}
}
}));
return read.setCoder(getCoder());
}

Expand Down

0 comments on commit ac8adfb

Please sign in to comment.