Skip to content

Conversation

@xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Dec 4, 2016

What changes were proposed in this pull request?

As the scenario describe in SPARK-18700, when cachedDataSourceTables invalided, the coming few queries will fetch all FileStatus in listLeafFiles function. In the condition of table has many partitions, these jobs will occupy much memory of driver finally may cause driver OOM.

In this patch, add StripedLock for each table's relation in cache not for the whole cachedDataSourceTables, each table's load cache operation protected by it.

How was this patch tested?

Add a multi-thread access table test in PartitionedTablePerfStatsSuite and check it only loading once using metrics in HiveCatalogMetrics

@xuanyuanking xuanyuanking changed the title SPARK-18700: add ReadWriteLock for each table's relation in cache [SPARK-18700][SQL] Add ReadWriteLock for each table's relation in cache Dec 4, 2016
@SparkQA
Copy link

SparkQA commented Dec 4, 2016

Test build #69634 has finished for PR 16135 at commit 8718ec3.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 4, 2016

Test build #69635 has finished for PR 16135 at commit a1d9a3c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 5, 2016

Test build #69660 has finished for PR 16135 at commit 95aabb8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

@rxin @liancheng

@rxin
Copy link
Contributor

rxin commented Dec 7, 2016

cc @ericl can you take a look at this?

@ericl
Copy link
Contributor

ericl commented Dec 7, 2016

Is it sufficient to lock around the catalog.filterPartitions(Nil)? Why do we need reader locks?


/** ReadWriteLock for each tables, protect the read and write cached */
protected[hive] val tableLockMap =
new ConcurrentHashMap[QualifiedTableName, ReentrantReadWriteLock]
Copy link
Contributor

@ericl ericl Dec 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I considered the Striped.lazyWeakReadWriteLock here but I need deal with invalidateAllCache(), it need all <K, V>, so I finally new a HashMap self.

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Dec 8, 2016

@ericl Thanks for your review.

Is it sufficient to lock around the catalog.filterPartitions(Nil)?

Yes, this patch port from 1.6.2 and I missed the diff here. Fixed in next patch.

Why do we need reader locks?

Write or Invalid the table cache operation fewer than read it. Reader waiting when there is same table writing cache. Do ericl mean here just need a lock, not RWLock?

@ericl
Copy link
Contributor

ericl commented Dec 8, 2016

I guess the large number of lock sites is confusing me. We only want to prevent concurrent instantiation of a single table, so shouldn't you only need 1 lock for that site?

Also, we should have a unit test that tries to concurrently read from a table from many threads, and verifies via the catalog metrics that it is only loaded once (see TablePerfStatsSuite for how to access the metrics).

val key = metastoreCatalog.getQualifiedTableName(table)
metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
metastoreCatalog.readLock(key,
metastoreCatalog.cachedDataSourceTables.getIfPresent(key))
Copy link
Contributor

@ericl ericl Dec 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need locking?


