Skip to content

Commit

Permalink
responding to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cbb330 committed Dec 10, 2024
1 parent 2bd29f5 commit f725479
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

/**
* Represents a summary of data files, including the content, total size, and total count of files.
* This class can be used to store and manipulate summary data for collections of files, and
* provides a encoder for use with serializing Spark typed datasets
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
Expand All @@ -16,11 +21,16 @@ public class DataFilesSummary {
private Long totalFileCount;

/**
* Returns the Spark Encoder for this class.
* Returns the Spark Encoder for this class using shared object and thread-safe
* initialization-on-demand holder idiom.
*
* @return Encoder for ExampleData
* @return Encoder for DataFilesSummary
*/
private static class EncoderSingleton {
public static final Encoder<DataFilesSummary> instance = Encoders.bean(DataFilesSummary.class);
}

public static Encoder<DataFilesSummary> getEncoder() {
return Encoders.bean(DataFilesSummary.class);
return EncoderSingleton.instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ public static IcebergTableStats populateStatsOfAllReferencedFiles(
public static IcebergTableStats populateStatsForSnapshots(
String fqtn, Table table, SparkSession spark, IcebergTableStats stats) {

// TODO: turn these longs into attributes of a class and use the class as a schema which spark
// will use as an
// encoding when serializing the table into the spark dataframe
Map<Integer, DataFilesSummary> currentSnapshotDataFilesSummary =
getFileMetadataTable(table, spark, MetadataTableType.FILES);

Expand Down Expand Up @@ -263,12 +260,17 @@ private static long getManifestFilesCount(
.count();
}

/** Get all data files count depending on metadata type to query. */
/**
* Return summary of table files content either from all snapshots or current snapshot depending
* on metadataTableType.
*/
private static Map<Integer, DataFilesSummary> getFileMetadataTable(
Table table, SparkSession spark, MetadataTableType metadataTableType) {
Encoder<DataFilesSummary> dataFilesSummaryEncoder = DataFilesSummary.getEncoder();
Map<Integer, DataFilesSummary> result = new HashMap<>();
SparkTableUtil.loadMetadataTable(spark, table, metadataTableType)
.select("content", "file_path", "file_size_in_bytes")
.dropDuplicates()
.groupBy("content")
.agg(count("*").as("totalFileCount"), sum("file_size_in_bytes").as("sumOfFileSizeBytes"))
.as(dataFilesSummaryEncoder)
Expand Down

0 comments on commit f725479

Please sign in to comment.