Skip to content

Commit

Permalink
Merge pull request #143 from osopardo1/unified-catalog
Browse files Browse the repository at this point in the history
Unified Catalog for Delta and Qbeast Tables.
  • Loading branch information
osopardo1 authored Dec 14, 2022
2 parents d503833 + 8d3354c commit 0bdee8e
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 35 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ For example:
sbt assembly

$SPARK_HOME/bin/spark-shell \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.3.0-alpha.jar \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.3.0.jar \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--packages io.delta:delta-core_2.12:1.2.0
```
Expand Down
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2

```bash
$SPARK_HOME/bin/spark-shell \
--repositories https://s01.oss.sonatype.org/content/repositories/snapshots \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0
--packages io.qbeast:qbeast-spark_2.12:0.3.0-SNAPSHOT,io.delta:delta-core_2.12:1.2.0
```

### 2. Indexing a dataset
Expand Down Expand Up @@ -173,11 +174,11 @@ qbeastTable.analyze()
Go to [QbeastTable documentation](./docs/QbeastTable.md) for more detailed information.

# Dependencies and Version Compatibility
| Version | Spark | Hadoop | Delta Lake |
|-------------|:-----:|:------:|:----------:|
| 0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
| 0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
| 0.3.0-alpha | 3.2.x | 3.3.x | 1.2.x |
| Version | Spark | Hadoop | Delta Lake |
|------------|:-----:|:------:|:----------:|
| 0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
| 0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
| 0.3.0 | 3.2.x | 3.3.x | 1.2.x |

Check [here](https://docs.delta.io/latest/releases.html) for **Delta Lake** and **Apache Spark** version compatibility.

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Dependencies._
import xerial.sbt.Sonatype._

val mainVersion = "0.3.0-alpha"
val mainVersion = "0.3.0"

lazy val qbeastCore = (project in file("core"))
.settings(
Expand Down
50 changes: 50 additions & 0 deletions docs/AdvancedConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,56 @@

There's different configurations for the index that can affect the performance on read or the writing process. Here is a resume of some of them.

## Catalogs

We designed the `QbeastCatalog` to work as an **entry point for other format's Catalog's** as well.

However, you can also handle different Catalogs simultanously.

### 1. Unified Catalog

```bash
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
```

Using the `spark_catalog` configuration, you can write **qbeast** and **delta** ( or upcoming formats ;) ) tables into the `default` namespace.

```scala
df.write
.format("qbeast")
.option("columnsToIndex", "user_id,product_id")
.saveAsTable("qbeast_table")

df.write
.format("delta")
.saveAsTable("delta_table")
```
### 2. Secondary catalog

For using **more than one Catalog in the same session**, you can set it up in a different space.

```bash
--conf spark.sql.catalog.spark_catalog = org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.sql.catalog.qbeast_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
```

Notice the `QbeastCatalog` conf parameter is not anymore `spark_catalog`, but has a customized name like `qbeast_catalog`. Each table written using the **qbeast** implementation, should have the prefix `qbeast_catalog`.

For example:

```scala
// DataFrame API
df.write
.format("qbeast")
.option("columnsToIndex", "user_id,product_id")
.saveAsTable("qbeast_catalog.default.qbeast_table")

// SQL
spark.sql("CREATE TABLE qbeast_catalog.default.qbeast_table USING qbeast AS SELECT * FROM ecommerce")
```



## ColumnsToIndex

These are the columns you want to index. Try to find those which are interesting for your queries, or your data pipelines.
Expand Down
58 changes: 42 additions & 16 deletions docs/Quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@ Inside the project folder, launch a spark-shell with the required **dependencies
```bash
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,\
io.delta:delta-core_2.12:1.0.0,\
com.amazonaws:aws-java-sdk:1.12.20,\
org.apache.hadoop:hadoop-common:3.2.0,\
org.apache.hadoop:hadoop-client:3.2.0,\
org.apache.hadoop:hadoop-aws:3.2.0
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \
--packages io.qbeast:qbeast-spark_2.12:0.3.0,io.delta:delta-core_2.12:1.2.0
```
As an **_extra configuration_**, you can also change two global parameters of the index:

Expand All @@ -37,26 +32,28 @@ As an **_extra configuration_**, you can also change two global parameters of th
```
Consult the [Qbeast-Spark advanced configuration](AdvancedConfiguration.md) for more information.

