diff --git a/modules/example/src/main/scala/example/ExampleApp.scala b/modules/example/src/main/scala/example/ExampleApp.scala index 84d7f8434..845826d98 100644 --- a/modules/example/src/main/scala/example/ExampleApp.scala +++ b/modules/example/src/main/scala/example/ExampleApp.scala @@ -7,13 +7,30 @@ import zio.elasticsearch._ object ExampleApp extends ZIOAppDefault { override def run: Task[Unit] = { - val index = IndexName("kibana_sample_data_ecommerce") - val docId = DocumentId("wzvbqIQB22WMP-4s4h8K") + val index = IndexName("documentindex") + val docId = DocumentId("SrBtwIQBGQ08MH8or_0t") val routing = Some(Routing("10")) (for { - _ <- Console.printLine("Welcome to an example app...") - _ <- Console.printLine(s"Looking for the document '$docId' in '$index' index...'") + _ <- Console.printLine("Welcome to an example app...") + _ <- Console.printLine(s"Creating document with id '$docId' in '$index' index...'") + newDocId <- + ElasticRequest + .create(index = index, id = docId, doc = ExampleDocument("docId", "docName", 11), routing = routing) + .execute + _ <- Console.printLine(newDocId) + _ <- Console.printLine("Creating document with same id as previous (unsuccessfully)...") + newDocId2 <- + ElasticRequest + .create(index = index, id = docId, doc = ExampleDocument("docId2", "docName2", 22), routing = routing) + .execute + _ <- Console.printLine(newDocId2) + _ <- Console.printLine("Updating existing document...") + _ <- + ElasticRequest + .upsert(index = index, id = docId, doc = ExampleDocument("docId3", "docName3", 33), routing = routing) + .execute + _ <- Console.printLine("Getting updated document...") res <- ElasticRequest.getById[ExampleDocument](index, docId, routing).execute _ <- Console.printLine(res) } yield ()).provide(ElasticExecutor.local, HttpClientZioBackend.layer()) diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticCreateResponse.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticCreateResponse.scala new file mode 100644 index 000000000..d8a10da1a --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticCreateResponse.scala @@ -0,0 +1,13 @@ +package zio.elasticsearch + +import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField} + +final case class ElasticCreateResponse( + @jsonField("_id") + id: String +) + +object ElasticCreateResponse { + implicit val decoder: JsonDecoder[ElasticCreateResponse] = + DeriveJsonDecoder.gen[ElasticCreateResponse] +} diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticGetResponse.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticGetResponse.scala new file mode 100644 index 000000000..439262e25 --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticGetResponse.scala @@ -0,0 +1,14 @@ +package zio.elasticsearch + +import zio.json.ast.Json +import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField} + +private[elasticsearch] final case class ElasticGetResponse( + found: Boolean, + @jsonField("_source") + source: Json +) + +private[elasticsearch] object ElasticGetResponse { + implicit val decoder: JsonDecoder[ElasticGetResponse] = DeriveJsonDecoder.gen[ElasticGetResponse] +} diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala index 9b248c696..218b5633b 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala @@ -20,13 +20,13 @@ object ElasticRequest { doc: A, routing: Option[Routing] = None ): ElasticRequest[Unit] = - Create(index, Some(id), Document.from(doc), routing) + Create(index, Some(id), Document.from(doc), routing).map(_ => ()) def create[A: Schema]( index: IndexName, doc: A, routing: Option[Routing] - ): ElasticRequest[Unit] = + ): ElasticRequest[Option[DocumentId]] = Create(index, None, Document.from(doc), routing) def getById[A: Schema]( @@ -52,7 +52,7 @@ object ElasticRequest { id: Option[DocumentId], document: Document, routing: Option[Routing] = None - ) extends ElasticRequest[Unit] + ) extends ElasticRequest[Option[DocumentId]] private[elasticsearch] final case class CreateOrUpdate( index: IndexName, diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticResponse.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticResponse.scala deleted file mode 100644 index 20d7d526a..000000000 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticResponse.scala +++ /dev/null @@ -1,14 +0,0 @@ -package zio.elasticsearch - -import zio.json.ast.Json -import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField} - -final case class ElasticResponse( - found: Boolean, - @jsonField("_source") - source: Json -) - -object ElasticResponse { - implicit val decoder: JsonDecoder[ElasticResponse] = DeriveJsonDecoder.gen[ElasticResponse] -} diff --git a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala index d5602eb6d..871296eed 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala @@ -1,10 +1,11 @@ package zio.elasticsearch -import sttp.client3._ import sttp.client3.ziojson._ +import sttp.client3.{SttpBackend, UriContext, basicRequest => request} +import sttp.model.MediaType.ApplicationJson import sttp.model.Uri +import zio.Task import zio.elasticsearch.ElasticRequest._ -import zio.{Task, ZIO} private[elasticsearch] final class HttpElasticExecutor private (config: ElasticConfig, client: SttpBackend[Task, Any]) extends ElasticExecutor { @@ -15,27 +16,58 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC override def execute[A](request: ElasticRequest[A]): Task[A] = request match { - case _: Create => ZIO.unit - case _: CreateOrUpdate => ZIO.unit + case r: Create => executeCreate(r) + case r: CreateOrUpdate => executeCreateOrUpdate(r) case r: GetById => executeGetById(r) case map @ Map(_, _) => execute(map.request).map(map.mapper) } - private def executeGetById(getById: GetById): Task[Option[Document]] = { - val uri = uri"$basePath/${getById.index}/$Doc/${getById.id}".withParam("routing", getById.routing.map(_.value)) - basicRequest + private def executeGetById(r: GetById): Task[Option[Document]] = { + val uri = uri"$basePath/${r.index}/$Doc/${r.id}".withParam("routing", r.routing.map(_.value)) + request .get(uri) - .response(asJson[ElasticResponse]) + .response(asJson[ElasticGetResponse]) .send(client) .map(_.body.toOption) - .map(_.flatMap(d => if (d.found) Option(Document.from(d.source)) else None)) + .map(_.flatMap(d => if (d.found) Some(Document.from(d.source)) else None)) } + private def executeCreate(r: Create): Task[Option[DocumentId]] = { + def createUri(maybeDocumentId: Option[DocumentId]) = + maybeDocumentId match { + case Some(documentId) => + uri"$basePath/${r.index}/$Create/$documentId".withParam("routing", r.routing.map(_.value)) + case None => + uri"$basePath/${r.index}/$Doc".withParam("routing", r.routing.map(_.value)) + } + + request + .post(createUri(r.id)) + .contentType(ApplicationJson) + .response(asJson[ElasticCreateResponse]) + .body(r.document.json) + .send(client) + .map(_.body.toOption) + .map(_.flatMap(body => Some(DocumentId(body.id)))) + } + + private def executeCreateOrUpdate(r: CreateOrUpdate): Task[Unit] = { + val u = uri"$basePath/${r.index}/$Doc/${r.id}" + .withParam("routing", r.routing.map(_.value)) + + request + .put(u) + .contentType(ApplicationJson) + .body(r.document.json) + .send(client) + .unit + } } private[elasticsearch] object HttpElasticExecutor { - private final val Doc = "_doc" + private final val Doc = "_doc" + private final val Create = "_create" def apply(config: ElasticConfig, client: SttpBackend[Task, Any]) = new HttpElasticExecutor(config, client)