diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index b7523ee12b56a..7eb04519555b2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -687,11 +687,25 @@ protected final List, ResourceId>> finalizeDestinati distinctFilenames.get(finalFilename)); distinctFilenames.put(finalFilename, result); outputFilenames.add(KV.of(result, finalFilename)); - FileSystems.reportSinkLineage(finalFilename); } + reportSinkLineage(outputFilenames); return outputFilenames; } + /** + * Report sink Lineage. Report every file if number of files no more than 100, otherwise only + * report at directory level. + */ + private void reportSinkLineage(List, ResourceId>> outputFilenames) { + if (outputFilenames.size() <= 100) { + for (KV, ResourceId> kv : outputFilenames) { + FileSystems.reportSinkLineage(kv.getValue()); + } + } else { + FileSystems.reportSinkLineage(outputFilenames.get(0).getValue().getCurrentDirectory()); + } + } + private Collection> createMissingEmptyShards( @Nullable DestinationT dest, @Nullable Integer numShards, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 7ddfde441aedc..8d6e52c64a527 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -26,10 +26,12 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.io.FileSystem.LineageLevel; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; @@ -297,9 +299,10 @@ public final List> split( System.currentTimeMillis() - startTime, expandedFiles.size(), splitResults.size()); + + reportSourceLineage(expandedFiles); return splitResults; } else { - FileSystems.reportSourceLineage(getSingleFileMetadata().resourceId()); if (isSplittable()) { @SuppressWarnings("unchecked") List> splits = @@ -315,6 +318,37 @@ public final List> split( } } + /** + * Report source Lineage. Due to the size limit of Beam metrics, report full file name or only dir + * depend on the number of files. + * + *

- Number of files<=100, report full file paths; + * + *

- Number of directory<=100, report directory names (one level up); + * + *

- Otherwise, report top level only. + */ + private static void reportSourceLineage(List expandedFiles) { + if (expandedFiles.size() <= 100) { + for (Metadata metadata : expandedFiles) { + FileSystems.reportSourceLineage(metadata.resourceId()); + } + } else { + HashSet uniqueDirs = new HashSet<>(); + for (Metadata metadata : expandedFiles) { + ResourceId dir = metadata.resourceId().getCurrentDirectory(); + uniqueDirs.add(dir); + if (uniqueDirs.size() > 100) { + FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL); + return; + } + } + for (ResourceId uniqueDir : uniqueDirs) { + FileSystems.reportSourceLineage(uniqueDir); + } + } + } + /** * Determines whether a file represented by this source is can be split into bundles. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java index 11314a318b256..73caa7284e986 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -157,10 +157,20 @@ protected abstract void rename( */ protected abstract String getScheme(); + public enum LineageLevel { + FILE, + TOP_LEVEL + } + + /** Report {@link Lineage} metrics for resource id at file level. */ + protected void reportLineage(ResourceIdT resourceId, Lineage lineage) { + reportLineage(resourceId, lineage, LineageLevel.FILE); + } + /** - * Report {@link Lineage} metrics for resource id. + * Report {@link Lineage} metrics for resource id to a given level. * *

Unless override by FileSystem implementations, default to no-op. */ - protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage) {} + protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage, LineageLevel level) {} } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index a4ca9b80dce37..fb25cac6262f9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -39,6 +39,7 @@ import java.util.regex.Pattern; import javax.annotation.Nonnull; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.FileSystem.LineageLevel; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; @@ -398,12 +399,36 @@ public ResourceId apply(@Nonnull Metadata input) { /** Report source {@link Lineage} metrics for resource id. */ public static void reportSourceLineage(ResourceId resourceId) { - getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSources()); + reportSourceLineage(resourceId, LineageLevel.FILE); } /** Report sink {@link Lineage} metrics for resource id. */ public static void reportSinkLineage(ResourceId resourceId) { - getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSinks()); + reportSinkLineage(resourceId, LineageLevel.FILE); + } + + /** + * Report source {@link Lineage} metrics for resource id at given level. + * + *

Internal API, no backward compatibility guaranteed. + */ + public static void reportSourceLineage(ResourceId resourceId, LineageLevel level) { + reportLineage(resourceId, Lineage.getSources(), level); + } + + /** + * Report source {@link Lineage} metrics for resource id at given level. + * + *

Internal API, no backward compatibility guaranteed. + */ + public static void reportSinkLineage(ResourceId resourceId, LineageLevel level) { + reportLineage(resourceId, Lineage.getSinks(), level); + } + + /** Report {@link Lineage} metrics for resource id at given level to given Lineage container. */ + private static void reportLineage(ResourceId resourceId, Lineage lineage, LineageLevel level) { + FileSystem fileSystem = getFileSystemInternal(resourceId.getScheme()); + fileSystem.reportLineage(resourceId, lineage, level); } private static class FilterResult { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java index bbac337f2d0fe..843deb5cab320 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java @@ -19,7 +19,9 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; +import java.util.HashSet; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.FileSystem.LineageLevel; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.range.OffsetRange; @@ -30,6 +32,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.checkerframework.checker.nullness.qual.Nullable; public abstract class ReadAllViaFileBasedSourceTransform extends PTransform, PCollection> { @@ -81,6 +84,9 @@ public static class SplitIntoRangesFn extends DoFn> { private final long desiredBundleSizeBytes; + // track unique resourceId met. Access it only inside reportSourceLineage + private transient @Nullable HashSet uniqueIds; + public SplitIntoRangesFn(long desiredBundleSizeBytes) { this.desiredBundleSizeBytes = desiredBundleSizeBytes; } @@ -88,6 +94,7 @@ public SplitIntoRangesFn(long desiredBundleSizeBytes) { @ProcessElement public void process(ProcessContext c) { MatchResult.Metadata metadata = c.element().getMetadata(); + reportSourceLineage(metadata.resourceId()); if (!metadata.isReadSeekEfficient()) { c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes()))); return; @@ -97,6 +104,31 @@ public void process(ProcessContext c) { c.output(KV.of(c.element(), range)); } } + + /** + * Report source Lineage. Due to the size limit of Beam metrics, report full file name or only + * top level depend on the number of files. + * + *

- Number of files<=100, report full file paths; + * + *

- Otherwise, report top level only. + */ + @SuppressWarnings("nullness") // only called in processElement, guaranteed to be non-null + private void reportSourceLineage(ResourceId resourceId) { + if (uniqueIds == null) { + uniqueIds = new HashSet<>(); + } else if (uniqueIds.isEmpty()) { + // already at capacity + FileSystems.reportSourceLineage(resourceId, LineageLevel.TOP_LEVEL); + return; + } + uniqueIds.add(resourceId); + FileSystems.reportSourceLineage(resourceId, LineageLevel.FILE); + if (uniqueIds.size() >= 100) { + // avoid reference leak + uniqueIds.clear(); + } + } } public abstract static class AbstractReadFileRangesFn @@ -140,7 +172,6 @@ public void process(ProcessContext c) throws IOException { throw e; } } - FileSystems.reportSourceLineage(resourceId); } } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java index 6332051c0ddc7..32079ebf55a38 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java @@ -217,9 +217,19 @@ protected String getScheme() { @Override protected void reportLineage(GcsResourceId resourceId, Lineage lineage) { + reportLineage(resourceId, lineage, LineageLevel.FILE); + } + + @Override + protected void reportLineage(GcsResourceId resourceId, Lineage lineage, LineageLevel level) { GcsPath path = resourceId.getGcsPath(); if (!path.getBucket().isEmpty()) { - lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject())); + ImmutableList.Builder segments = + ImmutableList.builder().add(path.getBucket()); + if (level != LineageLevel.TOP_LEVEL && !path.getObject().isEmpty()) { + segments.add(path.getObject()); + } + lineage.add("gcs", segments.build()); } else { LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject()); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java index 0b79cde1f187d..f2ff7118f95de 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java @@ -23,6 +23,9 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.api.services.storage.model.Objects; @@ -38,6 +41,7 @@ import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -235,6 +239,20 @@ public void testMatchNonGlobs() throws Exception { contains(toFilenames(matchResults.get(4)).toArray())); } + @Test + public void testReportLineageOnBucket() { + verifyLineage("gs://testbucket", ImmutableList.of("testbucket")); + verifyLineage("gs://testbucket/", ImmutableList.of("testbucket")); + verifyLineage("gs://testbucket/foo/bar.txt", ImmutableList.of("testbucket", "foo/bar.txt")); + } + + private void verifyLineage(String uri, List expected) { + GcsResourceId path = GcsResourceId.fromGcsPath(GcsPath.fromUri(uri)); + Lineage mockLineage = mock(Lineage.class); + gcsFileSystem.reportLineage(path, mockLineage); + verify(mockLineage, times(1)).add("gcs", expected); + } + private StorageObject createStorageObject(String gcsFilename, long fileSize) { GcsPath gcsPath = GcsPath.fromUri(gcsFilename); // Google APIs will use null for empty files. diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java index 7ed56efa44bda..75d66c46478a7 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java @@ -627,7 +627,17 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir @Override protected void reportLineage(S3ResourceId resourceId, Lineage lineage) { - lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey())); + reportLineage(resourceId, lineage, LineageLevel.FILE); + } + + @Override + protected void reportLineage(S3ResourceId resourceId, Lineage lineage, LineageLevel level) { + ImmutableList.Builder segments = + ImmutableList.builder().add(resourceId.getBucket()); + if (level != LineageLevel.TOP_LEVEL && !resourceId.getKey().isEmpty()) { + segments.add(resourceId.getKey()); + } + lineage.add("s3", segments.build()); } /** diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java index fbef40f4b5c04..db749d7080e2c 100644 --- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java @@ -34,6 +34,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.notNull; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -74,6 +75,7 @@ import org.apache.beam.sdk.io.aws.options.S3Options; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -1209,6 +1211,21 @@ public void testWriteAndReadWithS3Options() throws IOException { open.close(); } + @Test + public void testReportLineageOnBucket() { + verifyLineage("s3://testbucket", ImmutableList.of("testbucket")); + verifyLineage("s3://testbucket/", ImmutableList.of("testbucket")); + verifyLineage("s3://testbucket/foo/bar.txt", ImmutableList.of("testbucket", "foo/bar.txt")); + } + + private void verifyLineage(String uri, List expected) { + S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Config("mys3"), client); + S3ResourceId path = S3ResourceId.fromUri(uri); + Lineage mockLineage = mock(Lineage.class); + s3FileSystem.reportLineage(path, mockLineage); + verify(mockLineage, times(1)).add("s3", expected); + } + /** A mockito argument matcher to implement equality on GetObjectMetadataRequest. */ private static class GetObjectMetadataRequestMatcher implements ArgumentMatcher { diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java index 384c8c627ee7f..e851f8333d0b2 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java @@ -658,7 +658,17 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir @Override protected void reportLineage(S3ResourceId resourceId, Lineage lineage) { - lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey())); + reportLineage(resourceId, lineage, LineageLevel.FILE); + } + + @Override + protected void reportLineage(S3ResourceId resourceId, Lineage lineage, LineageLevel level) { + ImmutableList.Builder segments = + ImmutableList.builder().add(resourceId.getBucket()); + if (level != LineageLevel.TOP_LEVEL && !resourceId.getKey().isEmpty()) { + segments.add(resourceId.getKey()); + } + lineage.add("s3", segments.build()); } /** diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemTest.java index 423176e52a75f..39995b8b31670 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemTest.java @@ -34,6 +34,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -55,6 +56,7 @@ import org.apache.beam.sdk.io.aws2.options.S3Options; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -1068,6 +1070,21 @@ public void testWriteAndRead() throws IOException { open.close(); } + @Test + public void testReportLineageOnBucket() { + verifyLineage("s3://testbucket", ImmutableList.of("testbucket")); + verifyLineage("s3://testbucket/", ImmutableList.of("testbucket")); + verifyLineage("s3://testbucket/foo/bar.txt", ImmutableList.of("testbucket", "foo/bar.txt")); + } + + private void verifyLineage(String uri, List expected) { + S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Config("mys3"), client); + S3ResourceId path = S3ResourceId.fromUri(uri); + Lineage mockLineage = mock(Lineage.class); + s3FileSystem.reportLineage(path, mockLineage); + verify(mockLineage, times(1)).add("s3", expected); + } + /** A mockito argument matcher to implement equality on GetHeadObjectRequest. */ private static class GetHeadObjectRequestMatcher implements ArgumentMatcher { diff --git a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java index 5137eaf9bb2dc..bbb2e22d94ce6 100644 --- a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java +++ b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java @@ -453,7 +453,12 @@ protected AzfsResourceId matchNewResource(String singleResourceSpec, boolean isD @Override protected void reportLineage(AzfsResourceId resourceId, Lineage lineage) { - if (!Strings.isNullOrEmpty(resourceId.getBlob())) { + reportLineage(resourceId, lineage, LineageLevel.FILE); + } + + @Override + protected void reportLineage(AzfsResourceId resourceId, Lineage lineage, LineageLevel level) { + if (level != LineageLevel.TOP_LEVEL && !Strings.isNullOrEmpty(resourceId.getBlob())) { lineage.add( "abs", ImmutableList.of( diff --git a/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemTest.java b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemTest.java index 545f314688c3c..27a2220c2e447 100644 --- a/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemTest.java +++ b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemTest.java @@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -51,6 +52,7 @@ import org.apache.beam.sdk.io.azure.options.BlobstoreOptions; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -338,4 +340,20 @@ public void testMatchNonGlobs() throws Exception { blobContainerClient.delete(); } + + @Test + public void testReportLineageOnBucket() { + verifyLineage("azfs://account/container", ImmutableList.of("account", "container")); + verifyLineage("azfs://account/container/", ImmutableList.of("account", "container")); + verifyLineage( + "azfs://account/container/foo/bar.txt", + ImmutableList.of("account", "container", "foo/bar.txt")); + } + + private void verifyLineage(String uri, List expected) { + AzfsResourceId path = AzfsResourceId.fromUri(uri); + Lineage mockLineage = mock(Lineage.class); + azureBlobStoreFileSystem.reportLineage(path, mockLineage); + verify(mockLineage, times(1)).add("abs", expected); + } } diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java index 859c03ed7750d..ecdde5cbc8fe7 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -154,9 +154,16 @@ public void writeThenReadAll() { PipelineResult result = pipeline.run(); PipelineResult.State pipelineState = result.waitUntilFinish(); - assertEquals( - Lineage.query(result.metrics(), Lineage.Type.SOURCE), - Lineage.query(result.metrics(), Lineage.Type.SINK)); + + Set sources = Lineage.query(result.metrics(), Lineage.Type.SOURCE); + Set sinks = Lineage.query(result.metrics(), Lineage.Type.SINK); + if (numShards <= 100) { + // both should be the full files, if supported by the runner + assertEquals(sources, sinks); + } else { + // if supported by runner, both should be non-empty + assertEquals(sources.isEmpty(), sinks.isEmpty()); + } collectAndPublishMetrics(result); // Fail the test if pipeline failed. diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py index e181beac4a584..ffbce5893a969 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem.py @@ -315,10 +315,14 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage): + def report_lineage(self, path, lineage, level=None): try: - components = s3io.parse_s3_path(path, get_account=True) + components = s3io.parse_s3_path(path, object_optional=True) except ValueError: # report lineage is fail-safe return + if level == FileSystem.LineageLevel.TOP_LEVEL or \ + (len(components) > 1 and components[-1] == ''): + # bucket only + components = components[:-1] lineage.add('s3', *components) diff --git a/sdks/python/apache_beam/io/aws/s3filesystem_test.py b/sdks/python/apache_beam/io/aws/s3filesystem_test.py index 60e6f319b2c96..87403f482bd25 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem_test.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem_test.py @@ -265,6 +265,15 @@ def test_rename(self, unused_mock_arg): src_dest_pairs = list(zip(sources, destinations)) s3io_mock.rename_files.assert_called_once_with(src_dest_pairs) + def test_lineage(self): + self._verify_lineage("s3://bucket/", ("bucket", )) + self._verify_lineage("s3://bucket/foo/bar.txt", ("bucket", "foo/bar.txt")) + + def _verify_lineage(self, uri, expected_segments): + lineage_mock = mock.MagicMock() + self.fs.report_lineage(uri, lineage_mock) + lineage_mock.add.assert_called_once_with("s3", *expected_segments) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index bb56fa09d3703..4495245dc54a3 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -317,10 +317,15 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage): + def report_lineage(self, path, lineage, level=None): try: - components = blobstorageio.parse_azfs_path(path, get_account=True) + components = blobstorageio.parse_azfs_path( + path, blob_optional=True, get_account=True) except ValueError: # report lineage is fail-safe return + if level == FileSystem.LineageLevel.TOP_LEVEL \ + or(len(components) > 1 and components[-1] == ''): + # bucket only + components = components[:-1] lineage.add('abs', *components) diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py index cee459f5b8a20..138fe5f78b20c 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py @@ -320,6 +320,18 @@ def test_rename(self, unused_mock_blobstorageio): src_dest_pairs = list(zip(sources, destinations)) blobstorageio_mock.rename_files.assert_called_once_with(src_dest_pairs) + def test_lineage(self): + self._verify_lineage( + "azfs://storageaccount/container/", ("storageaccount", "container")) + self._verify_lineage( + "azfs://storageaccount/container/foo/bar.txt", + ("storageaccount", "container", "foo/bar.txt")) + + def _verify_lineage(self, uri, expected_segments): + lineage_mock = mock.MagicMock() + self.fs.report_lineage(uri, lineage_mock) + lineage_mock.add.assert_called_once_with("abs", *expected_segments) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index c708e117c3a1d..f9d4303c8c785 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -280,9 +280,31 @@ def _check_state_for_finalize_write(self, writer_results, num_shards): src_files.append(src) dst_files.append(dst) - FileSystems.report_sink_lineage(dst) + + self._report_sink_lineage(dst_glob, dst_files) return src_files, dst_files, delete_files, num_skipped + def _report_sink_lineage(self, dst_glob, dst_files): + """ + Report sink Lineage. Report every file if number of files no more than 100, + otherwise only report at directory level. + """ + if len(dst_files) <= 100: + for dst in dst_files: + FileSystems.report_sink_lineage(dst) + else: + dst = dst_glob + # dst_glob has a wildcard for shard number (see _shard_name_template) + sep = dst_glob.find('*') + if sep > 0: + dst = dst[:sep] + try: + dst, _ = FileSystems.split(dst) + except ValueError: + return # lineage report is fail-safe + + FileSystems.report_sink_lineage(dst) + @check_accessible(['file_path_prefix']) def finalize_write( self, init_result, writer_results, unused_pre_finalize_results): diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index efd863810ed75..a02bc6de32c73 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -39,6 +39,7 @@ from apache_beam.io import range_trackers from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata +from apache_beam.io.filesystem import FileSystem from apache_beam.io.filesystems import FileSystems from apache_beam.io.restriction_trackers import OffsetRange from apache_beam.options.value_provider import StaticValueProvider @@ -168,10 +169,38 @@ def _get_concat_source(self) -> concat_source.ConcatSource: min_bundle_size=self._min_bundle_size, splittable=splittable) single_file_sources.append(single_file_source) - FileSystems.report_source_lineage(file_name) + + self._report_source_lineage(files_metadata) self._concat_source = concat_source.ConcatSource(single_file_sources) + return self._concat_source + def _report_source_lineage(self, files_metadata): + """ + Report source Lineage. depend on the number of files, report full file + name, only dir, or only top level + """ + if len(files_metadata) <= 100: + for file_metadata in files_metadata: + FileSystems.report_source_lineage(file_metadata.path) + else: + size_track = set() + for file_metadata in files_metadata: + if len(size_track) >= 100: + FileSystems.report_source_lineage( + file_metadata.path, level=FileSystem.LineageLevel.TOP_LEVEL) + return + + try: + base, _ = FileSystems.split(file_metadata.path) + except ValueError: + pass + else: + size_track.add(base) + + for base in size_track: + FileSystems.report_source_lineage(base) + def open_file(self, file_name): return FileSystems.open( file_name, @@ -343,6 +372,7 @@ def __init__( self._min_bundle_size = min_bundle_size self._splittable = splittable self._compression_type = compression_type + self._size_track = None def process(self, element: Union[str, FileMetadata], *args, **kwargs) -> Iterable[Tuple[FileMetadata, OffsetRange]]: @@ -352,7 +382,8 @@ def process(self, element: Union[str, FileMetadata], *args, match_results = FileSystems.match([element]) metadata_list = match_results[0].metadata_list for metadata in metadata_list: - FileSystems.report_source_lineage(metadata.path) + self._report_source_lineage(metadata.path) + splittable = ( self._splittable and _determine_splittability_from_compression_type( metadata.path, self._compression_type)) @@ -366,6 +397,28 @@ def process(self, element: Union[str, FileMetadata], *args, metadata, OffsetRange(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY)) + def _report_source_lineage(self, path): + """ + Report source Lineage. Due to the size limit of Beam metrics, report full + file name or only top level depend on the number of files. + + * Number of files<=100, report full file paths; + + * Otherwise, report top level only. + """ + if self._size_track is None: + self._size_track = set() + elif len(self._size_track) == 0: + FileSystems.report_source_lineage( + path, level=FileSystem.LineageLevel.TOP_LEVEL) + return + + self._size_track.add(path) + FileSystems.report_source_lineage(path) + + if len(self._size_track) >= 100: + self._size_track.clear() + class _ReadRange(DoFn): def __init__( diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index bdc25dcf0fe54..840fdf3309e7b 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -934,7 +934,11 @@ def delete(self, paths): """ raise NotImplementedError - def report_lineage(self, path, unused_lineage): + class LineageLevel: + FILE = 'FILE' + TOP_LEVEL = 'TOP_LEVEL' + + def report_lineage(self, path, unused_lineage, level=None): """ Report Lineage metrics for path. diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index ccbeac640765c..a32b85332b608 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -391,13 +391,27 @@ def get_chunk_size(path): return filesystem.CHUNK_SIZE @staticmethod - def report_source_lineage(path): - """Report source :class:`~apache_beam.metrics.metric.Lineage`.""" + def report_source_lineage(path, level=None): + """ + Report source :class:`~apache_beam.metrics.metric.Lineage`. + + Args: + path: string path to be reported. + level: the level of file path. default to + :class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE. + """ filesystem = FileSystems.get_filesystem(path) - filesystem.report_lineage(path, Lineage.sources()) + filesystem.report_lineage(path, Lineage.sources(), level=level) @staticmethod - def report_sink_lineage(path): - """Report sink :class:`~apache_beam.metrics.metric.Lineage`.""" + def report_sink_lineage(path, level=None): + """ + Report sink :class:`~apache_beam.metrics.metric.Lineage`. + + Args: + path: string path to be reported. + level: the level of file path. default to + :class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE. + """ filesystem = FileSystems.get_filesystem(path) - filesystem.report_lineage(path, Lineage.sinks()) + filesystem.report_lineage(path, Lineage.sinks(), level=level) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 053b02d325a5c..325f70ddfd96d 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -366,10 +366,14 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage): + def report_lineage(self, path, lineage, level=None): try: - bucket, blob = gcsio.parse_gcs_path(path) + components = gcsio.parse_gcs_path(path, object_optional=True) except ValueError: # report lineage is fail-safe return - lineage.add('gcs', bucket, blob) + if level == FileSystem.LineageLevel.TOP_LEVEL \ + or(len(components) > 1 and components[-1] == ''): + # bucket only + components = components[:-1] + lineage.add('gcs', *components) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 1206529faf01c..ec7fa94b05fd4 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -375,6 +375,15 @@ def test_delete_error(self, mock_gcsio): self.fs.delete(files) gcsio_mock.delete_batch.assert_called() + def test_lineage(self): + self._verify_lineage("gs://bucket/", ("bucket", )) + self._verify_lineage("gs://bucket/foo/bar.txt", ("bucket", "foo/bar.txt")) + + def _verify_lineage(self, uri, expected_segments): + lineage_mock = mock.MagicMock() + self.fs.report_lineage(uri, lineage_mock) + lineage_mock.add.assert_called_once_with("gcs", *expected_segments) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)