Skip to content

Commit

Permalink
Report File Lineage on directory (#32662) (#32706)
Browse files Browse the repository at this point in the history
* Report File Lineage on directory

* added comments, restore lineage assert in TextIOIT

* Report bucket level Lineage for files larger than 100

* fix lint
  • Loading branch information
Abacn authored Oct 9, 2024
1 parent 64854bb commit 96a96da
Show file tree
Hide file tree
Showing 24 changed files with 393 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,25 @@ protected final List<KV<FileResult<DestinationT>, ResourceId>> finalizeDestinati
distinctFilenames.get(finalFilename));
distinctFilenames.put(finalFilename, result);
outputFilenames.add(KV.of(result, finalFilename));
FileSystems.reportSinkLineage(finalFilename);
}
reportSinkLineage(outputFilenames);
return outputFilenames;
}

/**
* Report sink Lineage. Report every file if number of files no more than 100, otherwise only
* report at directory level.
*/
private void reportSinkLineage(List<KV<FileResult<DestinationT>, ResourceId>> outputFilenames) {
if (outputFilenames.size() <= 100) {
for (KV<FileResult<DestinationT>, ResourceId> kv : outputFilenames) {
FileSystems.reportSinkLineage(kv.getValue());
}
} else {
FileSystems.reportSinkLineage(outputFilenames.get(0).getValue().getCurrentDirectory());
}
}

private Collection<FileResult<DestinationT>> createMissingEmptyShards(
@Nullable DestinationT dest,
@Nullable Integer numShards,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.FileSystem.LineageLevel;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
Expand Down Expand Up @@ -297,9 +299,10 @@ public final List<? extends FileBasedSource<T>> split(
System.currentTimeMillis() - startTime,
expandedFiles.size(),
splitResults.size());

reportSourceLineage(expandedFiles);
return splitResults;
} else {
FileSystems.reportSourceLineage(getSingleFileMetadata().resourceId());
if (isSplittable()) {
@SuppressWarnings("unchecked")
List<FileBasedSource<T>> splits =
Expand All @@ -315,6 +318,37 @@ public final List<? extends FileBasedSource<T>> split(
}
}

/**
* Report source Lineage. Due to the size limit of Beam metrics, report full file name or only dir
* depend on the number of files.
*
* <p>- Number of files<=100, report full file paths;
*
* <p>- Number of directory<=100, report directory names (one level up);
*
* <p>- Otherwise, report top level only.
*/
private static void reportSourceLineage(List<Metadata> expandedFiles) {
if (expandedFiles.size() <= 100) {
for (Metadata metadata : expandedFiles) {
FileSystems.reportSourceLineage(metadata.resourceId());
}
} else {
HashSet<ResourceId> uniqueDirs = new HashSet<>();
for (Metadata metadata : expandedFiles) {
ResourceId dir = metadata.resourceId().getCurrentDirectory();
uniqueDirs.add(dir);
if (uniqueDirs.size() > 100) {
FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL);
return;
}
}
for (ResourceId uniqueDir : uniqueDirs) {
FileSystems.reportSourceLineage(uniqueDir);
}
}
}

/**
* Determines whether a file represented by this source is can be split into bundles.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,20 @@ protected abstract void rename(
*/
protected abstract String getScheme();

public enum LineageLevel {
FILE,
TOP_LEVEL
}

/** Report {@link Lineage} metrics for resource id at file level. */
protected void reportLineage(ResourceIdT resourceId, Lineage lineage) {
reportLineage(resourceId, lineage, LineageLevel.FILE);
}

/**
* Report {@link Lineage} metrics for resource id.
* Report {@link Lineage} metrics for resource id to a given level.
*
* <p>Unless override by FileSystem implementations, default to no-op.
*/
protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage) {}
protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage, LineageLevel level) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.FileSystem.LineageLevel;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
Expand Down Expand Up @@ -398,12 +399,36 @@ public ResourceId apply(@Nonnull Metadata input) {

/** Report source {@link Lineage} metrics for resource id. */
public static void reportSourceLineage(ResourceId resourceId) {
getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSources());
reportSourceLineage(resourceId, LineageLevel.FILE);
}

/** Report sink {@link Lineage} metrics for resource id. */
public static void reportSinkLineage(ResourceId resourceId) {
getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSinks());
reportSinkLineage(resourceId, LineageLevel.FILE);
}

/**
* Report source {@link Lineage} metrics for resource id at given level.
*
* <p>Internal API, no backward compatibility guaranteed.
*/
public static void reportSourceLineage(ResourceId resourceId, LineageLevel level) {
reportLineage(resourceId, Lineage.getSources(), level);
}

/**
* Report source {@link Lineage} metrics for resource id at given level.
*
* <p>Internal API, no backward compatibility guaranteed.
*/
public static void reportSinkLineage(ResourceId resourceId, LineageLevel level) {
reportLineage(resourceId, Lineage.getSinks(), level);
}

/** Report {@link Lineage} metrics for resource id at given level to given Lineage container. */
private static void reportLineage(ResourceId resourceId, Lineage lineage, LineageLevel level) {
FileSystem fileSystem = getFileSystemInternal(resourceId.getScheme());
fileSystem.reportLineage(resourceId, lineage, level);
}

private static class FilterResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.HashSet;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.FileSystem.LineageLevel;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.range.OffsetRange;
Expand All @@ -30,6 +32,7 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.nullness.qual.Nullable;

