Releases: Qbeast-io/qbeast-spark
v0.7.0
We are glad to announce availability for our next 0.7.0 release! With new performance gains, new APIs, and some bug fixes 🐛
What's changed
Reduce metadata memory footprint in #335
The impact of using the driver's memory to process the Qbeast Metadata in the Delta Log was very high compared to using DataFrames. Now we ensure the Log is read minimum times and processed in a distributed manner, leading to 10x faster queries with Sampling and more efficient writes (less information broadcasted and collected).
Updated packages: removal of qbeast-core
as a side effect of #335
Since we need to use DataFrame and Spark API's for processing Qbeast Metadata efficiently, we reduced the overhead of separating interfaces in another module.
Everything is unified into one single artifact: qbeast-spark
. This does not change anything on your Spark Configuration, but it simplifies the software architecture and dependency tree. 😄
Pre-commit hook feature #319
A good way to ensure only-once transactions with extra metadata is using Pre-commit hooks
.
We add an interface for implementing your code to run between writing of files and committing the information. The mechanism will add a reference in the CommitInfo
too.
Example:
- Create your custom hook
import io.qbeast.spark.delta.hook.PreCommitHook
import io.qbeast.spark.delta.hook.PreCommitHook.PreCommitHookOutput
import org.apache.spark.sql.delta.actions.Action
class SimpleHook extends PreCommitHook {
override val name: String = "SimpleHook"
override def run(actions: Seq[Action]): PreCommitHookOutput = {
Map("clsName" -> "SimpleHook")
}
}
- Write data with a
SimpleHook
import io.test.hook.SimpleHook
import spark.implicits._
val tmpDir = "/tmp/test"
val df = spark.sparkContext.range(0, 100).toDF()
(df
.write
.mode("append")
.format("qbeast")
.option("columnsToIndex", "value")
.option("qbeastPreCommitHook.hook", classOf[SimpleHook].getCanonicalName)
.save(tmpDir)
)
- Check results
cat /tmp/test/_delta_log/00000000000000000000.json | jq
{
"commitInfo": {
...,
"tags": {
"clsName": "SimpleHook"
},
...
}
}
You can find all the details in Advanced Configuration.
New API's
- Merged #330: New
IndexMetrics
statistics to account for the multi-block files.
OTree Index Metrics:
revisionId: 1
elementCount: 309008
dimensionCount: 2
desiredCubeSize: 3000
indexingColumns: price:linear,user_id:linear
height: 8 (4)
avgFanout: 3.94 (4.0)
cubeCount: 230
blockCount: 848
fileCount: 61
bytes: 11605372
Multi-block files stats:
cubeElementCountStats: (count: 230, avg: 1343, std: 1186, quartiles: (1,292,858,2966,3215))
blockElementCountStats: (count: 848, avg: 364, std: 829, quartiles: (1,6,21,42,3120))
fileBytesStats: (count: 61, avg: 190252, std: 57583, quartiles: (113168,139261,182180,215136,332851))
blockCountPerCubeStats: (count: 230, avg: 3, std: 1, quartiles: (1,4,4,4,4))
blockCountPerFileStats: (count: 61, avg: 13, std: 43, quartiles: (1,3,5,5,207))
...
- Merged #356: Retrieve Revision information from
QbeastTable
import io.qbeast.spark.QbeastTable
// Init QbeastTable
val qbeastTable = QbeastTable.forPath(spark, "/tmp/dir")
qbeastTable.allRevisions() // List of Revision
qbeastTable.revision(1) // Information of Revision with ID=1
qbeastTable.latestRevision // Information of latest available revision
- Merged #332: New Compute Histogram utility method.
import io.qbeast.spark.utils.QbeastUtils
val brandStats = QbeastUtils.computeHistogramForColumn(df, "brand", 50)
val statsStr = s"""{"brand_histogram":$brandStats}"""
(df
.write
.mode("overwrite")
.format("qbeast")
.option("columnsToIndex", "brand:histogram")
.option("columnStats", statsStr)
.save(targetPath))
Bug fixes
- Issue #337: Remove
compact()
operation. Useoptimize()
instead. - Issue #333: Broadcast cube weights during optimization file writing
- Issue #339: Persist property changes from SET TBLPROPERTIES operations in the _delta_log
- Issue #340: ConvertToQbeast should work for table paths containing namespaces
- Issue #344: Log sampling filtering stats only in debug mode.
- Issue #352: Sampling Error when explicit type is set in columnsToIndex
- Update .md qb-spark files
- Issue #360: Update README.md
- Issue #363: Update markdown file links
- Issue #365: Fix broken link in README
- Issue #366: TBLPROPERTIES consistency between log, catalog, and Qbeast internals
New contributors
- @alinagrebenkina made their first contribution at #353
- @JosepSampe made their first contribution in #362
- @jorgeMarin1 made their first contribution in #358
Full changelog: v0.6.0...v0.7.0
v0.6.0
WARNING: This release includes breaking changes to the Format. If you have tables written prior to the 0.6.0 version, you can convert them following the documentation.
What's Changed?
1. New Qbeast Metadata to solve small files problem
Fixes the small file problem in incremental appends by adding support for multiple-block files. This change reduces the amount of files loaded when executing a query, improving the overall reading performance.
Before 0.6.0, each file would only contain information about one single cube. This causes the data to be spread amongst many small files, creating bigger overheads when reading from a specific area.
New AddFile
tags
schema (>v0.6.0)
"tags": {
"revision": "1",
"blocks": [
{
"cube": "w",
"minWeight": 2,
"maxWeight": 3,
"replicated": false,
"elementCount": 4
},
{
"cube": "wg",
"minWeight": 5,
"maxWeight": 6,
"replicated": false,
"elementCount": 7
},
]
}
The MultiBlock file approach, allows each file to contain multiple Blocks from different Cubes. This means, that the Metadata in each AddFile
is modified, and such change can compromise old tables.
Make sure to follow the guides to transform an old table (<0.6.0) to the new format.
2. Balanced file layout with Domain-Driven Appends
Another of the upgrades we made in the new code, is using Cube Domains Strategy for appending data incrementally. The change uses the existing index during partition-level domain estimation to help reduce the number of cubes with outdated max weights from 45% to 0.16%, producing a more stable and balanced file layout.
Fixes #226. Full details in #227
3. AutoIndexing Feature
Say goodbye to the .option("columnsToIndex", "a,b")
. The new AutoIndexing feature chooses the best columns to organize the data automatically.
It is NOT enabled by default. If you want to use it, you should add the necessary configuration.
spark.qbeast.index.columnsToIndex.auto=true
spark.qbeast.index.columnsToIndex.auto.max=10
4. Support for Spark 3.5.x and Delta 3.1.x
Upgrade to the latest version of the Dependencies. New libraries include:
Read everything on the Apache Spark page and Delta Lake Release.
Other Features
- Adds #288: Including more log messages in critical parts of the code. Make the code easier to debug and understand what is happening.
- Adds #261: Block filtering during Sampling. Lesser files to read, faster results.
- Adds #253: File Skipping with Delta. Initial results show an improvement of 10x by applying Delta's file skipping on Delta Log's entries.
- Adds #243:
txnVersion
andtxnAppId
are included inQbeastOptions
to write streaming data. - Adds #236: Update SBT / scalastyle frameworks.
- Fixed #312:
dataChange
on Optimization is set tofalse
. - Fixed #315: solve roll-up cube count.
- Fixed #317: no overhead during optimization.
Bug Fixes
- Fix #246: Create an External Table w/ Location loads the existing configuration instead of throwing errors.
- Fix #281: Schema Merge and Schema Overwrite mimic Delta Lake's behavior.
- Fix #228: Correct implementation of CubeId hash equals.
Contributors
@Jiaweihu08 @fpj @cdelfosse @alexeiakimov @osopardo1
Full Changelog: v0.5.0...v0.6.0
v0.5.0
Happy New Year Qbeasters!! 🤶 🤶 🤶 A lot of things have happened in the past 2023... and we'd like to thank your support with this release.
What we have accomplished in the past months:
- String Histogram Indexing. Partitioning lexicographic variables is a big pain in storage. But thanks to a few lines of pre-processing and a lot of backend engineering, we can group similar text values in the same file, so fine-grained data skipping on String IDs becomes an efficient task.
You need to:
1. Compute Histogram
import org.apache.spark.sql.delta.skipping.MultiDimClusteringFunctions
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, min}
def getStringHistogramStr(df: DataFrame, columnName: String, numBins: Int): String = {
val binStarts = "__bin_starts"
val stringPartitionColumn = MultiDimClusteringFunctions.range_partition_id(col(columnName), numBins)
df
.select(columnName)
.distinct()
.na.drop
.groupBy(stringPartitionColumn)
.agg(min(columnName).alias(binStarts))
.select(binStarts)
.orderBy(binStarts)
.collect()
.map { r =>
val s = r.getAs[String](0)
s"'$s'"
}
.mkString("[", ",", "]")
}
val histogram = getStringHistogramStr(df, "test_col_name", 100)
2. Configure the Qbeast table as follows:
val columnStats = s"""{"test_col_name_histogram":$histogram}"""
df
.write
.format("qbeast")
.option("columnsToIndex", s"test_col_name:histogram")
.option("columnStats", columnStats)
.save(targetPath)
- Upgrade to Spark 3.4.1 and Delta 2.4.0. Read what is new in: #211
- Add Delta File Skipping. #235 To enable full compatibility with other formats, we allow non-indexed files to be written in the same table. These files should be skipped efficiently, even if no multidimensional indexing is applied, so we need to rely on the underlying file format for this task. In this case, we add the Delta File Skipping feature.
- Fixed #246 . If we create an existing Qbeast Table in Spark Catalog, there's no need to add any options such as
columnsToIndex
orcubeSize
. - Fixed #213. Now we can append data correctly with
INSERT INTO
query.
Contributors
@Jiaweihu08 @alexeiakimov @cdelfosse @osopardo1
Full Changelog: v0.4.0...v0.5.0
v0.4.0
Version 0.4.0 is finally here!
The main changes in this version include:
-
Support for
Spark 3.3.0
andDelta 2.1.0
. It's about time to catch up with the latest versions! -
Including OR and IN operators in the Query Filters. Merged in #186 This is a strong use case in which we can filter the files that belong to a certain space separately in an
OR
predicate. -
New hints on Spark Log for Data Skipping. Merged in #179. More precisely, how many files have been skipped and the percentage read. Now, your Spark trace may contain some text like:
OTreeIndex data filters (exec id 1: (id is not null))
Qbeast filtered files (exec id 1): 0 of 4
-
Merged #182. Inner Cubes State is printed to facilitate debugging.
-
Solved several bugs that were influencing the read of
Timestamps
and theTransformation
process. Those include #195 and #202.
Contributors
@Jiaweihu08 @osopardo1 @Adricu8
Full Changelog: v0.3.1...v0.4.0
v0.3.5
A new version of Qbeast-Spark for Spark 3.2.2 and Delta 1.2.0 is ready!
Changes in this version include:
-
Including OR and IN operators in the Query Filters. Merged in #186 This is a strong use case in which we can filter the files that belong to a certain space separately in an
OR
predicate. -
New hints on Spark Log for Data Skipping. Merged in #179. More precisely, how many files have been skipped and the percentage read. Now, your Spark trace may contain some text like:
OTreeIndex data filters (exec id 1: (id is not null))
Qbeast filtered files (exec id 1): 0 of 4
-
Merged #182. Inner Cubes State is printed to facilitate debugging.
-
Solved several bugs that were influencing the read of
Timestamps
and theTransformation
process. Those include #195 and #202.
Contributors
@Jiaweihu08 @cugni @osopardo1 @Adricu8
Full Changelog: v0.3.3...v0.3.5
v0.3.3
We are back with a new release before upgrading versions of Spark and Delta.
What's changed
In v0.3.3 we include two major changes:
-
Data Stagin Feature #173 . It accumulates small appends to the table without indexing until a predefined amount of data is reached. Doing so prevents the index from having many small files caused by small appends.
To use Data Staging, you can configure the size through
SparkConf
.--conf spark.qbeast.index.stagingSizeInBytes=1000000000
This value is optional and defaults to
None
, in which case writes will ignore the staging area.We empty the staging area during any write by setting:
--conf spark.qbeast.index.stagingSizeInBytes=0.
-
Fixed #176 .
-
Clearer parameter naming:
Changesqbeast.spark.compaction.minFileSize
toqbeast.spark.compaction.minFileSizeInBytes
andqbeast.spark.compaction.maxFileSize
toqbeast.spark.compaction.maxFileSizeInBytes
Contributors
@Jiaweihu08 @osopardo1
Full Changelog: v0.3.2...v0.3.3
v0.3.2
Bip bip new version of qbeast-spark
with some awesome algorithm improvements 🥳
What's changed
-
Better file sizes! Now the final size of the cubes corresponds to the
cubeSize
used to write the data. You can find more information about the algorithm changes and performance numbers in the merged PR #156 . -
Register Operation Metrics and Per-file Statistics [Delta]. Statistical information of the columns (
min
,max
,nullCount
) is gathered in order to perform a better data skipping. -
Option for specifying min/max values of the indexed columns. It will allow a more flexible creation of Revision, in order to include values that might not be in the newly indexed
Dataframe
.df.write.format("qbeast") .option("columnsToIndex", "a,b") .option("columnStats","""{"a_min":0,"a_max":10,"b_min":20.0,"b_max":70.0}""") .save("/tmp/table")
The enforced structure of the JSON is:
{ "columnName_min" : value "columnName_max" : value }
Minor changes
(click to see)
Contributors
Special thanks to @Jiaweihu08, who took the Qbeast Format files to the next level with the Domain-Driven algorithm!
@cugni @osopardo1
Full Changelog: v0.3.1...v0.3.2
v0.3.1
Hi everyone :) Excited to announce the new version of qbeast-spark. Your Christmas gift came early this year!
Not many things changed in terms of API. The metadata used to query Qbeast is still the same, but we updated the versions of Delta and Spark so you can benefit from their significant upgrades too. We also add Table Properties, and now you can load your Data Lake tables to any query engine or BI tool that uses Catalogs.
Here is the list of features and bugs solved since v0.2.0 was released.
What's changed
-
Table Properties. Now we can use qbeast formatted data as Tables in Spark SQL. Thanks to the
QbeastCatalog
, you will be able to manipulate tables and insert data into them.
The class can be used as the default catalog through thespark_catalog
configuration:spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
Or as an external catalog:
spark.sql.catalog.qbeast_catalog= io.qbeast.spark.internal.sources.catalog.QbeastCatalog spark.sql.catalog.catalog_one.warehouse=/tmp/dir
-
Compaction of small files. Having many small files can turn into a performance issue. To address this, Compact operation, based on Delta Lake's
OPTIMIZE
, compacts them into a single bigger file. Merged on #110 .
You can use the command through QbeastTable interface:val qbeastTable = QbeastTable.forPath("path") qbeastTable.compact()
It will compact the files under [
MIN_FILE_SIZE_COMPACTION
,MAX_FILE_SIZE_COMPACTION
].
The default configuration is set to 1024 * 1024 * 1024 bytes as in DeltaDELTA_OPTIMIZE_MIN_FILE_SIZE
.
To change the default values, useSparkConf
:--conf spark.qbeast.compact.minFileSize=1\ --conf spark.qbeast.compact.maxFileSize=10000
-
Support
INSERT INTO
operation. Merged on #116 -
Support for indexing types
Date
andTimestamp
. Merged on #129 -
Enrich Index Metrics. New Fanout calculation for each level to understand better the tree structure and its correctness. Merged on #118
-
Merged #107. Add a lower bound to group cube size.
-
Merged #77 . Add client protocol for optimizing and writing concurrently.
Minor changes
(click to see)
Contributors
Special thanks to @Jiaweihu08 and @Adricu8, which made great contributions this year!
@cugni @alexeiakimov @osopardo1
Full Changelog: v0.2.0...v0.3.0
qbeast-spark 0.2.0
Hooray! We are releasing qbeast-spark 0.2.0
! 🥳 Coming with plenty of new functionalities, fixed bugs, and some enhancements, this version of qbeast-spark features some API changes, which you can see in the detailed list below.
In this release notes, you can find:
- A complete list of merged pull requests since the previous version (0.1.0).
- Major API changes which affect to the usage of qbeast-spark
What's Changed
- Better way to store index metadata (#26): Changes on protocol.
- New Revision metadata and Snapshot (#29): Including new structures for Revision and Snapshot.
- Better code organization (#39): Including changes in packaging and new API extensions.
- Using Spark Config to manage Qbeast configuration (#55): Enabling
--conf
. - Filter cubes by query range (#35): Filter Pushdown.
- Avoid missing cube files on reading (#57): Avoid information losses on reading.
- Avoid repeated reading of the DeltaLog (#65): Delta Log read optimization.
- Solved NullPointerException when indexing strings (#69).
- Easier information API (#72): Access information about indexed data.
- Add API to get Index Metrics (#74): Index metrics.
- Replace
result
withresultInternal
forCubeWeights
(#85): Improve memory usage when indexing. - Add support for null values (#67).
- Keeper Integration API (#83).
Minor changes
(click to see)
- Block file tag minWeight equal to the minimum weight of the block's elements (#30): Add more information about the index structure in
tags
. - Initialize QueryRange to AllSpace (#38): Avoid information losses.
- Avoiding Out of memory when running the Quick start (#51): Memory usage improvements.
- Match error on filtering string indexed columns (#59).
- Add check to publish only when repository is "Qbeast-io/qbeast-spark" (#60): Fix CI workflows.
- Replace
udf/udaf
with Spark SQL by (#70): Improve query speed. - Ensuring that the TableChange is always broadcasting (#73).
- New Qbeast-spark logo 😄 (#75).
- Add a visual example demo to the README (#76).
- Releases on Sonatype and Maven Central (#82).
- Add Codecov to the CI - Tests coverage (#86).
- Generate scaladocs for qbeastCore and qbeastSpark projects (#88, #90): Available at https://docs.qbeast.io/
- Documentation Improvements (#87, #91, #93): Better documentation, README files and developer guides.
API Changes
- New packaging. Now SQL extensions can be found in
io.qbeast.spark.internal
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension
-
Support for indexing different types. By default, qbeast-spark will use the type of data.
- Linear: Numeric
- Hashing : String
df.write.format("qbeast").option("columnsToIndex", "column:type,column2:type2...").save(tmpDir)
Example:
// Index the columns directly and let qbeast understand the types
df.write.format("qbeast").option("columnsToIndex", "a,b,c").save(tmpDir)
// specify the columns where you want to use hashing
df.write.format("qbeast").option("columnsToIndex", "a,b:hashing,c").save(tmpDir)
- Support for changing the cube size with DataFrame API.
df.write.format("qbeast").option("columnsToIndex", "your,columns,to,index").option("cubeSize", "1000000").save(tmpDir)
- Configuration can be set through SparkConf
new SparkConf()
.set("spark.qbeast.index.defaultCubeSize", "1000")
.set("spark.qbeast.index.cubeWeightsBufferCapacity", "1000"))
QbeastTable
Know the insights of the index and tune new parameters to boost the benefits.
val qbeastTable = QbeastTable.forPath(spark, path)
qbeastTable.indexedColumns() // Outputs the actual indexed columns
qbeastTable.cubeSize() // Outputs the actual cube size
qbeastTable.revisionsID() // Outputs the list of revision identifiers available for a table
qbeastTable.latestRevisionID() // Outputs the latest revision identifier available for a table
qbeastTable.indexMetrics() //Index Metrics of the table
Protocol changes
v0.2.0 includes protocol changes for writing information in the commit log.
Check out the updates at QbeastFormat
Contributors
Special thanks to @osopardo1, @eavilaes, @Jiaweihu08 @cugni and @alexeiakimov.
Full Changelog: v0.1.0...v0.2.0
v0.1.0
Welcome to the very first release of Qbeast Open Source Format! 😄
Qbeast-Spark is the reference implementation of Qbeast Format. Currently built on top of Delta Lake, qbeast-spark
adds an extra layer for indexing multiple columns of your dataset and extracting valuable information through a sampling pushdown operator.
This is the first alpha release, presenting all the main functionalities:
- Documentation for users and developers
- Writing to your favorite data storage through Spark.
- Extension of Delta Commit Log for adding custom metadata to the file log information.
- Reading the dataset and performing sampling pushdown on qbeast format
API
Write your data on Qbeast Format:
df.write.mode("overwrite").format("qbeast").option("columnsToIndex", "col_a,col_b").save(tmp_dir)
Load the newly indexed dataset.
val qbeast_df = spark.read.format("qbeast").load(tmp_dir)
Notice how the sampler is converted into filters and pushed down to the source:
qbeast_df.sample(0.1).explain(true)
Protocol
What you expect to find in the Delta Commit Log for this first version, is the following AddFile
information:
{
"add" : {
"..." : {},
"tags" : {
"cube" : "A",
"indexedColumns" : "ss_sales_price,ss_ticket_number",
"maxWeight" : "462168771",
"minWeight" : "-2147483648",
"rowCount" : "508765",
"space" : "{\"timestamp\":1631692406506,\"transformations\":[{\"min\":-99.76,\"max\":299.28000000000003,\"scale\":0.0025060144346431435},{\"min\":-119998.5,\"max\":359999.5,\"scale\":2.083342013925058E-6}]}",
"state" : "FLOODED"
}
}
}