diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala index 0aa51a940160e..b6d64c79b1df4 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala @@ -22,7 +22,7 @@ import java.{util => ju} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.connector.CustomTaskMetric +import org.apache.spark.sql.connector.metric.CustomTaskMetric import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index c34c435630143..5c772abfe808d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -30,13 +30,12 @@ import org.apache.spark.internal.Logging import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.CustomMetric import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric} import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder} import org.apache.spark.sql.connector.write.streaming.StreamingWrite -import org.apache.spark.sql.execution.metric.CustomSumMetric import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend} import org.apache.spark.sql.sources._ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomAvgMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomAvgMetric.java new file mode 100644 index 0000000000000..71e83002dda07 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomAvgMetric.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.metric; + +import org.apache.spark.annotation.Evolving; + +import java.util.Arrays; +import java.text.DecimalFormat; + +/** + * Built-in `CustomMetric` that computes average of metric values. Note that please extend this + * class and override `name` and `description` to create your custom metric for real usage. + * + * @since 3.2.0 + */ +@Evolving +public abstract class CustomAvgMetric implements CustomMetric { + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + if (taskMetrics.length > 0) { + double average = ((double)Arrays.stream(taskMetrics).sum()) / taskMetrics.length; + return new DecimalFormat("#0.000").format(average); + } else { + return "0"; + } + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java similarity index 73% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java index bbd35ac946773..4c4151ad96975 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.connector; +package org.apache.spark.sql.connector.metric; import org.apache.spark.annotation.Evolving; @@ -29,23 +29,23 @@ */ @Evolving public interface CustomMetric { - /** - * Returns the name of custom metric. - */ - String name(); + /** + * Returns the name of custom metric. + */ + String name(); - /** - * Returns the description of custom metric. - */ - String description(); + /** + * Returns the description of custom metric. + */ + String description(); - /** - * The initial value of this metric. - */ - long initialValue = 0L; + /** + * The initial value of this metric. + */ + long initialValue = 0L; - /** - * Given an array of task metric values, returns aggregated final metric value. - */ - String aggregateTaskMetrics(long[] taskMetrics); + /** + * Given an array of task metric values, returns aggregated final metric value. + */ + String aggregateTaskMetrics(long[] taskMetrics); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomSumMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomSumMetric.java new file mode 100644 index 0000000000000..ba28e9b9187ee --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomSumMetric.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.metric; + +import org.apache.spark.annotation.Evolving; + +import java.util.Arrays; + +/** + * Built-in `CustomMetric` that sums up metric values. Note that please extend this class + * and override `name` and `description` to create your custom metric for real usage. + * + * @since 3.2.0 + */ +@Evolving +public abstract class CustomSumMetric implements CustomMetric { + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + return String.valueOf(Arrays.stream(taskMetrics).sum()); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java similarity index 88% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java index 99ea8f6b4a113..1b6f04d927913 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.connector; +package org.apache.spark.sql.connector.metric; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.connector.read.PartitionReader; @@ -34,13 +34,13 @@ */ @Evolving public interface CustomTaskMetric { - /** - * Returns the name of custom task metric. - */ - String name(); + /** + * Returns the name of custom task metric. + */ + String name(); - /** - * Returns the long value of custom task metric. - */ - long value(); + /** + * Returns the long value of custom task metric. + */ + long value(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java index d6cf070cf4c85..5286bbf9f85a1 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java @@ -21,7 +21,7 @@ import java.io.IOException; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.CustomTaskMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; /** * A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index b70a656c492a8..0c009f5c56d06 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector.read; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomMetric; import org.apache.spark.sql.connector.read.streaming.ContinuousStream; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala index cc28be3ca8ed7..f2449a1ec58f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -17,10 +17,7 @@ package org.apache.spark.sql.execution.metric -import java.text.NumberFormat -import java.util.Locale - -import org.apache.spark.sql.connector.CustomMetric +import org.apache.spark.sql.connector.metric.CustomMetric object CustomMetrics { private[spark] val V2_CUSTOM = "v2Custom" @@ -45,31 +42,3 @@ object CustomMetrics { } } } - -/** - * Built-in `CustomMetric` that sums up metric values. Note that please extend this class - * and override `name` and `description` to create your custom metric for real usage. - */ -abstract class CustomSumMetric extends CustomMetric { - - override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { - taskMetrics.sum.toString - } -} - -/** - * Built-in `CustomMetric` that computes average of metric values. Note that please extend this - * class and override `name` and `description` to create your custom metric for real usage. - */ -abstract class CustomAvgMetric extends CustomMetric { - - override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { - val average = if (taskMetrics.isEmpty) { - 0.0 - } else { - taskMetrics.sum.toDouble / taskMetrics.length - } - val numberFormat = NumberFormat.getNumberInstance(Locale.US) - numberFormat.format(average) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 0e48e6efeee33..959144bab33f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -24,7 +24,7 @@ import scala.concurrent.duration._ import org.apache.spark.SparkContext import org.apache.spark.scheduler.AccumulableInfo -import org.apache.spark.sql.connector.CustomMetric +import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index a3238551b2fd2..e7ab4a184b07b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -28,7 +28,7 @@ import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Status._ import org.apache.spark.scheduler._ -import org.apache.spark.sql.connector.CustomMetric +import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala index 020f3f494a2e5..440b0dc08ecbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.metric import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.connector.metric.{CustomAvgMetric, CustomSumMetric} class CustomMetricsSuite extends SparkFunSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index a58265124d708..612b74a661d39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -37,7 +37,8 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.connector.{CustomMetric, CustomTaskMetric, RangeInputPartition, SimpleScanBuilder} +import org.apache.spark.sql.connector.{RangeInputPartition, SimpleScanBuilder} +import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution