Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto Indexing #247

Merged
merged 36 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
fa69b4f
Merge pull request #239 from osopardo1/235-add-delta-data-skipping
osopardo1 Nov 30, 2023
75f1128
Bootstrap AutoIndexing
Dec 5, 2023
ba29a0e
change names
Dec 5, 2023
03b737d
Add headers and optional parameters
Dec 5, 2023
e7012b4
fix
Dec 5, 2023
338d061
Add SparkAutoIndexerTest
Dec 5, 2023
c63093b
scalafix
Dec 5, 2023
485bccd
add AUTO_INDEXING_ENABLED check in QbeastDataSource
Dec 5, 2023
9f14557
(WIP) defining method to autoselect best columns to index. Tests in
Dec 5, 2023
8db0a51
Code cleaning
Dec 5, 2023
838a225
remove cs
Dec 5, 2023
9e9801d
Minor changes
SrTangente Dec 5, 2023
a45c99f
reorganization
Dec 7, 2023
e051e47
Change AutoIndexer trait
Dec 7, 2023
80629c5
fix
Dec 7, 2023
b48074e
2 methods AutoIndexer
Dec 7, 2023
ea11e90
Delete checks
Dec 7, 2023
85a7ac9
remove coma
Dec 7, 2023
cf93459
Add checking again
Dec 11, 2023
304ed4f
Merge pull request #245 from osopardo1/backport-delta-file-skipping
osopardo1 Dec 11, 2023
3c8c918
Added some documentation
SrTangente Dec 11, 2023
a4b0731
Merge branch 'main' into 244-autoindexing
SrTangente Dec 11, 2023
963e04e
Revert "Added some documentation"
SrTangente Dec 11, 2023
3caa4d6
Revert "Merge branch 'main' into 244-autoindexing"
SrTangente Dec 11, 2023
13be525
Reverted merge
SrTangente Dec 12, 2023
255c70c
fix tests
Dec 12, 2023
f84f1f9
Add more docs
Dec 12, 2023
78a8fef
Refomat Dependencies.scala
Dec 13, 2023
5c51bf8
Move implementation to the trait
Dec 13, 2023
d5be51a
Rename AutoIndexer, separate methods
Dec 14, 2023
91d4c06
Remove whiole package reference
Dec 14, 2023
e4ab8b0
small naming change
Dec 14, 2023
eeeaf71
Change documentation
Dec 14, 2023
7138a57
Change config AUTO_INDEXING_ENABLED name
Dec 14, 2023
0a9d9cf
Change config variable on tests
Dec 14, 2023
e423a66
fix scalafmt
Dec 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ lazy val qbeastSpark = (project in file("."))
sparkSql % Provided,
hadoopClient % Provided,
deltaCore % Provided,
sparkml % Provided,
amazonAws % Test,
hadoopCommons % Test,
hadoopAws % Test),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.qbeast.core.model

/**
* ColumnsToIndexSelector interface to automatically select which columns to index.
* @tparam DATA
* the data to index
*/
trait ColumnsToIndexSelector[DATA] {

/**
* The maximum number of columns to index.
* @return
*/
def MAX_COLUMNS_TO_INDEX: Int

/**
* Selects the columns to index given a DataFrame
* @param data
* the data to index
* @return
*/
def selectColumnsToIndex(data: DATA): Seq[String] =
selectColumnsToIndex(data, MAX_COLUMNS_TO_INDEX)

/**
* Selects the columns to index with a given number of columns to index
* @param data
* the data to index
* @param numColumnsToIndex
* the number of columns to index
* @return
* A sequence with the names of the columns to index
*/
def selectColumnsToIndex(data: DATA, numColumnsToIndex: Int): Seq[String]

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ trait QbeastCoreContext[DATA, DataSchema, QbeastOptions, FileDescriptor] {
def indexManager: IndexManager[DATA]
def queryManager[QUERY: ClassTag]: QueryManager[QUERY, DATA]
def revisionBuilder: RevisionFactory[DataSchema, QbeastOptions]
def columnSelector: ColumnsToIndexSelector[DATA]
def keeper: Keeper

}
Expand Down
16 changes: 16 additions & 0 deletions docs/AdvancedConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ You can specify different advanced options to the columns to index:
df.write.format("qbeast").option("columnsToIndex", "column:type,column2:type...")
```

## Automatic Column Selection

To **avoid specifying the `columnsToIndex`**, you can enable auto indexer through the Spark Configuration:

```shell
--conf spark.qbeast.index.columnsToIndex.auto=true \
--conf spark.qbeast.index.columnsToIndex.auto.max=10
```
And write the DataFrame without any extra option:

```scala
df.write.format("qbeast").save("path/to/table")
```

Read more about it in the [Columns to Index selector](ColumnsToIndexSelector.md) section.

## CubeSize

CubeSize option lets you specify the maximum size of the cube, in number of records. By default, it's set to 5M.
Expand Down
73 changes: 73 additions & 0 deletions docs/ColumnsToIndexSelector.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
## Columns To Index Selector

Qbeast Format organizes the records using a multidimensional index. This index is built on a subset of the columns in the table. From `1.0.0` version, **the columns can be selected automatically by enabling the automatic column index selector or manually by the user**.

If you want to forget about the distribution and let qbeast handle all the indexing pre-process, there's no need to specify the `columnsToIndex` in the **DataFrame**.

You only need to **enable the Columns To Index Selector in the `SparkConf`**:

```shell
--conf spark.qbeast.index.columnsToIndex.auto=true \
--conf spark.qbeast.index.columnsToIndex.auto.max=10
```

And **write the DataFrame as usual**:

```scala
df.write.format("qbeast").save("path/to/table")
```

Or use SQL:

```scala
spark.sql("CREATE TABLE table_name USING qbeast LOCATION 'path/to/table'")
```
### Interface

The `ColumnsToIndexSelector` is an interface that can be implemented by different classes. The interface is defined as follows:

```scala
trait ColumnsToIndexSelector[DATA] {

/**
* The maximum number of columns to index.
* @return
*/
def MAX_COLUMNS_TO_INDEX: Int

/**
* Selects the columns to index given a DataFrame
* @param data
* the data to index
* @return
*/
def selectColumnsToIndex(data: DATA): Seq[String] =
selectColumnsToIndex(data, MAX_COLUMNS_TO_INDEX)

/**
* Selects the columns to index with a given number of columns to index
* @param data
* the data to index
* @param numColumnsToIndex
* the number of columns to index
* @return
* A sequence with the names of the columns to index
*/
def selectColumnsToIndex(data: DATA, numColumnsToIndex: Int): Seq[String]

}

```

### SparkColumnsToIndexSelector

`SparkColumnsToIndexSelector` is the first implementation of the `ColumnsToIndexSelector` process. Is designed to work with Apache Spark DataFrames and **provides functionality to automatically select columns for indexing based on certain criteria**.

The steps are the following:

1. **Convert Timestamp columns** to Unix timestamps and update the DataFrame.
2. **Initialize Vector Assembler** for each column. For String columns, transform them into numeric with StringIndexer.
4. **Combine features** from VectorAssembler into a Single Vector column.
5. Calculate the **Correlation Matrix**.
6. Calculate the **absolute correlation** for each column.
7. Get the **top N columns that have the lowest average correlation**.
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ object Dependencies {
val hadoopCommons = "org.apache.hadoop" % "hadoop-common" % hadoopVersion
val hadoopAws = "org.apache.hadoop" % "hadoop-aws" % hadoopVersion
val fasterxml = "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.12.0"
val sparkml = "org.apache.spark" %% "spark-mllib" % sparkVersion
}
6 changes: 5 additions & 1 deletion src/main/scala/io/qbeast/context/QbeastContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.qbeast.core.keeper.LocalKeeper
import io.qbeast.core.model._
import io.qbeast.spark.delta.writer.RollupDataWriter
import io.qbeast.spark.delta.SparkDeltaMetadataManager
import io.qbeast.spark.index.SparkColumnsToIndexSelector
import io.qbeast.spark.index.SparkOTreeManager
import io.qbeast.spark.index.SparkRevisionFactory
import io.qbeast.spark.internal.QbeastOptions
Expand Down Expand Up @@ -92,6 +93,8 @@ object QbeastContext
override def revisionBuilder: RevisionFactory[StructType, QbeastOptions] =
SparkRevisionFactory

override def columnSelector: ColumnsToIndexSelector[DataFrame] = SparkColumnsToIndexSelector

/**
* Sets the unmanaged context. The specified context will not be disposed automatically at the
* end of the Spark session.
Expand Down Expand Up @@ -146,7 +149,8 @@ object QbeastContext
indexManager,
metadataManager,
dataWriter,
revisionBuilder)
revisionBuilder,
columnSelector)

private def destroyManaged(): Unit = this.synchronized {
managedOption.foreach(_.keeper.stop())
Expand Down
146 changes: 146 additions & 0 deletions src/main/scala/io/qbeast/spark/index/SparkColumnsToIndexSelector.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.spark.index

import io.qbeast.core.model.ColumnsToIndexSelector
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.ml.Pipeline
import org.apache.spark.qbeast.config.MAX_NUM_COLUMNS_TO_INDEX
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.DataFrame

object SparkColumnsToIndexSelector extends ColumnsToIndexSelector[DataFrame] with Serializable {

/**
* The maximum number of columns to index.
*
* @return
*/
override def MAX_COLUMNS_TO_INDEX: Int = MAX_NUM_COLUMNS_TO_INDEX

/**
* Adds unix timestamp columns to the DataFrame for the columns specified
* @param data
* @param inputCols
* @return
*/
private def withUnixTimestamp(data: DataFrame, inputCols: Seq[StructField]): DataFrame = {
val timestampColsTransformation = inputCols
.filter(_.dataType == TimestampType)
.map(c => (c.name, unix_timestamp(col(c.name))))
.toMap

data.withColumns(timestampColsTransformation)
}

/**
* Adds preprocessing transformers to the DataFrame for the columns specified
* @param data
* the DataFrame
* @param inputCols
* the columns to preprocess
* @return
*/
protected def withPreprocessedPipeline(
data: DataFrame,
inputCols: Seq[StructField]): DataFrame = {

val transformers = inputCols
.collect {
case column if column.dataType == StringType =>
val colName = column.name
val indexer = new StringIndexer().setInputCol(colName).setOutputCol(s"${colName}_Index")
val encoder =
new OneHotEncoder().setInputCol(s"${colName}_Index").setOutputCol(s"${colName}_Vec")
Seq(indexer, encoder)

case column =>
val colName = column.name
Seq(
new VectorAssembler()
.setInputCols(Array(colName))
.setOutputCol(s"${colName}_Vec")
.setHandleInvalid("keep"))
}
.flatten
.toArray

val preprocessingPipeline = new Pipeline().setStages(transformers)
val preprocessingModel = preprocessingPipeline.fit(data)
val preprocessedData = preprocessingModel.transform(data)

preprocessedData
}

/**
* Selects the top N minimum absolute correlated columns
* @param data
* the DataFrame
* @param inputCols
* the columns to preprocess
* @param numCols
* the number of columns to return
* @return
*/
protected def selectTopNCorrelatedColumns(
data: DataFrame,
inputCols: Seq[StructField],
numCols: Int): Array[String] = {

val inputVecCols = inputCols.map(_.name + "_Vec").toArray

val assembler = new VectorAssembler()
.setInputCols(inputVecCols)
.setOutputCol("features")
.setHandleInvalid("keep")

val vectorDf = assembler.transform(data)

// Calculate the correlation matrix
val correlationMatrix: DataFrame = Correlation.corr(vectorDf, "features")
// Extract the correlation matrix as a Matrix
val corrArray = correlationMatrix.select("pearson(features)").head.getAs[Matrix](0)

// Calculate the average absolute correlation for each column
val averageCorrelation =
corrArray.toArray.map(Math.abs).grouped(inputVecCols.length).toArray.head

// Get the indices of columns with the lowest average correlation
val sortedIndices = averageCorrelation.zipWithIndex.sortBy { case (corr, _) => corr }
val selectedIndices = sortedIndices.take(numCols).map(_._2)

val selectedCols = selectedIndices.map(inputCols(_).name)
selectedCols

}

override def selectColumnsToIndex(data: DataFrame, numColumnsToIndex: Int): Seq[String] = {

// IF there's no data to write, we return all the columns to index
if (data.isEmpty) {
return data.columns.take(numColumnsToIndex)
}

val inputCols = data.schema
// Add unix timestamp columns
val updatedData = withUnixTimestamp(data, inputCols)
// Add column transformers
val preprocessedPipeline = withPreprocessedPipeline(updatedData, inputCols)
// Calculate the top N minimum absolute correlated columns
val selectedColumns =
selectTopNCorrelatedColumns(preprocessedPipeline, inputCols, numColumnsToIndex)

selectedColumns

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.table.IndexedTableFactory
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
import org.apache.spark.qbeast.config.COLUMN_SELECTOR_ENABLED
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
Expand Down Expand Up @@ -94,7 +95,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
data: DataFrame): BaseRelation = {

require(
parameters.contains("columnsToIndex") || mode == SaveMode.Append,
parameters.contains("columnsToIndex") || mode == SaveMode.Append || COLUMN_SELECTOR_ENABLED,
throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified"))

val tableId = QbeastOptions.loadTableIDFromParameters(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.qbeast.spark.internal.sources.catalog
import io.qbeast.context.QbeastContext
import io.qbeast.spark.internal.sources.v2.QbeastStagedTableImpl
import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
Expand Down Expand Up @@ -109,7 +108,6 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
properties: util.Map[String, String]): Table = {

if (QbeastCatalogUtils.isQbeastProvider(properties)) {
checkQbeastProperties(properties.asScala.toMap)
// Create the table
QbeastCatalogUtils.createQbeastTable(
ident,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package io.qbeast.spark.internal.sources.v2

import io.qbeast.spark.internal.sources.catalog.CreationMode
import io.qbeast.spark.internal.sources.catalog.QbeastCatalogUtils
import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties
import io.qbeast.spark.table.IndexedTableFactory
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -72,9 +71,6 @@ private[sources] class QbeastStagedTableImpl(
// we pass all the writeOptions to the properties as well
writeOptions.foreach { case (k, v) => props.put(k, v) }

// Check all the Qbeast properties are correctly specified
checkQbeastProperties(props.asScala.toMap)

// Creates the corresponding table on the Catalog and executes
// the writing of the dataFrame (if any)
QbeastCatalogUtils.createQbeastTable(
Expand Down
Loading
Loading