diff --git a/build.sbt b/build.sbt index 817794f89..516c3502a 100644 --- a/build.sbt +++ b/build.sbt @@ -118,6 +118,7 @@ lazy val scala3Projects: Seq[ProjectReference] = Seq( httpstreams, akkastreams, pekkostreams, + reactivestreamsakka, reactivestreamspekko ) lazy val scala3_root = Project("elastic4s-scala3", file("scala3")) @@ -242,6 +243,19 @@ lazy val httpstreams = (project in file("elastic4s-http-streams")) ) ) +lazy val reactivestreamsakka = (project in file("elastic4s-reactivestreams-akka")) + .dependsOn(core, testkit % "test", jackson % "test") + .settings(name := "elastic4s-reactivestreams-akka") + .settings(scala3Settings) + .settings(libraryDependencies ++= + Seq( + Dependencies.akkaActor, + Dependencies.akkaStream, + Dependencies.reactiveStreamsTck, + Dependencies.scalaTestPlusTestng + ) + ) + lazy val reactivestreamspekko = (project in file("elastic4s-reactivestreams-pekko")) .dependsOn(core, testkit % "test", jackson % "test") .settings(name := "elastic4s-reactivestreams-pekko") diff --git a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/BulkIndexingSubscriber.scala b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/BulkIndexingSubscriber.scala index 00464e049..5d5340b5c 100644 --- a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/BulkIndexingSubscriber.scala +++ b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/BulkIndexingSubscriber.scala @@ -23,6 +23,7 @@ import scala.util.{Failure, Success} * @param builder used to turn elements of T into IndexDefinitions so they can be used in the bulk indexer * @tparam T the type of element provided by the publisher this subscriber will subscribe with */ +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") class BulkIndexingSubscriber[T] private[streams] ( client: ElasticClient, builder: RequestBuilder[T], @@ -61,6 +62,7 @@ class BulkIndexingSubscriber[T] private[streams] ( actor ! PoisonPill } +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") object BulkActor { // signifies that the downstream publisher has completed (NOT that a bulk request has suceeded) @@ -78,6 +80,7 @@ object BulkActor { } +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") class BulkActor[T](client: ElasticClient, subscription: Subscription, builder: RequestBuilder[T], @@ -281,6 +284,7 @@ class BulkActor[T](client: ElasticClient, * * @tparam T the type of elements this builder supports */ +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") trait RequestBuilder[T] { def request(t: T): BulkCompatibleRequest } @@ -288,11 +292,13 @@ trait RequestBuilder[T] { /** * Notified on each acknowledgement */ +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") trait ResponseListener[-T] { def onAck(resp: BulkResponseItem, original: T): Unit def onFailure(resp: BulkResponseItem, original: T): Unit = () } +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") object ResponseListener { val noop: ResponseListener[Any] = new ResponseListener[Any] { override def onAck(resp: BulkResponseItem, original: Any): Unit = () @@ -322,6 +328,7 @@ object ResponseListener { * Once an index is performed (either by this flush value or because docs arrived in time) * the flush after schedule is reset. **/ +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") case class SubscriberConfig[T](batchSize: Int = 100, concurrentRequests: Int = 5, refreshAfterOp: Boolean = false, diff --git a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ReactiveElastic.scala b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ReactiveElastic.scala index bbcb6bbc7..05be0027e 100644 --- a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ReactiveElastic.scala +++ b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ReactiveElastic.scala @@ -7,6 +7,7 @@ import com.sksamuel.elastic4s.{ElasticClient, Indexes, IndexesAndTypes} import scala.concurrent.duration._ import scala.language.implicitConversions +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") object ReactiveElastic { implicit class ReactiveElastic(client: ElasticClient) { diff --git a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala index a21e10da0..cb770b30c 100644 --- a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala +++ b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala @@ -21,6 +21,7 @@ import scala.util.{Failure, Success} * @param maxItems the maximum number of elements to return * @param actorRefFactory an Actor reference factory required by the publisher */ +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") class ScrollPublisher private[streams] (client: ElasticClient, search: SearchRequest, maxItems: Long)( implicit actorRefFactory: ActorRefFactory ) extends Publisher[SearchHit] { @@ -38,6 +39,7 @@ class ScrollPublisher private[streams] (client: ElasticClient, search: SearchReq } } +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") class ScrollSubscription(client: ElasticClient, query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)( implicit actorRefFactory: ActorRefFactory ) extends Subscription { @@ -61,11 +63,13 @@ class ScrollSubscription(client: ElasticClient, query: SearchRequest, s: Subscri } } +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") object PublishActor { object Ready case class Request(n: Long) } +@deprecated("Use the elastic4-reactivestreams-akka package", "8.16.0") class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long) extends Actor with Stash { diff --git a/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/BulkIndexingSubscriber.scala b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/BulkIndexingSubscriber.scala new file mode 100644 index 000000000..ab008505b --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/BulkIndexingSubscriber.scala @@ -0,0 +1,336 @@ +package com.sksamuel.elastic4s.akka.reactivestreams + +import akka.actor._ +import com.sksamuel.elastic4s.requests.bulk.{BulkCompatibleRequest, BulkRequest, BulkResponseItem} +import com.sksamuel.elastic4s.requests.common.RefreshPolicy +import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess} +import org.reactivestreams.{Subscriber, Subscription} + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.language.higherKinds +import scala.util.{Failure, Success} + +/** + * An implementation of the reactive API Subscriber. + * This subscriber will bulk index received elements. The bulk nature means that the elasticsearch + * index operations are performed as a bulk call, the size of which are controlled by the batchSize param. + * + * The received elements must be converted into an elastic4s bulk compatible definition, such as index or delete. + * This is done by the RequestBuilder. + * + * @param client used to connect to the cluster + * @param builder used to turn elements of T into IndexDefinitions so they can be used in the bulk indexer + * @tparam T the type of element provided by the publisher this subscriber will subscribe with + */ +class BulkIndexingSubscriber[T] private[reactivestreams] ( + client: ElasticClient, + builder: RequestBuilder[T], + config: SubscriberConfig[T] +)(implicit actorRefFactory: ActorRefFactory) + extends Subscriber[T] { + + private var actor: ActorRef = _ + + override def onSubscribe(sub: Subscription): Unit = { + // rule 1.9 https://github.com/reactive-streams/reactive-streams-jvm#2.5 + // when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller + if (sub == null) throw new NullPointerException() + if (actor == null) + actor = actorRefFactory.actorOf(Props(new BulkActor(client, sub, builder, config))) + else + // rule 2.5, must cancel subscription if onSubscribe has been invoked twice + // https://github.com/reactive-streams/reactive-streams-jvm#2.5 + sub.cancel() + } + + override def onNext(t: T): Unit = { + if (t == null) throw new NullPointerException("On next should not be called until onSubscribe has returned") + actor ! t + } + + override def onError(t: Throwable): Unit = { + if (t == null) throw new NullPointerException() + actor ! t + } + + override def onComplete(): Unit = + actor ! BulkActor.Completed + + def close(): Unit = + actor ! PoisonPill +} + +object BulkActor { + + // signifies that the downstream publisher has completed (NOT that a bulk request has suceeded) + case object Completed + + case object ForceIndexing + + case class Result[T](items: Seq[BulkResponseItem], originals: Seq[T], requestNext: Boolean) + + case class FailedResult[T](items: Seq[BulkResponseItem], originals: Seq[T], requestNext: Boolean) + + case class Request(n: Int) + + case class Send[T](req: BulkRequest, originals: Seq[T], attempts: Int) + +} + +class BulkActor[T](client: ElasticClient, + subscription: Subscription, + builder: RequestBuilder[T], + config: SubscriberConfig[T]) + extends Actor { + + import com.sksamuel.elastic4s.ElasticDsl._ + import context.{dispatcher, system} + + private val buffer = new ArrayBuffer[T]() + buffer.sizeHint(config.batchSize) + + private var completed = false + + // total number of documents requested from our publisher + private var requested: Long = 0L + + // total number of documents acknowledged at the elasticsearch cluster level but pending confirmation of index + private var sent: Long = 0L + + // total number of documents confirmed as successful + private var confirmed: Long = 0L + + // total number of documents that failed the retry attempts and are ignored + private var failed: Long = 0L + + // Create a scheduler if a flushInterval is provided. This scheduler will be used to force indexing, otherwise + // we can be stuck at batchSize-1 waiting for the nth message to arrive. + // + // It has been suggested we can use ReceiveTimeout here, but one reason we can't is because BulkResult messages, + // will cause the timeout period to be reset, but they shouldn't interfere with the flush interval. + private val flushIntervalScheduler: Option[Cancellable] = config.flushInterval.map { interval => + system.scheduler.scheduleWithFixedDelay(interval, interval, self, BulkActor.ForceIndexing) + } + + // If flushAfter is specified then after each message, a scheduler is created to force indexing if no documents + // are received within the given duration. + private var flushAfterScheduler: Option[Cancellable] = None + + private def resetFlushAfterScheduler(): Unit = { + flushAfterScheduler.foreach(_.cancel()) + flushAfterScheduler = config.flushAfter.map { interval => + system.scheduler.scheduleOnce(interval, self, BulkActor.ForceIndexing) + } + } + + // requests our initial starting batches, we can request them all at once, and then just request a new batch + // each time we complete a batch + override def preStart(): Unit = + self ! BulkActor.Request(config.batchSize * config.concurrentRequests) + + def receive: PartialFunction[Any, Unit] = { + case t: Throwable => handleError(t) + + case BulkActor.Completed => + // since we are completed at the publisher level, we should send all remaining documents because a complete + // batch cannot happen now + if (buffer.nonEmpty) + index() + completed = true + shutdownIfAllConfirmed() + + case BulkActor.Request(n) => + subscription.request(n) + requested = requested + n + + case msg: BulkActor.Send[T] => send(msg.req, msg.originals, msg.attempts) + + case BulkActor.ForceIndexing => + if (buffer.nonEmpty) + index() + + case msg: BulkActor.Result[T] => + confirmed = confirmed + msg.items.size + msg.items + .zip(msg.originals) + .foreach { + case (item, original) => + config.listener.onAck(item, original) + } + checkCompleteOrRequestNext(msg.items.size, msg.requestNext) + + case msg: BulkActor.FailedResult[T] => + failed = failed + msg.items.size + msg.items + .zip(msg.originals) + .foreach { + case (item, original) => + config.listener.onFailure(item, original) + } + checkCompleteOrRequestNext(msg.items.size, msg.requestNext) + + case t: T => + buffer.append(t) + if (buffer.size == config.batchSize) + index() + else + resetFlushAfterScheduler() + } + + // need to check if we're completed, because if we are then this might be the last pending conf + // and if it is, we can shutdown. + private def checkCompleteOrRequestNext(n: Int, requestNext: Boolean): Unit = + if (completed) shutdownIfAllConfirmed() + else if (requestNext) self ! BulkActor.Request(n) + + // Stops the schedulers if they exist and invoke final functions + override def postStop(): Unit = { + flushIntervalScheduler.map(_.cancel()) + flushAfterScheduler.map(_.cancel()) + if (failed == 0) + config.successFn() + config.completionFn() + } + + private def shutdownIfAllConfirmed(): Unit = + if (confirmed + failed == sent) + context.stop(self) + + private def send(req: BulkRequest, originals: Seq[T], attempts: Int): Unit = { + require(req.requests.size == originals.size, "Requests size does not match originals size") + + def filterByIndexes[S](sequence: Seq[S], indexes: Set[Int]) = + sequence.zipWithIndex + .filter { case (_, index) => indexes.contains(index) } + .map { case (seqItem, _) => seqItem } + + def getOriginalForResponse(response: BulkResponseItem) = originals(response.itemId) + + // returns just requests that failed as a new bulk definition (+ originals) + def getRetryDef(failures: Seq[BulkResponseItem]): (BulkRequest, Seq[T]) = { + val policy = if (config.refreshAfterOp) RefreshPolicy.Immediate else RefreshPolicy.NONE + + val failureIds = failures.map(_.itemId).toSet + val retryOriginals = filterByIndexes(originals, failureIds) + val failedReqs = filterByIndexes(req.requests, failureIds) + + (BulkRequest(failedReqs).refresh(policy), retryOriginals) + } + + val f = client.execute(req) + f.onComplete { + case Failure(e) => self ! e + case Success(failure: RequestFailure) => self ! new RuntimeException(failure.toString) + case Success(RequestSuccess(_, _, _, result)) => + if (result.hasSuccesses) + self ! BulkActor.Result( + result.successes, + result.successes.map(getOriginalForResponse), + requestNext = !result.errors) + if (result.errors) { + val failures = if (attempts > 0) { + val (retriable, nonRetriable) = result.failures.partition(failure => + config.retryFailure(failure, getOriginalForResponse(failure))) + // all retriable failures need to be resent, if retries left, but only after we wait for the failureWait + // period to avoid flooding the cluster + if (retriable.nonEmpty) { + val (retryDef, originals) = getRetryDef(retriable) + system.scheduler.scheduleOnce(config.failureWait, self, BulkActor.Send(retryDef, originals, attempts - 1)) + } + nonRetriable + } else result.failures + self ! BulkActor.FailedResult(failures, failures.map(getOriginalForResponse), requestNext = attempts == 0) + } + } + } + + private def handleError(t: Throwable): Unit = { + // if an error we will cancel the subscription as we cannot for sure handle further elements + // and the error may be from outside the subscriber + subscription.cancel() + config.errorFn(t) + buffer.clear() + context.stop(self) + } + + private def index(): Unit = { + + def bulkDef: BulkRequest = { + val defs = buffer.map(t => builder.request(t)).toSeq + val policy = if (config.refreshAfterOp) RefreshPolicy.Immediate else RefreshPolicy.NONE + BulkRequest(defs).refresh(policy) + } + + sent = sent + buffer.size + self ! BulkActor.Send(bulkDef, buffer.toList, config.maxAttempts) + + buffer.clear + + // buffer is now empty so no point keeping a scheduled flush after operation + flushAfterScheduler.foreach(_.cancel()) + flushAfterScheduler = None + } +} + +/** + * An implementation of this typeclass must provide a bulk compatible request for the given instance of T. + * The bulk compatible request will then be sent to elastic. + * + * A bulk compatible request can be either an index, update, or delete. + * + * @tparam T the type of elements this builder supports + */ +trait RequestBuilder[T] { + def request(t: T): BulkCompatibleRequest +} + +/** + * Notified on each acknowledgement + */ +trait ResponseListener[-T] { + def onAck(resp: BulkResponseItem, original: T): Unit + def onFailure(resp: BulkResponseItem, original: T): Unit = () +} + +object ResponseListener { + val noop: ResponseListener[Any] = new ResponseListener[Any] { + override def onAck(resp: BulkResponseItem, original: Any): Unit = () + } +} + +/** + * @param listener a listener which is notified on each acknowledge batch item + * @param batchSize the number of elements to group together per batch aside from the last batch + * @param concurrentRequests the number of concurrent batch operations + * @param refreshAfterOp if the index should be refreshed after each bulk operation + * @param completionFn a function which is invoked when all sent requests have been acknowledged and the publisher has completed + * Note: this function is executed regardless of whether there was an error or not, + * that is, this function is always invoked regardless of the state + * @param successFn a function will is only invoked when all operations have completed successfully + * @param errorFn a function which is invoked after there is an error + * @param failureWait the timeout before re-trying failed requests. Usually a failed request is elasticsearch's way of + * indicating backpressure, so this parameter determines how long to wait between requests. + * @param retryFailure a function which is invoked for every failure in a bulk request, and returns true if the + * failure should be retried or false if it should not. + * @param maxAttempts the max number of times to try a request. If it fails too many times it probably isn't back pressure + * but an error with the document. + * @param flushInterval used to schedule periodic bulk indexing. This can be set to avoid waiting for a complete batch + * for a long period of time. It also is used if the publisher will never complete. + * This ensures that all elements are indexed, even if the last batch size is lower than batch size. + * @param flushAfter used to schedule an index if no document has been received within the given duration. + * Once an index is performed (either by this flush value or because docs arrived in time) + * the flush after schedule is reset. + **/ +case class SubscriberConfig[T](batchSize: Int = 100, + concurrentRequests: Int = 5, + refreshAfterOp: Boolean = false, + listener: ResponseListener[T] = ResponseListener.noop, + completionFn: () => Unit = () => (), + successFn: () => Unit = () => (), + errorFn: Throwable => Unit = _ => (), + failureWait: FiniteDuration = 2.seconds, + retryFailure: (BulkResponseItem, T) => Boolean = (_: BulkResponseItem, _: T) => true, + maxAttempts: Int = 5, + flushInterval: Option[FiniteDuration] = None, + flushAfter: Option[FiniteDuration] = None) diff --git a/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ReactiveElastic.scala b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ReactiveElastic.scala new file mode 100644 index 000000000..d82b4c42a --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ReactiveElastic.scala @@ -0,0 +1,59 @@ +package com.sksamuel.elastic4s.akka.reactivestreams + +import akka.actor.ActorRefFactory +import com.sksamuel.elastic4s.requests.searches.SearchRequest +import com.sksamuel.elastic4s.{ElasticClient, IndexesAndTypes} + +import scala.concurrent.duration._ +import scala.language.implicitConversions + +object ReactiveElastic { + + implicit class ReactiveElastic(client: ElasticClient) { + + import com.sksamuel.elastic4s.ElasticDsl._ + + def subscriber[T](config: SubscriberConfig[T])(implicit builder: RequestBuilder[T], + actorRefFactory: ActorRefFactory): BulkIndexingSubscriber[T] = + new BulkIndexingSubscriber[T](client, builder, config) + + def subscriber[T]( + batchSize: Int = 100, + concurrentRequests: Int = 5, + refreshAfterOp: Boolean = false, + listener: ResponseListener[T] = ResponseListener.noop, + typedListener: ResponseListener[T] = ResponseListener.noop, + completionFn: () => Unit = () => (), + errorFn: Throwable => Unit = _ => (), + flushInterval: Option[FiniteDuration] = None, + flushAfter: Option[FiniteDuration] = None, + failureWait: FiniteDuration = 2.seconds, + maxAttempts: Int = 5 + )(implicit builder: RequestBuilder[T], actorRefFactory: ActorRefFactory): BulkIndexingSubscriber[T] = { + val config = SubscriberConfig( + batchSize = batchSize, + concurrentRequests = concurrentRequests, + refreshAfterOp = refreshAfterOp, + listener = listener, + completionFn = completionFn, + errorFn = errorFn, + failureWait = failureWait, + flushInterval = flushInterval, + flushAfter = flushAfter, + maxAttempts = maxAttempts + ) + subscriber(config) + } + + def publisher(indexesTypes: IndexesAndTypes, elements: Long = Long.MaxValue, keepAlive: String = "1m")( + implicit actorRefFactory: ActorRefFactory + ): ScrollPublisher = + publisher(search(indexesTypes.indexes).query("*:*").scroll(keepAlive), elements) + + def publisher(q: SearchRequest)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher = + publisher(q, Long.MaxValue) + + def publisher(q: SearchRequest, elements: Long)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher = + new ScrollPublisher(client, q, elements) + } +} diff --git a/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisher.scala b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisher.scala new file mode 100644 index 000000000..94b6f34c6 --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/main/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisher.scala @@ -0,0 +1,177 @@ +package com.sksamuel.elastic4s.akka.reactivestreams + +import akka.actor.{Actor, ActorRefFactory, PoisonPill, Props, Stash} +import com.sksamuel.elastic4s.requests.searches.{SearchHit, SearchRequest, SearchResponse} +import com.sksamuel.elastic4s.akka.reactivestreams.PublishActor.Ready +import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess} +import com.sksamuel.elastic4s.ext.OptionImplicits.RichOption +import org.reactivestreams.{Publisher, Subscriber, Subscription} +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.mutable +import scala.util.{Failure, Success} + +/** + * An implementation of the reactive API Publisher, that publishes documents using an elasticsearch + * scroll cursor. The initial query must be provided to the publisher, and there are helpers to create + * a query for all documents in an index (and type). + * + * @param client a client for the cluster + * @param search the initial search query to execute + * @param maxItems the maximum number of elements to return + * @param actorRefFactory an Actor reference factory required by the publisher + */ +class ScrollPublisher private[reactivestreams] (client: ElasticClient, search: SearchRequest, maxItems: Long)( + implicit actorRefFactory: ActorRefFactory +) extends Publisher[SearchHit] { + require(search.keepAlive.isDefined, "Search Definition must have a scroll to be used as Publisher") + + override def subscribe(s: Subscriber[_ >: SearchHit]): Unit = { + // Rule 1.9 subscriber cannot be null + if (s == null) throw new NullPointerException("Rule 1.9: Subscriber cannot be null") + val subscription = new ScrollSubscription(client, search, s, maxItems) + s.onSubscribe(subscription) + // rule 1.03 the subscription should not invoke any onNext's until the onSubscribe call has returned + // even tho the user might call request in the onSubscribe, we can't start sending the results yet. + // this ready method signals to the actor that its ok to start sending data. + subscription.ready() + } +} + +class ScrollSubscription(client: ElasticClient, query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)( + implicit actorRefFactory: ActorRefFactory +) extends Subscription { + + private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max))) + + private[reactivestreams] def ready(): Unit = + actor ! PublishActor.Ready + + override def cancel(): Unit = + // Rule 3.5: this call is idempotent, is fast, and thread safe + // Rule 3.7: after cancelling, further calls should be no-ops, which is handled by the actor + // we don't mind the subscriber having any pending requests before cancellation is processed + actor ! PoisonPill + + override def request(n: Long): Unit = { + // Rule 3.9 + if (n < 1) s.onError(new java.lang.IllegalArgumentException("Rule 3.9: Must request > 0 elements")) + // Rule 3.4 this method returns quickly as the search request is non-blocking + actor ! PublishActor.Request(n) + } +} + +object PublishActor { + object Ready + case class Request(n: Long) +} + +class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long) + extends Actor + with Stash { + + import com.sksamuel.elastic4s.ElasticDsl._ + import context.dispatcher + + protected val logger: Logger = LoggerFactory.getLogger(getClass.getName) + + private var scrollId: String = _ + private var processed: Long = 0L + private val queue: mutable.Queue[SearchHit] = mutable.Queue.empty + + // Parse the keep alive setting out of the original query. + private val keepAlive = query.keepAlive.map(_.toString).getOrElse("1m") + + // rule 1.03 the subscription should not send any results until the onSubscribe call has returned + // even tho the user might call request in the onSubscribe, we can't start sending the results yet. + // this ready method signals to the actor that its ok to start sending data. In the meantime we just stash requests. + override def receive: PartialFunction[Any, Unit] = { + case Ready => + context become ready + logger.info("Scroll publisher has become 'Ready'") + unstashAll() + case _ => + stash() + } + + private def send(k: Long): Unit = { + require(queue.size >= k) + for (_ <- 0L until k) + if (max == 0 || processed < max) { + s.onNext(queue.dequeue) + processed = processed + 1 + if (processed == max && max > 0) { + s.onComplete() + context.stop(self) + } + } + } + + // ready is the standard state, we can service requests and request more from upstream as well + private def ready: Actor.Receive = { + // if a request comes in for more than is currently available, + // we will send a request for more while sending what we can now + case PublishActor.Request(n) if n > queue.size => + val toRequest = n - queue.size + logger.debug( + s"Request for $n items, but only ${queue.size} available; sending ${queue.size} now, requesting $toRequest from upstream" + ) + Option(scrollId) match { + case None => client.execute(query).onComplete(result => self ! result) + case Some(id) => client.execute(searchScroll(id) keepAlive keepAlive).onComplete(result => self ! result) + } + // we switch state while we're waiting on elasticsearch, so we know not to send another request to ES + // because we are using a scroll and can only have one active request at at time. + context become fetching + logger.info("Scroll publisher has become 'Fetching'") + // queue up a new request to handle the remaining ones required when the ES response comes in + self ! PublishActor.Request(toRequest) + send(queue.size) + // in this case, we have enough available so just send 'em + case PublishActor.Request(n) => + logger.debug(s"Request for $n items; sending") + send(n) + } + + // fetching state is when we're waiting for a reply from es for a request we sent + private def fetching: Actor.Receive = { + // if we're in fetching mode, its because we ran out of results to send + // so any requests must be stashed until a fresh batch arrives + case PublishActor.Request(n) => + logger.debug(s"Request for $n items but we're already waiting on a response; stashing request") + require(queue.isEmpty) // must be empty or why did we not send it before switching to this mode? + stash() + // if the request to elastic failed we will terminate the subscription + case Failure(t) => + logger.warn("Elasticsearch returned a failure; will terminate the subscription", t) + s.onError(t) + context.stop(self) + case Success(resp: RequestFailure) => + logger.warn("Request errored; will terminate the subscription; {}", resp.error.toString) + s.onError(new RuntimeException(resp.error.toString)) + context.stop(self) + // handle when the es request times out + case Success(resp: RequestSuccess[SearchResponse]) if resp.result.isTimedOut => + logger.warn("Elasticsearch request timed out; will terminate the subscription") + s.onError(new RuntimeException("Request terminated early or timed out")) + context.stop(self) + // if we had no results from ES then we have nothing left to publish and our work here is done + case Success(resp: RequestSuccess[SearchResponse]) if resp.result.isEmpty => + logger.debug("Response from ES came back empty; this means no more items upstream so will complete subscription") + scrollId = resp.result.scrollId.getOrElse(scrollId) + s.onComplete() + logger.debug("Stopping publisher actor") + context.stop(self) + // more results and we can unleash the beast (stashed requests) and switch back to ready mode + case Success(resp: RequestSuccess[SearchResponse]) => + scrollId = resp.result.scrollId.getOrError("Response did not include a scroll id") + queue ++= resp.result.hits.hits + context become ready + unstashAll() + } + + override def postStop(): Unit = { + super.postStop() + client.execute(clearScroll(scrollId)) + } +} diff --git a/elastic4s-reactivestreams-akka/src/test/resources/log4j2.xml b/elastic4s-reactivestreams-akka/src/test/resources/log4j2.xml new file mode 100644 index 000000000..15a4d18c6 --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/test/resources/log4j2.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/BulkIndexingSubscriberIntegrationTest.scala b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/BulkIndexingSubscriberIntegrationTest.scala new file mode 100644 index 000000000..89dbd2317 --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/BulkIndexingSubscriberIntegrationTest.scala @@ -0,0 +1,76 @@ +package com.sksamuel.elastic4s.akka.reactivestreams + +import com.sksamuel.elastic4s.ElasticDsl +import com.sksamuel.elastic4s.requests.bulk.BulkCompatibleRequest +import org.reactivestreams.{Publisher, Subscriber, Subscription} + +import scala.util.Random + +object Ship { + + val ships = List( + Ship("clipper"), + Ship("anaconda"), + Ship("courier", Some("Fast ship that delivers")), + Ship("python"), + Ship("fer-de-lance"), + Ship("sidewinder"), + Ship("cobra"), + Ship("viper"), + Ship("eagle"), + Ship("vulture"), + Ship("dropship", Some("Drop it while its hot")), + Ship("orca"), + Ship("type6"), + Ship("type7"), + Ship("type9"), + Ship("hauler"), + Ship("adder"), + Ship("asp explorer"), + Ship("diamondback") + ) + +} + +class ShipRequestBuilder(indexName: String = "bulkindexsubint") extends RequestBuilder[Ship] { + + import ElasticDsl._ + import com.sksamuel.elastic4s.jackson.ElasticJackson.Implicits._ + + override def request(ship: Ship): BulkCompatibleRequest = { + indexInto(s"$indexName/ships") source ship + } +} + +object ShipPublisher extends Publisher[Ship] { + + override def subscribe(s: Subscriber[_ >: Ship]): Unit = { + var remaining = Ship.ships + s.onSubscribe(new Subscription { + override def cancel(): Unit = () + override def request(n: Long): Unit = { + remaining.take(n.toInt).foreach(t => s.onNext(t)) + remaining = remaining.drop(n.toInt) + if (remaining.isEmpty) + s.onComplete() + } + }) + } +} + + +object ShipEndlessPublisher extends Publisher[Ship] { + + override def subscribe(s: Subscriber[_ >: Ship]): Unit = { + var remaining = Ship.ships + s.onSubscribe(new Subscription { + override def cancel(): Unit = () + override def request(n: Long): Unit = { + remaining.take(n.toInt).foreach(t => s.onNext(t)) + remaining = remaining.drop(n.toInt) + } + }) + } +} + +case class Ship(name: String, description: Option[String] = None, size: Int = Random.nextInt(100)) diff --git a/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/BulkIndexingSubscriberWhiteboxTest.scala b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/BulkIndexingSubscriberWhiteboxTest.scala new file mode 100644 index 000000000..22619bc59 --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/BulkIndexingSubscriberWhiteboxTest.scala @@ -0,0 +1,90 @@ +package com.sksamuel.elastic4s.akka.reactivestreams + +import akka.actor.ActorSystem +import com.sksamuel.elastic4s.jackson.ElasticJackson +import com.sksamuel.elastic4s.requests.bulk.BulkCompatibleRequest +import com.sksamuel.elastic4s.testkit.DockerTests +import org.reactivestreams.tck.SubscriberWhiteboxVerification.{SubscriberPuppet, WhiteboxSubscriberProbe} +import org.reactivestreams.tck.{SubscriberWhiteboxVerification, TestEnvironment} +import org.reactivestreams.{Subscriber, Subscription} +import org.scalatestplus.testng.TestNGSuiteLike + +class BulkIndexingSubscriberWhiteboxTest + extends SubscriberWhiteboxVerification[Item](new TestEnvironment(DEFAULT_TIMEOUT_MILLIS)) + with TestNGSuiteLike with DockerTests { + + implicit val system: ActorSystem = ActorSystem() + + try { + client.execute { + createIndex("bulkindexwhitebox") + }.await + } catch { + case _: Exception => + } + + object ItemRequestBuilder extends RequestBuilder[Item] { + + import ElasticJackson.Implicits._ + + override def request(t: Item): BulkCompatibleRequest = indexInto("bulkindexwhitebox").doc(t) + } + + override def createSubscriber(probe: WhiteboxSubscriberProbe[Item]): Subscriber[Item] = { + new BulkIndexingSubscriber[Item](client, ItemRequestBuilder, SubscriberConfig()) { + + override def onSubscribe(s: Subscription): Unit = { + super.onSubscribe(s) + // register a successful Subscription, and create a Puppet, + // for the WhiteboxVerification to be able to drive its tests: + probe.registerOnSubscribe(new SubscriberPuppet() { + + def triggerRequest(elements: Long): Unit = { + s.request(elements) + } + + def signalCancel(): Unit = { + s.cancel() + } + }) + } + + override def onComplete(): Unit = { + super.onComplete() + probe.registerOnComplete() + } + + override def onError(t: Throwable): Unit = { + probe.registerOnError(t) + } + + override def onNext(t: Item): Unit = { + super.onNext(t) + probe.registerOnNext(t) + } + } + } + + override def createElement(element: Int): Item = castles(element) + + val castles = Array( + Item("bodium"), + Item("hever"), + Item("tower of london"), + Item("canarvon"), + Item("conwy"), + Item("beaumaris"), + Item("bolsover"), + Item("conningsbrough"), + Item("tintagel"), + Item("rochester"), + Item("dover"), + Item("hexham"), + Item("harleigh"), + Item("white"), + Item("radley"), + Item("berkeley") + ) +} + +case class Item(name: String) diff --git a/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisherIntegrationTest.scala b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisherIntegrationTest.scala new file mode 100644 index 000000000..b33cee940 --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisherIntegrationTest.scala @@ -0,0 +1,81 @@ +package com.sksamuel.elastic4s.akka.reactivestreams + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import akka.actor.ActorSystem +import com.sksamuel.elastic4s.requests.indexes.IndexRequest +import com.sksamuel.elastic4s.requests.searches.SearchHit +import com.sksamuel.elastic4s.testkit.DockerTests +import org.reactivestreams.{Subscriber, Subscription} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import scala.util.Try + +class ScrollPublisherIntegrationTest extends AnyWordSpec with DockerTests with Matchers { + + import ReactiveElastic._ + import com.sksamuel.elastic4s.jackson.ElasticJackson.Implicits._ + + private val indexName = getClass.getSimpleName.toLowerCase + + private implicit val system: ActorSystem = ActorSystem() + + val emperors = Array( + Item("Augustus"), + Item("Tiberius"), + Item("Caligua"), + Item("Claudius"), + Item("Nero"), + Item("Galba"), + Item("Otho"), + Item("Vitellius"), + Item("Vespasian"), + Item("Titus"), + Item("Domitian"), + Item("Nerva"), + Item("Trajan"), + Item("Hadrian"), + Item("Antoninus Pius"), + Item("Marcus Aurelius"), + Item("Commodus"), + Item("Pertinax"), + Item("Diocletion") + ) + + implicit object RichSearchHitRequestBuilder extends RequestBuilder[SearchHit] { + override def request(hit: SearchHit): IndexRequest = { + indexInto(indexName).doc(hit.sourceAsString) + } + } + + Try { + client.execute { + createIndex(indexName) + }.await + } + + client.execute { + bulk(emperors.map(indexInto(indexName).source(_))).refreshImmediately + }.await + + "elastic-streams" should { + "publish all data from the index" in { + + val publisher = client.publisher(search(indexName) query "*:*" scroll "1m") + + val completionLatch = new CountDownLatch(1) + val documentLatch = new CountDownLatch(emperors.length) + + publisher.subscribe(new Subscriber[SearchHit] { + override def onComplete(): Unit = completionLatch.countDown() + override def onError(t: Throwable): Unit = fail(t) + override def onSubscribe(s: Subscription): Unit = s.request(1000) + override def onNext(t: SearchHit): Unit = documentLatch.countDown() + }) + + completionLatch.await(10, TimeUnit.SECONDS) shouldBe true + documentLatch.await(10, TimeUnit.SECONDS) shouldBe true + } + } +} diff --git a/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisherUnitTest.scala b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisherUnitTest.scala new file mode 100644 index 000000000..534c4f3b7 --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisherUnitTest.scala @@ -0,0 +1,20 @@ +package com.sksamuel.elastic4s.akka.reactivestreams + +import akka.actor.ActorSystem +import com.sksamuel.elastic4s.testkit.DockerTests +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class ScrollPublisherUnitTest extends AnyWordSpec with Matchers with DockerTests { + + import ReactiveElastic._ + + implicit val system: ActorSystem = ActorSystem() + + "elastic-streams" should { + "throw exception if search definition has no scroll" in { + an [IllegalArgumentException] should be thrownBy + client.publisher(search("scrollpubint") query "*:*") + } + } +} diff --git a/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisherVerificationTest.scala b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisherVerificationTest.scala new file mode 100644 index 000000000..ee378f93c --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/ScrollPublisherVerificationTest.scala @@ -0,0 +1,64 @@ +package com.sksamuel.elastic4s.akka.reactivestreams + +import akka.actor.ActorSystem +import com.sksamuel.elastic4s.jackson.ElasticJackson +import com.sksamuel.elastic4s.requests.searches.SearchHit +import com.sksamuel.elastic4s.testkit.DockerTests +import org.reactivestreams.Publisher +import org.reactivestreams.tck.{PublisherVerification, TestEnvironment} +import org.scalatestplus.testng.TestNGSuiteLike + +import scala.util.Try + +class ScrollPublisherVerificationTest + extends PublisherVerification[SearchHit]( + new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), + PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS + ) with TestNGSuiteLike with DockerTests { + + import ElasticJackson.Implicits._ + + implicit val system: ActorSystem = ActorSystem() + + Try { + client.execute { + deleteIndex("scrollpubver") + }.await + } + + Try { + client.execute { + createIndex("scrollpubver") + }.await + } + + client.execute { + bulk( + indexInto("scrollpubver") source Empire("Parthian", "Persia", "Ctesiphon"), + indexInto("scrollpubver") source Empire("Ptolemaic", "Egypt", "Alexandria"), + indexInto("scrollpubver") source Empire("British", "Worldwide", "London"), + indexInto("scrollpubver") source Empire("Achaemenid", "Persia", "Babylon"), + indexInto("scrollpubver") source Empire("Sasanian", "Persia", "Ctesiphon"), + indexInto("scrollpubver") source Empire("Mongol", "East Asia", "Avarga"), + indexInto("scrollpubver") source Empire("Roman", "Mediterranean", "Rome"), + indexInto("scrollpubver") source Empire("Sumerian", "Mesopotamia", "Uruk"), + indexInto("scrollpubver") source Empire("Klingon", "Space", "Kronos"), + indexInto("scrollpubver") source Empire("Romulan", "Space", "Romulus"), + indexInto("scrollpubver") source Empire("Cardassian", "Space", "Cardassia Prime"), + indexInto("scrollpubver") source Empire("Egyptian", "Egypt", "Memphis"), + indexInto("scrollpubver") source Empire("Babylonian", "Levant", "Babylon") + ).refreshImmediately + }.await + + private val query = search("scrollpubver").matchAllQuery().scroll("1m").limit(2) + + override def boundedDepthOfOnNextAndRequestRecursion: Long = 2L + + override def createFailedPublisher(): Publisher[SearchHit] = null + + override def createPublisher(elements: Long): Publisher[SearchHit] = { + new ScrollPublisher(client, query, elements) + } +} + +case class Empire(name: String, location: String, capital: String) diff --git a/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/SubscriberListenerTest.scala b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/SubscriberListenerTest.scala new file mode 100644 index 000000000..1fd1de62c --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/SubscriberListenerTest.scala @@ -0,0 +1,40 @@ +package com.sksamuel.elastic4s.akka.reactivestreams + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import akka.actor.ActorSystem +import com.sksamuel.elastic4s.requests.bulk.BulkResponseItem +import com.sksamuel.elastic4s.testkit.DockerTests +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import scala.util.Try + +class SubscriberListenerTest extends AnyWordSpec with Matchers with DockerTests { + + import ReactiveElastic._ + + implicit val system: ActorSystem = ActorSystem() + implicit val builder: ShipRequestBuilder = new ShipRequestBuilder() + + Try { + client.execute { + createIndex("subscriberlistenertest") + }.await + } + + "Reactive streams subscriber" should { + "invoke listener for each confirmed doc" ignore { + + val latch = new CountDownLatch(Ship.ships.length) + + val config = SubscriberConfig(listener = new ResponseListener[Ship] { + def onAck(resp: BulkResponseItem, original: Ship): Unit = latch.countDown() + }) + val subscriber = client.subscriber[Ship](config) + ShipPublisher.subscribe(subscriber) + + latch.await(1, TimeUnit.MINUTES) shouldBe true + } + } +} diff --git a/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/package.scala b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/package.scala new file mode 100644 index 000000000..b4f5dd373 --- /dev/null +++ b/elastic4s-reactivestreams-akka/src/test/scala/com/sksamuel/elastic4s/akka/reactivestreams/package.scala @@ -0,0 +1,6 @@ +package com.sksamuel.elastic4s.akka + +package object reactivestreams { + val DEFAULT_TIMEOUT_MILLIS = 2000L + val PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 2000L +}