public abstract class ReadAllViaFileBasedSourceTransform<InT, T>
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
Expand Down Expand Up @@ -81,13 +84,17 @@ public static class SplitIntoRangesFn
extends DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>> {
private final long desiredBundleSizeBytes;

// track unique resourceId met. Access it only inside reportSourceLineage
private transient @Nullable HashSet<ResourceId> uniqueIds;

public SplitIntoRangesFn(long desiredBundleSizeBytes) {
this.desiredBundleSizeBytes = desiredBundleSizeBytes;
}

@ProcessElement
public void process(ProcessContext c) {
MatchResult.Metadata metadata = c.element().getMetadata();
reportSourceLineage(metadata.resourceId());
if (!metadata.isReadSeekEfficient()) {
c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes())));
return;
Expand All @@ -97,6 +104,31 @@ public void process(ProcessContext c) {
c.output(KV.of(c.element(), range));
}
}

/**
* Report source Lineage. Due to the size limit of Beam metrics, report full file name or only
* top level depend on the number of files.
*
* <p>- Number of files<=100, report full file paths;
*
* <p>- Otherwise, report top level only.
*/
@SuppressWarnings("nullness") // only called in processElement, guaranteed to be non-null
private void reportSourceLineage(ResourceId resourceId) {
if (uniqueIds == null) {
uniqueIds = new HashSet<>();
} else if (uniqueIds.isEmpty()) {
// already at capacity
FileSystems.reportSourceLineage(resourceId, LineageLevel.TOP_LEVEL);
return;
}
uniqueIds.add(resourceId);
FileSystems.reportSourceLineage(resourceId, LineageLevel.FILE);
if (uniqueIds.size() >= 100) {
// avoid reference leak
uniqueIds.clear();
}
}
}

public abstract static class AbstractReadFileRangesFn<InT, T>
Expand Down Expand Up @@ -140,7 +172,6 @@ public void process(ProcessContext c) throws IOException {
throw e;
}
}
FileSystems.reportSourceLineage(resourceId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,19 @@ protected String getScheme() {

@Override
protected void reportLineage(GcsResourceId resourceId, Lineage lineage) {
reportLineage(resourceId, lineage, LineageLevel.FILE);
}

@Override
protected void reportLineage(GcsResourceId resourceId, Lineage lineage, LineageLevel level) {
GcsPath path = resourceId.getGcsPath();
if (!path.getBucket().isEmpty()) {
lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject()));
ImmutableList.Builder<String> segments =
ImmutableList.<String>builder().add(path.getBucket());
if (level != LineageLevel.TOP_LEVEL && !path.getObject().isEmpty()) {
segments.add(path.getObject());
}
lineage.add("gcs", segments.build());
} else {
LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.services.storage.model.Objects;
Expand All @@ -38,6 +41,7 @@
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -235,6 +239,20 @@ public void testMatchNonGlobs() throws Exception {
contains(toFilenames(matchResults.get(4)).toArray()));
}

@Test
public void testReportLineageOnBucket() {
verifyLineage("gs://testbucket", ImmutableList.of("testbucket"));
verifyLineage("gs://testbucket/", ImmutableList.of("testbucket"));
verifyLineage("gs://testbucket/foo/bar.txt", ImmutableList.of("testbucket", "foo/bar.txt"));
}

private void verifyLineage(String uri, List<String> expected) {
GcsResourceId path = GcsResourceId.fromGcsPath(GcsPath.fromUri(uri));
Lineage mockLineage = mock(Lineage.class);
gcsFileSystem.reportLineage(path, mockLineage);
verify(mockLineage, times(1)).add("gcs", expected);
}

private StorageObject createStorageObject(String gcsFilename, long fileSize) {
GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
// Google APIs will use null for empty files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,17 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey()));
reportLineage(resourceId, lineage, LineageLevel.FILE);
}

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage lineage, LineageLevel level) {
ImmutableList.Builder<String> segments =
ImmutableList.<String>builder().add(resourceId.getBucket());
if (level != LineageLevel.TOP_LEVEL && !resourceId.getKey().isEmpty()) {
segments.add(resourceId.getKey());
}
lineage.add("s3", segments.build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.beam.sdk.io.aws.options.S3Options;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -1209,6 +1211,21 @@ public void testWriteAndReadWithS3Options() throws IOException {
open.close();
}

@Test
public void testReportLineageOnBucket() {
verifyLineage("s3://testbucket", ImmutableList.of("testbucket"));
verifyLineage("s3://testbucket/", ImmutableList.of("testbucket"));
verifyLineage("s3://testbucket/foo/bar.txt", ImmutableList.of("testbucket", "foo/bar.txt"));
}

private void verifyLineage(String uri, List<String> expected) {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Config("mys3"), client);
S3ResourceId path = S3ResourceId.fromUri(uri);
Lineage mockLineage = mock(Lineage.class);
s3FileSystem.reportLineage(path, mockLineage);
verify(mockLineage, times(1)).add("s3", expected);
}

/** A mockito argument matcher to implement equality on GetObjectMetadataRequest. */
private static class GetObjectMetadataRequestMatcher
implements ArgumentMatcher<GetObjectMetadataRequest> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,17 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey()));
reportLineage(resourceId, lineage, LineageLevel.FILE);
}

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage lineage, LineageLevel level) {
ImmutableList.Builder<String> segments =
ImmutableList.<String>builder().add(resourceId.getBucket());
if (level != LineageLevel.TOP_LEVEL && !resourceId.getKey().isEmpty()) {
segments.add(resourceId.getKey());
}
lineage.add("s3", segments.build());
}

/**
Expand Down
Loading

0 comments on commit 96a96da

Please sign in to comment.