Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Revision metadata and Snapshot #29

Merged
merged 14 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 48 additions & 62 deletions src/main/scala/io/qbeast/spark/index/OTreeAlgorithm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@
*/
package io.qbeast.spark.index

import io.qbeast.spark.index.QbeastColumns.{
cubeColumnName,
cubeToReplicateColumnName,
stateColumnName,
weightColumnName
}
import io.qbeast.spark.index.QbeastColumns._
import io.qbeast.spark.model._
import io.qbeast.spark.sql.qbeast.QbeastSnapshot
import io.qbeast.spark.sql.qbeast.RevisionSnapshot
import io.qbeast.spark.sql.rules.Functions.qbeastHash
import io.qbeast.spark.sql.utils.State._
import org.apache.spark.sql.functions._
Expand All @@ -33,30 +28,30 @@ trait OTreeAlgorithm {
*/
def indexFirst(
dataFrame: DataFrame,
columnsToIndex: Seq[String]): (DataFrame, SpaceRevision, Map[CubeId, Weight])
columnsToIndex: Seq[String]): (DataFrame, Revision, Map[CubeId, Weight])

/**
* Indexes a given non-first data frame using the current snapshot
* Indexes a given non-first data frame using the current revision snapshot
* of the index state and the announced set.
*
* @param dataFrame the data frame to append
* @param snapshot the index state snapshot
* @param announcedSet the announced set
* @return the indexed data frame, the space revision and the weightMap
* @param revisionSnapshot the index state snapshot of the revision
* @param announcedSet the announced set of the revision
* @return the indexed data frame, the revision and the weightMap
*/
def indexNext(
dataFrame: DataFrame,
snapshot: QbeastSnapshot,
announcedSet: Set[CubeId]): (DataFrame, SpaceRevision, Map[CubeId, Weight])
revisionSnapshot: RevisionSnapshot,
announcedSet: Set[CubeId]): (DataFrame, Revision, Map[CubeId, Weight])

/**
* Returns the columns contributing to the pseudo random weight generation.
*
* @param schema the schema
* @param columnsToIndex the columns to index
* @param dimensionColumns the columns to index
* @return the columns
*/
def getWeightContributorColumns(schema: StructType, columnsToIndex: Seq[String]): Seq[String]
def getWeightContributorColumns(schema: StructType, dimensionColumns: Seq[String]): Seq[String]

/**
* The desired size of the cube.
Expand All @@ -69,25 +64,22 @@ trait OTreeAlgorithm {
* Takes the data from different cubes and replicates it to their children
*
* @param dataFrame data to be replicated
* @param spaceRevision current space revision to index
* @param qbeastSnapshot current snapshot of the index
* @param revisionSnapshot current revision snapshot to index
* @param cubesToReplicate set of cubes to replicate
* @return the modified dataFrame with replicated data
*/
def replicateCubes(
dataFrame: DataFrame,
spaceRevision: SpaceRevision,
qbeastSnapshot: QbeastSnapshot,
revisionSnapshot: RevisionSnapshot,
cubesToReplicate: Set[CubeId]): (DataFrame, Map[CubeId, Weight])

/**
* Analyze the index structure and returns which cubes need to be optimized
*
* @param qbeastSnapshot snapshot
* @param spaceRevision space revision to review
* @param revisionSnapshot snapshot of a single Revision
* @return the sequence of cubes that need optimization
*/
def analyzeIndex(qbeastSnapshot: QbeastSnapshot, spaceRevision: SpaceRevision): Seq[CubeId]
def analyzeIndex(revisionSnapshot: RevisionSnapshot): Seq[CubeId]

}

Expand All @@ -107,34 +99,32 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)

override def indexFirst(
dataFrame: DataFrame,
columnsToIndex: Seq[String]): (DataFrame, SpaceRevision, Map[CubeId, Weight]) = {
columnsToIndex: Seq[String]): (DataFrame, Revision, Map[CubeId, Weight]) = {
// splitting the list of columns in two to fit the signature of the agg method.
val spaceRevision = SpaceRevision(dataFrame, columnsToIndex)
val revision = Revision(dataFrame, columnsToIndex, desiredCubeSize)
val (indexedDataFrame, cubeWeights: Map[CubeId, Weight]) = index(
dataFrame = dataFrame,
columnsToIndex = columnsToIndex,
spaceRevision = spaceRevision,
revision = revision,
cubeNormalizedWeights = Map.empty,
announcedSet = Set.empty,
replicatedSet = Set.empty,
isReplication = false)
(indexedDataFrame, spaceRevision, cubeWeights)
(indexedDataFrame, revision, cubeWeights)
}

override def indexNext(
dataFrame: DataFrame,
snapshot: QbeastSnapshot,
announcedSet: Set[CubeId]): (DataFrame, SpaceRevision, Map[CubeId, Weight]) = {
val spaceRevision = snapshot.lastSpaceRevision
revisionSnapshot: RevisionSnapshot,
announcedSet: Set[CubeId]): (DataFrame, Revision, Map[CubeId, Weight]) = {
val revision = revisionSnapshot.revision
val (indexedDataFrame, cubeWeights: Map[CubeId, Weight]) = index(
dataFrame = dataFrame,
columnsToIndex = snapshot.indexedCols,
spaceRevision,
cubeNormalizedWeights = snapshot.cubeNormalizedWeights(spaceRevision),
revision,
cubeNormalizedWeights = revisionSnapshot.cubeNormalizedWeights,
announcedSet = announcedSet,
replicatedSet = snapshot.replicatedSet(spaceRevision),
replicatedSet = revisionSnapshot.replicatedSet,
isReplication = false)
(indexedDataFrame, spaceRevision, cubeWeights)
(indexedDataFrame, revision, cubeWeights)
}

