Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(executor): Support Create and CreateOrUpdate requests #8

Merged
merged 5 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions modules/example/src/main/scala/example/ExampleApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,30 @@ import zio.elasticsearch._
object ExampleApp extends ZIOAppDefault {

override def run: Task[Unit] = {
val index = IndexName("kibana_sample_data_ecommerce")
val docId = DocumentId("wzvbqIQB22WMP-4s4h8K")
val index = IndexName("documentindex")
val docId = DocumentId("SrBtwIQBGQ08MH8or_0t")
val routing = Some(Routing("10"))

(for {
_ <- Console.printLine("Welcome to an example app...")
_ <- Console.printLine(s"Looking for the document '$docId' in '$index' index...'")
_ <- Console.printLine("Welcome to an example app...")
_ <- Console.printLine(s"Creating document with id '$docId' in '$index' index...'")
newDocId <-
ElasticRequest
.create(index = index, id = docId, doc = ExampleDocument("docId", "docName", 11), routing = routing)
.execute
_ <- Console.printLine(newDocId)
_ <- Console.printLine("Creating document with same id as previous (unsuccessfully)...")
newDocId2 <-
ElasticRequest
.create(index = index, id = docId, doc = ExampleDocument("docId2", "docName2", 22), routing = routing)
.execute
_ <- Console.printLine(newDocId2)
_ <- Console.printLine("Updating existing document...")
_ <-
ElasticRequest
.upsert(index = index, id = docId, doc = ExampleDocument("docId3", "docName3", 33), routing = routing)
.execute
_ <- Console.printLine("Getting updated document...")
res <- ElasticRequest.getById[ExampleDocument](index, docId, routing).execute
_ <- Console.printLine(res)
} yield ()).provide(ElasticExecutor.local, HttpClientZioBackend.layer())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package zio.elasticsearch

import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField}

final case class ElasticCreateResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say this is ElasticUpsertResponse.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, it is only used for create.

@jsonField("_id")
id: String
)

object ElasticCreateResponse {
dbulaja98 marked this conversation as resolved.
Show resolved Hide resolved
implicit val decoder: JsonDecoder[ElasticCreateResponse] =
DeriveJsonDecoder.gen[ElasticCreateResponse]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package zio.elasticsearch
import zio.json.ast.Json
import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField}

final case class ElasticResponse(
final case class ElasticGetResponse(
found: Boolean,
@jsonField("_source")
source: Json
)

object ElasticResponse {
implicit val decoder: JsonDecoder[ElasticResponse] = DeriveJsonDecoder.gen[ElasticResponse]
object ElasticGetResponse {
implicit val decoder: JsonDecoder[ElasticGetResponse] = DeriveJsonDecoder.gen[ElasticGetResponse]
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ object ElasticRequest {
doc: A,
routing: Option[Routing] = None
): ElasticRequest[Unit] =
Create(index, Some(id), Document.from(doc), routing)
Create(index, Some(id), Document.from(doc), routing).map(_ => ())

def create[A: Schema](
index: IndexName,
doc: A,
routing: Option[Routing]
): ElasticRequest[Unit] =
): ElasticRequest[Option[DocumentId]] =
Create(index, None, Document.from(doc), routing)

def getById[A: Schema](
Expand All @@ -52,7 +52,7 @@ object ElasticRequest {
id: Option[DocumentId],
document: Document,
routing: Option[Routing] = None
) extends ElasticRequest[Unit]
) extends ElasticRequest[Option[DocumentId]]

private[elasticsearch] final case class CreateOrUpdate(
index: IndexName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package zio.elasticsearch

import sttp.client3._
import sttp.client3.ziojson._
import sttp.model.Header.contentType
import sttp.model.MediaType.ApplicationJson
import sttp.model.Uri
import zio.Task
import zio.elasticsearch.ElasticRequest._
import zio.{Task, ZIO}

private[elasticsearch] final class HttpElasticExecutor private (config: ElasticConfig, client: SttpBackend[Task, Any])
extends ElasticExecutor {
Expand All @@ -15,27 +17,58 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC

override def execute[A](request: ElasticRequest[A]): Task[A] =
request match {
case _: Create => ZIO.unit
case _: CreateOrUpdate => ZIO.unit
case r: Create => executeCreate(r)
case r: CreateOrUpdate => executeCreateOrUpdate(r)
case r: GetById => executeGetById(r)
case map @ Map(_, _) => execute(map.request).map(map.mapper)
}

private def executeGetById(getById: GetById): Task[Option[Document]] = {
val uri = uri"$basePath/${getById.index}/$Doc/${getById.id}".withParam("routing", getById.routing.map(_.value))
private def executeGetById(r: GetById): Task[Option[Document]] = {
val uri = uri"$basePath/${r.index}/$Doc/${r.id}".withParam("routing", r.routing.map(_.value))
basicRequest
.get(uri)
.response(asJson[ElasticResponse])
.response(asJson[ElasticGetResponse])
.send(client)
.map(_.body.toOption)
.map(_.flatMap(d => if (d.found) Option(Document.from(d.source)) else None))
}

private def executeCreate(r: Create): Task[Option[DocumentId]] = {
def createUri(maybeDocumentId: Option[DocumentId]) =
maybeDocumentId match {
case Some(documentId) =>
uri"$basePath/${r.index}/$Create/$documentId".withParam("routing", r.routing.map(_.value))
case None =>
uri"$basePath/${r.index}/$Doc".withParam("routing", r.routing.map(_.value))
}

basicRequest
.post(createUri(r.id))
.header(contentType(ApplicationJson))
.response(asJson[ElasticCreateResponse])
.body(r.document.json)
.send(client)
.map(_.body.toOption)
.map(_.flatMap(body => Option(DocumentId(body.id))))
dbulaja98 marked this conversation as resolved.
Show resolved Hide resolved
}

private def executeCreateOrUpdate(r: CreateOrUpdate): Task[Unit] = {
val u = uri"$basePath/${r.index}/$Doc/${r.id}"
.withParam("routing", r.routing.map(_.value))

basicRequest
.put(u)
.header(contentType(ApplicationJson))
.body(r.document.json)
.send(client)
.as(())
dbulaja98 marked this conversation as resolved.
Show resolved Hide resolved
}
}

private[elasticsearch] object HttpElasticExecutor {

private final val Doc = "_doc"
private final val Doc = "_doc"
private final val Create = "_create"

def apply(config: ElasticConfig, client: SttpBackend[Task, Any]) =
new HttpElasticExecutor(config, client)
Expand Down