Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Action to Analyze table #10288

Merged
merged 25 commits into from
Aug 22, 2024
Merged

Conversation

karuppayya
Copy link
Contributor

This change adds a Spark action to Analyze tables.
As part of analysis, the action generates Apache data - sketch for NDV stats and writes it as puffins.

@karuppayya
Copy link
Contributor Author

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Computes the statistic of the given columns and stores it as Puffin files. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalyzeTableSparkAction is a generic name, I see that in future we want to compute the partition stats too. Which may not be written as puffin files.

Either we can change the change the naming to computeNDVSketches or make it generic such that any kind of stats can be computed from this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more on this, I think we should just call it computeNDVSketches and not mix it with partition stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to follow the model of RDMS and Engines like Trino using ANALYZE TABLE <tblName> to collect all table level stats.
With a procedure per stats model, the user have to invoke procedure/action for every stats and
also with any new stats addition, the user need to ensure to update his code to call the new procedure/action.

not mix it with partition stats.

I think we could have partition stats as a separate action since it per partition, whereas this procedure can collect top level table stats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya
I can see the tests in TestAnalyzeTableAction, it's working fine.
But have we tested in Spark, whether its working with a query like -
"Analyze table table1 compute statistics" ?

Because generally it gives the error
"[NOT_SUPPORTED_COMMAND_FOR_V2_TABLE] ANALYZE TABLE is not supported for v2 tables."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark doesnot have the grammar for Analyzing tables.
This PR introduces a Spark action. In subsequent PR, i plan to introduce a iceberg procedure to invoke the Spark action.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @karuppayya are we planning to induce the other column-level stats in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeesou , This PR is only for the NDV stats. (PR to propagate the stats in scan)

Copy link
Contributor

@guykhazma guykhazma Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya @huaxingao following up on this discussion and issue #10791.
Wouldn't it make sense to extend the current method which extracts the theta sketches to also extract the rest of the statistics by enhancing the select expression here.
My thinking is that instead of having a custom code for generating the min/max we would rely on spark pushing it down to iceberg. Also, since we already pay the cost of generating the NDV stats it doesn't seem like the overhead will be much. To be more concrete, here is a roughly how it can be done (a lot of refactoring is needed, this is just to show the combination of operations in the same dataframe call).

Any thoughts? This can be still done in a separate PR that will follow this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya I would appreciate your input on the above.
If that make sense we can create a separate PR for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guykhazma , here is the related thread discussing about some aspects of storing min/max/null counts.

spark(), table, columnsToBeAnalyzed.toArray(new String[0]));
table
.updateStatistics()
.setStatistics(table.currentSnapshot().snapshotId(), statisticsFile)
Copy link
Member

@ajantha-bhat ajantha-bhat May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if table's current snapshot has modified concurrently by another client between like 117 to line 120?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good question. we should do #6442


