Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, ResolveDefaultColumns}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, SupportsRead, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.IdentityTransform
import org.apache.spark.sql.connector.read.SupportsReportStatistics
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._

case class DescribeTableExec(
Expand All @@ -40,6 +42,7 @@ case class DescribeTableExec(
if (isExtended) {
addMetadataColumns(rows)
addTableDetails(rows)
addTableStats(rows)
}
rows.toSeq
}
Expand Down Expand Up @@ -96,6 +99,23 @@ case class DescribeTableExec(
case _ =>
}

private def addTableStats(rows: ArrayBuffer[InternalRow]): Unit = table match {
case read: SupportsRead =>
read.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match {
case s: SupportsReportStatistics =>
val stats = s.estimateStatistics()
val statsComponents = Seq(
Option.when(stats.sizeInBytes().isPresent)(s"${stats.sizeInBytes().getAsLong} bytes"),
Option.when(stats.numRows().isPresent)(s"${stats.numRows().getAsLong} rows")
).flatten
if (statsComponents.nonEmpty) {
rows += toCatalystRow("Statistics", statsComponents.mkString(", "), null)
}
case _ =>
}
case _ =>
}

private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = {
if (table.partitioning.nonEmpty) {
val partitionColumnsOnly = table.partitioning.forall(t => t.isInstanceOf[IdentityTransform])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3342,7 +3342,8 @@ class DataSourceV2SQLSuiteV1Filter
Row("# Column Default Values", "", ""),
Row("# Metadata Columns", "", ""),
Row("id", "bigint", "42"),
Row("id", "bigint", null)
Row("id", "bigint", null),
Row("Statistics", "0 bytes, 0 rows", null)
))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase
Row("Location", "file:/tmp/testcat/table_name", ""),
Row("Provider", "_", ""),
Row(TableCatalog.PROP_OWNER.capitalize, Utils.getCurrentUserName(), ""),
Row("Table Properties", "[bar=baz]", "")))
Row("Table Properties", "[bar=baz]", ""),
Row("Statistics", "0 bytes, 0 rows", null)))
}
}

Expand Down Expand Up @@ -196,4 +197,20 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase
Row("comment", "column_comment")))
}
}

test("describe extended table with stats") {
withNamespaceAndTable("ns", "tbl") { tbl =>
sql(
s"""
|CREATE TABLE $tbl
|(key INT, col STRING)
|$defaultUsing""".stripMargin)

sql(s"INSERT INTO $tbl values (1, 'aaa'), (2, 'bbb'), (3, 'ccc'), (null, 'ddd')")
val descriptionDf = sql(s"DESCRIBE TABLE EXTENDED $tbl")
val stats = descriptionDf.filter("col_name == 'Statistics'").head()
.getAs[String]("data_type")
assert("""\d+\s+bytes,\s+4\s+rows""".r.matches(stats))
}
}
}