Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

We should follow hive table and also store partition spec in metastore for data source table.
This brings 2 benefits:

  1. It's more flexible to manage the table data files, as users can use ADD PARTITION, DROP PARTITION and RENAME PARTITION
  2. We don't need to cache all file status for data source table anymore.

How was this patch tested?

existing tests.

Michael Allman and others added 30 commits October 13, 2016 16:58
partitions' files from a table file catalog
moving a protected method down and making it private
* [SPARK-16980][SQL] Load only catalog table partition metadata required
to answer a query

* Add a new catalyst optimizer rule to SQL core for pruning unneeded
partitions' files from a table file catalog

* Include the type of file catalog in the FileSourceScanExec metadata

* TODO: Consider renaming FileCatalog to better differentiate it from
BasicFileCatalog (or vice-versa)

* try out parquet case insensitive fallback

* Refactor the FileSourceScanExec.metadata val to make it prettier

* fix and add test for input files

* rename test

* Refactor `TableFileCatalog.listFiles` to call `listDataLeafFiles` once
instead of once per partition

* fix it

* more test cases

* also fix a bug with zero partitions selected

* feature flag

* add comments

* extend and fix flakiness in test

* Enhance `ParquetMetastoreSuite` with mixed-case partition columns

* Tidy up a little by removing some unused imports, an unused method and
moving a protected method down and making it private

* Put partition count in `FileSourceScanExec.metadata` for partitioned
tables

* Fix some errors in my revision of `ParquetSourceSuite`

* Thu Oct 13 17:18:14 PDT 2016

* more generic

* Thu Oct 13 18:09:42 PDT 2016

* Thu Oct 13 18:09:55 PDT 2016

* Thu Oct 13 18:22:31 PDT 2016
)

* Thu Oct 13 19:02:36 PDT 2016

* Thu Oct 13 19:03:06 PDT 2016
partition data from a HadoopFsRelation's file catalog
@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67604 has finished for PR 15515 at commit 9a6fff6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67610 has finished for PR 15515 at commit 8c80555.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67615 has finished for PR 15515 at commit b6776cc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl
Copy link
Contributor

ericl commented Oct 27, 2016

[info] - SPARK-10562: partition by column with mixed case name *** FAILED *** (605 milliseconds)
[info]   java.lang.reflect.InvocationTargetException:
[info]   at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
[info]   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]   at java.lang.reflect.Method.invoke(Method.java:497)
[info]   at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:588)

Odd that the last commit could cause this, maybe it's a flake? jenkins test this please

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67625 has finished for PR 15515 at commit b6776cc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67632 has finished for PR 15515 at commit b6776cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
// Need to recover partitions into the metastore so our saved data is visible.
val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier)
Union(createCmd, recoverPartitionCmd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a special node for running a sequence of commands. We are relying on the implementation of Union at here. Let's address this in a follow-up PR.

case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
updateStats(logicalRel.catalogTable.get,
AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's the cost of AnalyzeTableCommand.calculateTotalSize? Also, why is sizeInBytes not the latest size?

sparkSession.sqlContext.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sparkSession.sessionState.executePlan(
AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we have a similar logic at https://github.com/apache/spark/pull/15515/files#diff-94fbd986b04087223f53697d4b6cab24R396. Are we recovering partitions twice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just double checked with create table test_sel2 USING parquet PARTITIONED BY (fieldone, fieldtwo) AS SELECT id as fieldzero, id as fieldone, id as fieldtwo from range(100) and it is using a different path, so the recovery is not duplicated.

// This is always the case for Hive format tables, but is not true for Datasource tables created
// before Spark 2.1 unless they are converted via `msck repair table`.
spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
catalog.refreshTable(tableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also update the doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a pretty major change for 2.1. Shall we do it in a followup once the patches for 2.1 are finalized?

@yhuai
Copy link
Contributor

yhuai commented Oct 27, 2016

Looks good. I left a few questions. Let me know if you want to address them in follow-up prs.

df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
// Need to recover partitions into the metastore so our saved data is visible.
val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier)
Union(createCmd, recoverPartitionCmd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll file a ticket after this is merged

case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
updateStats(logicalRel.catalogTable.get,
AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that the relation's size is no longer computed when it is resolved, so we have to force a table scan here to get an updated size.

Weird that github reordered my comment above actually ^

sparkSession.sqlContext.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sparkSession.sessionState.executePlan(
AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just double checked with create table test_sel2 USING parquet PARTITIONED BY (fieldone, fieldtwo) AS SELECT id as fieldzero, id as fieldone, id as fieldtwo from range(100) and it is using a different path, so the recovery is not duplicated.

// This is always the case for Hive format tables, but is not true for Datasource tables created
// before Spark 2.1 unless they are converted via `msck repair table`.
spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
catalog.refreshTable(tableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a pretty major change for 2.1. Shall we do it in a followup once the patches for 2.1 are finalized?


if (l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty &&
l.catalogTable.get.partitionProviderIsHive) {
// TODO(ekl) we should be more efficient here and only recover the newly added partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a follow-up pr for this. Will cc you.

@yhuai
Copy link
Contributor

yhuai commented Oct 27, 2016

Cool. I am merging this pr to unblock other tasks.

}
}

test("when partition management is disabled, we preserve the old behavior even for new tables") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked the old behavior. It is different from the existing behavior in our Spark 2.0 build. Let me do a quick fix to resolve it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants