Skip to content

Conversation

@petermaxlee
Copy link
Contributor

What changes were proposed in this pull request?

This patch implements the 3 things specified in SPARK-16311:

(1) Append a message to the FileNotFoundException and say that a workaround is to do explicitly metadata refresh.
(2) Make metadata refresh work on temporary tables/views.
(3) Make metadata refresh work on Datasets/DataFrames, by introducing a Dataset.refresh() method.

And one additional small change:
(4) Merge invalidateTable and refreshTable.

How was this patch tested?

Created a new test suite that creates a temporary directory and then deletes a file from it to verify Spark can read the directory once refresh is called.

@petermaxlee
Copy link
Contributor Author

cc @rxin

@rxin
Copy link
Contributor

rxin commented Jun 30, 2016

cc @cloud-fan / @liancheng

@gatorsmile
Copy link
Member

Before, I tried to merge invalidateTable and refreshTable. @yhuai left the following comment:
#13156 (comment)

I think maybe we can keep them separately?

* @group action
* @since 2.0.0
*/
def refresh(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will remove the cached data. This is different from what JIRA describes. CC @rxin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other refresh methods also remove cached data, so I thought this is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new API has different behaviors from the refreshTable API and Refresh Table SQL statement. See the following code:

/**
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
* is refreshed.
*
* @group cachemgmt
* @since 2.0.0
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
sessionCatalog.refreshTable(tableIdent)
// If this table is cached as an InMemoryRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = Dataset.ofRows(sparkSession, logicalPlan)
// Uncache the logicalPlan.
sparkSession.sharedState.cacheManager.uncacheQuery(df, blocking = true)
// Cache it again.
sparkSession.sharedState.cacheManager.cacheQuery(df, Some(tableIdent.table))
}
}

IMO, if we using the word refresh, we have to make them consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ic - we can't unpersist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can unpersist, but should persist it again immediately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can and should call unpersist, but we should also call persist()/cache() again so that the Dataset will be cached lazily again with correct data when it gets executed next time. I guess that's also what @gatorsmile meant.

@gatorsmile
Copy link
Member

Test cases are not enough to cover the metadata refreshing. The current metadata cache is only used for data source tables. We still could convert Hive tables to data source tables. For example, parquet and orc. Thus, we also need to check the behaviors of these cases.

Try to design more test cases for metadata refreshing, including both positive and negative cases.

@petermaxlee
Copy link
Contributor Author

What do you mean by both positive and negative cases?

@gatorsmile
Copy link
Member

gatorsmile commented Jun 30, 2016

For example, I try to refresh the metadata of a DataFrame that has multiple leaf nodes of MetastoreRelation that are already converted to LogicalRelation. I think we expect the metadata stored in cachedDataSourceTables should be invalidated. The positive case is the table is still cached. The negative case is the table is already uncached.

Update: just correct the contents.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61524 has finished for PR 13989 at commit 82f9bec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/**
* Invalidates any metadata cached in the plan recursively.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Refreshes" instead of "Invalidates"?

@petermaxlee
Copy link
Contributor Author

petermaxlee commented Jun 30, 2016

Would this work? Traverse the logical plan to find whether it references any catalog relation, and if it does, call catalog.refreshTable("...")?

For example

scala> spark.table("test1").queryExecution.optimizedPlan.asInstanceOf[org.apache.spark.sql.execution.datasources.LogicalRelation].metastoreTableIdentifier
res20: Option[org.apache.spark.sql.catalyst.TableIdentifier] = Some(`default`.`test1`)

@liancheng
Copy link
Contributor

One concern of mine is that, analyzed plan, optimized plan, and executed (physical) plan stored in QueryExecution are all lazy vals, which means that they won't be re-optimized/planned accordingly after refreshing metadata of the corresponding logical plan.

Say we constructed a DataFrame df to join a small table A and a large table B. After calling df.write.parquet(...), analyzed, optimized, and executed plans of df are all computed. Since A is small, the planner may decide to broadcast it, and this decision is reflected in the physical plan.

Next, we add a bunch of files into the directory where table A lives to make it super large, then call df.refresh() to refresh the logical plan. Now, if we try to call df.write.parquet(...) again, the query may probably crash since the physical plan is not refreshed and still thinks that A should be broadcasted.

@liancheng
Copy link
Contributor

liancheng commented Jun 30, 2016

In general, I think reconstructing a DataFrame/Dataset or using REFRESH TABLE at application level may be a better approach to solve the problem this PR tries to solve. Did I miss any context here?

@rxin
Copy link
Contributor

rxin commented Jun 30, 2016

I think @liancheng has a good point. Why don't we take out Dataset.refresh() for now?

@petermaxlee
Copy link
Contributor Author

petermaxlee commented Jun 30, 2016

Alright I will do that and submit a new pull request.

Note that I think the data frame refresh is already possible via table refresh, if a data frame references a table, or if some view references a data frame.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #3159 has finished for PR 13989 at commit 82f9bec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Jul 5, 2016
## What changes were proposed in this pull request?
This patch fixes the bug that the refresh command does not work on temporary views. This patch is based on #13989, but removes the public Dataset.refresh() API as well as improved test coverage.

Note that I actually think the public refresh() API is very useful. We can in the future implement it by also invalidating the lazy vals in QueryExecution (or alternatively just create a new QueryExecution).

## How was this patch tested?
Re-enabled a previously ignored test, and added a new test suite for Hive testing behavior of temporary views against MetastoreRelation.

Author: Reynold Xin <rxin@databricks.com>
Author: petermaxlee <petermaxlee@gmail.com>

Closes #14009 from rxin/SPARK-16311.
asfgit pushed a commit that referenced this pull request Jul 5, 2016
## What changes were proposed in this pull request?
This patch fixes the bug that the refresh command does not work on temporary views. This patch is based on #13989, but removes the public Dataset.refresh() API as well as improved test coverage.

Note that I actually think the public refresh() API is very useful. We can in the future implement it by also invalidating the lazy vals in QueryExecution (or alternatively just create a new QueryExecution).

## How was this patch tested?
Re-enabled a previously ignored test, and added a new test suite for Hive testing behavior of temporary views against MetastoreRelation.

Author: Reynold Xin <rxin@databricks.com>
Author: petermaxlee <petermaxlee@gmail.com>

Closes #14009 from rxin/SPARK-16311.

(cherry picked from commit 16a2a7d)
Signed-off-by: Reynold Xin <rxin@databricks.com>
@petermaxlee petermaxlee closed this Jul 6, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants