-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33449][SQL] Support File Metadata Cache for Parquet #30483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@wangyum WIP now, missing some configuration entries, test suite and orc file support |
|
Test build #131655 has finished for PR 30483 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since your fix is merged here,
could you fix Scala style and rebase to the master, @LuciferYang ?
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
OK~ will do it later ~ |
|
Test build #131754 has finished for PR 30483 at commit
|
|
@LuciferYang It would be great if we had some benchmark numbers. |
|
@wangyum this is a very good suggestion ~ |
...java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
Outdated
Show resolved
Hide resolved
|
Test build #131760 has finished for PR 30483 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala
Outdated
Show resolved
Hide resolved
|
Test build #131771 has finished for PR 30483 at commit
|
|
Test build #131775 has finished for PR 30483 at commit
|
|
Test build #142494 has finished for PR 30483 at commit
|
|
@dongjoon-hyun Because #33748 gives an ORC-only pr and use a new JIRA SPARK-36516, I'll change this PR to Parquet-only |
|
There will be some duplicate codes in the two PR, and this part of the code will be synchronized after one of them is merged |
|
Kubernetes integration test starting |
|
Test build #142498 has finished for PR 30483 at commit
|
|
Got it. Thank you for moving forward this efforts, @LuciferYang . |
|
Kubernetes integration test status success |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #142500 has finished for PR 30483 at commit
|
|
Test build #142505 has finished for PR 30483 at commit
|
|
|
||
| lazy val footerFileMetaData = | ||
| lazy val footerFileMetaData = if (parquetMetaCacheEnabled) { | ||
| ParquetFileMeta.readFooterFromCache(filePath, conf).getFileMetaData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happen if the file is removed and replaced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can discuss it in #33748 first. I'll set this PR to draft first
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @LuciferYang for the PR. Correct me if I'm wrong but I feel this is only very useful in a cluster where Spark executors are reused across different queries, and even in that case we'll need to be very careful on cache invalidation, since the same file can be overwritten with different content (e.g., in Hive insert overwrite).
I noticed that Spark currently needs to read the footer twice: once in SpecificParquetRecordReaderBase and another in ParquetFileFormat or ParquetPartitionReaderFactory. This can be fixed separately with a much simpler approach.
| .createWithDefault(false) | ||
|
|
||
| val FILE_META_CACHE_PARQUET_ENABLED = buildConf("spark.sql.fileMetaCache.parquet.enabled") | ||
| .doc("To indicate if enable parquet file meta cache, it is recommended to enabled " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm curious whether this can help if your Spark queries is running as separate Spark jobs, where each of them may use different executors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this feature does have limitations, NODE_LOCAL + thrift-server with interactive analysis should be the best scene. If the architecture is storage and computing are separated, we need to consider the task scheduling.
In fact, in the OAP project, the fileMetaCache is relies on dataCache(PROCESS_LOCAL)
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe FILE_META_CACHE_TTL_SINCE_LAST_ACCESS_SEC and spark.sql.fileMetaCache.ttlSinceLastAccessSec so it's easier to know that the unit is second?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion
| this.fileSchema = footer.getFileMetaData().getSchema(); | ||
| FilterCompat.Filter filter = ParquetInputFormat.getFilter(configuration); | ||
| List<BlockMetaData> blocks = | ||
| RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this apply all the filter levels? e.g., stats, dictionary, and bloom filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to investigate it again
| def getFileMeta: FileMeta | ||
| override def hashCode(): Int = path.hashCode | ||
| override def equals(other: Any): Boolean = other match { | ||
| case df: FileMetaKey => path.equals(df.path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the same file gets replaced? how do we invalidate the cache? this is very common from my experience, e.g., Hive overwrite a partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very good question, we discussed in #33748 (comment),
If the file name has the timestamp, I think we don't have to worry too much. The names of the new file and the old file are different and they can ensure that they don't read the wrong data.
If it is manually file replaced and the file has the same name and the corresponding file meta exists in the cache, an incorrect file meta will be used to read the data. If the data reading fails, the job will fail. But if the data reading happens to be successful, the job will read the wrong data.
In fact, even if there is no `FileMetaCache`, there is a similar risk in manually replace files with same name, because the offset and length of PartitionedFile maybe don't match after manually replace for a running job
And At the same time, I added a warning for this feature in SQLConf.
Now Parquet is a draft because the Deprecated API, We are focusing on ORC (SPARK-36516) now
Refer to apache#30483 (cherry picked from commit cb7852c)
1. Implement LocalDataCacheManager 2. base xiaoxiang's PR 3. Implement CacheFileScanRDD 4. Implement AbstractCacheFileSystem 5. Optimize performance 6. Support soft affinity for hdfs 7. Support ByteBuffer to read data, and avoid to read data one byte by one byte 8. Add File Metadata cache support for Parquet : Refer to apache#30483 9. Support to cache small files in memory : ByteBufferPageStore extends PageStore to support cache data in memory
What changes were proposed in this pull request?
The main purpose of this pr is to introduce the File Meta Cache mechanism for Spark SQL and the basic File Meta Cache implementation for Parquet is provided at the same time.
The main change of this pr as follows:
Defined a
FileMetaCacheManagerto cache the mappingFileMetaKeytoFileMeta. TheFileMetaKeyis the cache key,equalsis determined by the file path by default. TheFileMetaused to represent the cache value and It is generated by theFileMetaKey#getFileMetamethod.Currently, the
FileMetaCacheManagersupports a simple cache expiration elimination mechanism, and the expiration time is determined by the new configFILE_META_CACHE_TTL_SINCE_LAST_ACCESSFor Parquet file format, this pr added
ParquetFileMetaKeyandParquetFileMetato cache Parquet file Footer and the Footer cache can be used by Vectorized read scene both in DS API V1 and V2, the feature will be enabled whenFILE_META_CACHE_PARQUET_ENABLEDis trueCurrently, the file meta cache mechanism cannot be used by
RowBasedReader, and it needs the completion of PARQUET-1965 for further support.Why are the changes needed?
Support Parquet datasource use File Meta Cache mechanism to reduce the times of metadata reads multiple queries are performed on the same dataset.
Does this PR introduce any user-facing change?
Add 3 new config:
FILE_META_CACHE_PARQUET_ENABLED(spark.sql.fileMetaCache.parquet.enabled)to indicate if enable parquet file meta cache mechanismFILE_META_CACHE_TTL_SINCE_LAST_ACCESS(spark.sql.fileMetaCache.ttlSinceLastAccess)to represent Time-to-live for file metadata cache entry after last access, the unit is seconds.How was this patch tested?
ParquetQuerySuite