Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Spark 3.4.1 and Delta 2.4.0 #211

Merged
merged 7 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

**Qbeast Spark** is an extension for [**Data Lakehouses**](http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf) that enables **multi-dimensional filtering** and **sampling** directly on the storage

[![apache-spark](https://img.shields.io/badge/apache--spark-3.3.x-blue)](https://spark.apache.org/releases/spark-release-3-2-2.html)
[![apache-spark](https://img.shields.io/badge/apache--spark-3.4.x-blue)](https://spark.apache.org/releases/spark-release-3-4-1.html)
[![apache-hadoop](https://img.shields.io/badge/apache--hadoop-3.3.x-blue)](https://hadoop.apache.org/release/3.3.1.html)
[![delta-core](https://img.shields.io/badge/delta--core-2.1.0-blue)](https://github.com/delta-io/delta/releases/tag/v1.2.0)
[![delta-core](https://img.shields.io/badge/delta--core-2.4.0-blue)](https://github.com/delta-io/delta/releases/tag/v2.4.0)
[![codecov](https://codecov.io/gh/Qbeast-io/qbeast-spark/branch/main/graph/badge.svg?token=8WO7HGZ4MW)](https://codecov.io/gh/Qbeast-io/qbeast-spark)

</div>
Expand Down Expand Up @@ -170,6 +170,7 @@ Use [Python index visualizer](./utils/visualizer/README.md) for your indexed tab
| 0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
| 0.3.x | 3.2.x | 3.3.x | 1.2.x |
| 0.4.x | 3.3.x | 3.3.x | 2.1.x |
| 0.5.x | 3.4.x | 3.3.x | 2.4.x |

Check [here](https://docs.delta.io/latest/releases.html) for **Delta Lake** and **Apache Spark** version compatibility.

Expand Down
5 changes: 2 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Dependencies._
import xerial.sbt.Sonatype._

val mainVersion = "0.4.0"
val mainVersion = "0.5.0-SNAPSHOT"

lazy val qbeastCore = (project in file("core"))
.settings(
Expand Down Expand Up @@ -104,8 +104,7 @@ ThisBuild / publishTo := {
}

// Sonatype settings
//ThisBuild / publishMavenStyle := true
ThisBuild / coverageEnabled := true
ThisBuild / publishMavenStyle := true
ThisBuild / sonatypeProfileName := "io.qbeast"
ThisBuild / sonatypeProjectHosting := Some(
GitHubHosting(user = "Qbeast-io", repository = "qbeast-spark", email = "info@qbeast.io"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ case class LinearTransformation(

/**
* Merges two transformations. The domain of the resulting transformation is the union of this
* and the other transformation. The range of the resulting transformation is the intersection of
* this and the other transformation, which can be a LinearTransformation or IdentityTransformation
* and the other transformation. The range of the resulting transformation
* is the intersection of this and the other transformation,
* which can be a LinearTransformation or IdentityTransformation
* @param other
* @return a new Transformation that contains both this and other.
*/
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import sbt._
* External libraries used in the project with versions.
*/
object Dependencies {
lazy val sparkVersion: String = sys.props.get("spark.version").getOrElse("3.3.0")
lazy val sparkVersion: String = sys.props.get("spark.version").getOrElse("3.4.1")
lazy val hadoopVersion: String = sys.props.get("hadoop.version").getOrElse("3.3.4")
lazy val deltaVersion: String = "2.1.0"
lazy val deltaVersion: String = "2.4.0"

val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion
val sparkSql = "org.apache.spark" %% "spark-sql" % sparkVersion
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/qbeast/spark/QbeastTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class QbeastTable private (
private def deltaLog: DeltaLog = DeltaLog.forTable(sparkSession, tableID.id)

private def qbeastSnapshot: DeltaQbeastSnapshot =
delta.DeltaQbeastSnapshot(deltaLog.snapshot)
delta.DeltaQbeastSnapshot(deltaLog.update())

private def indexedTable: IndexedTable = indexedTableFactory.getIndexedTable(tableID)

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/qbeast/spark/delta/CubeDataLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ case class CubeDataLoader(tableID: QTableID) {

private val spark = SparkSession.active

private val snapshot = DeltaLog.forTable(SparkSession.active, tableID.id).snapshot
private val snapshot = DeltaLog.forTable(SparkSession.active, tableID.id).unsafeVolatileSnapshot

/**
* Loads the data from a set of cubes in a specific revision
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ private[delta] case class DeltaMetadataWriter(

val cubeStrings = deltaReplicatedSet.map(_.string)
val cubeBlocks =
deltaLog.snapshot.allFiles
deltaLog
.update()
.allFiles
.where(TagColumns.revision === lit(revision.revisionID.toString) &&
TagColumns.cube.isInCollection(cubeStrings))
.collect()
Expand Down Expand Up @@ -145,7 +147,7 @@ private[delta] case class DeltaMetadataWriter(
} else if (mode == SaveMode.Ignore) {
return Nil
} else if (mode == SaveMode.Overwrite) {
deltaLog.assertRemovable()
DeltaLog.assertRemovable(txn.snapshot)
}
}
val rearrangeOnly = options.rearrangeOnly
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ object OTreeIndex {

def apply(spark: SparkSession, path: Path): OTreeIndex = {
val deltaLog = DeltaLog.forTable(spark, path)
val tahoe = TahoeLogFileIndex(spark, deltaLog, path, deltaLog.snapshot, Seq.empty, false)
val unsafeVolatileSnapshot = deltaLog.update()
val tahoe = TahoeLogFileIndex(spark, deltaLog, path, unsafeVolatileSnapshot, Seq.empty, false)
OTreeIndex(tahoe)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
private[spark] class StagingDataManager(tableID: QTableID) extends DeltaStagingUtils {
private val spark = SparkSession.active

protected override val snapshot: Snapshot = DeltaLog.forTable(spark, tableID.id).snapshot
protected override val snapshot: Snapshot =
DeltaLog.forTable(spark, tableID.id).unsafeVolatileSnapshot

private def stagingRemoveFiles: Seq[RemoveFile] = {
import spark.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ case class ConvertToQbeastCommand(
val (fileFormat, tableId) = resolveTableFormat(spark)

val deltaLog = DeltaLog.forTable(spark, tableId.table)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.snapshot)
val unsafeVolatileSnapshot = deltaLog.update()
val qbeastSnapshot = DeltaQbeastSnapshot(unsafeVolatileSnapshot)
val isQbeast = qbeastSnapshot.loadAllRevisions.nonEmpty

if (isQbeast) {
Expand All @@ -78,7 +79,7 @@ case class ConvertToQbeastCommand(

// Convert delta to qbeast through metadata modification
val tableID = QTableID(tableId.table)
val schema = deltaLog.snapshot.schema
val schema = deltaLog.update().schema

SparkDeltaMetadataManager.updateMetadataWithTransaction(tableID, schema) {
val convRevision = stagingRevision(tableID, cubeSize, columnsToIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,30 @@
*/
package io.qbeast.spark.internal.rules

import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
import org.apache.spark.sql.{AnalysisExceptionFactory, SchemaUtils}
import org.apache.spark.sql.catalyst.expressions.{
Alias,
AnsiCast,
ArrayTransform,
Attribute,
Cast,
CreateStruct,
Expression,
GetArrayItem,
GetStructField,
LambdaFunction,
NamedExpression,
NamedLambdaVariable,
UpCast
}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, StructField, StructType}

private[rules] object QbeastAnalysisUtils {

private lazy val conf = SQLConf.get

/**
* Checks if the schema of the Table corresponds to the schema of the Query
* From Delta Lake OSS Project code in DeltaAnalysis
Expand Down Expand Up @@ -72,24 +78,27 @@ private[rules] object QbeastAnalysisUtils {
Project(project, query)
}

type CastFunction = (Expression, DataType) => Expression
type CastFunction = (Expression, DataType, String) => Expression

/**
* From DeltaAnalysis code in spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala
* Get cast operation for the level of strictness in the schema a user asked for
* @return
*/
def getCastFunction: CastFunction = {
val conf = SQLConf.get
val timeZone = conf.sessionLocalTimeZone
conf.storeAssignmentPolicy match {
case SQLConf.StoreAssignmentPolicy.LEGACY =>
Cast(_, _, Option(timeZone), ansiEnabled = false)
(input: Expression, dt: DataType, _) =>
Cast(input, dt, Option(timeZone), ansiEnabled = false)
case SQLConf.StoreAssignmentPolicy.ANSI =>
(input: Expression, dt: DataType) => {
AnsiCast(input, dt, Option(timeZone))
(input: Expression, dt: DataType, name: String) => {
val cast = Cast(input, dt, Option(timeZone), ansiEnabled = true)
cast.setTagValue(Cast.BY_TABLE_INSERTION, ())
TableOutputResolver.checkCastOverflowInTableInsert(cast, name)
}
case SQLConf.StoreAssignmentPolicy.STRICT => UpCast(_, _)
case SQLConf.StoreAssignmentPolicy.STRICT =>
(input: Expression, dt: DataType, _) => UpCast(input, dt)
}
}

Expand Down Expand Up @@ -131,7 +140,10 @@ private[rules] object QbeastAnalysisUtils {
case (other, i) if i < target.length =>
val targetAttr = target(i)
Alias(
getCastFunction(GetStructField(parent, i, Option(other.name)), targetAttr.dataType),
getCastFunction(
GetStructField(parent, i, Option(other.name)),
targetAttr.dataType,
targetAttr.name),
targetAttr.name)(explicitMetadata = Option(targetAttr.metadata))

case (other, i) =>
Expand All @@ -146,6 +158,34 @@ private[rules] object QbeastAnalysisUtils {
Option(parent.metadata))
}

/**
* From DeltaAnalysis code in spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala
*
* Recursively add casts to Array[Struct]
* @param tableName the name of the table
* @param parent the parent expression
* @param source the source Struct
* @param target the final target Struct
* @param sourceNullable if source is nullable
* @return
*/

private def addCastsToArrayStructs(
tableName: String,
parent: NamedExpression,
source: StructType,
target: StructType,
sourceNullable: Boolean): Expression = {
val structConverter: (Expression, Expression) => Expression = (_, i) =>
addCastsToStructs(tableName, Alias(GetArrayItem(parent, i), i.toString)(), source, target)
val transformLambdaFunc = {
val elementVar = NamedLambdaVariable("elementVar", source, sourceNullable)
val indexVar = NamedLambdaVariable("indexVar", IntegerType, false)
LambdaFunction(structConverter(elementVar, indexVar), Seq(elementVar, indexVar))
}
ArrayTransform(parent, transformLambdaFunc)
}

/**
* From DeltaAnalysis code in spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala
* Adds cast to input/query column from the target table
Expand All @@ -163,8 +203,11 @@ private[rules] object QbeastAnalysisUtils {
attr
case (s: StructType, t: StructType) if s != t =>
addCastsToStructs(tblName, attr, s, t)
case (ArrayType(s: StructType, sNull: Boolean), ArrayType(t: StructType, tNull: Boolean))
if s != t && sNull == tNull =>
addCastsToArrayStructs(tblName, attr, s, t, sNull)
case _ =>
getCastFunction(attr, targetAttr.dataType)
getCastFunction(attr, targetAttr.dataType, targetAttr.name)
}
Alias(expr, targetAttr.name)(explicitMetadata = Option(targetAttr.metadata))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package io.qbeast.spark.internal.sources.catalog

import org.apache.spark.sql.AnalysisExceptionFactory
import org.apache.spark.sql.connector.catalog.{
Column,
Identifier,
SparkCatalogV2Util,
StagedTable,
SupportsWrite,
Table,
Expand Down Expand Up @@ -44,7 +46,9 @@ private[catalog] case class DefaultStagedTable(

override def name(): String = ident.name()

override def schema(): StructType = table.schema()
override def schema(): StructType = SparkCatalogV2Util.v2ColumnsToStructType(columns())

override def columns(): Array[Column] = table.columns()

override def partitioning(): Array[Transform] = table.partitioning()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
}
}

override def createTable(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]): Table =
createTable(ident, SparkCatalogV2Util.v2ColumnsToStructType(columns), partitions, properties)

override def createTable(
ident: Identifier,
schema: StructType,
Expand All @@ -120,7 +127,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
} else {
getSessionCatalog(properties.asScala.toMap).createTable(
ident,
schema,
SparkCatalogV2Util.structTypeToV2Columns(schema),
partitions,
properties)
}
Expand Down Expand Up @@ -154,7 +161,11 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
}
DefaultStagedTable(
ident,
sessionCatalog.createTable(ident, schema, partitions, properties),
sessionCatalog.createTable(
ident,
SparkCatalogV2Util.structTypeToV2Columns(schema),
partitions,
properties),
this)
}
}
Expand All @@ -179,12 +190,26 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
}
DefaultStagedTable(
ident,
sessionCatalog.createTable(ident, schema, partitions, properties),
sessionCatalog.createTable(
ident,
SparkCatalogV2Util.structTypeToV2Columns(schema),
partitions,
properties),
this)

}
}

override def stageCreate(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {

stageCreate(ident, SparkCatalogV2Util.v2ColumnsToStructType(columns), partitions, properties)

}

override def stageCreate(
ident: Identifier,
schema: StructType,
Expand All @@ -202,7 +227,11 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
DefaultStagedTable(
ident,
getSessionCatalog(properties.asScala.toMap)
.createTable(ident, schema, partitions, properties),
.createTable(
ident,
SparkCatalogV2Util.structTypeToV2Columns(schema),
partitions,
properties),
this)
}
}
Expand Down
Loading