Skip to content

Commit

Permalink
Remove Map and phantom type parameter ERT and support streaming (#99)
Browse files Browse the repository at this point in the history
Co-authored-by: Arnold Lacko <arnold.lacko@bexio.com>
Co-authored-by: markaya <markoristic15@gmail.com>
Co-authored-by: Dimitrije Bulaja <dbulaja98@gmail.com>
  • Loading branch information
4 people authored Mar 6, 2023
1 parent 23ae467 commit cebe4f7
Show file tree
Hide file tree
Showing 15 changed files with 487 additions and 379 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import zio.prelude.Newtype.unsafeWrap
final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) {

def findAll(): Task[List[GitHubRepo]] =
elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, matchAll))
elasticsearch.execute(ElasticRequest.search(Index, matchAll)).documentAs[GitHubRepo]

def findById(organization: String, id: String): Task[Option[GitHubRepo]] =
for {
routing <- routingOf(organization)
res <- elasticsearch.execute(ElasticRequest.getById[GitHubRepo](Index, DocumentId(id)).routing(routing))
res <-
elasticsearch.execute(ElasticRequest.getById(Index, DocumentId(id)).routing(routing)).documentAs[GitHubRepo]
} yield res

def create(repository: GitHubRepo): Task[CreationOutcome] =
Expand Down Expand Up @@ -75,7 +76,7 @@ final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) {
} yield res

def search(query: ElasticQuery[_]): Task[List[GitHubRepo]] =
elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, query))
elasticsearch.execute(ElasticRequest.search(Index, query)).documentAs[GitHubRepo]

private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] =
Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e))
Expand Down
124 changes: 112 additions & 12 deletions modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package zio.elasticsearch

import zio.Chunk
import zio.elasticsearch.ElasticQuery._
import zio.stream.{Sink, ZSink}
import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import scala.util.Random

object HttpExecutorSpec extends IntegrationSpec {

def spec: Spec[TestEnvironment, Any] = {
Expand All @@ -31,7 +35,7 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genCustomer) { customer =>
for {
docId <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, customer))
res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, docId))
res <- ElasticExecutor.execute(ElasticRequest.getById(index, docId)).documentAs[CustomerDocument]
} yield assert(res)(isSome(equalTo(customer)))
}
},
Expand Down Expand Up @@ -67,7 +71,7 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genDocumentId, genCustomer) { (documentId, customer) =>
for {
_ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer))
doc <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId))
doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield assert(doc)(isSome(equalTo(customer)))
}
},
Expand All @@ -76,7 +80,7 @@ object HttpExecutorSpec extends IntegrationSpec {
for {
_ <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer))
_ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer))
doc <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId))
doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield assert(doc)(isSome(equalTo(secondCustomer)))
}
}
Expand Down Expand Up @@ -131,20 +135,22 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genDocumentId, genCustomer) { (documentId, customer) =>
for {
_ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer))
res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId))
res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield assert(res)(isSome(equalTo(customer)))
}
},
test("return None if the document does not exist") {
checkOnce(genDocumentId) { documentId =>
assertZIO(ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)))(isNone)
assertZIO(
ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
)(isNone)
}
},
test("fail with throwable if decoding fails") {
checkOnce(genDocumentId, genEmployee) { (documentId, employee) =>
val result = for {
_ <- ElasticExecutor.execute(ElasticRequest.upsert[EmployeeDocument](index, documentId, employee))
res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId))
res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield res

assertZIO(result.exit)(
Expand All @@ -170,7 +176,8 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
query = range("balance").gte(100)
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(isNonEmpty)
}
} @@ around(
Expand All @@ -193,7 +200,9 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
query = range("age").gte(0)
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](secondSearchIndex, query))
res <- ElasticExecutor
.execute(ElasticRequest.search(secondSearchIndex, query))
.documentAs[CustomerDocument]
} yield res

