Skip to content

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Aug 16, 2021

What changes were proposed in this pull request?

The main purpose of this pr is to introduce the File Meta Cache mechanism for Spark SQL and the basic File Meta Cache implementation for Orc is provided at the same time. There was originally a PR that supports file meta cache both Parquet and ORC, but Parquet has non Deprecated API that can be used to pass footer to create new ParquetFileReader and both Apache Spark and Parquet community are reluctant to advertise the deprecated API, so this pr spin off ORC-only part.

The main change of this pr as follows:

  • Defined a FileMetaCacheManager to cache the mapping FileMetaKey to FileMeta. The FileMetaKey is the cache key, equals is determined by the file path by default. The FileMeta used to represent the cache value and It is generated by the FileMetaKey#getFileMeta method.

  • Currently, the FileMetaCacheManager supports a simple cache expiration elimination mechanism, the expiration time is determined by the new config FILE_META_CACHE_TTL_SINCE_LAST_ACCESS and the maximum number of file meta entries the meta cache contains for each executor is determined by the new config FILE_META_CACHE_MAXIMUM_SIZE

  • For Orc file format, this pr added OrcFileMetaKey and OrcFileMeta to cache Orc file Tail and and the Tail cache can be used by Vectorized read scene both in DS API V1 and V2, the feature will be enabled when FILE_META_CACHE_ENABLED_SOURCE_LIST configured orc

Currently, the file meta cache mechanism cannot be used by RowBasedReader, and it needs the completion of ORC-746 for further support.

The fileMetaCache need users to pay special attention to the following situations:

If the fileMetaCache is enabled, the existing data files should not be replaced with the same file name, otherwise there will be a risk of job failure or wrong data reading before the cache entry expires.

Why are the changes needed?

Support Orc datasource use File Meta Cache mechanism to reduce the times of metadata reads multiple queries are performed on the same dataset.

Does this PR introduce any user-facing change?

Add 3 new config:

  • FILE_META_CACHE_ENABLED_SOURCE_LIST(spark.sql.fileMetaCache.enabledSourceList): A comma-separated list of data source short names for which data source enabled file meta cache, now the file meta cache only support ORC, it is recommended to enabled this config when multiple queries are performed on the same dataset, default is false.

  • FILE_META_CACHE_TTL_SINCE_LAST_ACCESS(spark.sql.fileMetaCache.ttlSinceLastAccess) to represent Time-to-live for file metadata cache entry after last access, the unit is seconds.

  • FILE_META_CACHE_MAXIMUM_SIZE(spark.sql.fileMetaCache.maximumSize) to represent maximum number of file meta entries the file meta cache contains.

How was this patch tested?

  • Pass the Jenkins or GitHub Action
  • Add new test suites to FileMetaCacheSuite and SQLConfSuite
  • Add FileMetaCacheReadBenchmark to measure benefits

@github-actions github-actions bot added the SQL label Aug 16, 2021
@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Test build #142495 has finished for PR 33748 at commit 45f4827.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Test build #142497 has started for PR 33748 at commit 1a9bd3b.

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46993/

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46997/

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46998/

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46993/

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46997/

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46998/

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Test build #142503 has finished for PR 33748 at commit 30df269.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class FileMetaKey
  • case class OrcFileMetaKey(path: Path, configuration: Configuration)
  • case class OrcFileMeta(tail: OrcTail) extends FileMeta

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47004/

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Test build #142496 has finished for PR 33748 at commit a74c793.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47014/

@SparkQA
Copy link

SparkQA commented Aug 19, 2021

Test build #142634 has finished for PR 33748 at commit 4adeb62.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2021

Test build #142639 has finished for PR 33748 at commit e5f9497.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2021

Test build #142632 has finished for PR 33748 at commit c3838e6.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2021

Test build #142640 has finished for PR 33748 at commit ec8fa1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Ya, it's a little blocked by the on-going discussion because it's related. Thank you for waiting for it, @LuciferYang .

@dbtsai
Copy link
Member

dbtsai commented Aug 20, 2021

Since the metadata is cached in the executor, does it mean the task reading the same ORC file has to be scheduled on the same executor? How can we guarantee this?

@sunchao please correct me if I'm wrong, if we overwrite the table, the same file names will be reused which potentially could cause inconsistent issue. Shouldn't we have some safeguard such as checking the file sizes?

Thanks,

@viirya
Copy link
Member

viirya commented Aug 20, 2021

@sunchao please correct me if I'm wrong, if we overwrite the table, the same file names will be reused which potentially could cause inconsistent issue. Shouldn't we have some safeguard such as checking the file sizes?

Even overwriting the table, will we use same file names? I remember the file names will include unique task id/attempt id.

@sunchao
Copy link
Member

sunchao commented Aug 20, 2021

In Hive it's common that the same file name (e.g., 000000_0) gets used when doing insert overwrite. Even if we check file size and other stuff, it can't completely prevent us from hitting a stale cache.

@dongjoon-hyun
Copy link
Member

@dbtsai and @sunchao .

Everything depends on the data lifecycle. For the safety, we can control it by reducing spark.sql.fileMetaCache.ttlSinceLastAccessSec to 10 secs or less which is still effective to reduce the twice footer lookups in the single task.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Aug 20, 2021

Since the metadata is cached in the executor, does it mean the task reading the same ORC file has to be scheduled on the same executor? How can we guarantee this?

At present, I think it is try best because there is no guarantee of scheduling, if there has many NODE_LOCAL scheduling is probably the best scenario.