Read the ***store_sales*** public dataset from `TPC-DS`, the table has with **23** columns in total and was generated with a `scaleFactor` of 1. Check [The Making of TPC-DS](http://www.tpc.org/tpcds/presentations/the_making_of_tpcds.pdf) for more details on the dataset.

Read the ***ecommerce*** test dataset from [Kaggle](https://www.kaggle.com/code/adilemrebilgic/e-commerce-analytics/data).
```scala
val parquetTablePath = "s3a://qbeast-public-datasets/store_sales"

val parquetDf = spark.read.format("parquet").load(parquetTablePath).na.drop()
val ecommerce = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/test/resources/ecommerce100K_2019_Oct.csv")
```

Indexing the data with the desired columns, in this case `ss_cdemo_sk` and `ss_cdemo_sk`.
Indexing the data with the desired columns, in this case `user_id` and `product_id`.
```scala
val qbeastTablePath = "/tmp/qbeast-test-data/qtable"

(parquetDf.write
(ecommerce.write
.mode("overwrite")
.format("qbeast") // Saving the dataframe in a qbeast datasource
.option("columnsToIndex", "ss_cdemo_sk,ss_cdemo_sk") // Indexing the table
.option("cubeSize", 300000) // The desired number of records of the resulting files/cubes. Default is 100000
.option("columnsToIndex", "user_id,product_id") // Indexing the table
.option("cubeSize", "500") // The desired number of records of the resulting files/cubes. Default is 5M
.save(qbeastTablePath))
```


## Sampling

Allow the sample operator to be pushed down to the source when sampling, reducing i/o and computational cost.
Expand All @@ -80,6 +77,35 @@ qbeastDf.sample(0.1).explain()

Notice that the sample operator is no longer present in the physical plan. It's converted into a `Filter (qbeast_hash)` instead and is used to select files during data scanning(`DataFilters` from `FileScan`). We skip reading many files in this way, involving less I/O.

## SQL

Thanks to the `QbeastCatalog`, you can use plain SQL and `CREATE TABLE` or `INSERT INTO` in qbeast format.

To check the different configuration on the Catalog, please go to [Advanced Configuration](AdvancedConfiguration.md)

```scala
ecommerce.createOrReplaceTmpView("ecommerce_october")

spark.sql("CREATE OR REPLACE TABLE ecommerce_qbeast USING qbeast AS SELECT * FROM ecommerce_october")

//OR

val ecommerceNovember = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./src/test/resources/ecommerce100K_2019_Nov.csv")

ecommerceNovember.createOrReplaceTmpView("ecommerce_november")

spark.sql("INSERT INTO ecommerce_qbeast SELECT * FROM ecommerce_november")
```
Sampling has also an operator called `TABLESAMPLE`, which can be expressed in both terms of rows or percentage.

```scala
spark.sql("SELECT avg(price) FROM ecommerce_qbeast TABLESAMPLE(10 PERCENT)").show()
```


## Analyze and Optimize

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.apache.spark.sql.catalyst.analysis.{
import org.apache.spark.sql.{SparkCatalogUtils, SparkSession}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -37,21 +38,44 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces]

private val tableFactory = QbeastContext.indexedTableFactory

private val deltaCatalog: DeltaCatalog = new DeltaCatalog()

private var delegatedCatalog: CatalogPlugin = null

private var catalogName: String = null

private def getSessionCatalog(): T = {
/**
* Gets the delegated catalog of the session
* @return
*/
private def getDelegatedCatalog(): T = {
val sessionCatalog = delegatedCatalog match {
case null =>
// In this case, any catalog has been delegated, so we need to search for the default
SparkCatalogUtils.getV2SessionCatalog(SparkSession.active)
case o => o
}

sessionCatalog.asInstanceOf[T]
}

/**
* Gets the session catalog depending on provider properties, if any
*
* The intention is to include the different catalog providers
* while we add the integrations with the formats.
* For example, for "delta" provider it will return a DeltaCatalog instance.
*
* In this way, users may only need to instantiate one single unified catalog.
* @param properties the properties with the provider parameter
* @return
*/
private def getSessionCatalog(properties: Map[String, String] = Map.empty): T = {
properties.get("provider") match {
case Some("delta") => deltaCatalog.asInstanceOf[T]
case _ => getDelegatedCatalog()
}
}

override def loadTable(ident: Identifier): Table = {
try {
getSessionCatalog().loadTable(ident) match {
Expand Down Expand Up @@ -93,7 +117,11 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces]
// Load the table
loadTable(ident)
} else {
getSessionCatalog().createTable(ident, schema, partitions, properties)
getSessionCatalog(properties.asScala.toMap).createTable(
ident,
schema,
partitions,
properties)
}

}
Expand All @@ -119,12 +147,13 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces]
properties,
tableFactory)
} else {
if (getSessionCatalog().tableExists(ident)) {
getSessionCatalog().dropTable(ident)
val sessionCatalog = getSessionCatalog(properties.asScala.toMap)
if (sessionCatalog.tableExists(ident)) {
sessionCatalog.dropTable(ident)
}
DefaultStagedTable(
ident,
getSessionCatalog().createTable(ident, schema, partitions, properties),
sessionCatalog.createTable(ident, schema, partitions, properties),
this)
}
}
Expand All @@ -143,12 +172,13 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces]
properties,
tableFactory)
} else {
if (getSessionCatalog().tableExists(ident)) {
getSessionCatalog().dropTable(ident)
val sessionCatalog = getSessionCatalog(properties.asScala.toMap)
if (sessionCatalog.tableExists(ident)) {
sessionCatalog.dropTable(ident)
}
DefaultStagedTable(
ident,
getSessionCatalog().createTable(ident, schema, partitions, properties),
sessionCatalog.createTable(ident, schema, partitions, properties),
this)

}
Expand All @@ -170,7 +200,8 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces]
} else {
DefaultStagedTable(
ident,
getSessionCatalog().createTable(ident, schema, partitions, properties),
getSessionCatalog(properties.asScala.toMap)
.createTable(ident, schema, partitions, properties),
this)
}
}
Expand Down Expand Up @@ -208,6 +239,8 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces]
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
// Initialize the catalog with the corresponding name
this.catalogName = name
// Initialize the catalog in any other provider that we can integrate with
this.deltaCatalog.initialize(name, options)
}

override def name(): String = catalogName
Expand All @@ -216,6 +249,8 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces]
// Check if the delegating catalog has Table and SupportsNamespace properties
if (delegate.isInstanceOf[TableCatalog] && delegate.isInstanceOf[SupportsNamespaces]) {
this.delegatedCatalog = delegate
// Set delegated catalog in any other provider that we can integrate with
this.deltaCatalog.setDelegateCatalog(delegate)
} else throw new IllegalArgumentException("Invalid session catalog: " + delegate)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo

data.write.format("delta").saveAsTable("delta_table") // delta catalog

// spark.sql("USE CATALOG qbeast_catalog")
data.write
.format("qbeast")
.option("columnsToIndex", "id")
Expand All @@ -41,6 +40,33 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo

}))

it should
"coexist with Delta tables in the same catalog" in withQbeastContextSparkAndTmpWarehouse(
(spark, _) => {

val data = createTestData(spark)

data.write.format("delta").saveAsTable("delta_table") // delta catalog

data.write
.format("qbeast")
.option("columnsToIndex", "id")
.saveAsTable("qbeast_table") // qbeast catalog

val tables = spark.sessionState.catalog.listTables("default")
tables.size shouldBe 2

val deltaTable = spark.read.table("delta_table")
val qbeastTable = spark.read.table("qbeast_table")

assertSmallDatasetEquality(
deltaTable,
qbeastTable,
orderedComparison = false,
ignoreNullable = true)

})

it should "crate table" in withQbeastContextSparkAndTmpWarehouse((spark, _) => {

spark.sql(
Expand Down

0 comments on commit 0bdee8e

Please sign in to comment.