Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHOW PARTITIONS is not allowed on a table that is not partitioned when it is in fact a partitioned Delta table #681

Closed
cequencer opened this issue May 23, 2021 · 13 comments
Assignees
Labels
acknowledged This issue has been read and acknowledged by Delta admins

Comments

@cequencer
Copy link

Using spark-shell from precompiled OSS Apache Spark 3.0.2 without Hadoop + io.delta:delta-core_2.12:0.8.0.

High level summary of my complete test program to describe the issue and the debugging information:

  1. I have created (unmanaged) fully qualified Delta Table using full path using sample data.
  2. Separately, I have created a separate Spark dataframe reading directly using just the full path to simulate a reader program.
  3. I have created (managed) table reference to the above Delta Table.
    a. This is done explicitly using the logic outlined here
  4. I have successfully created an EXTERNAL table reference as a managed table
  5. I execute Spark SQL: SHOW PARTITIONS and it failed saying the table is not partitioned
  6. I have provided extra debugging information via Spark SQL: DESCRIBE TABLE EXTENDED and SHOW CREATE TABLE
    a. The SHOW CREATE TABLE content did not match the original DDL statements I provided and it is causing the SHOW PARTITIONS to fail.

My objective is to SHOW PARTITIONS of my EXTERNAL managed Delta table created by another program that is not managed to begin with. Is there a more direct approach or am I running into a bug here?

spark-shell --master yarn --deploy-mode client --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.warehouse.dir=/user/myuserid/warehouse --packages io.delta:delta-core_2.12:0.8.0 --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 3
Spark context Web UI available at http://FQDN:port
Spark context available as 'sc' (master = yarn, app id = application_1234567890123_111111).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.2
      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_2xx
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.types.DateType

scala> import io.delta.tables._
import io.delta.tables._

scala> import spark.implicits._
import spark.implicits._

scala> val columns=Array("col1", "eventdate")
columns: Array[String] = Array(col1, eventdate)

scala> val sample_df = sc.parallelize(Seq(
     |                       ("rec1_data", "2021-05-23"),
     |                       ("rec2_data", "2021-05-22"),
     |                       ("rec3_data", "2021-05-21")
     |                     )).toDF(columns: _*)
21/05/23 22:20:39 WARN SharedState: Not allowing to set spark.sql.warehouse.dir or hive.metastore.warehouse.dir in SparkSession's options, it should be set statically for cross-session usages
sample_df: org.apache.spark.sql.DataFrame = [col1: string, eventdate: string]

scala> val event_df = sample_df.withColumn("eventdate", to_date(col("eventdate"), "yyyy-MM-dd"))
event_df: org.apache.spark.sql.DataFrame = [col1: string, eventdate: date]

scala> event_df.printSchema
root
 |-- col1: string (nullable = true)
 |-- eventdate: date (nullable = true)


scala> event_df.show(false)
+---------+----------+
|col1     |eventdate |
+---------+----------+
|rec1_data|2021-05-23|
|rec2_data|2021-05-22|
|rec3_data|2021-05-21|
+---------+----------+


scala> spark.catalog.listDatabases().show(false)
+-------+----------------+------------------------+
|name   |description     |locationUri             |
+-------+----------------+------------------------+
|default|default database|/user/myuserid/warehouse|
+-------+----------------+------------------------+


scala> spark.catalog.listTables().show(false)
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

scala> val hdfsDeltaPathPrefix = "/user/myuserid/delta"
hdfsDeltaPathPrefix: String = /user/myuserid/delta

scala> val qualifiedDeltaHdfsPath = hdfsDeltaPathPrefix + "/events"
qualifiedDeltaHdfsPath: String = /user/myuserid/delta/events

scala> event_df.write.format("delta").partitionBy("eventdate").save(qualifiedDeltaHdfsPath)

scala>

scala> val readDeltaEvents_df = spark.read.format("delta").load(qualifiedDeltaHdfsPath)
readDeltaEvents_df: org.apache.spark.sql.DataFrame = [col1: string, eventdate: date]

