diff --git a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala index bc44a98bf..e2ee2f4eb 100644 --- a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala +++ b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala @@ -32,12 +32,13 @@ import zio.prelude.Newtype.unsafeWrap final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) { def findAll(): Task[List[GitHubRepo]] = - elasticsearch.execute(ElasticRequest.search(Index, matchAll)).result[GitHubRepo] + elasticsearch.execute(ElasticRequest.search(Index, matchAll)).documentAs[GitHubRepo] def findById(organization: String, id: String): Task[Option[GitHubRepo]] = for { routing <- routingOf(organization) - res <- elasticsearch.execute(ElasticRequest.getById(Index, DocumentId(id)).routing(routing)).result[GitHubRepo] + res <- + elasticsearch.execute(ElasticRequest.getById(Index, DocumentId(id)).routing(routing)).documentAs[GitHubRepo] } yield res def create(repository: GitHubRepo): Task[CreationOutcome] = @@ -75,7 +76,7 @@ final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) { } yield res def search(query: ElasticQuery[_]): Task[List[GitHubRepo]] = - elasticsearch.execute(ElasticRequest.search(Index, query)).result[GitHubRepo] + elasticsearch.execute(ElasticRequest.search(Index, query)).documentAs[GitHubRepo] private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] = Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e)) diff --git a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala index 91387d6b9..7c9afc629 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala @@ -20,7 +20,7 @@ import zio.Chunk import zio.elasticsearch.ElasticQuery._ import zio.schema.Schema import zio.schema.codec.DecodeError -import zio.stream.{ZPipeline, ZSink} +import zio.stream.{Sink, ZPipeline, ZSink} import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ @@ -37,7 +37,7 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genCustomer) { customer => for { docId <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, customer)) - res <- ElasticExecutor.execute(ElasticRequest.getById(index, docId)).result[CustomerDocument] + res <- ElasticExecutor.execute(ElasticRequest.getById(index, docId)).documentAs[CustomerDocument] } yield assert(res)(isSome(equalTo(customer))) } }, @@ -73,7 +73,7 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) - doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument] + doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] } yield assert(doc)(isSome(equalTo(customer))) } }, @@ -82,7 +82,7 @@ object HttpExecutorSpec extends IntegrationSpec { for { _ <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer)) _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer)) - doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument] + doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] } yield assert(doc)(isSome(equalTo(secondCustomer))) } } @@ -137,13 +137,15 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) - res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument] + res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] } yield assert(res)(isSome(equalTo(customer))) } }, test("return None if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument])( + assertZIO( + ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] + )( isNone ) } @@ -152,7 +154,7 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genEmployee) { (documentId, employee) => val result = for { _ <- ElasticExecutor.execute(ElasticRequest.upsert[EmployeeDocument](index, documentId, employee)) - res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument] + res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] } yield res assertZIO(result.exit)( @@ -179,7 +181,7 @@ object HttpExecutorSpec extends IntegrationSpec { ) query = range("balance").gte(100) res <- - ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).result[CustomerDocument] + ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument] } yield assert(res)(isNonEmpty) } } @@ around( @@ -203,7 +205,9 @@ object HttpExecutorSpec extends IntegrationSpec { ) query = range("age").gte(0) res <- - ElasticExecutor.execute(ElasticRequest.search(secondSearchIndex, query)).result[CustomerDocument] + ElasticExecutor + .execute(ElasticRequest.search(secondSearchIndex, query)) + .documentAs[CustomerDocument] } yield res assertZIO(result.exit)( @@ -235,7 +239,7 @@ object HttpExecutorSpec extends IntegrationSpec { ) query = ElasticQuery.contains("name.keyword", firstCustomer.name.take(3)) res <- - ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).result[CustomerDocument] + ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument] } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( @@ -259,7 +263,7 @@ object HttpExecutorSpec extends IntegrationSpec { ) query = ElasticQuery.startsWith("name.keyword", firstCustomer.name.take(3)) res <- - ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).result[CustomerDocument] + ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument] } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( @@ -284,7 +288,7 @@ object HttpExecutorSpec extends IntegrationSpec { query = wildcard("name.keyword", s"${firstCustomer.name.take(2)}*${firstCustomer.name.takeRight(2)}") res <- - ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).result[CustomerDocument] + ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument] } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( @@ -292,11 +296,11 @@ object HttpExecutorSpec extends IntegrationSpec { ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ) ) @@ shrinks(0), - suite("searching documents and returning them as ZStream")( - test("search for document using range query") { + suite("searching documents and returning them as a stream")( + test("search for documents using range query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => - val sink: ZSink[Any, Throwable, RawItem, Nothing, Chunk[RawItem]] = ZSink.collectAll[RawItem] + val sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item] for { _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) @@ -319,12 +323,12 @@ object HttpExecutorSpec extends IntegrationSpec { ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex, None)), ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ), - test("search for document using range query with multiple pages") { + test("search for documents using range query with multiple pages") { checkOnce(genCustomer) { customer => - def sink: ZSink[Any, Throwable, CustomerDocument, Nothing, Chunk[CustomerDocument]] = + def sink: Sink[Throwable, CustomerDocument, Nothing, Chunk[CustomerDocument]] = ZSink.collectAll[CustomerDocument] - def pipeline[A: Schema]: ZPipeline[Any, Nothing, RawItem, Either[DecodeError, A]] = + def pipeline[A: Schema]: ZPipeline[Any, Nothing, Item, Either[DecodeError, A]] = ZPipeline.map(_.documentAs[A]) def pipeline_2[A]: ZPipeline[Any, Nothing, Either[DecodeError, A], A] = ZPipeline.collectWhileRight @@ -349,6 +353,35 @@ object HttpExecutorSpec extends IntegrationSpec { } @@ around( ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex, None)), ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie + ), + test("search for documents using range query with multiple pages and return type") { + checkOnce(genCustomer) { customer => + def sink: Sink[Throwable, CustomerDocument, Nothing, Chunk[CustomerDocument]] = + ZSink.collectAll[CustomerDocument] + + for { + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll)) + reqs = (1 to 50).map { _ => + ElasticRequest.create[CustomerDocument]( + secondSearchIndex, + customer.copy(id = Random.alphanumeric.take(5).mkString, balance = 150) + ) + } + _ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*)) + _ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*)) + _ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*)) + _ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue) + query = range("balance").gte(100) + res <- ElasticExecutor + .streamAs[CustomerDocument]( + ElasticRequest.search(secondSearchIndex, query) + ) + .run(sink) + } yield assert(res)(hasSize(Assertion.equalTo(200))) + } + } @@ around( + ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex, None)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie ) ) @@ shrinks(0), suite("deleting by query")( @@ -388,7 +421,7 @@ object HttpExecutorSpec extends IntegrationSpec { ElasticExecutor.execute(ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue) res <- ElasticExecutor .execute(ElasticRequest.search(deleteByQueryIndex, matchAll)) - .result[CustomerDocument] + .documentAs[CustomerDocument] } yield assert(res)(hasSameElements(List(firstCustomer.copy(balance = 150)))) } } @@ around( diff --git a/modules/library/src/main/scala/zio/elasticsearch/Document.scala b/modules/library/src/main/scala/zio/elasticsearch/Document.scala index bd24d8660..c6c8f5de0 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/Document.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/Document.scala @@ -16,19 +16,13 @@ package zio.elasticsearch -import zio.json.ast.Json import zio.schema.Schema -import zio.schema.codec.JsonCodec.JsonDecoder -import zio.schema.codec.{DecodeError, JsonCodec} +import zio.schema.codec.JsonCodec -private[elasticsearch] final case class Document(json: String) { - def decode[A](implicit schema: Schema[A]): Either[DecodeError, A] = JsonDecoder.decode(schema, json) -} +private[elasticsearch] final case class Document(json: String) private[elasticsearch] object Document { def from[A](doc: A)(implicit schema: Schema[A]): Document = Document( JsonCodec.jsonEncoder(schema).encodeJson(doc, indent = None).toString ) - - def from(json: Json): Document = new Document(json.toString) } diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala index 12664c79f..559c3f075 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala @@ -19,12 +19,15 @@ package zio.elasticsearch import sttp.client3.SttpBackend import zio.elasticsearch.ElasticRequest.GetByQuery import zio.stream.ZStream +import zio.schema.Schema import zio.{RIO, Task, URLayer, ZIO, ZLayer} private[elasticsearch] trait ElasticExecutor { def execute[A](request: ElasticRequest[A]): Task[A] - def stream(request: GetByQuery): ZStream[Any, Throwable, RawItem] + def stream(request: GetByQuery): ZStream[Any, Throwable, Item] + + def streamAs[A: Schema](request: GetByQuery): ZStream[Any, Throwable, A] } object ElasticExecutor { @@ -37,6 +40,9 @@ object ElasticExecutor { private[elasticsearch] def execute[A](request: ElasticRequest[A]): RIO[ElasticExecutor, A] = ZIO.serviceWithZIO[ElasticExecutor](_.execute(request)) - private[elasticsearch] def stream(request: GetByQuery): ZStream[ElasticExecutor, Throwable, RawItem] = + private[elasticsearch] def stream(request: GetByQuery): ZStream[ElasticExecutor, Throwable, Item] = ZStream.serviceWithStream[ElasticExecutor](_.stream(request)) + + private[elasticsearch] def streamAs[A: Schema](request: GetByQuery): ZStream[ElasticExecutor, Throwable, A] = + ZStream.serviceWithStream[ElasticExecutor](_.streamAs[A](request)) } diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala index 8c9439679..6fb0f422e 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala @@ -52,7 +52,7 @@ private[elasticsearch] final case class Hits( total: Total, @jsonField("max_score") maxScore: Option[Double] = None, - hits: List[Item] + hits: List[Hit] ) private[elasticsearch] object Hits { @@ -65,7 +65,7 @@ private[elasticsearch] object Total { implicit val decoder: JsonDecoder[Total] = DeriveJsonDecoder.gen[Total] } -private[elasticsearch] final case class Item( +private[elasticsearch] final case class Hit( @jsonField("_index") index: String, @jsonField("_type") @@ -78,6 +78,6 @@ private[elasticsearch] final case class Item( source: Json ) -private[elasticsearch] object Item { - implicit val decoder: JsonDecoder[Item] = DeriveJsonDecoder.gen[Item] +private[elasticsearch] object Hit { + implicit val decoder: JsonDecoder[Hit] = DeriveJsonDecoder.gen[Hit] } diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticResult.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticResult.scala index 86e7d4708..3fe44e209 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticResult.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticResult.scala @@ -18,18 +18,18 @@ package zio.elasticsearch import zio.prelude.ZValidation import zio.schema.Schema -import zio.{Task, ZIO} +import zio.{IO, Task, ZIO} sealed trait ElasticResult[F[_]] { - def result[A: Schema]: Task[F[A]] + def documentAs[A: Schema]: Task[F[A]] } -final class GetResult private[elasticsearch] (private val doc: Option[Document]) extends ElasticResult[Option] { - override def result[A: Schema]: Task[Option[A]] = +final class GetResult private[elasticsearch] (private val doc: Option[Item]) extends ElasticResult[Option] { + override def documentAs[A: Schema]: IO[DecodingException, Option[A]] = ZIO .fromEither(doc match { - case Some(document) => - document.decode match { + case Some(item) => + item.documentAs match { case Left(e) => Left(DecodingException(s"Could not parse the document: ${e.message}")) case Right(doc) => Right(Some(doc)) } @@ -39,10 +39,10 @@ final class GetResult private[elasticsearch] (private val doc: Option[Document]) .mapError(e => DecodingException(s"Could not parse the document: ${e.message}")) } -final class SearchResult private[elasticsearch] (private val hits: List[Document]) extends ElasticResult[List] { - override def result[A: Schema]: Task[List[A]] = +final class SearchResult private[elasticsearch] (private val hits: List[Item]) extends ElasticResult[List] { + override def documentAs[A: Schema]: IO[DecodingException, List[A]] = ZIO.fromEither { - ZValidation.validateAll(hits.map(d => ZValidation.fromEither(d.decode))).toEitherWith { errors => + ZValidation.validateAll(hits.map(item => ZValidation.fromEither(item.documentAs))).toEitherWith { errors => DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})") } } diff --git a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala index fe295312b..e389145a9 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala @@ -29,6 +29,7 @@ import sttp.model.StatusCode.{ import zio.ZIO.logDebug import zio.elasticsearch.ElasticRequest._ import zio.json.ast.Json.{Obj, Str} +import zio.schema.Schema import zio.stream.ZStream import zio.{Chunk, Task, ZIO} @@ -54,21 +55,19 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC case r: GetByQuery => executeGetByQuery(r) } -// def stream[A: Schema](r: GetByQuery): ZStream[Any, Throwable, A] = { -// val pipeline: ZPipeline[Any, Nothing, Json, Document] = ZPipeline.map(Document.from) -// val pipeline2: ZPipeline[Any, Nothing, Document, Either[DecodeError, A]] = ZPipeline.map(_.decode) -// val pipeline3: ZPipeline[Any, Nothing, Either[DecodeError, A], A] = ZPipeline.collectWhileRight -// -// ZStream.paginateChunkZIO("") { s => -// if (s.isEmpty) executeGetByQueryWithScroll(r) else executeGetByScroll(s) -// } >>> pipeline >>> pipeline2 >>> pipeline3 -// } - - def stream(r: GetByQuery): ZStream[Any, Throwable, RawItem] = + def stream(r: GetByQuery): ZStream[Any, Throwable, Item] = ZStream.paginateChunkZIO("") { s => if (s.isEmpty) executeGetByQueryWithScroll(r) else executeGetByScroll(s) } + def streamAs[A: Schema](r: GetByQuery): ZStream[Any, Throwable, A] = + ZStream + .paginateChunkZIO("") { s => + if (s.isEmpty) executeGetByQueryWithScroll(r) else executeGetByScroll(s) + } + .map(_.documentAs[A]) + .collectWhileRight + private def executeBulk(r: Bulk): Task[Unit] = { val uri = (r.index match { case Some(index) => uri"${config.uri}/$index/$Bulk" @@ -218,7 +217,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC .response(asJson[ElasticGetResponse]) ).flatMap { response => response.code match { - case HttpOk => ZIO.attempt(new GetResult(response.body.toOption.map(d => Document.from(d.source)))) + case HttpOk => ZIO.attempt(new GetResult(response.body.toOption.map(r => Item(r.source)))) case HttpNotFound => ZIO.succeed(new GetResult(None)) case _ => ZIO.fail(createElasticExceptionFromCustomResponse(response)) } @@ -239,19 +238,19 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), value => ZIO.succeed( - new SearchResult(value.results.map(_.toString).map(Document(_))) - ) // TODO have Json in Document instead of String??? + new SearchResult(value.results.map(Item)) + ) ) case _ => ZIO.fail(createElasticExceptionFromCustomResponse(response)) } } - private def executeGetByQueryWithScroll(r: GetByQuery): ZIO[Any, Throwable, (Chunk[RawItem], Option[String])] = + private def executeGetByQueryWithScroll(r: GetByQuery): Task[(Chunk[Item], Option[String])] = sendRequestWithCustomResponse( request .post( - uri"${config.uri}/${r.index}/$Search".withParams(("scroll", "1m")) + uri"${config.uri}/${r.index}/$Search".withParams((Scroll, ScrollDefaultDuration)) ) .response(asJson[ElasticQueryResponse]) .contentType(ApplicationJson) @@ -261,20 +260,20 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC case HttpOk => response.body.fold( e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), - value => ZIO.succeed((Chunk.fromIterable(value.results).map(RawItem), value.scrollId)) + value => ZIO.succeed((Chunk.fromIterable(value.results).map(Item), value.scrollId)) ) case _ => ZIO.fail(createElasticExceptionFromCustomResponse(response)) } } - private def executeGetByScroll(scrollId: String): ZIO[Any, Throwable, (Chunk[RawItem], Option[String])] = + private def executeGetByScroll(scrollId: String): Task[(Chunk[Item], Option[String])] = sendRequestWithCustomResponse( request - .post(uri"${config.uri}/$Search/scroll".withParams(("scroll", "1m"))) + .post(uri"${config.uri}/$Search/$Scroll".withParams((Scroll, ScrollDefaultDuration))) .response(asJson[ElasticQueryResponse]) .contentType(ApplicationJson) - .body(Obj("scroll_id" -> Str(scrollId))) + .body(Obj(ScrollId -> Str(scrollId))) ).flatMap { response => response.code match { case HttpOk => @@ -282,7 +281,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), value => if (value.results.isEmpty) ZIO.succeed((Chunk.empty, None)) - else ZIO.succeed((Chunk.fromIterable(value.results).map(RawItem), Some(scrollId))) + else ZIO.succeed((Chunk.fromIterable(value.results).map(Item), value.scrollId.orElse(Some(scrollId)))) ) case _ => ZIO.fail(createElasticExceptionFromCustomResponse(response)) @@ -326,11 +325,14 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC private[elasticsearch] object HttpElasticExecutor { - private final val Bulk = "_bulk" - private final val Create = "_create" - private final val DeleteByQuery = "_delete_by_query" - private final val Doc = "_doc" - private final val Search = "_search" + private final val Bulk = "_bulk" + private final val Create = "_create" + private final val DeleteByQuery = "_delete_by_query" + private final val Doc = "_doc" + private final val Search = "_search" + private final val Scroll = "scroll" + private final val ScrollId = "scroll_id" + private final val ScrollDefaultDuration = "1m" def apply(config: ElasticConfig, client: SttpBackend[Task, Any]) = new HttpElasticExecutor(config, client) diff --git a/modules/library/src/main/scala/zio/elasticsearch/Item.scala b/modules/library/src/main/scala/zio/elasticsearch/Item.scala new file mode 100644 index 000000000..760e37c79 --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/Item.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2022 LambdaWorks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.elasticsearch + +import zio.json.ast.Json +import zio.schema.Schema +import zio.schema.codec.DecodeError +import zio.schema.codec.JsonCodec.JsonDecoder + +final case class Item(raw: Json) { + def documentAs[A](implicit schema: Schema[A]): Either[DecodeError, A] = JsonDecoder.decode(schema, raw.toString) +} diff --git a/modules/library/src/main/scala/zio/elasticsearch/RawItem.scala b/modules/library/src/main/scala/zio/elasticsearch/RawItem.scala deleted file mode 100644 index c99dc1858..000000000 --- a/modules/library/src/main/scala/zio/elasticsearch/RawItem.scala +++ /dev/null @@ -1,10 +0,0 @@ -package zio.elasticsearch - -import zio.json.ast.Json -import zio.schema.Schema -import zio.schema.codec.DecodeError -import zio.schema.codec.JsonCodec.JsonDecoder - -final case class RawItem(raw: Json) { - def documentAs[A](implicit schema: Schema[A]): Either[DecodeError, A] = JsonDecoder.decode(schema, raw.toString) -} diff --git a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala deleted file mode 100644 index ccd153dc4..000000000 --- a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright 2022 LambdaWorks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package zio.elasticsearch - -import zio.Random.nextUUID -import zio.elasticsearch.ElasticRequest._ -import zio.json.ast.Json -import zio.stm.{STM, TMap, USTM, ZSTM} -import zio.stream.ZStream -import zio.{Task, ZIO} - -private[elasticsearch] final case class TestExecutor private (data: TMap[IndexName, TMap[DocumentId, Document]]) - extends ElasticExecutor { - self => - - def execute[A](request: ElasticRequest[A]): Task[A] = - request match { - case Bulk(requests, _, _, _) => - fakeBulk(requests) - case Create(index, document, _, _) => - fakeCreate(index, document) - case CreateWithId(index, id, document, _, _) => - fakeCreateWithId(index, id, document) - case CreateIndex(name, _) => - fakeCreateIndex(name) - case CreateOrUpdate(index, id, document, _, _) => - fakeCreateOrUpdate(index, id, document) - case DeleteById(index, id, _, _) => - fakeDeleteById(index, id) - case DeleteByQuery(index, _, _, _) => - fakeDeleteByQuery(index) - case DeleteIndex(name) => - fakeDeleteIndex(name) - case Exists(index, id, _) => - fakeExists(index, id) - case GetById(index, id, _) => - fakeGetById(index, id) - case GetByQuery(index, _, _) => - fakeGetByQuery(index) - } - - override def stream(request: GetByQuery): ZStream[Any, Throwable, RawItem] = ??? - - private def fakeBulk(requests: List[BulkableRequest[_]]): Task[Unit] = - ZIO.attempt { - requests.map { r => - execute(r) - } - }.unit - - private def fakeCreate(index: IndexName, document: Document): Task[DocumentId] = - for { - uuid <- nextUUID - documents <- getDocumentsFromIndex(index).commit - documentId = DocumentId(uuid.toString) - _ <- documents.put(documentId, document).commit - } yield documentId - - private def fakeCreateWithId(index: IndexName, documentId: DocumentId, document: Document): Task[CreationOutcome] = - (for { - documents <- getDocumentsFromIndex(index) - alreadyExists <- documents.contains(documentId) - _ <- documents.putIfAbsent(documentId, document) - } yield if (alreadyExists) AlreadyExists else Created).commit - - private def fakeCreateIndex(index: IndexName): Task[CreationOutcome] = - (for { - alreadyExists <- self.data.contains(index) - emptyDocuments <- TMap.empty[DocumentId, Document] - _ <- self.data.putIfAbsent(index, emptyDocuments) - } yield if (alreadyExists) AlreadyExists else Created).commit - - private def fakeCreateOrUpdate(index: IndexName, documentId: DocumentId, document: Document): Task[Unit] = - (for { - documents <- getDocumentsFromIndex(index) - _ <- documents.put(documentId, document) - } yield ()).commit - - private def fakeDeleteById(index: IndexName, documentId: DocumentId): Task[DeletionOutcome] = - (for { - documents <- getDocumentsFromIndex(index) - exists <- documents.contains(documentId) - _ <- documents.delete(documentId) - } yield if (exists) Deleted else NotFound).commit - - private def fakeDeleteByQuery(index: IndexName): Task[DeletionOutcome] = - (for { - exists <- self.data.contains(index) - } yield if (exists) Deleted else NotFound).commit - // until we have a way of using query to delete we can either delete all or delete none documents - - private def fakeDeleteIndex(index: IndexName): Task[DeletionOutcome] = - (for { - exists <- self.data.contains(index) - _ <- self.data.delete(index) - } yield if (exists) Deleted else NotFound).commit - - private def fakeExists(index: IndexName, documentId: DocumentId): Task[Boolean] = - (for { - documents <- getDocumentsFromIndex(index) - exists <- documents.contains(documentId) - } yield exists).commit - - private def fakeGetById(index: IndexName, documentId: DocumentId): Task[GetResult] = - (for { - documents <- getDocumentsFromIndex(index) - maybeDocument <- documents.get(documentId) - } yield new GetResult(maybeDocument)).commit - - private def fakeGetByQuery(index: IndexName): Task[SearchResult] = { - def createSearchResult( - index: IndexName, - documents: TMap[DocumentId, Document] - ): USTM[SearchResult] = - for { - items <- - documents.toList.map( - _.map { case (id, document) => - Item( - index = index.toString, - `type` = "type", - id = id.toString, - score = 1, - source = Json.Str(document.json) - ) - } - ) - } yield new SearchResult(items.map(_.source.toString).map(Document(_))) - - (for { - documents <- getDocumentsFromIndex(index) - response <- createSearchResult(index, documents) - } yield response).commit - } - - private def getDocumentsFromIndex(index: IndexName): ZSTM[Any, ElasticException, TMap[DocumentId, Document]] = - for { - maybeDocuments <- self.data.get(index) - documents <- maybeDocuments.fold[STM[ElasticException, TMap[DocumentId, Document]]]( - STM.fail[ElasticException](new ElasticException(s"Index $index does not exists!")) - )(STM.succeed(_)) - } yield documents -} diff --git a/modules/library/src/main/scala/zio/elasticsearch/package.scala b/modules/library/src/main/scala/zio/elasticsearch/package.scala index 63967b9f7..78c8d0466 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/package.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/package.scala @@ -62,17 +62,7 @@ package object elasticsearch { def containsAny(name: String, params: List[String]): Boolean = params.exists(StringUtils.contains(name, _)) - /*// TODO decide if this extension is favorable to avoid having an additional flatMap in user code - final implicit class ResultRIO[R, F[_]](zio: RIO[R, ElasticResult[F]]) { - def result[A: Schema]: RIO[R, F[A]] = zio.flatMap(_.result[A]) - } - - // TODO decide if this extension is favorable to avoid having an additional flatMap in user code - final implicit class ResultTask[F[_]](zio: Task[ElasticResult[F]]) { - def result[A: Schema]: Task[F[A]] = zio.flatMap(_.result[A]) - }*/ - - final implicit class Result[R, F[_]](zio: ZIO[R, Throwable, ElasticResult[F]]) { - def result[A: Schema]: ZIO[R, Throwable, F[A]] = zio.flatMap(_.result[A]) + final implicit class ZIOResultOps[R, F[_]](zio: RIO[R, ElasticResult[F]]) { + def documentAs[A: Schema]: RIO[R, F[A]] = zio.flatMap(_.documentAs[A]) } } diff --git a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala index c8ba36cdd..82c55c9af 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala @@ -244,7 +244,7 @@ object HttpElasticExecutorSpec extends WireMockSpec { .getById(index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) .routing(Routing("routing")) ) - .result[GitHubRepo] + .documentAs[GitHubRepo] )(isSome(equalTo(repo))) }, test("getting by query request") { @@ -297,7 +297,7 @@ object HttpElasticExecutorSpec extends WireMockSpec { assertZIO( addStubMapping *> ElasticExecutor .execute(ElasticRequest.search(index = index, query = matchAll)) - .result[GitHubRepo] + .documentAs[GitHubRepo] )( equalTo(List(repo)) )