We may need to collect the information about fileMetaCache distribution to the driver and consider this factor when task scheduling, but this may make the driver use more memory.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Aug 20, 2021

In Hive it's common that the same file name (e.g., 000000_0) gets used when doing insert overwrite. Even if we check file size and other stuff, it can't completely prevent us from hitting a stale cache.

Can we add ctime or mtime of the file to the PartitionedFile and use this information for check?

Similarly, how do we ensure that the FileStatus cache(SharedInMemoryCache) is correct when the user overwrites the file and does not send the refreshTable command to the current SparkApp? There is also the problem of the same name of the file. I feel that solving this consistency problem may become an independent topic :)

@sunchao
Copy link
Member

sunchao commented Aug 20, 2021

Can we add ctime or mtime of the file to the PartitionedFile and use this information for check?

Yea file path + modification time seem like a good way to validate the cache

Similarly, how do we ensure that the FileStatus cache(SharedInMemoryCache) is correct when the user overwrites the file and does not send the refreshTable command to the current SparkApp? There is also the problem of the same name of the file. I feel that solving this consistency problem may become an independent topic :)

I think they are not quite the same: FileStatus cache operates on a per-table basis so you'll only get stale data in the worst case. However, here the cache is on a per-file basis, so one could end up having some files in a partition that are cached while the rest are not. In addition, at least in Parquet, metadata, including row count, stats, bloom filter, etc, are used to filter row groups or data pages. Task failure or correctness issue could happen if we apply stale metadata on a newer file, or if the metadata is used in aggregation pushdown (ongoing work).

Everything depends on the data lifecycle. For the safety, we can control it by reducing spark.sql.fileMetaCache.ttlSinceLastAccessSec to 10 secs or less which is still effective to reduce the twice footer lookups in the single task.

I understand you want to avoid the duplicate footer lookup. In Parquet at least we can just pass the footer from either ParquetFileFormat or ParquetPartitionReaderFactory to SpecificParquetRecordReaderBase for reuse, which I think is much simpler than using a cache.

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Aug 20, 2021

I understand you want to avoid the duplicate footer lookup. In Parquet at least we can just pass the footer from either ParquetFileFormat or ParquetPartitionReaderFactory to SpecificParquetRecordReaderBase for reuse, which I think is much simpler than using a cache.

If we can add some strategies to Spark in the future to ensure that the task reading the same Orc file has to be scheduled on the same executor, the benefits of fileMetaCache will be more obvious. In fact, in our production environment, about 100000 sql queries are using this feature(for parquet file) every day.

@sunchao
Copy link
Member

sunchao commented Aug 20, 2021

Yea, but it adds complexity and more memory consumption like you mentioned earlier, and you'll need to have the driver a long running process like a Presto coordinator, which I'm not sure how many people are using Spark this way.

@LuciferYang
Copy link
Contributor Author

which I'm not sure how many people are using Spark this way.

There should be many. We can do some survey, haha ~

@LuciferYang
Copy link
Contributor Author

Yea, but it adds complexity and more memory consumption like you mentioned earlier, and you'll need to have the driver a long running process like a Presto coordinator

Yes, in the production environment, we did change a lot of code for similar optimization

@dbtsai
Copy link
Member

dbtsai commented Aug 20, 2021

Everything depends on the data lifecycle. For the safety, we can control it by reducing spark.sql.fileMetaCache.ttlSinceLastAccessSec to 10 secs or less which is still effective to reduce the twice footer lookups in the single task.

@dongjoon-hyun Could you elaborate what's the twice footer lookups here in a single task? If it's in a single task, then the files should be the same, so the life of the cache can be just for a single cache, right?

I thought the purpose of this PR is when the same ORC file is used for multiple tasks, the fileMetaCache can be used to avoid reading the footer multiple times with a caveat that those tasks have to be scheduled on the same executor.

As @LuciferYang and @sunchao mentioned above, this requires adding something like Presto coordinator to ensure the footer cache can be reused. I feel it's fairly complicated, and don't know if it worths it. For this use-case, we might just use Iceberg which stores the metadata as a separated manifest.

@LuciferYang
Copy link
Contributor Author

@dongjoon-hyun db90daf, 7327fdb change to use Guava Cache and merge with master because #33784 revert code aboutCaffeine

@SparkQA
Copy link

SparkQA commented Aug 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47186/

@SparkQA
Copy link

SparkQA commented Aug 22, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47186/

@SparkQA
Copy link

SparkQA commented Aug 22, 2021

Test build #142684 has finished for PR 33748 at commit 7327fdb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class NoSuchDatabaseException(db: String)
  • case class NoSuchNamespaceException(
  • case class NoSuchTableException(
  • case class NoSuchPartitionException(
  • case class NoSuchPermanentFunctionException(db: String, func: String)
  • case class NoSuchFunctionException(
  • case class NoSuchPartitionsException(override val message: String)
  • case class NoSuchTempFunctionException(func: String)
  • class DefaultDateFormatter(
  • class DefaultTimestampFormatter(
  • case class DayTimeIntervalType(startField: Byte, endField: Byte) extends AnsiIntervalType
  • case class YearMonthIntervalType(startField: Byte, endField: Byte) extends AnsiIntervalType

@SparkQA
Copy link

SparkQA commented Sep 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47760/

@SparkQA
Copy link

SparkQA commented Sep 14, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47760/

@SparkQA
Copy link

SparkQA commented Sep 14, 2021

Test build #143257 has finished for PR 33748 at commit a7eff43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LuciferYang LuciferYang deleted the SPARK-36516 branch October 22, 2023 07:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants