Skip to content

Commit

Permalink
Preliminary architecture of ArbitraryQuery support
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Feb 23, 2025
1 parent b5be929 commit f71b65a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 54 deletions.
8 changes: 8 additions & 0 deletions core/src/main/scala/lightdb/ArbitraryQuery.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package lightdb

import fabric.Json

case class ArbitraryQuery(query: String, params: Map[String, Json] = Map.empty) {
def param(name: String, value: Json): ArbitraryQuery = copy(params = params + (name -> value))
def params(tuples: (String, Json)*): ArbitraryQuery = copy(params = params ++ tuples.toMap)
}
8 changes: 7 additions & 1 deletion core/src/main/scala/lightdb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc], V](model: Mo
countTotal: Boolean = false,
scoreDocs: Boolean = false,
minDocScore: Option[Double] = None,
facets: List[FacetQuery[Doc]] = Nil) { query =>
facets: List[FacetQuery[Doc]] = Nil,
arbitraryQuery: Option[ArbitraryQuery] = None) { query =>
private type Q = Query[Doc, Model, V]

def scored: Q = copy(scoreDocs = true)
Expand All @@ -35,6 +36,8 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc], V](model: Mo
minDocScore = Some(min)
)

def withArbitraryQuery(query: ArbitraryQuery): Q = copy(arbitraryQuery = Some(query))

def clearFilters: Q = copy(filter = None)

def filter(f: Model => Filter[Doc]): Q = {
Expand Down Expand Up @@ -114,6 +117,9 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc], V](model: Mo
))

def search(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = {
if (arbitraryQuery.nonEmpty && !store.supportsArbitraryQuery) {
throw new UnsupportedOperationException(s"Arbitrary query is set, but not allowed with this store (${store.getClass.getSimpleName})")
}
val storeMode = store.storeMode
if (Query.Validation || (Query.WarnFilteringWithoutIndex && storeMode.isAll)) {
val notIndexed = filter.toList.flatMap(_.fields(model)).filter(!_.indexed)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/lightdb/store/Store.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala

abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name: String,
model: Model) extends Initializable with Disposable {
def supportsArbitraryQuery: Boolean = false

protected def id(doc: Doc): Id[Doc] = doc._id
lazy val idField: UniqueIndex[Doc, Id[Doc]] = model._id

Expand Down
114 changes: 61 additions & 53 deletions sql/src/main/scala/lightdb/sql/SQLStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.language.implicitConversions
abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, model: Model) extends Store[Doc, Model](name, model) {
protected def connectionManager: ConnectionManager

override def supportsArbitraryQuery: Boolean = true

override protected def initialize(): Task[Unit] = Task.next {
transaction { implicit transaction =>
initTransaction()
Expand Down Expand Up @@ -346,60 +348,66 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name:

override def doSearch[V](query: Query[Doc, Model, V])
(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = Task {
var extraFields = List.empty[SQLPart]
val fields = query.conversion match {
case Conversion.Value(field) => List(field)
case Conversion.Doc() | Conversion.Converted(_) => this.fields
case Conversion.Materialized(fields) => fields
case Conversion.DocAndIndexes() => if (storeMode.isIndexes) {
this.fields.filter(_.indexed)
} else {
this.fields
}
case Conversion.Json(fields) => fields
case d: Conversion.Distance[Doc, _] =>
extraFields = extraFields ::: extraFieldsForDistance(d)
this.fields
}
val state = getState
val b = SQLQueryBuilder(
store = this,
state = state,
fields = fields.map(f => fieldPart(f)) ::: extraFields,
filters = query.filter.map(filter2Part).toList,
group = Nil,
having = Nil,
sort = query.sort.collect {
case Sort.ByField(index, direction) =>
val dir = if (direction == SortDirection.Descending) "DESC" else "ASC"
SQLPart(s"${index.name} $dir")
case Sort.ByDistance(field, _, direction) => sortByDistance(field, direction)
},
limit = Some(query.limit),
offset = query.offset
)
val results = b.execute()
val rs = results.rs
state.register(rs)
val total = if (query.countTotal) {
Some(b.queryTotal())
} else {
None
query.arbitraryQuery match {
case Some(aq) =>
// TODO: Support arbitrary query
???
case None =>
var extraFields = List.empty[SQLPart]
val fields = query.conversion match {
case Conversion.Value(field) => List(field)
case Conversion.Doc() | Conversion.Converted(_) => this.fields
case Conversion.Materialized(fields) => fields
case Conversion.DocAndIndexes() => if (storeMode.isIndexes) {
this.fields.filter(_.indexed)
} else {
this.fields
}
case Conversion.Json(fields) => fields
case d: Conversion.Distance[Doc, _] =>
extraFields = extraFields ::: extraFieldsForDistance(d)
this.fields
}
val state = getState
val b = SQLQueryBuilder(
store = this,
state = state,
fields = fields.map(f => fieldPart(f)) ::: extraFields,
filters = query.filter.map(filter2Part).toList,
group = Nil,
having = Nil,
sort = query.sort.collect {
case Sort.ByField(index, direction) =>
val dir = if (direction == SortDirection.Descending) "DESC" else "ASC"
SQLPart(s"${index.name} $dir")
case Sort.ByDistance(field, _, direction) => sortByDistance(field, direction)
},
limit = Some(query.limit),
offset = query.offset
)
val results = b.execute()
val rs = results.rs
state.register(rs)
val total = if (query.countTotal) {
Some(b.queryTotal())
} else {
None
}
val stream = rapid.Stream.fromIterator[(V, Double)](Task {
val iterator = rs2Iterator(rs, query.conversion)
val ps = rs.getStatement.asInstanceOf[PreparedStatement]
ActionIterator(iterator.map(v => v -> 0.0), onClose = () => state.returnPreparedStatement(b.sql, ps))
})
SearchResults(
model = model,
offset = query.offset,
limit = query.limit,
total = total,
streamWithScore = stream,
facetResults = Map.empty,
transaction = transaction
)
}
val stream = rapid.Stream.fromIterator[(V, Double)](Task {
val iterator = rs2Iterator(rs, query.conversion)
val ps = rs.getStatement.asInstanceOf[PreparedStatement]
ActionIterator(iterator.map(v => v -> 0.0), onClose = () => state.returnPreparedStatement(b.sql, ps))
})
SearchResults(
model = model,
offset = query.offset,
limit = query.limit,
total = total,
streamWithScore = stream,
facetResults = Map.empty,
transaction = transaction
)
}

protected def sortByDistance[G <: Geo](field: Field[_, List[G]], direction: SortDirection): SQLPart = {
Expand Down

0 comments on commit f71b65a

Please sign in to comment.