Skip to content

Commit

Permalink
Rename elastic4s-pekko-http-streams to elastic4s-reactivestreams-pekko (
Browse files Browse the repository at this point in the history
#3224)

* Rename elastic4s-pekko-http-streams to elastic4s-reactivestreams-pekko

* Remove deprecated method

* Remove commented out test

* Convert long literals to uppercase
  • Loading branch information
Philippus authored Nov 28, 2024
1 parent 11ec550 commit 17e1718
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 249 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ lazy val scala3Projects: Seq[ProjectReference] = Seq(
httpstreams,
akkastreams,
pekkostreams,
pekkohttpstreams
reactivestreamspekko
)
lazy val scala3_root = Project("elastic4s-scala3", file("scala3"))
.settings(name := "elastic4s")
Expand Down Expand Up @@ -242,9 +242,9 @@ lazy val httpstreams = (project in file("elastic4s-http-streams"))
)
)

lazy val pekkohttpstreams = (project in file("elastic4s-pekko-http-streams"))
lazy val reactivestreamspekko = (project in file("elastic4s-reactivestreams-pekko"))
.dependsOn(core, testkit % "test", jackson % "test")
.settings(name := "elastic4s-pekko-http-streams")
.settings(name := "elastic4s-reactivestreams-pekko")
.settings(scala3Settings)
.settings(libraryDependencies ++=
Seq(
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.sksamuel.elastic4s.pekko.http.streams
package com.sksamuel.elastic4s.pekko.reactivestreams

import org.apache.pekko.actor._
import com.sksamuel.elastic4s.requests.bulk.{BulkCompatibleRequest, BulkRequest, BulkResponseItem}
Expand All @@ -23,7 +23,7 @@ import scala.util.{Failure, Success}
* @param builder used to turn elements of T into IndexDefinitions so they can be used in the bulk indexer
* @tparam T the type of element provided by the publisher this subscriber will subscribe with
*/
class BulkIndexingSubscriber[T] private[streams] (
class BulkIndexingSubscriber[T] private[reactivestreams] (
client: ElasticClient,
builder: RequestBuilder[T],
config: SubscriberConfig[T]
Expand Down Expand Up @@ -93,16 +93,16 @@ class BulkActor[T](client: ElasticClient,
private var completed = false

// total number of documents requested from our publisher
private var requested: Long = 0l
private var requested: Long = 0L

// total number of documents acknowledged at the elasticsearch cluster level but pending confirmation of index
private var sent: Long = 0l
private var sent: Long = 0L

// total number of documents confirmed as successful
private var confirmed: Long = 0l
private var confirmed: Long = 0L

// total number of documents that failed the retry attempts and are ignored
private var failed: Long = 0l
private var failed: Long = 0L

// Create a scheduler if a flushInterval is provided. This scheduler will be used to force indexing, otherwise
// we can be stuck at batchSize-1 waiting for the nth message to arrive.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.sksamuel.elastic4s.pekko.http.streams
package com.sksamuel.elastic4s.pekko.reactivestreams

import org.apache.pekko.actor.ActorRefFactory

Expand Down Expand Up @@ -50,12 +50,6 @@ object ReactiveElastic {
): ScrollPublisher =
publisher(search(indexes).query("*:*").scroll(keepAlive), elements)

@deprecated("Use publisher that takes an Indexes parameter instead", "8.15.4")
def publisher(indexesTypes: IndexesAndTypes, elements: Long = Long.MaxValue, keepAlive: String = "1m")(
implicit actorRefFactory: ActorRefFactory
): ScrollPublisher =
publisher(search(indexesTypes.indexes).query("*:*").scroll(keepAlive), elements)

def publisher(q: SearchRequest)(implicit actorRefFactory: ActorRefFactory): ScrollPublisher =
publisher(q, Long.MaxValue)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.sksamuel.elastic4s.pekko.http.streams
package com.sksamuel.elastic4s.pekko.reactivestreams

import org.apache.pekko.actor.{Actor, ActorRefFactory, PoisonPill, Props, Stash}
import com.sksamuel.elastic4s.requests.searches.{SearchHit, SearchRequest, SearchResponse}
import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess}
import com.sksamuel.elastic4s.ext.OptionImplicits.RichOption
import com.sksamuel.elastic4s.pekko.http.streams.PublishActor.Ready
import com.sksamuel.elastic4s.pekko.reactivestreams.PublishActor.Ready
import org.reactivestreams.{Publisher, Subscriber, Subscription}
import org.slf4j.{Logger, LoggerFactory}

Expand All @@ -21,7 +21,7 @@ import scala.util.{Failure, Success}
* @param maxItems the maximum number of elements to return
* @param actorRefFactory an Actor reference factory required by the publisher
*/
class ScrollPublisher private[streams] (client: ElasticClient, search: SearchRequest, maxItems: Long)(
class ScrollPublisher private[reactivestreams] (client: ElasticClient, search: SearchRequest, maxItems: Long)(
implicit actorRefFactory: ActorRefFactory
) extends Publisher[SearchHit] {
require(search.keepAlive.isDefined, "Search Definition must have a scroll to be used as Publisher")
Expand All @@ -44,7 +44,7 @@ class ScrollSubscription(client: ElasticClient, query: SearchRequest, s: Subscri

private val actor = actorRefFactory.actorOf(Props(new PublishActor(client, query, s, max)))

private[streams] def ready(): Unit =
private[reactivestreams] def ready(): Unit =
actor ! PublishActor.Ready

override def cancel(): Unit =
Expand Down Expand Up @@ -76,7 +76,7 @@ class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_
protected val logger: Logger = LoggerFactory.getLogger(getClass.getName)

private var scrollId: String = _
private var processed: Long = 0
private var processed: Long = 0L
private val queue: mutable.Queue[SearchHit] = mutable.Queue.empty

// Parse the keep alive setting out of the original query.
Expand All @@ -96,7 +96,7 @@ class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_

private def send(k: Long): Unit = {
require(queue.size >= k)
for (_ <- 0l until k)
for (_ <- 0L until k)
if (max == 0 || processed < max) {
s.onNext(queue.dequeue)
processed = processed + 1
Expand Down
Loading

0 comments on commit 17e1718

Please sign in to comment.