override def getWeightContributorColumns(
Expand All @@ -150,13 +140,11 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)
}
}.map(_.name)

override def analyzeIndex(
qbeastSnapshot: QbeastSnapshot,
spaceRevision: SpaceRevision): Seq[CubeId] = {
override def analyzeIndex(revisionSnapshot: RevisionSnapshot): Seq[CubeId] = {

val dimensionCount = qbeastSnapshot.indexedCols.length
val overflowedSet = qbeastSnapshot.overflowedSet(spaceRevision)
val replicatedSet = qbeastSnapshot.replicatedSet(spaceRevision)
val dimensionCount = revisionSnapshot.revision.dimensionCount
val overflowedSet = revisionSnapshot.overflowedSet
val replicatedSet = revisionSnapshot.replicatedSet

val cubesToOptimize = overflowedSet
.filter(cube => {
Expand All @@ -173,18 +161,15 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)

override def replicateCubes(
dataFrame: DataFrame,
spaceRevision: SpaceRevision,
qbeastSnapshot: QbeastSnapshot,
revisionSnapshot: RevisionSnapshot,
announcedSet: Set[CubeId]): (DataFrame, Map[CubeId, Weight]) = {

val columnsToIndex = qbeastSnapshot.indexedCols
val cubeWeights = qbeastSnapshot.cubeNormalizedWeights(spaceRevision)
val replicatedSet = qbeastSnapshot.replicatedSet(spaceRevision)
val cubeWeights = revisionSnapshot.cubeNormalizedWeights
val replicatedSet = revisionSnapshot.replicatedSet

index(
dataFrame = dataFrame,
columnsToIndex = columnsToIndex,
spaceRevision = spaceRevision,
revision = revisionSnapshot.revision,
cubeNormalizedWeights = cubeWeights,
announcedSet = announcedSet,
replicatedSet = replicatedSet,
Expand All @@ -194,8 +179,7 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)

private def index(
dataFrame: DataFrame,
columnsToIndex: Seq[String],
spaceRevision: SpaceRevision,
revision: Revision,
cubeNormalizedWeights: Map[CubeId, NormalizedWeight],
announcedSet: Set[CubeId],
replicatedSet: Set[CubeId],
Expand All @@ -204,7 +188,9 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)
val sqlContext = SparkSession.active.sqlContext
import sqlContext.implicits._

val weightedDataFrame = dataFrame.transform(df => addRandomWeight(df, columnsToIndex))
val dimensionColumns = revision.dimensionColumns
val weightedDataFrame =
dataFrame.transform(df => addRandomWeight(df, dimensionColumns))

val partitionCount = weightedDataFrame.rdd.getNumPartitions

Expand All @@ -214,10 +200,10 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)
desiredCubeSize
}

val dimensionCount = columnsToIndex.length
val dimensionCount = revision.dimensionCount
val selectionColumns =
if (isReplication) columnsToIndex ++ Seq(weightColumnName, cubeToReplicateColumnName)
else columnsToIndex ++ Seq(weightColumnName)
if (isReplication) dimensionColumns ++ Seq(weightColumnName, cubeToReplicateColumnName)
else dimensionColumns ++ Seq(weightColumnName)

val partitionedEstimatedCubeWeights = weightedDataFrame
.selectExpr(selectionColumns: _*)
Expand All @@ -229,8 +215,8 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)
announcedSet,
replicatedSet)
rows.foreach { row =>
val values = columnsToIndex.map(row.getAs[Any])
val point = rowValuesToPoint(values, spaceRevision)
val values = dimensionColumns.map(row.getAs[Any])
val point = rowValuesToPoint(values, revision)
val weight = Weight(row.getAs[Int](weightColumnName))
if (isReplication) {
val parentBytes = row.getAs[Array[Byte]](cubeToReplicateColumnName)
Expand Down Expand Up @@ -258,7 +244,7 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)

val findTargetCubeIds =
udf((rowValues: Seq[Any], weightValue: Int, parentBytes: Any) => {
val point = rowValuesToPoint(rowValues, spaceRevision)
val point = rowValuesToPoint(rowValues, revision)
val weight = Weight(weightValue)
val parent = parentBytes match {
case bytes: Array[Byte] => Some(CubeId(dimensionCount, bytes))
Expand All @@ -281,7 +267,7 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)
cubeColumnName,
explode(
findTargetCubeIds(
rowValuesColumn(columnsToIndex),
rowValuesColumn(dimensionColumns),
col(weightColumnName), {
if (isReplication) col(cubeToReplicateColumnName)
else lit(null)
Expand Down Expand Up @@ -312,7 +298,7 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)

}

private def rowValuesToPoint(values: Seq[Any], spaceRevision: SpaceRevision): Point = {
private def rowValuesToPoint(values: Seq[Any], spaceRevision: Revision): Point = {
val coordinates = IndexedSeq.newBuilder[Double]
for (value <- values) {
value match {
Expand All @@ -331,8 +317,8 @@ final class OTreeAlgorithmImpl(val desiredCubeSize: Int)
private def rowValuesColumn(columnsToIndex: Seq[String]): Column =
array(columnsToIndex.map(col): _*)

private def addRandomWeight(df: DataFrame, columnsToIndex: Seq[String]): DataFrame = {
val columns = getWeightContributorColumns(df.schema, columnsToIndex).map(name => df(name))
private def addRandomWeight(df: DataFrame, dimensionColumns: Seq[String]): DataFrame = {
val columns = getWeightContributorColumns(df.schema, dimensionColumns).map(name => df(name))
df.withColumn(weightColumnName, qbeastHash(columns: _*))
}

Expand Down
Loading