From 17e1718826df7978f4f5092f04795d0e15ace2c9 Mon Sep 17 00:00:00 2001 From: Philippus Baalman Date: Thu, 28 Nov 2024 20:17:22 +0100 Subject: [PATCH] Rename elastic4s-pekko-http-streams to elastic4s-reactivestreams-pekko (#3224) * Rename elastic4s-pekko-http-streams to elastic4s-reactivestreams-pekko * Remove deprecated method * Remove commented out test * Convert long literals to uppercase --- build.sbt | 6 +- ...ulkIndexingSubscriberIntegrationTest.scala | 214 ------------------ .../pekko/http/streams/package.scala | 6 - .../BulkIndexingSubscriber.scala | 12 +- .../reactivestreams}/ReactiveElastic.scala | 8 +- .../reactivestreams}/ScrollPublisher.scala | 12 +- .../src/test/resources/log4j2.xml | 0 ...ulkIndexingSubscriberIntegrationTest.scala | 76 +++++++ .../BulkIndexingSubscriberWhiteboxTest.scala | 2 +- .../ScrollPublisherIntegrationTest.scala | 3 +- .../ScrollPublisherUnitTest.scala | 2 +- .../ScrollPublisherVerificationTest.scala | 4 +- .../SubscriberListenerTest.scala | 2 +- .../pekko/reactivestreams/package.scala | 6 + 14 files changed, 104 insertions(+), 249 deletions(-) delete mode 100644 elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriberIntegrationTest.scala delete mode 100644 elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/package.scala rename {elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams => elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams}/BulkIndexingSubscriber.scala (98%) rename {elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams => elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams}/ReactiveElastic.scala (84%) rename {elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams => elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams}/ScrollPublisher.scala (95%) rename {elastic4s-pekko-http-streams => elastic4s-reactivestreams-pekko}/src/test/resources/log4j2.xml (100%) create mode 100644 elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriberIntegrationTest.scala rename {elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams => elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams}/BulkIndexingSubscriberWhiteboxTest.scala (97%) rename {elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams => elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams}/ScrollPublisherIntegrationTest.scala (97%) rename {elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams => elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams}/ScrollPublisherUnitTest.scala (91%) rename {elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams => elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams}/ScrollPublisherVerificationTest.scala (97%) rename {elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams => elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams}/SubscriberListenerTest.scala (95%) create mode 100644 elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/package.scala diff --git a/build.sbt b/build.sbt index 75d1ef848..817794f89 100644 --- a/build.sbt +++ b/build.sbt @@ -118,7 +118,7 @@ lazy val scala3Projects: Seq[ProjectReference] = Seq( httpstreams, akkastreams, pekkostreams, - pekkohttpstreams + reactivestreamspekko ) lazy val scala3_root = Project("elastic4s-scala3", file("scala3")) .settings(name := "elastic4s") @@ -242,9 +242,9 @@ lazy val httpstreams = (project in file("elastic4s-http-streams")) ) ) -lazy val pekkohttpstreams = (project in file("elastic4s-pekko-http-streams")) +lazy val reactivestreamspekko = (project in file("elastic4s-reactivestreams-pekko")) .dependsOn(core, testkit % "test", jackson % "test") - .settings(name := "elastic4s-pekko-http-streams") + .settings(name := "elastic4s-reactivestreams-pekko") .settings(scala3Settings) .settings(libraryDependencies ++= Seq( diff --git a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriberIntegrationTest.scala b/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriberIntegrationTest.scala deleted file mode 100644 index 4be8dce5a..000000000 --- a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriberIntegrationTest.scala +++ /dev/null @@ -1,214 +0,0 @@ -package com.sksamuel.elastic4s.pekko.http.streams - -import com.sksamuel.elastic4s.ElasticDsl -import com.sksamuel.elastic4s.requests.bulk.BulkCompatibleRequest -import org.reactivestreams.{Publisher, Subscriber, Subscription} - -import scala.util.Random - -//class BulkIndexingSubscriberIntegrationTest extends WordSpec with DockerTests with Matchers with BeforeAndAfter { -// -// import ReactiveElastic._ -// -// import scala.concurrent.duration._ -// -// implicit val system: ActorSystem = ActorSystem() -// -// val indexName = "bulkindexsubint" -// val strictIndex = "bulkindexfail" -// -// def deleteIndx(name: String): Unit = Try { -// http.execute { -// ElasticDsl.deleteIndex(name) -// }.await -// } -// -// after { -// deleteIndx(indexName) -// deleteIndx(strictIndex) -// } -// -// def blockUntilCount(expected: Long, index: String): Unit = { -// blockUntil(s"Expected count of $expected") { () => -// val result = http.execute { -// search(index).matchAllQuery().size(0) -// }.await.right.get -// expected <= result.result.totalHits -// } -// } -// -// @deprecated -// def blockUntilCount(expected: Long, indexAndTypes: IndexAndTypes): Unit = { -// blockUntil(s"Expected count of $expected") { () => -// val result = http.execute { -// search(indexAndTypes).matchAllQuery().size(0) -// }.await.right.get -// expected <= result.result.totalHits -// } -// } -// -// def blockUntil(explain: String)(predicate: () => Boolean): Unit = { -// -// var backoff = 0 -// var done = false -// -// while (backoff <= 16 && !done) { -// if (backoff > 0) Thread.sleep(200 * backoff) -// backoff = backoff + 1 -// try { -// done = predicate() -// } catch { -// case e: Throwable => -// logger.warn("problem while testing predicate", e) -// } -// } -// -// require(done, s"Failed waiting on: $explain") -// } -// -// def ensureIndexExists(index: String): Unit = { -// Try { -// http.execute { -// createIndex(index) -// }.await -// } -// } -// -// Try { -// http.execute { -// deleteIndex(indexName) -// }.await -// } -// -// Try { -// http.execute { -// createIndex(indexName) -// }.await -// } -// -// "elastic-streams" should { -// "index all received data" in { -// ensureIndexExists(indexName) -// implicit val builder: ShipRequestBuilder = new ShipRequestBuilder(indexName) -// -// val completionLatch = new CountDownLatch(1) -// val subscriber = http.subscriber[Ship](10, 2, completionFn = () => completionLatch.countDown()) -// ShipPublisher.subscribe(subscriber) -// completionLatch.await(5, TimeUnit.SECONDS) -// -// blockUntilCount(Ship.ships.length, indexName) -// } -// -// "index all received data even if the subscriber never completes" in { -// ensureIndexExists(indexName) -// implicit val builder: ShipRequestBuilder = new ShipRequestBuilder(indexName) -// -// // The short interval is just for the sake of test execution time, it's not a recommendation -// val subscriber = http.subscriber[Ship](8, 2, flushInterval = Some(500.millis)) -// ShipEndlessPublisher.subscribe(subscriber) -// -// blockUntilCount(Ship.ships.length, indexName) -// } -// -// "index all received data and ignore failures" in { -// -// http.execute { -// createIndex(strictIndex).mappings( -// mapping("ships").fields( -// textField("name"), -// intField("description"), -// intField("size") -// ) dynamic Strict -// ) -// }.await -// -// implicit val builder: ShipRequestBuilder = new ShipRequestBuilder(strictIndex) -// -// val errorsExpected = 2 -// -// val completionLatch = new CountDownLatch(1) -// val ackLatch = new CountDownLatch(Ship.ships.length - errorsExpected) -// val errorLatch = new CountDownLatch(errorsExpected) -// val subscriber = http.subscriber[Ship](10, 2, listener = new ResponseListener[Ship] { -// override def onAck(resp: BulkResponseItem, ship: Ship): Unit = ackLatch.countDown() -// override def onFailure(resp: BulkResponseItem, ship: Ship): Unit = errorLatch.countDown() -// }, completionFn = () => completionLatch.countDown(), maxAttempts = 2, failureWait = 100.millis) -// ShipPublisher.subscribe(subscriber) -// completionLatch.await(5, TimeUnit.SECONDS) -// -// ackLatch.getCount should be(0) -// errorLatch.getCount should be(0) -// -// blockUntilCount(Ship.ships.length - errorsExpected, strictIndex) -// } -// } -//} -// -object Ship { - - val ships = List( - Ship("clipper"), - Ship("anaconda"), - Ship("courier", Some("Fast ship that delivers")), - Ship("python"), - Ship("fer-de-lance"), - Ship("sidewinder"), - Ship("cobra"), - Ship("viper"), - Ship("eagle"), - Ship("vulture"), - Ship("dropship", Some("Drop it while its hot")), - Ship("orca"), - Ship("type6"), - Ship("type7"), - Ship("type9"), - Ship("hauler"), - Ship("adder"), - Ship("asp explorer"), - Ship("diamondback") - ) - -} - -class ShipRequestBuilder(indexName: String = "bulkindexsubint") extends RequestBuilder[Ship] { - - import ElasticDsl._ - import com.sksamuel.elastic4s.jackson.ElasticJackson.Implicits._ - - override def request(ship: Ship): BulkCompatibleRequest = { - indexInto(s"$indexName/ships") source ship - } -} - -object ShipPublisher extends Publisher[Ship] { - - override def subscribe(s: Subscriber[_ >: Ship]): Unit = { - var remaining = Ship.ships - s.onSubscribe(new Subscription { - override def cancel(): Unit = () - override def request(n: Long): Unit = { - remaining.take(n.toInt).foreach(t => s.onNext(t)) - remaining = remaining.drop(n.toInt) - if (remaining.isEmpty) - s.onComplete() - } - }) - } -} - - -object ShipEndlessPublisher extends Publisher[Ship] { - - override def subscribe(s: Subscriber[_ >: Ship]): Unit = { - var remaining = Ship.ships - s.onSubscribe(new Subscription { - override def cancel(): Unit = () - override def request(n: Long): Unit = { - remaining.take(n.toInt).foreach(t => s.onNext(t)) - remaining = remaining.drop(n.toInt) - } - }) - } -} - -case class Ship(name: String, description: Option[String] = None, size: Int = Random.nextInt(100)) diff --git a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/package.scala b/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/package.scala deleted file mode 100644 index 633083075..000000000 --- a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/package.scala +++ /dev/null @@ -1,6 +0,0 @@ -package com.sksamuel.elastic4s.pekko.http - -package object streams { - val DEFAULT_TIMEOUT_MILLIS = 2000l - val PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 2000L -} diff --git a/elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriber.scala b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriber.scala similarity index 98% rename from elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriber.scala rename to elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriber.scala index c0a1ff705..1d4986f26 100644 --- a/elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriber.scala +++ b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriber.scala @@ -1,4 +1,4 @@ -package com.sksamuel.elastic4s.pekko.http.streams +package com.sksamuel.elastic4s.pekko.reactivestreams import org.apache.pekko.actor._ import com.sksamuel.elastic4s.requests.bulk.{BulkCompatibleRequest, BulkRequest, BulkResponseItem} @@ -23,7 +23,7 @@ import scala.util.{Failure, Success} * @param builder used to turn elements of T into IndexDefinitions so they can be used in the bulk indexer * @tparam T the type of element provided by the publisher this subscriber will subscribe with */ -class BulkIndexingSubscriber[T] private[streams] ( +class BulkIndexingSubscriber[T] private[reactivestreams] ( client: ElasticClient, builder: RequestBuilder[T], config: SubscriberConfig[T] @@ -93,16 +93,16 @@ class BulkActor[T](client: ElasticClient, private var completed = false // total number of documents requested from our publisher - private var requested: Long = 0l + private var requested: Long = 0L // total number of documents acknowledged at the elasticsearch cluster level but pending confirmation of index - private var sent: Long = 0l + private var sent: Long = 0L // total number of documents confirmed as successful - private var confirmed: Long = 0l + private var confirmed: Long = 0L // total number of documents that failed the retry attempts and are ignored - private var failed: Long = 0l + private var failed: Long = 0L // Create a scheduler if a flushInterval is provided. This scheduler will be used to force indexing, otherwise // we can be stuck at batchSize-1 waiting for the nth message to arrive. diff --git a/elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams/ReactiveElastic.scala b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ReactiveElastic.scala similarity index 84% rename from elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams/ReactiveElastic.scala rename to elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ReactiveElastic.scala index eebc824bd..3b4ac04d5 100644 --- a/elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams/ReactiveElastic.scala +++ b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ReactiveElastic.scala @@ -1,4 +1,4 @@ -package com.sksamuel.elastic4s.pekko.http.streams +package com.sksamuel.elastic4s.pekko.reactivestreams import org.apache.pekko.actor.ActorRefFactory @@ -50,12 +50,6 @@ object ReactiveElastic { ): ScrollPublisher = publisher(search(indexes).query("*:*").scroll(keepAlive), elements) - @deprecated("Use publisher that takes an Indexes parameter instead", "8.15.4") - def publisher(indexesTypes: IndexesAndTypes, elements: Long = Long.MaxValue, keepAlive: String = "1m")( - implicit actorRefFactory: ActorRefFactory - ): ScrollPublisher = - publisher(search(indexesTypes.indexes).query("*:*").scroll(keepAlive), elements) - def publisher(q: SearchRequest)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher = publisher(q, Long.MaxValue) diff --git a/elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisher.scala b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisher.scala similarity index 95% rename from elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisher.scala rename to elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisher.scala index 429b4bab4..0cce5fa7e 100644 --- a/elastic4s-pekko-http-streams/src/main/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisher.scala +++ b/elastic4s-reactivestreams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisher.scala @@ -1,10 +1,10 @@ -package com.sksamuel.elastic4s.pekko.http.streams +package com.sksamuel.elastic4s.pekko.reactivestreams import org.apache.pekko.actor.{Actor, ActorRefFactory, PoisonPill, Props, Stash} import com.sksamuel.elastic4s.requests.searches.{SearchHit, SearchRequest, SearchResponse} import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess} import com.sksamuel.elastic4s.ext.OptionImplicits.RichOption -import com.sksamuel.elastic4s.pekko.http.streams.PublishActor.Ready +import com.sksamuel.elastic4s.pekko.reactivestreams.PublishActor.Ready import org.reactivestreams.{Publisher, Subscriber, Subscription} import org.slf4j.{Logger, LoggerFactory} @@ -21,7 +21,7 @@ import scala.util.{Failure, Success} * @param maxItems the maximum number of elements to return * @param actorRefFactory an Actor reference factory required by the publisher */ -class ScrollPublisher private[streams] (client: ElasticClient, search: SearchRequest, maxItems: Long)( +class ScrollPublisher private[reactivestreams] (client: ElasticClient, search: SearchRequest, maxItems: Long)( implicit actorRefFactory: ActorRefFactory ) extends Publisher[SearchHit] { require(search.keepAlive.isDefined, "Search Definition must have a scroll to be used as Publisher") @@ -44,7 +44,7 @@ class ScrollSubscription(client: ElasticClient, query: SearchRequest, s: Subscri private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max))) - private[streams] def ready(): Unit = + private[reactivestreams] def ready(): Unit = actor ! PublishActor.Ready override def cancel(): Unit = @@ -76,7 +76,7 @@ class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_ protected val logger: Logger = LoggerFactory.getLogger(getClass.getName) private var scrollId: String = _ - private var processed: Long = 0 + private var processed: Long = 0L private val queue: mutable.Queue[SearchHit] = mutable.Queue.empty // Parse the keep alive setting out of the original query. @@ -96,7 +96,7 @@ class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_ private def send(k: Long): Unit = { require(queue.size >= k) - for (_ <- 0l until k) + for (_ <- 0L until k) if (max == 0 || processed < max) { s.onNext(queue.dequeue) processed = processed + 1 diff --git a/elastic4s-pekko-http-streams/src/test/resources/log4j2.xml b/elastic4s-reactivestreams-pekko/src/test/resources/log4j2.xml similarity index 100% rename from elastic4s-pekko-http-streams/src/test/resources/log4j2.xml rename to elastic4s-reactivestreams-pekko/src/test/resources/log4j2.xml diff --git a/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriberIntegrationTest.scala b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriberIntegrationTest.scala new file mode 100644 index 000000000..436aab677 --- /dev/null +++ b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriberIntegrationTest.scala @@ -0,0 +1,76 @@ +package com.sksamuel.elastic4s.pekko.reactivestreams + +import com.sksamuel.elastic4s.ElasticDsl +import com.sksamuel.elastic4s.requests.bulk.BulkCompatibleRequest +import org.reactivestreams.{Publisher, Subscriber, Subscription} + +import scala.util.Random + +object Ship { + + val ships = List( + Ship("clipper"), + Ship("anaconda"), + Ship("courier", Some("Fast ship that delivers")), + Ship("python"), + Ship("fer-de-lance"), + Ship("sidewinder"), + Ship("cobra"), + Ship("viper"), + Ship("eagle"), + Ship("vulture"), + Ship("dropship", Some("Drop it while its hot")), + Ship("orca"), + Ship("type6"), + Ship("type7"), + Ship("type9"), + Ship("hauler"), + Ship("adder"), + Ship("asp explorer"), + Ship("diamondback") + ) + +} + +class ShipRequestBuilder(indexName: String = "bulkindexsubint") extends RequestBuilder[Ship] { + + import ElasticDsl._ + import com.sksamuel.elastic4s.jackson.ElasticJackson.Implicits._ + + override def request(ship: Ship): BulkCompatibleRequest = { + indexInto(s"$indexName/ships") source ship + } +} + +object ShipPublisher extends Publisher[Ship] { + + override def subscribe(s: Subscriber[_ >: Ship]): Unit = { + var remaining = Ship.ships + s.onSubscribe(new Subscription { + override def cancel(): Unit = () + override def request(n: Long): Unit = { + remaining.take(n.toInt).foreach(t => s.onNext(t)) + remaining = remaining.drop(n.toInt) + if (remaining.isEmpty) + s.onComplete() + } + }) + } +} + + +object ShipEndlessPublisher extends Publisher[Ship] { + + override def subscribe(s: Subscriber[_ >: Ship]): Unit = { + var remaining = Ship.ships + s.onSubscribe(new Subscription { + override def cancel(): Unit = () + override def request(n: Long): Unit = { + remaining.take(n.toInt).foreach(t => s.onNext(t)) + remaining = remaining.drop(n.toInt) + } + }) + } +} + +case class Ship(name: String, description: Option[String] = None, size: Int = Random.nextInt(100)) diff --git a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriberWhiteboxTest.scala b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriberWhiteboxTest.scala similarity index 97% rename from elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriberWhiteboxTest.scala rename to elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriberWhiteboxTest.scala index 45251397e..c9525bbab 100644 --- a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/BulkIndexingSubscriberWhiteboxTest.scala +++ b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/BulkIndexingSubscriberWhiteboxTest.scala @@ -1,4 +1,4 @@ -package com.sksamuel.elastic4s.pekko.http.streams +package com.sksamuel.elastic4s.pekko.reactivestreams import org.apache.pekko.actor.ActorSystem import com.sksamuel.elastic4s.jackson.ElasticJackson diff --git a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisherIntegrationTest.scala b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisherIntegrationTest.scala similarity index 97% rename from elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisherIntegrationTest.scala rename to elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisherIntegrationTest.scala index 167ec624a..d30098901 100644 --- a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisherIntegrationTest.scala +++ b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisherIntegrationTest.scala @@ -1,7 +1,6 @@ -package com.sksamuel.elastic4s.pekko.http.streams +package com.sksamuel.elastic4s.pekko.reactivestreams import java.util.concurrent.{CountDownLatch, TimeUnit} - import org.apache.pekko.actor.ActorSystem import com.sksamuel.elastic4s.requests.indexes.IndexRequest import com.sksamuel.elastic4s.requests.searches.SearchHit diff --git a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisherUnitTest.scala b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisherUnitTest.scala similarity index 91% rename from elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisherUnitTest.scala rename to elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisherUnitTest.scala index 9fbcef2c1..ba6c5ce7e 100644 --- a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisherUnitTest.scala +++ b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisherUnitTest.scala @@ -1,4 +1,4 @@ -package com.sksamuel.elastic4s.pekko.http.streams +package com.sksamuel.elastic4s.pekko.reactivestreams import org.apache.pekko.actor.ActorSystem import com.sksamuel.elastic4s.testkit.DockerTests diff --git a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisherVerificationTest.scala b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisherVerificationTest.scala similarity index 97% rename from elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisherVerificationTest.scala rename to elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisherVerificationTest.scala index 0c19f0a80..f78f09054 100644 --- a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/ScrollPublisherVerificationTest.scala +++ b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/ScrollPublisherVerificationTest.scala @@ -1,4 +1,4 @@ -package com.sksamuel.elastic4s.pekko.http.streams +package com.sksamuel.elastic4s.pekko.reactivestreams import org.apache.pekko.actor.ActorSystem import com.sksamuel.elastic4s.jackson.ElasticJackson @@ -52,7 +52,7 @@ class ScrollPublisherVerificationTest private val query = search("scrollpubver").matchAllQuery().scroll("1m").limit(2) - override def boundedDepthOfOnNextAndRequestRecursion: Long = 2l + override def boundedDepthOfOnNextAndRequestRecursion: Long = 2L override def createFailedPublisher(): Publisher[SearchHit] = null diff --git a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/SubscriberListenerTest.scala b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/SubscriberListenerTest.scala similarity index 95% rename from elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/SubscriberListenerTest.scala rename to elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/SubscriberListenerTest.scala index 9ca556670..f4b97592c 100644 --- a/elastic4s-pekko-http-streams/src/test/scala/com/sksamuel/elastic4s/pekko/http/streams/SubscriberListenerTest.scala +++ b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/SubscriberListenerTest.scala @@ -1,4 +1,4 @@ -package com.sksamuel.elastic4s.pekko.http.streams +package com.sksamuel.elastic4s.pekko.reactivestreams import java.util.concurrent.{CountDownLatch, TimeUnit} diff --git a/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/package.scala b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/package.scala new file mode 100644 index 000000000..7fced2223 --- /dev/null +++ b/elastic4s-reactivestreams-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/reactivestreams/package.scala @@ -0,0 +1,6 @@ +package com.sksamuel.elastic4s.pekko + +package object reactivestreams { + val DEFAULT_TIMEOUT_MILLIS = 2000L + val PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 2000L +}