diff --git a/README.md b/README.md index a68ea12d9..e6b1308f0 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,21 @@ qbeastTable.getIndexMetrics() qbeastTable.analyze() ``` +The format supports **Spark SQL** syntax. +It also updates the index in a **dynamic** fashion when new data is inserted. + +```scala +val newData = Seq(1, 2, 3, 4).toDF("value") + +newData.createOrReplaceTempView("newTable") + +spark.sql("insert into table myTable select * from newTable") + +spark.sql("insert into table myTable (value) values (4)") + + +``` + Go to [QbeastTable documentation](./docs/QbeastTable.md) for more detailed information. # Dependencies and Version Compatibility diff --git a/core/src/main/scala/io/qbeast/core/transform/LinearTransformation.scala b/core/src/main/scala/io/qbeast/core/transform/LinearTransformation.scala index 706cad39c..5787fba3a 100644 --- a/core/src/main/scala/io/qbeast/core/transform/LinearTransformation.scala +++ b/core/src/main/scala/io/qbeast/core/transform/LinearTransformation.scala @@ -62,7 +62,8 @@ case class LinearTransformation( /** * Merges two transformations. The domain of the resulting transformation is the union of this - * + * and the other transformation. The range of the resulting transformation is the intersection of + * this and the other transformation, which can be a LinearTransformation or IdentityTransformation * @param other * @return a new Transformation that contains both this and other. */ @@ -76,13 +77,25 @@ case class LinearTransformation( otherNullValue, orderedDataType) .asInstanceOf[Transformation] + case IdentityTransformation(newVal) => + val otherNullValue = + LinearTransformationUtils.generateRandomNumber( + min(minNumber, newVal), + max(maxNumber, newVal), + Option(42.toLong)) + val orderedDataType = this.orderedDataType + LinearTransformation( + min(minNumber, newVal), + max(maxNumber, newVal), + otherNullValue, + orderedDataType) + .asInstanceOf[Transformation] } } /** * This method should determine if the new data will cause the creation of a new revision. - * * @param newTransformation the new transformation created with statistics over the new data * @return true if the domain of the newTransformation is not fully contained in this one. */ @@ -91,6 +104,8 @@ case class LinearTransformation( case LinearTransformation(newMin, newMax, _, otherOrdering) if orderedDataType == otherOrdering => gt(minNumber, newMin) || lt(maxNumber, newMax) + case IdentityTransformation(newVal) => + gt(minNumber, newVal) || lt(maxNumber, newVal) } } @@ -139,22 +154,6 @@ class LinearTransformationSerializer object LinearTransformation { - private def generateRandomNumber(min: Any, max: Any, seed: Option[Long]): Any = { - val r = if (seed.isDefined) new Random(seed.get) else new Random() - val random = r.nextDouble() - - (min, max) match { - case (min: Double, max: Double) => min + (random * (max - min)) - case (min: Long, max: Long) => min + (random * (max - min)).toLong - case (min: Int, max: Int) => min + (random * (max - min)).toInt - case (min: Float, max: Float) => min + (random * (max - min)).toFloat - case (min, max) => - throw new IllegalArgumentException( - s"Cannot generate random number for type ${min.getClass.getName}") - - } - } - /** * Creates a LinearTransformation that has random value for the nulls * within the [minNumber, maxNumber] range @@ -164,12 +163,13 @@ object LinearTransformation { * @param seed * @return */ + def apply( minNumber: Any, maxNumber: Any, orderedDataType: OrderedDataType, seed: Option[Long] = None): LinearTransformation = { - val randomNull = generateRandomNumber(minNumber, maxNumber, seed) + val randomNull = LinearTransformationUtils.generateRandomNumber(minNumber, maxNumber, seed) LinearTransformation(minNumber, maxNumber, randomNull, orderedDataType) } @@ -215,3 +215,33 @@ class LinearTransformationDeserializer } } + +object LinearTransformationUtils { + + /** + * Creates a LinearTransformationUtils object that contains + * useful functions that can be used outside of the LinearTransformation class. + * @param minNumber + * @param maxNumber + * @param orderedDataType + * @param seed + * @return + */ + + def generateRandomNumber(min: Any, max: Any, seed: Option[Long]): Any = { + val r = if (seed.isDefined) new Random(seed.get) else new Random() + val random = r.nextDouble() + + (min, max) match { + case (min: Double, max: Double) => min + (random * (max - min)) + case (min: Long, max: Long) => min + (random * (max - min)).toLong + case (min: Int, max: Int) => min + (random * (max - min)).toInt + case (min: Float, max: Float) => min + (random * (max - min)).toFloat + case (min, max) => + throw new IllegalArgumentException( + s"Cannot generate random number for type ${min.getClass.getName}") + + } + } + +} diff --git a/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala b/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala index 3e8a01781..7258b26d9 100644 --- a/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala +++ b/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala @@ -39,8 +39,8 @@ case class LinearTransformer(columnName: String, dataType: QDataType) extends Tr // we return a Transformation where null values are transformed to 0 NullToZeroTransformation } else if (minAux == maxAux) { - // If all values are equal we return an IdentityTransformation - IdentityTransformation + // If both values are equal we return an IdentityTransformation + IdentityTransformation(minAux) } else { // otherwhise we pick the min and max val min = getValue(minAux) val max = getValue(maxAux) diff --git a/core/src/main/scala/io/qbeast/core/transform/Transformation.scala b/core/src/main/scala/io/qbeast/core/transform/Transformation.scala index f5269e42b..faf37ccfb 100644 --- a/core/src/main/scala/io/qbeast/core/transform/Transformation.scala +++ b/core/src/main/scala/io/qbeast/core/transform/Transformation.scala @@ -47,7 +47,7 @@ trait OrdinalTransformation extends Transformation { /** * Identity transformation. */ -object IdentityTransformation extends Transformation { +case class IdentityTransformation(newVal: Any) extends Transformation { @inline override def transform(value: Any): Double = value match { diff --git a/core/src/main/scala/io/qbeast/package.scala b/core/src/main/scala/io/qbeast/package.scala index b8bb253ec..ead306c2e 100644 --- a/core/src/main/scala/io/qbeast/package.scala +++ b/core/src/main/scala/io/qbeast/package.scala @@ -3,4 +3,5 @@ package io package object qbeast { type IISeq[T] = scala.collection.immutable.Seq[T] type SerializedCubeID = String + type IndexedColumns = Seq[String] } diff --git a/core/src/test/scala/io/qbeast/core/transform/LinearTransformationTest.scala b/core/src/test/scala/io/qbeast/core/transform/LinearTransformationTest.scala index 49de4baa5..3cb6f7d93 100644 --- a/core/src/test/scala/io/qbeast/core/transform/LinearTransformationTest.scala +++ b/core/src/test/scala/io/qbeast/core/transform/LinearTransformationTest.scala @@ -76,4 +76,43 @@ class LinearTransformationTest extends AnyFlatSpec with Matchers { } + it should "merge IdentityTransformations correctly" in { + val nullValue = 5000 + val linearT = LinearTransformation(0, 10000, nullValue, IntegerDataType) + + var otherNullValue = + LinearTransformationUtils.generateRandomNumber(0, 90000, Option(42.toLong)) + linearT.merge(IdentityTransformation(90000)) shouldBe LinearTransformation( + 0, + 90000, + otherNullValue, + IntegerDataType) + + otherNullValue = + LinearTransformationUtils.generateRandomNumber(-100, 10000, Option(42.toLong)) + linearT.merge(IdentityTransformation(-100)) shouldBe LinearTransformation( + -100, + 10000, + otherNullValue, + IntegerDataType) + + otherNullValue = LinearTransformationUtils.generateRandomNumber(0, 10000, Option(42.toLong)) + linearT.merge(IdentityTransformation(10)) shouldBe LinearTransformation( + 0, + 10000, + otherNullValue, + IntegerDataType) + } + + it should "detect new transformations that superseed" in { + val nullValue = 5000 + val linearT = LinearTransformation(0, 10000, nullValue, IntegerDataType) + + linearT.isSupersededBy(IdentityTransformation(90000)) shouldBe true + + linearT.isSupersededBy(IdentityTransformation(-100)) shouldBe true + + linearT.isSupersededBy(IdentityTransformation(10)) shouldBe false + + } } diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index 8879ccb6c..d6fe79c79 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -11,8 +11,10 @@ import org.apache.spark.sql.delta.Snapshot import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.SparkSession import java.net.URI +import org.apache.spark.sql.delta.DeltaLog /** * FileIndex to prune files @@ -77,3 +79,19 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex { override def partitionSchema: StructType = index.partitionSchema } + +/** + * Object OTreeIndex to create a new OTreeIndex + * @param sparkSession the spark session + * @param path the path to the delta log + * @return the OTreeIndex + */ +object OTreeIndex { + + def apply(spark: SparkSession, path: Path): OTreeIndex = { + val deltaLog = DeltaLog.forTable(spark, path) + val tahoe = TahoeLogFileIndex(spark, deltaLog, path, deltaLog.snapshot, Seq.empty, false) + OTreeIndex(tahoe) + } + +} diff --git a/src/main/scala/io/qbeast/spark/internal/QbeastSparkSessionExtension.scala b/src/main/scala/io/qbeast/spark/internal/QbeastSparkSessionExtension.scala index c08b3f7bd..829da0df6 100644 --- a/src/main/scala/io/qbeast/spark/internal/QbeastSparkSessionExtension.scala +++ b/src/main/scala/io/qbeast/spark/internal/QbeastSparkSessionExtension.scala @@ -4,7 +4,7 @@ package io.qbeast.spark.internal import io.delta.sql.DeltaSparkSessionExtension -import io.qbeast.spark.internal.rules.{ReplaceFileIndex, SampleRule} +import io.qbeast.spark.internal.rules.{SampleRule} import org.apache.spark.sql.SparkSessionExtensions /** @@ -19,10 +19,6 @@ class QbeastSparkSessionExtension extends DeltaSparkSessionExtension { extensions.injectOptimizerRule { session => new SampleRule(session) } - - extensions.injectOptimizerRule { session => - new ReplaceFileIndex(session) - } } } diff --git a/src/main/scala/io/qbeast/spark/internal/rules/ReplaceFileIndex.scala b/src/main/scala/io/qbeast/spark/internal/rules/ReplaceFileIndex.scala deleted file mode 100644 index 7d66a091d..000000000 --- a/src/main/scala/io/qbeast/spark/internal/rules/ReplaceFileIndex.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2021 Qbeast Analytics, S.L. - */ -package io.qbeast.spark.internal.rules - -import io.qbeast.spark.delta.OTreeIndex -import io.qbeast.spark.internal.sources.QbeastBaseRelation -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.delta.files.TahoeLogFileIndex -import org.apache.spark.sql.delta.{DeltaTable, DeltaTableUtils} -import org.apache.spark.sql.execution.datasources.LogicalRelation - -/** - * Rule class that changes the file index of the DeltaRelation - * underneath the QbeastRelation for an OTreeIndex - * @param spark The SparkSession to extend - */ -class ReplaceFileIndex(spark: SparkSession) extends Rule[LogicalPlan] { - - override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case r @ LogicalRelation(QbeastBaseRelation(delta, _), _, _, _) => - r.copy(relation = delta) match { - case p @ DeltaTable(fileIndex: TahoeLogFileIndex) => - DeltaTableUtils.replaceFileIndex(p, OTreeIndex(fileIndex)) - } - } - -} diff --git a/src/main/scala/io/qbeast/spark/internal/rules/SampleRule.scala b/src/main/scala/io/qbeast/spark/internal/rules/SampleRule.scala index a9e5c6d62..37a7f638a 100644 --- a/src/main/scala/io/qbeast/spark/internal/rules/SampleRule.scala +++ b/src/main/scala/io/qbeast/spark/internal/rules/SampleRule.scala @@ -5,7 +5,6 @@ package io.qbeast.spark.internal.rules import io.qbeast.core.model.Weight import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash -import io.qbeast.spark.internal.sources.QbeastBaseRelation import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{And, GreaterThanOrEqual, LessThan, Literal} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Sample} @@ -13,6 +12,9 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.SparkSession import io.qbeast.core.model.WeightRange +import io.qbeast.IndexedColumns +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import io.qbeast.spark.delta.OTreeIndex /** * Rule class that transforms a Sample operator over a QbeastRelation @@ -36,19 +38,18 @@ class SampleRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging { * Transforms the Sample Operator to a Filter * @param sample the Sample Operator * @param logicalRelation the LogicalRelation underneath - * @param qbeastBaseRelation the wrapped QbeastBaseRelation + * @param indexedColumns the IndexedColumns of the LogicalRelation * @return the new Filter */ private def transformSampleToFilter( sample: Sample, logicalRelation: LogicalRelation, - qbeastBaseRelation: QbeastBaseRelation): Filter = { + indexedColumns: IndexedColumns): Filter = { val weightRange = extractWeightRange(sample) val columns = - qbeastBaseRelation.columnTransformers.map(c => - logicalRelation.output.find(_.name == c.columnName).get) + indexedColumns.map(c => logicalRelation.output.find(_.name == c).get) val qbeastHash = new QbeastMurmur3Hash(columns) Filter( @@ -86,8 +87,15 @@ class SampleRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging { */ object QbeastRelation { - def unapply(plan: LogicalPlan): Option[(LogicalRelation, QbeastBaseRelation)] = plan match { - case l @ LogicalRelation(q: QbeastBaseRelation, _, _, _) => Some((l, q)) + def unapply(plan: LogicalPlan): Option[(LogicalRelation, IndexedColumns)] = plan match { + + case l @ LogicalRelation( + q @ HadoopFsRelation(o: OTreeIndex, _, _, _, _, parameters), + _, + _, + _) => + val columnsToIndex = parameters("columnsToIndex") + Some((l, columnsToIndex.split(","))) case _ => None } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala b/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala index e2dc50000..5cf3b2c71 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala @@ -3,29 +3,21 @@ */ package io.qbeast.spark.internal.sources -import io.qbeast.IISeq -import io.qbeast.core.model.{QTableID, Revision} -import io.qbeast.spark.delta.SparkDeltaMetadataManager -import io.qbeast.core.transform.Transformer -import org.apache.spark.sql.SQLContext +import io.qbeast.core.model.{QTableID} import org.apache.spark.sql.sources.BaseRelation -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.sources.InsertableRelation -/** - * Implementation of BaseRelation which wraps the Delta relation. - * - * @param relation the wrapped instance - * @param revision the revision to use - */ -case class QbeastBaseRelation(relation: BaseRelation, private val revision: Revision) - extends BaseRelation { - override def sqlContext: SQLContext = relation.sqlContext - - override def schema: StructType = relation.schema - - def columnTransformers: IISeq[Transformer] = revision.columnTransformers - -} +import org.apache.spark.sql.{SQLContext} +import org.apache.spark.sql.types.{StructType, StructField} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.SparkSession +import io.qbeast.spark.delta.OTreeIndex +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import io.qbeast.spark.table.IndexedTable +import io.qbeast.context.QbeastContext +import org.apache.hadoop.fs.{Path} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat /** * Companion object for QbeastBaseRelation @@ -38,9 +30,57 @@ object QbeastBaseRelation { * @return the QbeastBaseRelation */ - def forDeltaTable(tableID: QTableID): QbeastBaseRelation = { - val log = SparkDeltaMetadataManager.loadDeltaQbeastLog(tableID) - QbeastBaseRelation(log.createRelation, log.qbeastSnapshot.loadLatestRevision) + /** + * Returns a HadoopFsRelation that contains all of the data present + * in the table. This relation will be continually updated + * as files are added or removed from the table. However, new HadoopFsRelation + * must be requested in order to see changes to the schema. + * @param tableID the identifier of the table + * @param sqlContext the SQLContext + * @return the HadoopFsRelation + */ + def createRelation(sqlContext: SQLContext, table: IndexedTable): BaseRelation = { + + val spark = SparkSession.active + val tableID = table.tableID + val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID) + val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID) + val revision = snapshot.loadLatestRevision + val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",") + val cubeSize = revision.desiredCubeSize + val parameters = + Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString()) + + val path = new Path(tableID.id) + val fileIndex = OTreeIndex(spark, path) + val bucketSpec: Option[BucketSpec] = None + val file = new ParquetFileFormat() + + new HadoopFsRelation( + fileIndex, + partitionSchema = StructType(Seq.empty[StructField]), + dataSchema = schema, + bucketSpec = bucketSpec, + file, + parameters)(spark) with InsertableRelation { + def insert(data: DataFrame, overwrite: Boolean): Unit = { + table.save(data, parameters, append = !overwrite) + } + } + } + + /** + * Function that can be called from a QbeastBaseRelation object to create a + * new QbeastBaseRelation with a new tableID. + * @param tableID the identifier of the table + * @param indexedTable the indexed table + * @return BaseRelation for the new table in Qbeast format + */ + def forQbeastTable(tableID: QTableID, indexedTable: IndexedTable): BaseRelation = { + + val spark = SparkSession.active + createRelation(spark.sqlContext, indexedTable) + } } diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index e1cbfe025..77446a2d5 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -190,8 +190,13 @@ private[table] class IndexedTableImpl( } } - private def createQbeastBaseRelation(): QbeastBaseRelation = { - QbeastBaseRelation.forDeltaTable(tableID) + /** + * Creates a QbeastBaseRelation for the given table. + * @param tableID the table identifier + * @return the QbeastBaseRelation + */ + private def createQbeastBaseRelation(): BaseRelation = { + QbeastBaseRelation.forQbeastTable(tableID, this) } private def write(data: DataFrame, indexStatus: IndexStatus, append: Boolean): BaseRelation = { diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastInsertToTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastInsertToTest.scala new file mode 100644 index 000000000..964ad98c1 --- /dev/null +++ b/src/test/scala/io/qbeast/spark/utils/QbeastInsertToTest.scala @@ -0,0 +1,119 @@ +package io.qbeast.spark.utils + +import io.qbeast.spark.{QbeastIntegrationTestSpec} + +class QbeastInsertToTest extends QbeastIntegrationTestSpec { + + "Qbeast" should + "support insert into using select " + + "statement" in withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val cubeSize = 10000 + import spark.implicits._ + val initialData = Seq(5, 6, 7, 8).toDF("value") + val insertDataLower = Seq(2).toDF("value") + + initialData.write + .mode("append") + .format("qbeast") + .options(Map("columnsToIndex" -> "value", "cubeSize" -> cubeSize.toString)) + .save(tmpDir) + + val df = spark.read.format("qbeast").load(tmpDir) + + df.createOrReplaceTempView("t") + insertDataLower.createOrReplaceTempView("t_lower") + + // Insert using a SELECT statement + spark.sql("insert into table t select * from t_lower") + spark.sql("select * from t").collect() shouldBe initialData + .union(insertDataLower) + .collect() + + } + } + + it should + "support insert into using from " + + "statement" in withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val cubeSize = 10000 + import spark.implicits._ + val initialData = Seq(5, 6, 7, 8).toDF("value") + val insertDataLower = Seq(2).toDF("value") + + initialData.write + .mode("append") + .format("qbeast") + .options(Map("columnsToIndex" -> "value", "cubeSize" -> cubeSize.toString)) + .save(tmpDir) + + val df = spark.read.format("qbeast").load(tmpDir) + + df.createOrReplaceTempView("t") + insertDataLower.createOrReplaceTempView("t_lower") + + // Insert using a FROM statement + spark.sql("insert into table t from t_lower select *") + spark.sql("select * from t").collect() shouldBe initialData + .union(insertDataLower) + .collect() + } + } + + it should + "support insert into using multi-row values " + + "statement" in withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val cubeSize = 10000 + import spark.implicits._ + val initialData = Seq(5, 6, 7, 8).toDF("value") + + initialData.write + .mode("append") + .format("qbeast") + .options(Map("columnsToIndex" -> "value", "cubeSize" -> cubeSize.toString)) + .save(tmpDir) + + val df = spark.read.format("qbeast").load(tmpDir) + + df.createOrReplaceTempView("t") + + // Multi-Row Insert Using a VALUES Clause + spark.sql("insert into table t (value) values (4),(5)") + spark.sql("select * from t").collect() shouldBe initialData + .union(Seq(4, 5).toDF()) + .collect() + } + } + + it should + "support insert into using singe-row values " + + "statement" in withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val cubeSize = 10000 + import spark.implicits._ + val initialData = Seq(5, 6, 7, 8).toDF("value") + + initialData.write + .mode("append") + .format("qbeast") + .options(Map("columnsToIndex" -> "value", "cubeSize" -> cubeSize.toString)) + .save(tmpDir) + + val df = spark.read.format("qbeast").load(tmpDir) + + df.createOrReplaceTempView("t") + + // Single Row Insert Using a VALUES Clause + spark.sql("insert into table t (value) values (4)") + spark.sql("select * from t").collect() shouldBe initialData + .union(Seq(4).toDF()) + .collect() + + // TODO there might be more types of insert statements, but the most important + // ones are the ones above + } + } + +}