diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/DataFilesSummary.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/DataFilesSummary.java index 62ad398b..7d3c8cc1 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/DataFilesSummary.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/DataFilesSummary.java @@ -6,6 +6,11 @@ import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; +/** + * Represents a summary of data files, including the content, total size, and total count of files. + * This class can be used to store and manipulate summary data for collections of files, and + * provides a encoder for use with serializing Spark typed datasets + */ @Data @AllArgsConstructor @NoArgsConstructor @@ -16,11 +21,16 @@ public class DataFilesSummary { private Long totalFileCount; /** - * Returns the Spark Encoder for this class. + * Returns the Spark Encoder for this class using shared object and thread-safe + * initialization-on-demand holder idiom. * - * @return Encoder for ExampleData + * @return Encoder for DataFilesSummary */ + private static class EncoderSingleton { + public static final Encoder instance = Encoders.bean(DataFilesSummary.class); + } + public static Encoder getEncoder() { - return Encoders.bean(DataFilesSummary.class); + return EncoderSingleton.instance; } } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java index 6a9d0a1d..e7d5c9ea 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java @@ -115,9 +115,6 @@ public static IcebergTableStats populateStatsOfAllReferencedFiles( public static IcebergTableStats populateStatsForSnapshots( String fqtn, Table table, SparkSession spark, IcebergTableStats stats) { - // TODO: turn these longs into attributes of a class and use the class as a schema which spark - // will use as an - // encoding when serializing the table into the spark dataframe Map currentSnapshotDataFilesSummary = getFileMetadataTable(table, spark, MetadataTableType.FILES); @@ -263,12 +260,17 @@ private static long getManifestFilesCount( .count(); } - /** Get all data files count depending on metadata type to query. */ + /** + * Return summary of table files content either from all snapshots or current snapshot depending + * on metadataTableType. + */ private static Map getFileMetadataTable( Table table, SparkSession spark, MetadataTableType metadataTableType) { Encoder dataFilesSummaryEncoder = DataFilesSummary.getEncoder(); Map result = new HashMap<>(); SparkTableUtil.loadMetadataTable(spark, table, metadataTableType) + .select("content", "file_path", "file_size_in_bytes") + .dropDuplicates() .groupBy("content") .agg(count("*").as("totalFileCount"), sum("file_size_in_bytes").as("sumOfFileSizeBytes")) .as(dataFilesSummaryEncoder)