public static Iterator<Tuple2<String, ThetaSketchJavaSerializable>> computeNDVSketches(
SparkSession spark, String tableName, String... columns) {
String sql = String.format("select %s from %s", String.join(",", columns), tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also think about incremental update and update sketches from previous checkpoint. Querying whole table maybe not efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, incremental need to be wired into the ends of write paths.
This procedure could exist in parallel, which could get stats of the whole table on demand.

assumeTrue(catalogName.equals("spark_catalog"));
sql(
"CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES"
+ "('format-version'='2')",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default format version itself v2 now. So, specifying it again is redundant.

String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID()));
OutputFile outputFile = fileIO.newOutputFile(path);
try (PuffinWriter writer =
Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this name instead of "analyze table procedure".

@ajantha-bhat
Copy link
Member

there was an old PR on the same: #6582

@huaxingao
Copy link
Contributor

there was an old PR on the same: #6582

I don't have time to work on this, so karuppayya will take over. Thanks a lot @karuppayya for continuing the work.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @karuppayya @huaxingao @szehon-ho this is aewsome to see! I left a review of the API/implementation, still have yet to review the tests which look to be a WIP

* @param statsToBeCollected set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these stats be a Set<StandardBlobType> instead of arbitrary Strings? I feel like the API becomes more well defined in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, StandardBlobType defines string constants not enums

Comment on lines 89 to 117
private void validateColumns() {
validateEmptyColumns();
validateTypes();
}

private void validateEmptyColumns() {
if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.isEmpty()) {
throw new ValidationException("No columns to analyze for the table", table.name());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this validation should just happen at the time of setting these on the action rather than at the execcution time.

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this interface should have a snapshot API to allow users to pass in a snapshot to generate the statistics for. If it's not specified then we can generate the statistics for the latest snapshot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support branch/tag as well? (I guess in subsequent pr)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the snapshot(String snapshotId) has been added

for branch/tag -- is it existing pattern to support this first class in APIs, or require the caller to convert the information they have (branch/tag) into a snapshot ID?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, snapshot should be fine.

Comment on lines 104 to 106
if (field == null) {
throw new ValidationException("No column with %s name in the table", columnName);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after the if

SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed)
throws IOException {
Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator =
NDVSketchGenerator.computeNDVSketches(spark, table.name(), snapshotId, columnsToBeAnalyzed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does computeDVSketches need to be public? Seems like it can just be package private. Also nit, either way don't think you need the full qualified method name

import org.apache.datasketches.theta.Sketches;
import org.apache.datasketches.theta.UpdateSketch;

public class ThetaSketchJavaSerializable implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Comment on lines 46 to 54
if (sketch == null) {
return null;
}
if (sketch instanceof UpdateSketch) {
return sketch.compact();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after if

null,
ImmutableMap.of()));
}
writer.finish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Don't think you need the writer.finish() because the try with resources will close, and close will already finish

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you need to finish() to get the final fileSize() etc

table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()),
null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null means that the file will be uncompressed. I think it makes sense not to compress these files by default since the sketch will be a single long per column, so it'll be quite small already and not worth paying the price of compression/decompression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the sketch will be a single long per column

the sketch should be more than that, small number of kb iirc
Trino uses ZSTD for the blobs, and no compression for the footer.

Comment on lines 157 to 168
if (sketch1.getSketch() == null && sketch2.getSketch() == null) {
return emptySketchWrapped;
}
if (sketch1.getSketch() == null) {
return sketch2;
}
if (sketch2.getSketch() == null) {
return sketch1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after if

@karuppayya karuppayya force-pushed the analyze_action branch 3 times, most recently from 5538f6e to de520fc Compare June 4, 2024 17:55
Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @karuppayya thanks for the patch, I left a first round of comments.

* @param columns a set of column names to be analyzed
* @return this for method chaining
*/
AnalyzeTable columns(Set<String> columns);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, how about String... columns (see RewriteDataFiles). same for the others

* @param statsToBeCollected set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call statistics? Like StatisticsFile. https://iceberg.apache.org/contribute/#java-style-guidelines I think it can interpreted differently but I think point 3 implies we should make it have the full spelling if possible, and we dont have abbreviations for API methods in most of code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also statsToBeCollected => types ?

AnalyzeTable columns(Set<String> columns);

/**
* A set of statistics to be collected on the given columns of the given table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of statistics to be collected? (given columns, given tables is specified elsewhere)

*/
AnalyzeTable snapshot(String snapshotId);

/** The action result that contains a summary of the Analysis. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plural? contains summaries of the analysis?

Also if capital, it can be a a javadoc link.

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support branch/tag as well? (I guess in subsequent pr)

(PairFlatMapFunction<Iterator<Row>, String, String>)
input -> {
final List<Tuple2<String, String>> list = Lists.newArrayList();
while (input.hasNext()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use flatmap and mapToPair to make this more concise?

data.javaRDD().flatMap(r -> {
          List<Tuple2<String, String>> list =
            Lists.newArrayListWithExpectedSize(columns.size());
          for (int i = 0; i < r.size(); i++) {
            list.add(new Tuple2<>(columns.get(i), r.get(i).toString());
          }
          return list.iterator();
          }).mapToPair(t -> t);

return ImmutableAnalyzeTable.Result.builder().analysisResults(analysisResults).build();
}

private boolean analyzeableTypes(Set<String> columns) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to intellij, there is a typo (analyzable)

final JavaPairRDD<String, ThetaSketchJavaSerializable> sketches =
pairs.aggregateByKey(
new ThetaSketchJavaSerializable(),
1, // number of partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why limit to 1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was just copied from datasketches example.
This value is used in the HashPartitioner behind the scenes.
Should we set it to spark.sql.shuffle.partitions?

return sketches.toLocalIterator();
}

static class Add
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use lambdas here for cleaner code? like

 (sketch, val) -> {
              sketch.update(val;);
              return sketch;
          },

The next one may be too complex to inline but maybe we can reduce the ugly java boilerplate

final Row row = input.next();
int size = row.size();
for (int i = 0; i < size; i++) {
list.add(new Tuple2<>(columns.get(i), row.get(i).toString()));
Copy link
Collaborator

@szehon-ho szehon-ho Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, does forcing string type affect anything? I see the sketch library takes in other types.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Started a second round


@Override
public AnalyzeTable snapshot(String snapshotIdStr) {
this.snapshotId = Long.parseLong(snapshotIdStr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should just make this take long


StatisticsFile statisticsFile =
NDVSketchGenerator.generateNDV(
spark(), table, snapshotId, columns.toArray(new String[0]));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a similar thing with a columns() method as I suggested with snapshots, that checks if the user list is null/empty and sets it to all the table columns

  private Set<String> columns() {
    return (columns != null) && (columns.size() > 0) ?
      table.schema().columns().stream()
        .map(Types.NestedField::name)
        .collect(Collectors.toSet()) : 
      columns;
  }

Then the logic is centralized here if we have more stats, rather than in NDV class.

.type(StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1)
.build();
} catch (IOException ioe) {
List<String> errors = Lists.newArrayList();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we only reporting error if we have IOException? (looks like from writing puffin file). It seems a bit strange just to catch this specific case, and not other exceptions.

It seems more natural to either catch all exceptions and report error, or else just throw all exceptions, what do you think?

private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class);

private final Table table;
private Set<String> columns = ImmutableSet.of();
Copy link
Collaborator

@szehon-ho szehon-ho Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we convert this from array to set and back a few times (its passed in as array, stored as set, and then passed as array to NDVSketchGenerator.generateNDV function). Can we just keep this as array the whole time (store this as array here)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrays are mutable, so if we decide to switch to arrays, please make sure to defensive-copy whenever passing to another class

return sketches.toLocalIterator();
}

static class Combine
Copy link
Collaborator

@szehon-ho szehon-ho Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do this to get rid of ugly Function2 definition, and also make the main method a bit cleaner.

    JavaPairRDD<String, ThetaSketchJavaSerializable> sketches =
        pairs.aggregateByKey(
            new ThetaSketchJavaSerializable(),
            Integer.parseInt(
                SQLConf.SHUFFLE_PARTITIONS().defaultValueString()), // number of partitions
            NDVSketchGenerator::update,
            NDVSketchGenerator::combine);

    return sketches.toLocalIterator();
  }

  public static ThetaSketchJavaSerializable update(ThetaSketchJavaSerializable sketch, String val) {
    sketch.update(val);
    return sketch;
  }

  public static ThetaSketchJavaSerializable combine(
        final ThetaSketchJavaSerializable sketch1, final ThetaSketchJavaSerializable sketch2) {
      if (sketch1.getSketch() == null && sketch2.getSketch() == null) {
        return emptySketchWrapped;
      }
      if (sketch1.getSketch() == null) {
        return sketch2;
      }
      if (sketch2.getSketch() == null) {
        return sketch1;
      }

      final CompactSketch compactSketch1 = sketch1.getCompactSketch();
      final CompactSketch compactSketch2 = sketch2.getCompactSketch();
      return new ThetaSketchJavaSerializable(
          new SetOperationBuilder().buildUnion().union(compactSketch1, compactSketch2));
    }

JavaPairRDD<String, ThetaSketchJavaSerializable> sketches =
pairs.aggregateByKey(
new ThetaSketchJavaSerializable(),
Integer.parseInt(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we may be able to skip passing this , and rely on Spark defaults?

Can you do a bit of research to verify?

Puffin.write(outputFile).createdBy("Iceberg Analyze action").build()) {
for (String columnName : columns) {
writer.add(
new Blob(
Copy link
Collaborator

@szehon-ho szehon-ho Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi @marton-bod would be good if you can take a look as well, to verify interop between Spark and Trino here?

Here it seems we are storing the serialized sketch, as is specified in the spec. Should we store ndv as well in 'metadata', as is specified in spec: https://github.com/apache/iceberg/blob/main/format/puffin-spec.md#apache-datasketches-theta-v1-blob-type (does this mean properties?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @szehon-ho for the ping
there are couple potential issues

  • the blob needs to have ndv property
  • the sketch needs to be updated with the standard byte[] representation of values (Conversions.toByteBuffer)
  • there should be inter-op test Spark Action to Analyze table #10288 (comment)
  • i am not exactly sure how what's the lifecycle of ThetaSketchJavaSerializable & whether this can impact the final results. need to re-read this portion

* @param types set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable types(Set<String> types);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are allowed values for the types parameter? How can someone interacting with the javadoc learn this?
is it "stats types", "blob types" or something else?
if "blob types", we could link to https://iceberg.apache.org/puffin-spec/#blob-types , but i don't think we can assume that all known blob types will be supported by the code at all times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the blob type in Action and stats type in the Procedure(from where we could map stats to its blob type(s)?)
For eg: if NDV supports 2 blob types and user wants to generate only one of those, that would still be possible from the actions.

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the snapshot(String snapshotId) has been added

for branch/tag -- is it existing pattern to support this first class in APIs, or require the caller to convert the information they have (branch/tag) into a snapshot ID?

@@ -26,4 +29,8 @@ private StandardBlobTypes() {}
* href="https://datasketches.apache.org/">Apache DataSketches</a> library
*/
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1";

public static Set<String> blobTypes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it supposed to return "all standard blob types"?
should the name reflect that?

if we did #8202, would this new blob type be added to this method?

@@ -33,7 +33,7 @@ azuresdk-bom = "1.2.23"
awssdk-s3accessgrants = "2.0.0"
caffeine = "2.9.3"
calcite = "1.10.0"
datasketches = "6.0.0"
datasketches="6.0.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert

private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class);

private final Table table;
private Set<String> columns = ImmutableSet.of();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrays are mutable, so if we decide to switch to arrays, please make sure to defensive-copy whenever passing to another class

table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()),
null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the sketch will be a single long per column

the sketch should be more than that, small number of kb iirc
Trino uses ZSTD for the blobs, and no compression for the footer.

null,
ImmutableMap.of()));
}
writer.finish();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you need to finish() to get the final fileSize() etc

ImmutableList.of(table.schema().findField(columnName).fieldId()),
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(sketchMap.get(columnName).getSketch().toByteArray()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there should be compact() (perhaps not here, but inside computeNDVSketches

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW it would be good to have a cross-engine compatibility test to ensure the value we write here can indeed be used correctly by other engines. for trino, you can use https://java.testcontainers.org/modules/databases/trino/

Trino already has such tests, but that doesn't cover Iceberg Spark features that are being implemented.

}
final byte[] serializedSketchBytes = new byte[length];
in.readFully(serializedSketchBytes);
sketch = Sketches.wrapSketch(Memory.wrap(serializedSketchBytes));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wrote a compact sketch, so we can use CompactSketch.wrap here

List<Tuple2<String, String>> columnsList =
Lists.newArrayListWithExpectedSize(columns.size());
for (int i = 0; i < row.size(); i++) {
columnsList.add(new Tuple2<>(columns.get(i), row.get(i).toString()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't use toString
this should use Conversions.toByteBuffer
see https://iceberg.apache.org/puffin-spec/#apache-datasketches-theta-v1-blob-type

@karuppayya karuppayya force-pushed the analyze_action branch 2 times, most recently from c189b28 to 7b2cbce Compare June 17, 2024 23:41
Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot changed, skimming for now. will have to re-read this

@@ -26,4 +29,8 @@ private StandardBlobTypes() {}
* href="https://datasketches.apache.org/">Apache DataSketches</a> library
*/
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1";

public static Set<String> allStandardBlobTypes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Comment on lines 58 to 59
private final Set<String> supportedBlobTypes =
ImmutableSet.of(StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be static?

Comment on lines 60 to 61
private Set<String> columns;
private Set<String> blobTypesToAnalyze = supportedBlobTypes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about combining these into a list of (blob type, columns) pairs?
This might be necessary when we add support for new blob types.
see https://github.com/apache/iceberg/pull/10288/files#r1639547902

private Long snapshotId;

AnalyzeTableSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
Snapshot snapshot = table.currentSnapshot();
ValidationException.check(snapshot != null, "Cannot analyze a table that has no snapshots");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to handle this case gracefully.
Table without snapshots is an empty table (no data).
Also, stats are assigned to snapshots, and there is no snapshot, so there cannot be a stats file created.
Thus there is only one way to handle this gracefully -- just no-op.
I believe this would be better from user-perspective than just throwing.

.type(statsName)
.addAllErrors(Lists.newArrayList("Stats type not supported"))
.build();
throw new UnsupportedOperationException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this exception is thrown (due to some code modifications in the future), it could be helpful to include type in the exception message.

Comment on lines 171 to 172
public AnalyzeTable snapshot(long snapId) {
this.snapshotId = snapId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: snapId -> snapshotId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

Sets.newHashSet(StandardBlobTypes.blobTypes()).containsAll(statisticTypes),
"type not supported");
this.types = statisticTypes;
public AnalyzeTable blobTypes(Set<String> types) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: types -> blobTypes (just like in the interface method declaration)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

Copy link
Contributor Author

@karuppayya karuppayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @findepi for your review.
I addressed the comments, please take a look when you get a chance.
One pending item is the interoperability test.

* @param columnNames a set of column names to be analyzed
* @return this for method chaining
*/
AnalyzeTable columns(String... columnNames);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see few databases that allow collecting all the stats for given columns.
PostgreSQL
ANALYZE table_name (column1, column2);

Oracle
EXEC DBMS_STATS.GATHER_TABLE_STATS('schema_name', 'table_name', 'method_opt' => 'FOR COLUMNS column1, column2');

Also looks like most databases dont allow specifying types of stats to be collected.
Though we take type as input in the API, we can restrict the usage by not exposing it in the procedure?

correlation blob type for {B,C} together

This is the stats on the combined value of B and C. Is my understanding right?
Is this a common usecase since i didnt find databases supporting this by default

* @param types set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable types(Set<String> types);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the blob type in Action and stats type in the Procedure(from where we could map stats to its blob type(s)?)
For eg: if NDV supports 2 blob types and user wants to generate only one of those, that would still be possible from the actions.

@@ -26,4 +29,8 @@ private StandardBlobTypes() {}
* href="https://datasketches.apache.org/">Apache DataSketches</a> library
*/
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1";

public static Set<String> allStandardBlobTypes() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Sets.newHashSet(StandardBlobTypes.blobTypes()).containsAll(statisticTypes),
"type not supported");
this.types = statisticTypes;
public AnalyzeTable blobTypes(Set<String> types) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

Comment on lines 171 to 172
public AnalyzeTable snapshot(long snapId) {
this.snapshotId = snapId;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

Comment on lines 103 to 106
return type == Type.TypeID.INTEGER
|| type == Type.TypeID.LONG
|| type == Type.TypeID.STRING
|| type == Type.TypeID.DOUBLE;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were the datatypes supported by the sketch lib. I think this no longer relevant with Conversions.toByteBuffer

AnalyzeTable blobTypes(Set<String> blobTypes);

/**
* id of the snapshot for which stats need to be collected
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: capitalize id (at least first character, or even ID works)

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, snapshot should be fine.

* @param columnNames a set of column names to be analyzed
* @return this for method chaining
*/
AnalyzeTable columns(String... columnNames);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi Is this resolved? Is there a strong use case to support specification of (type, column) pair?

How about: columnStats(Set types, String ... columns).

If types is not specified, then we can set it to all the supported types.

try {
return generateNDVBlobs().stream();
} catch (Exception e) {
LOG.error(
Copy link
Collaborator

@szehon-ho szehon-ho Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, it seems simpler to throw an exception, what is the motivation for this error handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are can be more than one type of statistic to be collected, I was think about sending the errors with respect to each stat type in the results.
But looks like this is not something very common atleast in RDBMS land.
I changed the Result to not send error message and instead throwing an Exception now

throw new UnsupportedOperationException(
String.format("%s is not supported", type));
}
return Stream.empty();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved up to the catch code, as it doesnt make too much sense here.

}

private static ThetaSketchJavaSerializable combine(
final ThetaSketchJavaSerializable sketch1, final ThetaSketchJavaSerializable sketch2) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove the finals here

return sketch1;
}

final CompactSketch compactSketch1 = sketch1.getCompactSketch();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove final

return sketch;
}

private static ThetaSketchJavaSerializable combine(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realize, why cant we move this logic to ThetaSketchJavaSerializable itself?

Then update/combine can just be oneliners, and probably able to be inlined ie :

        colNameAndSketchPair.aggregateByKey(
            new ThetaSketchJavaSerializable(),
            ThetaSketchJavaSerializable::update,
            ThetaSketchJavaSerializable::combine,

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks a lot better , some more comments.

* @param columnNames a set of column names to be analyzed
* @return this for method chaining
*/
AnalyzeTable columns(String... columnNames);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: columnNames => columns for simplicity, there is also precedent in RewriteDataFiles for example

super(spark);
this.table = table;
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what you think, would it be simpler to do the error check here? (instead of doExecute)

Preconditions.checkNotNull(snapshot, "Cannot analyze an empty table")

Then this.snapshotToAnalyze can just be a primitive long (no need to worry about nulls?) The argument for the setter is already "long" type.

this.table = table;
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
snapshotToAnalyze = snapshot.snapshotId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we typically use this when setting member variables, ie this.snapshotToAnalyze


private final Table table;
private Set<String> columns;
private Long snapshotToAnalyze;
Copy link
Collaborator

@szehon-ho szehon-ho Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: snapshotToAnalyze => snapshotId? (I feel 'toAnalyze' is apparent, also as its not actually Snapshot object). If its to not hide a variable, maybe we can change the other one as this one is used in more places and it would save more unnecessary chars.

if (snapshot != null) {
snapshotToAnalyze = snapshot.snapshotId();
}
columns =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, this.columns = ...

Schema schema = table.schema();
List<Types.NestedField> nestedFields =
columns.stream().map(schema::findField).collect(Collectors.toList());
final JavaPairRDD<String, ByteBuffer> colNameAndSketchPairRDD =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit; remove final

return (CompactSketch) sketch;
}

void update(final ByteBuffer value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove final?

return sketch.getEstimate();
}

private void writeObject(final ObjectOutputStream out) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remove final (and next method too)

.collect(Collectors.toList());
}

static Iterator<Tuple2<String, ThetaSketchJavaSerializable>> computeNDVSketches(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • any reason this method is not private?
  • how about this method return the Map directly to let the main method be cleaner?

@aokolnychyi aokolnychyi merged commit 2f6e7e6 into apache:main Aug 22, 2024
47 checks passed
@aokolnychyi
Copy link
Contributor

Great work, @karuppayya! Thanks everyone for reviewing!

@karuppayya
Copy link
Contributor Author

Thanks @aokolnychyi and everyone for the reviews.

@sfc-gh-mrojas
Copy link

@karuppayya sorry to bother. I am able to get the action and called from Java liek this:

actions.computeTableStats(table).columns(columns.toArray(new String[0])).execute();

What what will be the Spark SQL / Pypsark ways to use this.

@karuppayya
Copy link
Contributor Author

karuppayya commented Sep 19, 2024

@sfc-gh-mrojas
I have PR open for introducing a procedure to invoke the action.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.