Skip to content

Commit

Permalink
#12 Fix range when scroll is used in search. Scroll is used when the …
Browse files Browse the repository at this point in the history
…number of requested elements is greater than 2*pageSize.
  • Loading branch information
To-om committed Apr 14, 2017
1 parent e8a9a97 commit fb677c0
Showing 1 changed file with 36 additions and 30 deletions.
66 changes: 36 additions & 30 deletions app/org/elastic4play/database/DBFind.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,23 @@ package org.elastic4play.database

import javax.inject.{ Inject, Singleton }

import scala.annotation.implicitNotFound
import scala.collection.mutable
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.{ DurationLong, FiniteDuration }
import scala.util.{ Failure, Success, Try }

import akka.NotUsed
import akka.actor.{ PoisonPill, Props, Stash, actorRef2Scala }
import akka.pattern.ask
import akka.stream.Materializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.util.Timeout

import play.api.{ Configuration, Logger }
import play.api.libs.json.{ JsNull, JsObject, JsString, Json }

import com.sksamuel.elastic4s.{ RichSearchHit, RichSearchResponse, SearchDefinition }
import com.sksamuel.elastic4s.ElasticDsl.{ clear, searchScroll }
import com.sksamuel.elastic4s.{ RichSearchHit, RichSearchResponse, SearchDefinition }
import org.elastic4play.{ InternalError, SearchError }
import play.api.libs.json.{ JsNull, JsObject, JsString, Json }
import play.api.{ Configuration, Logger }

import org.elastic4play.{ InternalError, SearchError, Timed }
import scala.collection.mutable
import scala.concurrent.duration.{ DurationLong, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }

/**
* Service class responsible for entity search
Expand Down Expand Up @@ -79,7 +75,7 @@ class DBFind(
/**
* Execute the search definition using scroll
*/
private[database] def searchWithScroll(searchDefinition: SearchDefinition, limit: Int): (Source[RichSearchHit, NotUsed], Future[Long]) = {
private[database] def searchWithScroll(searchDefinition: SearchDefinition, offset: Int, limit: Int): (Source[RichSearchHit, NotUsed], Future[Long]) = {
implicit val timeout = Timeout(keepAlive)
import akka.stream.scaladsl._

Expand All @@ -89,22 +85,23 @@ class DBFind(
db,
searchDefinition limit pageSize,
keepAliveStr,
offset,
limit))
.toMat(Sink.asPublisher(true))(Keep.both)
.run()
val total = (actorRef ? SearchPublisher.Start).flatMap {
case Success(l: Long) Future.successful(l)
case Failure(t) Future.failed(t)
case r Future.failed(InternalError(s"Unexpected actor response : %r"))
case _ Future.failed(InternalError(s"Unexpected actor response : %r"))
}
(Source.fromPublisher(pub), total)
}

/**
* Execute the search definition
*/
private[database] def searchWithoutScroll(searchDefinition: SearchDefinition, limit: Int): (Source[RichSearchHit, NotUsed], Future[Long]) = {
val resp = db.execute(searchDefinition limit limit)
private[database] def searchWithoutScroll(searchDefinition: SearchDefinition, offset: Int, limit: Int): (Source[RichSearchHit, NotUsed], Future[Long]) = {
val resp = db.execute(searchDefinition.start(offset).limit(limit))
val total = resp.map(_.totalHits)
val src = Source
.fromFuture(resp)
Expand Down Expand Up @@ -136,14 +133,14 @@ class DBFind(
def apply(range: Option[String], sortBy: Seq[String])(query: (String) SearchDefinition): (Source[JsObject, NotUsed], Future[Long]) = {
val (offset, limit) = getOffsetAndLimitFromRange(range)
val sortDef = DBUtils.sortDefinition(sortBy)
val searchDefinition = query(indexName) fields ("_source", "_routing", "_parent") start offset sort (sortDef: _*)
val searchDefinition = query(indexName).fields("_source", "_routing", "_parent").sort(sortDef: _*)

log.debug(s"search ${searchDefinition._builder}")
val (src, total) = if (limit > pageSize) {
searchWithScroll(searchDefinition, limit)
val (src, total) = if (limit > 2 * pageSize) {
searchWithScroll(searchDefinition, offset, limit)
}
else {
searchWithoutScroll(searchDefinition, limit)
searchWithoutScroll(searchDefinition, offset, limit)
}

(src.map(hit2json), total)
Expand All @@ -153,7 +150,7 @@ class DBFind(
* Execute the search definition
* This function is used to run aggregations
*/
def apply(query: (String) SearchDefinition) = {
def apply(query: (String) SearchDefinition): Future[RichSearchResponse] = {
val searchDefinition = query(indexName)
db.execute(searchDefinition)
.recoverWith {
Expand All @@ -180,10 +177,11 @@ class SearchPublisher(
db: DBConfiguration,
searchDefinition: SearchDefinition,
keepAliveStr: String,
offset: Int,
max: Int) extends ActorPublisher[RichSearchHit] with Stash {
import SearchPublisher._
import context.dispatcher
import akka.stream.actor.ActorPublisherMessage._
import context.dispatcher
private val queue: mutable.Queue[RichSearchHit] = mutable.Queue.empty
private var processed: Long = 0
private var scrollId: Option[String] = None
Expand All @@ -193,7 +191,7 @@ class SearchPublisher(
* initial state of the actor
* It can only receive "Start" message. All other messages are stashed
*/
def receive = {
def receive: Receive = {
case Start
val _sender = sender
db.execute(searchDefinition.scroll(keepAliveStr)).onComplete {
Expand All @@ -207,7 +205,7 @@ class SearchPublisher(
onError(t)
self ! PoisonPill
}
context become fetching
context become fetching(offset)
unstashAll()
case _
stash()
Expand All @@ -218,17 +216,18 @@ class SearchPublisher(
* If the queue is not empty send entities
* If the queue is not enough change
*/
def ready: Receive = {
def ready(remainingOffset: Int): Receive = {
case Request(n) if n > queue.size
require(scrollId.isDefined)
db.execute(searchScroll(scrollId.get).keepAlive(keepAliveStr)).onComplete {
case Success(result)
self ! result
case Failure(t)
onComplete()
onError(t)
self ! PoisonPill
}
context become fetching
context become fetching(remainingOffset)
self ! Request(n - queue.size)
send(queue.size.toLong)
case Request(n)
Expand All @@ -239,19 +238,26 @@ class SearchPublisher(
* In this state, actor retrieve the next page of result using scroll
* and add in local queue
*/
def fetching: Receive = {
case Request(n)
def fetching(remainingOffset: Int): Receive = {
case Request(_)
require(queue.isEmpty) // must be empty or why did we not send it before switching to this mode?
stash()
case resp: RichSearchResponse if resp.isTimedOut
onComplete()
onError(SearchError("Request terminated early or timed out", null))
context.stop(self)
case resp: RichSearchResponse if resp.isEmpty
onComplete()
context.stop(self)
case resp: RichSearchResponse
queue.enqueue(resp.hits: _*)
context become ready
val l = resp.hits.length
if (l > remainingOffset) {
queue.enqueue(resp.hits.drop(remainingOffset): _*)
context become ready(0)
}
else {
context become ready(remainingOffset - l)
}
unstashAll()
}

Expand Down

0 comments on commit fb677c0

Please sign in to comment.