assertZIO(result.exit)(
Expand Down Expand Up @@ -224,7 +233,8 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
query = ElasticQuery.contains("name.keyword", firstCustomer.name.take(3))
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(Assertion.contains(firstCustomer))
}
} @@ around(
Expand All @@ -247,7 +257,8 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
query = ElasticQuery.startsWith("name.keyword", firstCustomer.name.take(3))
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(Assertion.contains(firstCustomer))
}
} @@ around(
Expand All @@ -271,14 +282,101 @@ object HttpExecutorSpec extends IntegrationSpec {
)
query =
wildcard("name.keyword", s"${firstCustomer.name.take(2)}*${firstCustomer.name.takeRight(2)}")
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(Assertion.contains(firstCustomer))
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
)
) @@ shrinks(0),
suite("searching documents and returning them as a stream")(
test("search for documents using range query") {
checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) {
(firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) =>
val sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- ElasticExecutor.execute(
ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer)
)
_ <- ElasticExecutor.execute(
ElasticRequest
.upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer)
.refreshTrue
)
query = range("balance").gte(100)
res <- ElasticExecutor.stream(ElasticRequest.search(firstSearchIndex, query)).run(sink)
} yield assert(res)(isNonEmpty)
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("search for documents using range query with multiple pages") {
checkOnce(genCustomer) { customer =>
def sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll))
reqs = (0 to 203).map { _ =>
ElasticRequest.create[CustomerDocument](
secondSearchIndex,
customer.copy(id = Random.alphanumeric.take(5).mkString, balance = 150)
)
}
_ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue)
query = range("balance").gte(100)
res <- ElasticExecutor
.stream(
ElasticRequest.search(secondSearchIndex, query)
)
.run(sink)
} yield assert(res)(hasSize(equalTo(204)))
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie
),
test("search for documents using range query with multiple pages and return type") {
checkOnce(genCustomer) { customer =>
def sink: Sink[Throwable, CustomerDocument, Nothing, Chunk[CustomerDocument]] =
ZSink.collectAll[CustomerDocument]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll))
reqs = (0 to 200).map { _ =>
ElasticRequest.create[CustomerDocument](
secondSearchIndex,
customer.copy(id = Random.alphanumeric.take(5).mkString, balance = 150)
)
}
_ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue)
query = range("balance").gte(100)
res <- ElasticExecutor
.streamAs[CustomerDocument](ElasticRequest.search(secondSearchIndex, query))
.run(sink)
} yield assert(res)(hasSize(equalTo(201)))
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie
),
test("search for documents using range query - empty stream") {
val sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
query = range("balance").gte(100)
res <- ElasticExecutor.stream(ElasticRequest.search(firstSearchIndex, query)).run(sink)
} yield assert(res)(hasSize(equalTo(0)))
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
)
) @@ shrinks(0),
suite("deleting by query")(
test("successfully delete all matched documents") {
checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer, genDocumentId, genCustomer) {
Expand Down Expand Up @@ -314,7 +412,9 @@ object HttpExecutorSpec extends IntegrationSpec {
deleteQuery = range("balance").gte(300)
_ <-
ElasticExecutor.execute(ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue)
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](deleteByQueryIndex, matchAll))
res <- ElasticExecutor
.execute(ElasticRequest.search(deleteByQueryIndex, matchAll))
.documentAs[CustomerDocument]
} yield assert(res)(hasSameElements(List(firstCustomer.copy(balance = 150))))
}
} @@ around(
Expand Down
10 changes: 2 additions & 8 deletions modules/library/src/main/scala/zio/elasticsearch/Document.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@

package zio.elasticsearch

import zio.json.ast.Json
import zio.schema.Schema
import zio.schema.codec.JsonCodec.JsonDecoder
import zio.schema.codec.{DecodeError, JsonCodec}
import zio.schema.codec.JsonCodec

private[elasticsearch] final case class Document(json: String) {
def decode[A](implicit schema: Schema[A]): Either[DecodeError, A] = JsonDecoder.decode(schema, json)
}
private[elasticsearch] final case class Document(json: String)

private[elasticsearch] object Document {
def from[A](doc: A)(implicit schema: Schema[A]): Document = Document(
JsonCodec.jsonEncoder(schema).encodeJson(doc, indent = None).toString
)

def from(json: Json): Document = new Document(json.toString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
package zio.elasticsearch

import sttp.client3.SttpBackend
import zio.elasticsearch.ElasticRequest.Search
import zio.schema.Schema
import zio.stream.{Stream, ZStream}
import zio.{RIO, Task, URLayer, ZIO, ZLayer}

private[elasticsearch] trait ElasticExecutor {
def execute[A](request: ElasticRequest[A, _]): Task[A]
def execute[A](request: ElasticRequest[A]): Task[A]

def stream(request: Search): Stream[Throwable, Item]

def streamAs[A: Schema](request: Search): Stream[Throwable, A]
}

object ElasticExecutor {
Expand All @@ -30,6 +37,12 @@ object ElasticExecutor {
lazy val local: URLayer[SttpBackend[Task, Any], ElasticExecutor] =
ZLayer.succeed(ElasticConfig.Default) >>> live

private[elasticsearch] def execute[A](request: ElasticRequest[A, _]): RIO[ElasticExecutor, A] =
private[elasticsearch] def execute[A](request: ElasticRequest[A]): RIO[ElasticExecutor, A] =
ZIO.serviceWithZIO[ElasticExecutor](_.execute(request))

private[elasticsearch] def stream(request: Search): ZStream[ElasticExecutor, Throwable, Item] =
ZStream.serviceWithStream[ElasticExecutor](_.stream(request))

private[elasticsearch] def streamAs[A: Schema](request: Search): ZStream[ElasticExecutor, Throwable, A] =
ZStream.serviceWithStream[ElasticExecutor](_.streamAs[A](request))
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import zio.json.ast.Json
import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField}

private[elasticsearch] final case class ElasticQueryResponse(
@jsonField("_scroll_id")
scrollId: Option[String],
took: Int,
@jsonField("timed_out")
timedOut: Boolean,
Expand Down Expand Up @@ -50,7 +52,7 @@ private[elasticsearch] final case class Hits(
total: Total,
@jsonField("max_score")
maxScore: Option[Double] = None,
hits: List[Item]
hits: List[Hit]
)

private[elasticsearch] object Hits {
Expand All @@ -63,7 +65,7 @@ private[elasticsearch] object Total {
implicit val decoder: JsonDecoder[Total] = DeriveJsonDecoder.gen[Total]
}

private[elasticsearch] final case class Item(
private[elasticsearch] final case class Hit(
@jsonField("_index")
index: String,
@jsonField("_type")
Expand All @@ -76,6 +78,6 @@ private[elasticsearch] final case class Item(
source: Json
)

private[elasticsearch] object Item {
implicit val decoder: JsonDecoder[Item] = DeriveJsonDecoder.gen[Item]
private[elasticsearch] object Hit {
implicit val decoder: JsonDecoder[Hit] = DeriveJsonDecoder.gen[Hit]
}
Loading

0 comments on commit cebe4f7

Please sign in to comment.