scala> readDeltaEvents_df.printSchema
root
 |-- col1: string (nullable = true)
 |-- eventdate: date (nullable = true)


scala> readDeltaEvents_df.show(false)
+---------+----------+
|col1     |eventdate |
+---------+----------+
|rec1_data|2021-05-23|
|rec2_data|2021-05-22|
|rec3_data|2021-05-21|
+---------+----------+

scala> val create_tbl_def_event_full_schema = spark.sql(s"""CREATE TABLE events (col1 STRING, eventdate DATE) USING DELTA PARTITIONED BY (eventdate) LOCATION '${qualifiedDeltaHdfsPath}'""")
create_tbl_def_event_full_schema: org.apache.spark.sql.DataFrame = []

scala> spark.catalog.listTables().show(false)
+------+--------+-----------+---------+-----------+
|name  |database|description|tableType|isTemporary|
+------+--------+-----------+---------+-----------+
|events|default |null       |EXTERNAL |false      |
+------+--------+-----------+---------+-----------+


scala> spark.sql("SHOW PARTITIONS default.events")
org.apache.spark.sql.AnalysisException: SHOW PARTITIONS is not allowed on a table that is not partitioned: `default`.`events`;
  at org.apache.spark.sql.execution.command.ShowPartitionsCommand.run(tables.scala:1011)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
  ... 51 elided

scala>

scala> val describe_tbl_ext_events = spark.sql("DESCRIBE TABLE EXTENDED default.events")
describe_tbl_ext_events: org.apache.spark.sql.DataFrame = [col_name: string, data_type: string ... 1 more field]

scala> describe_tbl_ext_events.show(100, false)
+----------------------------+-----------------------------------------------------------------+-------+
|col_name                    |data_type                                                        |comment|
+----------------------------+-----------------------------------------------------------------+-------+
|col1                        |string                                                           |       |
|eventdate                   |date                                                             |       |
|                            |                                                                 |       |
|# Partitioning              |                                                                 |       |
|Part 0                      |eventdate                                                        |       |
|                            |                                                                 |       |
|# Detailed Table Information|                                                                 |       |
|Name                        |default.events                                                   |       |
|Location                    |hdfs://HDFSNN/user/myuserid/delta/events                         |       |
|Provider                    |delta                                                            |       |
|Table Properties            |[Type=EXTERNAL,delta.minReaderVersion=1,delta.minWriterVersion=2]|       |
+----------------------------+-----------------------------------------------------------------+-------+


scala> val show_create_tbl_events = spark.sql("SHOW CREATE TABLE default.events")
show_create_tbl_events: org.apache.spark.sql.DataFrame = [createtab_stmt: string]

scala> show_create_tbl_events.show(100, false)
+------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                    |
+------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE `default`.`events` (
  )
USING delta
LOCATION 'hdfs://HDFSNN/user/myuserid/delta/events'
|
+------------------------------------------------------------------------------------------------------------------+

@rahulsmahadev rahulsmahadev added the acknowledged This issue has been read and acknowledged by Delta admins label May 27, 2021
@Kimahriman
Copy link
Contributor

I think the SupportsPartitionManagement interface is what enables this functionality. Would need to add that support to DeltaTableV2. There's several required functions in it, but can probably throw unsupported exceptions for everything other than listPartitionIdentifiers

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowPartitionsExec.scala
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java#L141

@jaceklaskowski
Copy link
Contributor

@Kimahriman is right - more at CheckAnalysis

@cequencer
Copy link
Author

cequencer commented Jun 6, 2021

Thank you @Kimahriman and @jaceklaskowski for locating the references to the code that manages this logic.

Using the CheckAnalysis reference provided by @jaceklaskowski , I have located the case statement and the conditions that triggered my issue.

