Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid repeated reading of the DeltaLog #65

Merged
merged 16 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/QbeastBlock.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.qbeast.core.model

/**
* Container class for Qbeast file's metadata
* @param path
* @param revision
* @param minWeight
* @param maxWeight
* @param state
* @param elementCount
* @param size
* @param modificationTime
*/

case class QbeastBlock(
path: String,
revision: Long,
minWeight: Weight,
maxWeight: Weight,
state: String,
elementCount: Long,
size: Long,
modificationTime: Long)

/**
* Companion object for QbeastBlock
*/
object QbeastBlock {

private val metadataKeys =
Set("minWeight", "maxWeight", "state", "revision", "elementCount")

private def checkBlockMetadata(blockMetadata: Map[String, String]): Unit = {
metadataKeys.foreach(key =>
if (!blockMetadata.contains(key)) {
throw new IllegalArgumentException(s"Missing metadata key $key")
})
}

/**
* Creates a QbeastBlock from a file path and metadata map
* @param path
* @param blockMetadata
* @param size
* @param modificationTime
* @return
*/
def apply(
path: String,
blockMetadata: Map[String, String],
size: Long,
modificationTime: Long): QbeastBlock = {
checkBlockMetadata(blockMetadata)

QbeastBlock(
path,
blockMetadata("revision").toLong,
Weight(blockMetadata("minWeight").toInt),
Weight(blockMetadata("maxWeight").toInt),
blockMetadata("state"),
blockMetadata("elementCount").toLong,
size,
modificationTime)
}

}
56 changes: 0 additions & 56 deletions core/src/main/scala/io/qbeast/core/model/QbeastFile.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,13 @@ case class IndexStatus(
* Container for the status information of a cube
* @param maxWeight the max weight of the cube
* @param normalizedWeight the normalized weight of the cube
* @param files the name of the files belonging to the cube
* @param files the files belonging to the cube
*/
case class CubeStatus(maxWeight: Weight, normalizedWeight: NormalizedWeight, files: IISeq[String])
case class CubeStatus(
cubeId: CubeId,
maxWeight: Weight,
normalizedWeight: NormalizedWeight,
files: IISeq[QbeastBlock])
extends Serializable

/**
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/io/qbeast/core/model/Weight.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ object Weight {
*/
val MinValue: Weight = Weight(Int.MinValue)

private val offset: Double = MinValue.value.toDouble
private val range: Double = MaxValue.value.toDouble - offset
private[qbeast] val offset: Double = MinValue.value.toDouble
private[qbeast] val range: Double = MaxValue.value.toDouble - offset

/**
* Creates a weight from a given fraction. The fraction must
Expand Down
38 changes: 38 additions & 0 deletions core/src/test/scala/io/qbeast/core/model/QbeastBlockTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.qbeast.core.model

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class QbeastBlockTest extends AnyFlatSpec with Matchers {
"QbeastBlock" should "find all the keys in the map" in {
val blockMetadata: Map[String, String] = Map(
"minWeight" -> "19217",
"maxWeight" -> "11111111",
"state" -> "FlOODED",
"revision" -> "1",
"elementCount" -> "777")

val qbeastBlock = QbeastBlock("path", blockMetadata, 0L, 0L)
qbeastBlock.minWeight shouldBe Weight(19217)
qbeastBlock.maxWeight shouldBe Weight(11111111)
qbeastBlock.state shouldBe "FlOODED"
qbeastBlock.revision shouldBe 1
qbeastBlock.elementCount shouldBe 777
}

it should "throw exception if key not found" in {
val blockMetadata = Map.empty[String, String]
a[IllegalArgumentException] shouldBe thrownBy(QbeastBlock("path", blockMetadata, 0L, 0L))
}

it should "throw error if the types are different" in {
val blockMetadata: Map[String, String] = Map(
"minWeight" -> "19217",
"maxWeight" -> "11111111",
"state" -> "FlOODED",
"revision" -> "bad_type",
"elementCount" -> "777")

a[IllegalArgumentException] shouldBe thrownBy(QbeastBlock("path", blockMetadata, 0L, 0L))
}
}
41 changes: 0 additions & 41 deletions core/src/test/scala/io/qbeast/core/model/QbeastFileTest.scala

This file was deleted.

10 changes: 5 additions & 5 deletions src/main/scala/io/qbeast/spark/delta/CubeDataLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.qbeast.spark.delta

import io.qbeast.core.model.{CubeId, QTableID, Revision}
import io.qbeast.spark.utils.{State, TagUtils}
import io.qbeast.spark.utils.{State, TagColumns}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand Down Expand Up @@ -61,10 +61,10 @@ case class CubeDataLoader(tableID: QTableID) {
def loadCubeData(cube: CubeId, revision: Revision): DataFrame = {

val cubeBlocks = snapshot.allFiles
.filter(file =>
file.tags(TagUtils.revision) == revision.revisionID.toString &&
cube.string == file.tags(TagUtils.cube) &&
file.tags(TagUtils.state) != State.ANNOUNCED)
.where(
TagColumns.revision === lit(revision.revisionID.toString) &&
TagColumns.cube === lit(cube.string) &&
TagColumns.state != lit(State.ANNOUNCED))
.collect()

val fileNames = cubeBlocks.map(f => new Path(tableID.id, f.path).toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.qbeast.spark.delta

import io.qbeast.core.model.{QTableID, TableChanges}
import io.qbeast.spark.utils.TagUtils
import io.qbeast.spark.utils.TagColumns
import org.apache.spark.sql.delta.actions.{
Action,
AddFile,
Expand All @@ -14,6 +14,7 @@ import org.apache.spark.sql.delta.actions.{
}
import org.apache.spark.sql.delta.commands.DeltaCommand
import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaOptions, OptimisticTransaction}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisExceptionFactory, SaveMode, SparkSession}

Expand Down Expand Up @@ -60,9 +61,8 @@ private[delta] case class DeltaMetadataWriter(
val cubeStrings = deltaReplicatedSet.map(_.string)
val cubeBlocks =
deltaLog.snapshot.allFiles
.filter(file =>
file.tags(TagUtils.revision) == revision.revisionID.toString &&
cubeStrings.contains(file.tags(TagUtils.cube)))
.where(TagColumns.revision === lit(revision.revisionID.toString) &&
TagColumns.cube.isInCollection(cubeStrings))
.collect()

val newReplicatedFiles = cubeBlocks.map(ReplicatedFile(_))
Expand Down
27 changes: 16 additions & 11 deletions src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,19 @@
package io.qbeast.spark.delta

import io.qbeast.IISeq
import io.qbeast.core.model.{
IndexStatus,
QbeastSnapshot,
ReplicatedSet,
Revision,
RevisionID,
mapper
}
import io.qbeast.spark.utils.MetadataConfig
import org.apache.spark.sql.AnalysisExceptionFactory
import io.qbeast.core.model._
import io.qbeast.spark.utils.{MetadataConfig, TagColumns}
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{AnalysisExceptionFactory, Dataset}

/**
* Qbeast Snapshot that provides information about the current index state.
*
* @param snapshot the internal Delta Lakes log snapshot
*/
case class DeltaQbeastSnapshot(snapshot: Snapshot) extends QbeastSnapshot {
case class DeltaQbeastSnapshot(private val snapshot: Snapshot) extends QbeastSnapshot {

def isInitial: Boolean = snapshot.version == -1

Expand Down Expand Up @@ -160,4 +155,14 @@ case class DeltaQbeastSnapshot(snapshot: Snapshot) extends QbeastSnapshot {
}
}

/**
* Loads the dataset of qbeast blocks for a given revision
* @param revisionID the revision identifier
* @return the Dataset of QbeastBlocks
*/
def loadRevisionBlocks(revisionID: RevisionID): Dataset[AddFile] = {
snapshot.allFiles
.where(TagColumns.revision === lit(revisionID.toString))
}

}
Loading