def invalidateCache(): Unit = {
metastoreCatalog.cachedDataSourceTables.invalidateAll()
metastoreCatalog.invalidateAllCache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

@xuanyuanking xuanyuanking changed the title [SPARK-18700][SQL] Add ReadWriteLock for each table's relation in cache [SPARK-18700][SQL] Add StripedLock for each table's relation in cache Dec 9, 2016
@xuanyuanking
Copy link
Member Author

xuanyuanking commented Dec 9, 2016

hi @ericl
This commit do the 3 things below, thanks for your check:

  1. Delete the unnecessary lock use and simplify the lock operation
  2. Add UT test in PartitionedTablePerfStatsSuite
  3. Add cache hit metrics in HiveCatalogMetrics

Also change the description of this PR.

@SparkQA
Copy link

SparkQA commented Dec 9, 2016

Test build #69903 has finished for PR 16135 at commit 82cf00e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* Tracks the total number of cachedDataSourceTables hits.
*/
val METRIC_DATASOUCE_TABLE_CACHE_HITS = metricRegistry.counter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use one of the other metrics, rather than add a new one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we can't, only the cache hits can help us check the number.
I do the test below:
I add a Thread.sleep(1000) before cachedDataSourceTables.put(tableIdentifier, created) in HiveMetastoreCatalog.scala +265 to make the build table relation slow. And print all the metrics with and without lock

println(HiveCatalogMetrics.METRIC_DATASOUCE_TABLE_CACHE_HITS.getCount())
println(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount())
println(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount())
println(HiveCatalogMetrics.METRIC_HIVE_CLIENT_CALLS.getCount())
println(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
println(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount())

The result of without lock:

0
0
5
70
0
0

and the result of with lock:

9
0
5
70
0
0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's kind of odd, I'd expect the duplicate table building to cause more file accesses or at least cache hits since we are scanning the filesystem multiple times. Is that not the case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may find the reason of the odd scenario, please check my conclusion:
In 2.0 add a new config lazyPruningEnabled, it's default value is true and while multi-thread do the building same time, it will not do listLeafFile.
So I set the HIVE_MANAGE_FILESOURCE_PARTITIONS=false and set the partition larger than PARALLEL_PARTITION_DISCOVERY_THRESHOLD(this will cause METRIC_PARALLEL_LISTING_JOB_COUNT +1), test results list below:
without lock:

0
0
550   (50 file * 11, 1 is cache.load() and the other 10 is 10 threads)
90
11      (also 1 * 11)
1000

and with lock:

9
0
100  (2 * 50, 1 is cache.load() and the other 1 is the first threads)
54
2     (also 1 * 2)
550

so I can delete the added dataSourceTableCacheHits metrics and use parallelListingJobCount and filesDiscovered instead. Do this in next patch.

import org.apache.spark.sql.types._



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

tableIdent.table.toLowerCase)
}

/** ReadWriteLock for each tables, protect the read and write cached */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this comment to say that the reason we lock is to prevent concurrent table instantiation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update done, and more comments at HiveMetastoreCatalog.scala+226

private val tableLockStripes = Striped.lazyWeakLock(10)

/** Acquires a lock on the table cache for the duration of `f`. */
private def cacheLock[A](tableName: QualifiedTableName, f: => A): A = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withTableCreationLock

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update done

executorPool.shutdown()
executorPool.awaitTermination(30, TimeUnit.SECONDS)
// check the cache hit, the cache only load once
assert(HiveCatalogMetrics.METRIC_DATASOUCE_TABLE_CACHE_HITS.getCount() == 9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test fail without the lock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may failed sometimes without lock, but when I add Thread.sleep(1000) before cachedDataSourceTables.put(tableIdentifier, created) in HiveMetastoreCatalog.scala +265 to make the build table relation slow.It will failed every time. How can I do this hook in UT? Or how to make the cache build operation slow without really make a big table? : )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's fine, as long as it fails some fraction of the time it will eventually show up as a flaky test.

}
}

test("SPARK-18700: add lock for each table's realation in cache") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"table loaded only once even when resolved concurrently"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update done

}

/** ReadWriteLock for each tables, protect the read and write cached */
private val tableLockStripes = Striped.lazyWeakLock(10)
Copy link
Contributor

@ericl ericl Dec 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, may as well make it 100 if it's a lazy weak lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: tableCreationLocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

@SparkQA
Copy link

SparkQA commented Dec 11, 2016

Test build #69979 has started for PR 16135 at commit 276656e.

executorPool.shutdown()
executorPool.awaitTermination(30, TimeUnit.SECONDS)
// check the cache hit, the cache only load once
assert(HiveCatalogMetrics.METRIC_DATASOUCE_TABLE_CACHE_HITS.getCount() == 9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's fine, as long as it fails some fraction of the time it will eventually show up as a flaky test.

METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
METRIC_DATASOUCE_TABLE_CACHE_HITS.dec(METRIC_DATASOUCE_TABLE_CACHE_HITS.getCount())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/souce/source

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e...sorry, this new added metric will delete next patch like before comment

/**
* Tracks the total number of cachedDataSourceTables hits.
*/
val METRIC_DATASOUCE_TABLE_CACHE_HITS = metricRegistry.counter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's kind of odd, I'd expect the duplicate table building to cause more file accesses or at least cache hits since we are scanning the filesystem multiple times. Is that not the case?

@SparkQA
Copy link

SparkQA commented Dec 12, 2016

Test build #69998 has finished for PR 16135 at commit 16c47c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making these changes, this lgtm with some nits.

catalog.filterPartitions(Nil) // materialize all the partitions in memory
// Here we should protect all relation get and create operation with lock while big
// table's CatalogFileIndex will take some time, only lock cachedDataSourceTables.put
// will still cause driver memory waste. More detail see SPARK-18700.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably don't need this comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, delete it

}

