diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 23299a962ce5..89aac72da43d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -27,6 +27,7 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.metrics.LoggingMetricsReporter; import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.metrics.MetricsReporters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -41,7 +42,7 @@ public class BaseTable implements Table, HasTableOperations, Serializable { private final TableOperations ops; private final String name; - private final MetricsReporter reporter; + private MetricsReporter reporter; public BaseTable(TableOperations ops, String name) { this(ops, name, LoggingMetricsReporter.instance()); @@ -58,6 +59,10 @@ public MetricsReporter reporter() { return reporter; } + public void combineMetricsReporter(MetricsReporter metricsReporter) { + this.reporter = MetricsReporters.combine(this.reporter, metricsReporter); + } + @Override public TableOperations operations() { return ops; diff --git a/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java index 79b446c0ddbf..0ba12876b4eb 100644 --- a/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java +++ b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java @@ -35,4 +35,11 @@ public ScanReport scanReport() { "Metrics report is not a scan report"); return (ScanReport) metricsReport; } + + public CommitReport commitReport() { + Preconditions.checkArgument( + metricsReport == null || metricsReport instanceof CommitReport, + "Metrics report is not a commit report"); + return (CommitReport) metricsReport; + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index f491d9d37073..66fc5ea161cb 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -65,6 +66,7 @@ import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.PositionDeltaWriter; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.metrics.InMemoryMetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.CommitMetadata; @@ -77,10 +79,12 @@ import org.apache.iceberg.util.StructProjection; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.MetricsUtils; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.distributions.Distribution; import org.apache.spark.sql.connector.expressions.SortOrder; +import org.apache.spark.sql.connector.metric.CustomMetric; import org.apache.spark.sql.connector.write.DeltaBatchWrite; import org.apache.spark.sql.connector.write.DeltaWrite; import org.apache.spark.sql.connector.write.DeltaWriter; @@ -113,6 +117,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde private final Map writeProperties; private boolean cleanupOnAbort = false; + private InMemoryMetricsReporter metricsReporter; SparkPositionDeltaWrite( SparkSession spark, @@ -136,6 +141,11 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde this.writeRequirements = writeConf.positionDeltaRequirements(command); this.context = new Context(dataSchema, writeConf, info, writeRequirements); this.writeProperties = writeConf.writeProperties(); + + if (this.table instanceof BaseTable) { + this.metricsReporter = new InMemoryMetricsReporter(); + ((BaseTable) this.table).combineMetricsReporter(metricsReporter); + } } @Override @@ -169,6 +179,11 @@ public DeltaBatchWrite toBatch() { return new PositionDeltaBatchWrite(); } + @Override + public CustomMetric[] supportedCustomMetrics() { + return MetricsUtils.supportedCustomMetrics(); + } + private class PositionDeltaBatchWrite implements DeltaBatchWrite { @Override @@ -334,6 +349,7 @@ private void commitOperation(SnapshotUpdate operation, String description) { try { long start = System.currentTimeMillis(); operation.commit(); // abort is automatically called if this fails + MetricsUtils.postWriteMetrics(metricsReporter, sparkContext.sc()); long duration = System.currentTimeMillis() - start; LOG.info("Committed in {} ms", duration); } catch (Exception e) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index b15fe2576384..cc9be5153f2a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; @@ -51,6 +52,7 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.RollingDataWriter; +import org.apache.iceberg.metrics.InMemoryMetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CommitMetadata; @@ -63,6 +65,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.executor.OutputMetrics; +import org.apache.spark.sql.MetricsUtils; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.distributions.Distribution; @@ -103,6 +106,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final Map writeProperties; private boolean cleanupOnAbort = false; + private InMemoryMetricsReporter metricsReporter; SparkWrite( SparkSession spark, @@ -130,6 +134,11 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); this.writeProperties = writeConf.writeProperties(); + + if (this.table instanceof BaseTable) { + this.metricsReporter = new InMemoryMetricsReporter(); + ((BaseTable) this.table).combineMetricsReporter(metricsReporter); + } } @Override @@ -231,6 +240,7 @@ private void commitOperation(SnapshotUpdate operation, String description) { try { long start = System.currentTimeMillis(); operation.commit(); // abort is automatically called if this fails + MetricsUtils.postWriteMetrics(metricsReporter, sparkContext.sc()); long duration = System.currentTimeMillis() - start; LOG.info("Committed in {} ms", duration); } catch (Exception e) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java index ece0a5e6ff91..0d5b8cff0758 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java @@ -33,7 +33,9 @@ import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.spark.SparkWriteRequirements; import org.apache.iceberg.types.TypeUtil; +import org.apache.spark.sql.MetricsUtils; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.metric.CustomMetric; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.write.BatchWrite; import org.apache.spark.sql.connector.write.LogicalWriteInfo; @@ -166,6 +168,11 @@ public StreamingWrite toStreaming() { return asStreamingAppend(); } } + + @Override + public CustomMetric[] supportedCustomMetrics() { + return MetricsUtils.supportedCustomMetrics(); + } }; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java new file mode 100644 index 000000000000..6d8c72bb0e95 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedDataFiles extends CustomSumMetric { + + static final String NAME = "addedDataFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added data files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java new file mode 100644 index 000000000000..727eb0042ca2 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedDeleteFiles extends CustomSumMetric { + + static final String NAME = "addedDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added delete files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java new file mode 100644 index 000000000000..442f60365443 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedEqualityDeleteFiles extends CustomSumMetric { + + static final String NAME = "addedEqualityDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added equality delete files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java new file mode 100644 index 000000000000..4723176c88f7 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedEqualityDeletes extends CustomSumMetric { + + static final String NAME = "addedEqualityDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added equality deletes"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java new file mode 100644 index 000000000000..06353ea80f13 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedFileSizeInBytes extends CustomSumMetric { + + static final String NAME = "addedFileSizeInBytes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added file size (bytes)"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java new file mode 100644 index 000000000000..93ea3acde925 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedPositionalDeleteFiles extends CustomSumMetric { + + static final String NAME = "addedPositionalDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added positional delete files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java new file mode 100644 index 000000000000..897f55f3dc2b --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedPositionalDeletes extends CustomSumMetric { + + static final String NAME = "addedPositionalDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added positional deletes"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java new file mode 100644 index 000000000000..fecdb28be39d --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class AddedRecords extends CustomSumMetric { + + static final String NAME = "addedRecords"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of added records"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java new file mode 100644 index 000000000000..f9ecadd9b8fe --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedDataFiles extends CustomSumMetric { + + static final String NAME = "removedDataFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed data files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java new file mode 100644 index 000000000000..6543abd934e5 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedDeleteFiles extends CustomSumMetric { + + static final String NAME = "removedDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed delete files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java new file mode 100644 index 000000000000..31928a293357 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedEqualityDeleteFiles extends CustomSumMetric { + + static final String NAME = "removedEqualityDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed equality delete files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java new file mode 100644 index 000000000000..0838a288fe75 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedEqualityDeletes extends CustomSumMetric { + + static final String NAME = "removedEqualityDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed equality deletes"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java new file mode 100644 index 000000000000..666e33743ead --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedFileSizeInBytes extends CustomSumMetric { + + static final String NAME = "removedFileSizeInBytes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed file size (bytes)"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java new file mode 100644 index 000000000000..85d191f9edcc --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedPositionalDeleteFiles extends CustomSumMetric { + + static final String NAME = "removedPositionalDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed positional delete files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java new file mode 100644 index 000000000000..1a59348745df --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedPositionalDeletes extends CustomSumMetric { + + static final String NAME = "removedPositionalDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed positional deletes"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java new file mode 100644 index 000000000000..6c0a8eaae052 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class RemovedRecords extends CustomSumMetric { + + static final String NAME = "removedRecords"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of removed records"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java new file mode 100644 index 000000000000..61d87be65bb1 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalDataFiles extends CustomSumMetric { + + static final String NAME = "totalDataFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of data files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java new file mode 100644 index 000000000000..5f6d8d7dea1c --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalDeleteFiles extends CustomSumMetric { + + static final String NAME = "totalDeleteFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of delete files"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java new file mode 100644 index 000000000000..ea4e3afc7d78 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalEqualityDeletes extends CustomSumMetric { + + static final String NAME = "totalEqualityDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of equality deletes"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java new file mode 100644 index 000000000000..2277b10cd364 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalFileSizeInBytes extends CustomSumMetric { + + static final String NAME = "totalFileSizeInBytes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total data file size (bytes)"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java new file mode 100644 index 000000000000..f143b63fbf2a --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalPositionalDeletes extends CustomSumMetric { + + static final String NAME = "totalPositionalDeletes"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of positional deletes"; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java new file mode 100644 index 000000000000..14c2262ae308 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalRecords extends CustomSumMetric { + + static final String NAME = "totalRecords"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total number of records"; + } +} diff --git a/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/MetricsUtils.scala b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/MetricsUtils.scala new file mode 100644 index 000000000000..9768ab79d424 --- /dev/null +++ b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/MetricsUtils.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql + +import org.apache.iceberg.metrics.CounterResult +import org.apache.iceberg.metrics.InMemoryMetricsReporter +import org.apache.iceberg.spark.source.metrics.AddedDataFiles +import org.apache.iceberg.spark.source.metrics.AddedDeleteFiles +import org.apache.iceberg.spark.source.metrics.AddedEqualityDeleteFiles +import org.apache.iceberg.spark.source.metrics.AddedEqualityDeletes +import org.apache.iceberg.spark.source.metrics.AddedFileSizeInBytes +import org.apache.iceberg.spark.source.metrics.AddedPositionalDeleteFiles +import org.apache.iceberg.spark.source.metrics.AddedPositionalDeletes +import org.apache.iceberg.spark.source.metrics.AddedRecords +import org.apache.iceberg.spark.source.metrics.RemovedDataFiles +import org.apache.iceberg.spark.source.metrics.RemovedDeleteFiles +import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeleteFiles +import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeletes +import org.apache.iceberg.spark.source.metrics.RemovedFileSizeInBytes +import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeleteFiles +import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeletes +import org.apache.iceberg.spark.source.metrics.RemovedRecords +import org.apache.iceberg.spark.source.metrics.TotalDataFiles +import org.apache.iceberg.spark.source.metrics.TotalDeleteFiles +import org.apache.iceberg.spark.source.metrics.TotalEqualityDeletes +import org.apache.iceberg.spark.source.metrics.TotalFileSizeInBytes +import org.apache.iceberg.spark.source.metrics.TotalPositionalDeletes +import org.apache.iceberg.spark.source.metrics.TotalRecords +import org.apache.spark.SparkContext +import org.apache.spark.sql.connector.metric.CustomMetric +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric.SQLMetrics +import scala.collection.mutable + +object MetricsUtils { + + def supportedCustomMetrics(): Array[CustomMetric] = { + Array( + new AddedDataFiles, + new AddedDeleteFiles, + new AddedEqualityDeletes, + new AddedEqualityDeleteFiles, + new AddedFileSizeInBytes, + new AddedPositionalDeletes, + new AddedPositionalDeleteFiles, + new AddedRecords, + new RemovedDataFiles, + new RemovedDeleteFiles, + new RemovedRecords, + new RemovedEqualityDeleteFiles, + new RemovedEqualityDeletes, + new RemovedFileSizeInBytes, + new RemovedPositionalDeleteFiles, + new RemovedPositionalDeletes, + new TotalDataFiles, + new TotalDeleteFiles, + new TotalEqualityDeletes, + new TotalFileSizeInBytes, + new TotalPositionalDeletes, + new TotalRecords + ) + } + + def postWriteMetrics(metricsReporter: InMemoryMetricsReporter, sparkContext: SparkContext): Unit = { + if (metricsReporter != null) { + val commitReport = metricsReporter.commitReport + if (commitReport != null) { + val metricsResult = commitReport.commitMetrics + val metricValues = mutable.Map.empty[CustomMetric, Long] + addMetricValue(new AddedDataFiles, metricsResult.addedDataFiles, metricValues) + addMetricValue(new AddedDeleteFiles, metricsResult.addedDeleteFiles, metricValues) + addMetricValue(new AddedEqualityDeletes, metricsResult.addedEqualityDeletes, metricValues) + addMetricValue(new AddedEqualityDeleteFiles, metricsResult.addedEqualityDeleteFiles, metricValues) + addMetricValue(new AddedFileSizeInBytes, metricsResult.addedFilesSizeInBytes, metricValues) + addMetricValue(new AddedPositionalDeletes, metricsResult.addedPositionalDeletes, metricValues) + addMetricValue(new AddedPositionalDeleteFiles, metricsResult.addedPositionalDeleteFiles, metricValues) + addMetricValue(new AddedRecords, metricsResult.addedRecords, metricValues) + addMetricValue(new RemovedDataFiles, metricsResult.removedDataFiles, metricValues) + addMetricValue(new RemovedDeleteFiles, metricsResult.removedDeleteFiles, metricValues) + addMetricValue(new RemovedRecords, metricsResult.removedRecords, metricValues) + addMetricValue(new RemovedEqualityDeleteFiles, metricsResult.removedEqualityDeleteFiles, metricValues) + addMetricValue(new RemovedEqualityDeletes, metricsResult.removedEqualityDeletes, metricValues) + addMetricValue(new RemovedFileSizeInBytes, metricsResult.removedFilesSizeInBytes, metricValues) + addMetricValue(new RemovedPositionalDeleteFiles, metricsResult.removedPositionalDeleteFiles, metricValues) + addMetricValue(new RemovedPositionalDeletes, metricsResult.removedPositionalDeletes, metricValues) + addMetricValue(new TotalDataFiles, metricsResult.totalDataFiles, metricValues) + addMetricValue(new TotalDeleteFiles, metricsResult.totalDeleteFiles, metricValues) + addMetricValue(new TotalEqualityDeletes, metricsResult.totalEqualityDeletes, metricValues) + addMetricValue(new TotalFileSizeInBytes, metricsResult.totalFilesSizeInBytes, metricValues) + addMetricValue(new TotalPositionalDeletes, metricsResult.totalPositionalDeletes, metricValues) + addMetricValue(new TotalRecords, metricsResult.totalRecords, metricValues) + postDriverMetrics(sparkContext, metricValues.toMap) + } + } + } + + private def addMetricValue(metric: CustomMetric, result: CounterResult, + metricValues: mutable.Map[CustomMetric, Long]): Unit = { + if (result != null) { + metricValues.put(metric, result.value) + } + } + + private def postDriverMetrics(sparkContext: SparkContext, + metricValues: Map[CustomMetric, Long]): Unit = { + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + val metrics = SQLExecution.getQueryExecution(executionId.toLong).executedPlan.metrics + val sqlMetrics = metricValues.map { case (metric, value) => + val sqlMetric = metrics(metric.name) + sqlMetric.set(value) + sqlMetric + }.toSeq + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, sqlMetrics) + } +}