Despite the fact (bullet point 3 in my high level summary write up above) that executing CREATE TABLE with the matching DDL (to populate the matching schema and property information) to the external table, this information seems intentionally dropped or not recorded to the metastore. There weren't any WARNING message when I executed CREATE TABLE with the matching DDL, so that will be a tiny step of improvement to be informed that the DDL information was validated against the Delta table to be conforming but the metadata still got dropped.

This raises another question for me: Are there other additional SQL functions besides SHOW PARTITIONS that currently has NO-OP (no operation) type of behaviors for DELTA tables that I should be aware of?

My current naïve understanding is that if all the other SQL functions that currently reads and process queries against DELTA tables already works, what is the complexity that is excluding SHOW PARTITIONS to follow the same logic to retrieve the metadata from DELTA metadata path directly?

As a new Delta user, my first guess answer for the complexity question is schema evolution, where adding columns are allowed and supported (Spark 3.0+). However it is unclear to me if the support for adding columns are different between "adding non-partition columns" vs "adding partition columns".

I am relative new to the Delta OSS project, is this scope of the suggested implementation to fix this issue suitable for beginner?

Thanks again for the constructive feedbacks.

@Kimahriman
Copy link
Contributor

Oh you're using 3.0.2 so that's using the data source V1 code path I think. The error you're hitting is actually here: https://github.com/apache/spark/blob/v3.0.2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala#L1012

Show partitions support was only added to V2 in spark 3.1 I think. I assume support would only be added to the V2 code path for delta going forward?

@jaceklaskowski
Copy link
Contributor

@cequencer That should really be discussed on the Delta Lake Users and Developers forum.

@Kimahriman
Copy link
Contributor

Should this be closed as won't fix? It's easier to get and deal with the partitions using the DeltaLog API directly (and you can hack your way to it through py4j from python as well)

@scottsand-db
Copy link
Collaborator

Closing this since using the DeltaLogAPI seems like an easier route to take. Please reopen if this issue is still relevant

@cristianoperez
Copy link

@Kimahriman is there an easy way to access the DeltaLog from pyspark?

@Kimahriman
Copy link
Contributor

Depends on your definition of "easy". We just do:

jdf = spark._jvm.org.apache.spark.sql.delta.DeltaLog.forTable(spark._jsparkSession, path).snapshot().allFiles().toDF()
delta_log = DataFrame(jdf, spark._wrapped)

@zsxwing zsxwing self-assigned this Mar 15, 2022
@SandeepBallu
Copy link

SandeepBallu commented Mar 16, 2022

I am also facing the same issue as @cequencer. My Spark version is 3.1.1 but still show partitions on the delta table throwing error as pyspark.sql.utils.AnalysisException: SHOW PARTITIONS cannot run for a table which does not support partitioning;

I also tried to get the partitioned columns from the Catalog spark.catalog.listColumns(<Table_Name>, <DB_Name>). But this also returning an empty list.

Similar to show partitions delta is not supportinglistColumns also?

@nivassrinivas
Copy link

Depends on your definition of "easy". We just do:

jdf = spark._jvm.org.apache.spark.sql.delta.DeltaLog.forTable(spark._jsparkSession, path).snapshot().allFiles().toDF()
delta_log = DataFrame(jdf, spark._wrapped)

This is very useful sir but it is giving all the partitions, how can we get the valid latest partitions.

@Krulvis
Copy link

Krulvis commented Feb 10, 2023

Depends on your definition of "easy". We just do:

jdf = spark._jvm.org.apache.spark.sql.delta.DeltaLog.forTable(spark._jsparkSession, path).snapshot().allFiles().toDF()
delta_log = DataFrame(jdf, spark._wrapped)

How are you getting partitions information with this? delta_log.columns just returns all columns as List[str].

@kuksag
Copy link

kuksag commented Feb 21, 2023

How are you getting partitions information with this? delta_log.columns just returns all columns as List[str].

@Krulvis, You can access column 'path'. Every record contains information about some partition, e.g. attribute1=value1/attribute2=value2/filename

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
acknowledged This issue has been read and acknowledged by Delta admins
Projects
None yet
Development

Successfully merging a pull request may close this issue.