/** Locks for preventing driver mem waste when concurrent table instantiation */
private val tableCreationLocks = Striped.lazyWeakLock(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "These locks guard against multiple attempts to instantiate a table, which wastes memory."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

// check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and
// METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect,
// only one thread can really do the build, so the listing job count is 2, the other
// one is cahce.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/cahce/cache

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

……sorry, fix done

@xuanyuanking
Copy link
Member Author

Thanks for ericl's review!

@SparkQA
Copy link

SparkQA commented Dec 12, 2016

Test build #70012 has finished for PR 16135 at commit 5beccaa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl
Copy link
Contributor

ericl commented Dec 13, 2016

cc @rxin , this lgtm

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Dec 16, 2016

cc @rxin thanks for check. :)

@hvanhovell
Copy link
Contributor

retest this please

@hvanhovell
Copy link
Contributor

I am merging this one after a successful test run. Ping me if you object.

@SparkQA
Copy link

SparkQA commented Dec 19, 2016

Test build #70357 has finished for PR 16135 at commit 5beccaa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Dec 19, 2016

Go for it.

asfgit pushed a commit that referenced this pull request Dec 19, 2016
## What changes were proposed in this pull request?

As the scenario describe in [SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700), when cachedDataSourceTables invalided, the coming few queries will fetch all FileStatus in listLeafFiles function. In the condition of table has many partitions, these jobs will occupy much memory of driver finally may cause driver OOM.

In this patch, add StripedLock for each table's relation in cache not for the whole cachedDataSourceTables, each table's load cache operation protected by it.

## How was this patch tested?

Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and check it only loading once using metrics in `HiveCatalogMetrics`

Author: xuanyuanking <xyliyuanjian@gmail.com>

Closes #16135 from xuanyuanking/SPARK-18700.

(cherry picked from commit 2448285)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
@asfgit asfgit closed this in 2448285 Dec 19, 2016
@hvanhovell
Copy link
Contributor

Merging to master/2.1. @xuanyuanking can you open a backport for 2.0, if we also need to merge this to that branche?

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Dec 20, 2016

@hvanhovell Sure, I open a new BACKPORT-2.0.
There's a little diff in branch-2.0, the ut test of this patch based on the HiveCatalogMetrics which not existed in 2.0, so I added the metric used in this patch. Thanks for check.

asfgit pushed a commit that referenced this pull request Dec 21, 2016
…ation in cache

## What changes were proposed in this pull request?

Backport of #16135 to branch-2.0

## How was this patch tested?

Because of the diff between branch-2.0 and master/2.1, here add a multi-thread access table test in `HiveMetadataCacheSuite` and check it only loading once using metrics in `HiveCatalogMetrics`

Author: xuanyuanking <xyliyuanjian@gmail.com>

Closes #16350 from xuanyuanking/SPARK-18700-2.0.
// check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and
// METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect,
// only one thread can really do the build, so the listing job count is 2, the other
// one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is wrong. The extra counts are from the DataFrameWriter's save() API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on a fix to avoid the useless filesystem scan caused by the save() API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Xiao fixed this in #16481

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

As the scenario describe in [SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700), when cachedDataSourceTables invalided, the coming few queries will fetch all FileStatus in listLeafFiles function. In the condition of table has many partitions, these jobs will occupy much memory of driver finally may cause driver OOM.

In this patch, add StripedLock for each table's relation in cache not for the whole cachedDataSourceTables, each table's load cache operation protected by it.

## How was this patch tested?

Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and check it only loading once using metrics in `HiveCatalogMetrics`

Author: xuanyuanking <xyliyuanjian@gmail.com>

Closes apache#16135 from xuanyuanking/SPARK-18700.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants