Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SQL statement INSERT INTO with Qbeast format #116

Merged
merged 14 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ qbeastTable.getIndexMetrics()
qbeastTable.analyze()
```

The format supports **Spark SQL** syntax.
It also updates the index in a **dynamic** fashion when new data is inserted.

```scala
val newData = Seq(1, 2, 3, 4).toDF("value")

newData.createOrReplaceTempView("newTable")

spark.sql("insert into table myTable select * from newTable")

spark.sql("insert into table myTable (value) values (4)")


```

Go to [QbeastTable documentation](./docs/QbeastTable.md) for more detailed information.

# Dependencies and Version Compatibility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ case class LinearTransformation(

/**
* Merges two transformations. The domain of the resulting transformation is the union of this
*
* and the other transformation. The range of the resulting transformation is the intersection of
* this and the other transformation, which can be a LinearTransformation or IdentityTransformation
* @param other
* @return a new Transformation that contains both this and other.
*/
Expand All @@ -76,13 +77,25 @@ case class LinearTransformation(
otherNullValue,
orderedDataType)
.asInstanceOf[Transformation]
case IdentityTransformation(newVal) =>
val otherNullValue =
LinearTransformationUtils.generateRandomNumber(
min(minNumber, newVal),
max(maxNumber, newVal),
Option(42.toLong))
val orderedDataType = this.orderedDataType
LinearTransformation(
min(minNumber, newVal),
max(maxNumber, newVal),
otherNullValue,
orderedDataType)
.asInstanceOf[Transformation]

}
}

/**
* This method should determine if the new data will cause the creation of a new revision.
*
* @param newTransformation the new transformation created with statistics over the new data
* @return true if the domain of the newTransformation is not fully contained in this one.
*/
Expand All @@ -91,6 +104,8 @@ case class LinearTransformation(
case LinearTransformation(newMin, newMax, _, otherOrdering)
if orderedDataType == otherOrdering =>
gt(minNumber, newMin) || lt(maxNumber, newMax)
case IdentityTransformation(newVal) =>
gt(minNumber, newVal) || lt(maxNumber, newVal)
}

}
Expand Down Expand Up @@ -139,22 +154,6 @@ class LinearTransformationSerializer

object LinearTransformation {

private def generateRandomNumber(min: Any, max: Any, seed: Option[Long]): Any = {
val r = if (seed.isDefined) new Random(seed.get) else new Random()
val random = r.nextDouble()

(min, max) match {
case (min: Double, max: Double) => min + (random * (max - min))
case (min: Long, max: Long) => min + (random * (max - min)).toLong
case (min: Int, max: Int) => min + (random * (max - min)).toInt
case (min: Float, max: Float) => min + (random * (max - min)).toFloat
case (min, max) =>
throw new IllegalArgumentException(
s"Cannot generate random number for type ${min.getClass.getName}")

}
}

/**
* Creates a LinearTransformation that has random value for the nulls
* within the [minNumber, maxNumber] range
Expand All @@ -164,12 +163,13 @@ object LinearTransformation {
* @param seed
* @return
*/

def apply(
minNumber: Any,
maxNumber: Any,
orderedDataType: OrderedDataType,
seed: Option[Long] = None): LinearTransformation = {
val randomNull = generateRandomNumber(minNumber, maxNumber, seed)
val randomNull = LinearTransformationUtils.generateRandomNumber(minNumber, maxNumber, seed)
LinearTransformation(minNumber, maxNumber, randomNull, orderedDataType)
}

Expand Down Expand Up @@ -215,3 +215,33 @@ class LinearTransformationDeserializer
}

}

object LinearTransformationUtils {

/**
* Creates a LinearTransformationUtils object that contains
* useful functions that can be used outside of the LinearTransformation class.
* @param minNumber
* @param maxNumber
* @param orderedDataType
* @param seed
* @return
*/

def generateRandomNumber(min: Any, max: Any, seed: Option[Long]): Any = {
val r = if (seed.isDefined) new Random(seed.get) else new Random()
val random = r.nextDouble()

(min, max) match {
case (min: Double, max: Double) => min + (random * (max - min))
case (min: Long, max: Long) => min + (random * (max - min)).toLong
case (min: Int, max: Int) => min + (random * (max - min)).toInt
case (min: Float, max: Float) => min + (random * (max - min)).toFloat
case (min, max) =>
throw new IllegalArgumentException(
s"Cannot generate random number for type ${min.getClass.getName}")

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ case class LinearTransformer(columnName: String, dataType: QDataType) extends Tr
// we return a Transformation where null values are transformed to 0
NullToZeroTransformation
} else if (minAux == maxAux) {
// If all values are equal we return an IdentityTransformation
IdentityTransformation
// If both values are equal we return an IdentityTransformation
IdentityTransformation(minAux)
} else { // otherwhise we pick the min and max
val min = getValue(minAux)
val max = getValue(maxAux)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ trait OrdinalTransformation extends Transformation {
/**
* Identity transformation.
*/
object IdentityTransformation extends Transformation {
case class IdentityTransformation(newVal: Any) extends Transformation {
Adricu8 marked this conversation as resolved.
Show resolved Hide resolved

@inline
override def transform(value: Any): Double = value match {
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/io/qbeast/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package io
package object qbeast {
type IISeq[T] = scala.collection.immutable.Seq[T]
type SerializedCubeID = String
type IndexedColumns = Seq[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,43 @@ class LinearTransformationTest extends AnyFlatSpec with Matchers {

}

it should "merge IdentityTransformations correctly" in {
val nullValue = 5000
val linearT = LinearTransformation(0, 10000, nullValue, IntegerDataType)

var otherNullValue =
LinearTransformationUtils.generateRandomNumber(0, 90000, Option(42.toLong))
linearT.merge(IdentityTransformation(90000)) shouldBe LinearTransformation(
0,
90000,
otherNullValue,
IntegerDataType)

otherNullValue =
LinearTransformationUtils.generateRandomNumber(-100, 10000, Option(42.toLong))
linearT.merge(IdentityTransformation(-100)) shouldBe LinearTransformation(
-100,
10000,
otherNullValue,
IntegerDataType)

otherNullValue = LinearTransformationUtils.generateRandomNumber(0, 10000, Option(42.toLong))
linearT.merge(IdentityTransformation(10)) shouldBe LinearTransformation(
Adricu8 marked this conversation as resolved.
Show resolved Hide resolved
0,
10000,
otherNullValue,
IntegerDataType)
}

it should "detect new transformations that superseed" in {
val nullValue = 5000
val linearT = LinearTransformation(0, 10000, nullValue, IntegerDataType)

linearT.isSupersededBy(IdentityTransformation(90000)) shouldBe true

linearT.isSupersededBy(IdentityTransformation(-100)) shouldBe true

linearT.isSupersededBy(IdentityTransformation(10)) shouldBe false

}
}
18 changes: 18 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.SparkSession

import java.net.URI
import org.apache.spark.sql.delta.DeltaLog

/**
* FileIndex to prune files
Expand Down Expand Up @@ -77,3 +79,19 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex {

override def partitionSchema: StructType = index.partitionSchema
}

/**
* Object OTreeIndex to create a new OTreeIndex
* @param sparkSession the spark session
* @param path the path to the delta log
* @return the OTreeIndex
*/
object OTreeIndex {

def apply(spark: SparkSession, path: Path): OTreeIndex = {
val deltaLog = DeltaLog.forTable(spark, path)
val tahoe = TahoeLogFileIndex(spark, deltaLog, path, deltaLog.snapshot, Seq.empty, false)
OTreeIndex(tahoe)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.qbeast.spark.internal

import io.delta.sql.DeltaSparkSessionExtension
import io.qbeast.spark.internal.rules.{ReplaceFileIndex, SampleRule}
import io.qbeast.spark.internal.rules.{SampleRule}
import org.apache.spark.sql.SparkSessionExtensions

/**
Expand All @@ -19,10 +19,6 @@ class QbeastSparkSessionExtension extends DeltaSparkSessionExtension {
extensions.injectOptimizerRule { session =>
new SampleRule(session)
}

extensions.injectOptimizerRule { session =>
new ReplaceFileIndex(session)
}
}

}

This file was deleted.

22 changes: 15 additions & 7 deletions src/main/scala/io/qbeast/spark/internal/rules/SampleRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ package io.qbeast.spark.internal.rules

import io.qbeast.core.model.Weight
import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash
import io.qbeast.spark.internal.sources.QbeastBaseRelation
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{And, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Sample}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.SparkSession
import io.qbeast.core.model.WeightRange
import io.qbeast.IndexedColumns
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import io.qbeast.spark.delta.OTreeIndex

/**
* Rule class that transforms a Sample operator over a QbeastRelation
Expand All @@ -36,19 +38,18 @@ class SampleRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging {
* Transforms the Sample Operator to a Filter
* @param sample the Sample Operator
* @param logicalRelation the LogicalRelation underneath
* @param qbeastBaseRelation the wrapped QbeastBaseRelation
* @param indexedColumns the IndexedColumns of the LogicalRelation
* @return the new Filter
*/
private def transformSampleToFilter(
sample: Sample,
logicalRelation: LogicalRelation,
qbeastBaseRelation: QbeastBaseRelation): Filter = {
indexedColumns: IndexedColumns): Filter = {

val weightRange = extractWeightRange(sample)

val columns =
qbeastBaseRelation.columnTransformers.map(c =>
logicalRelation.output.find(_.name == c.columnName).get)
indexedColumns.map(c => logicalRelation.output.find(_.name == c).get)
val qbeastHash = new QbeastMurmur3Hash(columns)

Filter(
Expand Down Expand Up @@ -86,8 +87,15 @@ class SampleRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging {
*/
object QbeastRelation {

def unapply(plan: LogicalPlan): Option[(LogicalRelation, QbeastBaseRelation)] = plan match {
case l @ LogicalRelation(q: QbeastBaseRelation, _, _, _) => Some((l, q))
def unapply(plan: LogicalPlan): Option[(LogicalRelation, IndexedColumns)] = plan match {

case l @ LogicalRelation(
q @ HadoopFsRelation(o: OTreeIndex, _, _, _, _, parameters),
_,
_,
_) =>
val columnsToIndex = parameters("columnsToIndex")
Some((l, columnsToIndex.split(",")))
case _ => None
}

Expand Down
Loading