Skip to content

Commit

Permalink
Fix code remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
vanjaftn committed Nov 14, 2023
1 parent 277b7ca commit a6efadf
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 70 deletions.
7 changes: 7 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_filter.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,23 @@ import zio.elasticsearch.ElasticAggregation.filterAggregation

You can create a `Filter` aggregation using the `filterAggregation` method in the following manner:
```scala
import zio.elasticsearch.ElasticQuery.term

val aggregation: FilterAggregation = filterAggregation(name = "filterAggregation", query = term(field = Document.stringField, value = "test"))
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
import zio.elasticsearch.ElasticQuery.term

val multipleAggregations: MultipleAggregations = filterAggregation(name = "filterAggregation", query = term(field = Document.stringField, value = "test")).withAgg(maxAggregation(name = "maxAggregation", field = Document.doubleField))
```

If you want to add another sub-aggregation, you can use `withSubAgg` method:
```scala
import zio.elasticsearch.ElasticQuery.term
import zio.elasticsearch.ElasticAggregation.maxAggregation

val aggregationWithSubAgg: FilterAggregation = filterAggregation(name = "filterAggregation", query = term(field = Document.stringField, value = "test")).withSubAgg(maxAggregation(name = "maxAggregation", field = Document.intField))
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using filter aggregation") {
val expectedResponse = (
test("aggregate using filter aggregation with max aggregation as a sub aggregation") {
val expectedResult = (
"aggregation",
FilterAggregationResult(
docCount = 2,
Expand Down Expand Up @@ -159,7 +159,7 @@ object HttpExecutorSpec extends IntegrationSpec {
.execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation))
.aggregations

} yield assert(aggsRes.head)(equalTo(expectedResponse))
} yield assert(aggsRes.head)(equalTo(expectedResult))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private[elasticsearch] final case class Cardinality(name: String, field: String,
}
}

sealed trait FilterAggregation extends SingleElasticAggregation with WithSubAgg[FilterAggregation] with WithAgg
sealed trait FilterAggregation extends SingleElasticAggregation with WithAgg with WithSubAgg[FilterAggregation]

private[elasticsearch] final case class Filter(
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,10 @@ private[elasticsearch] final class HttpExecutor private (esConfig: ElasticConfig
case HttpOk =>
response.body.fold(
e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")),
value => {
value =>
ZIO.succeed(new AggregateResult(value.aggs.map { case (key, response) =>
(key, toResult(response))
}))
}
)
case _ =>
ZIO.fail(handleFailuresFromCustomResponse(response))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import zio.json.ast.Json
import zio.json.ast.Json.Obj
import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField}

private[elasticsearch] sealed trait AggregationBucket

sealed trait AggregationResponse

object AggregationResponse {
Expand All @@ -31,16 +33,13 @@ object AggregationResponse {
AvgAggregationResult(value)
case CardinalityAggregationResponse(value) =>
CardinalityAggregationResult(value)
case FilterAggregationResponse(docCount, subAggregations) => {
case FilterAggregationResponse(docCount, subAggregations) =>
FilterAggregationResult(
docCount = docCount,
subAggregations = subAggregations.fold(Map[String, AggregationResult]())(a =>
a.map { case (key, response) =>
(key, toResult(response))
}
)
subAggregations = subAggregations.fold(Map[String, AggregationResult]())(_.map { case (key, response) =>
(key, toResult(response))
})
)
}
case MaxAggregationResponse(value) =>
MaxAggregationResult(value)
case MinAggregationResponse(value) =>
Expand Down Expand Up @@ -85,25 +84,13 @@ private[elasticsearch] object CardinalityAggregationResponse {
DeriveJsonDecoder.gen[CardinalityAggregationResponse]
}

private[elasticsearch] final case class StatsAggregationResponse(
count: Int,
min: Double,
max: Double,
avg: Double,
sum: Double
) extends AggregationResponse

private[elasticsearch] object StatsAggregationResponse {
implicit val decoder: JsonDecoder[StatsAggregationResponse] = DeriveJsonDecoder.gen[StatsAggregationResponse]
}

private[elasticsearch] final case class FilterAggregationResponse(
@jsonField("doc_count")
docCount: Int,
subAggregations: Option[Map[String, AggregationResponse]] = None
) extends AggregationResponse

private[elasticsearch] object FilterAggregationResponse {
private[elasticsearch] object FilterAggregationResponse extends JsonDecoderOps {
implicit val decoder: JsonDecoder[FilterAggregationResponse] = Obj.decoder.mapOrFail { case Obj(fields) =>
val allFields = fields.flatMap { case (field, data) =>
field match {
Expand Down Expand Up @@ -140,19 +127,10 @@ private[elasticsearch] object FilterAggregationResponse {
case str if str.contains("sum#") =>
Some(field -> SumAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("terms#") =>
Some(
field -> TermsAggregationResponse(
docErrorCount = objFields("doc_count_error_upper_bound").unsafeAs[Int],
sumOtherDocCount = objFields("sum_other_doc_count").unsafeAs[Int],
buckets = objFields("buckets")
.unsafeAs[Chunk[Json]]
.map(_.unsafeAs[TermsAggregationBucket](TermsAggregationBucket.decoder))
)
)
Some(field -> data.unsafeAs[TermsAggregationResponse](TermsAggregationResponse.decoder))
case str if str.contains("value_count#") =>
Some(field -> ValueCountAggregationResponse(value = objFields("value").unsafeAs[Int]))
}

}
}.toMap

Expand Down Expand Up @@ -184,17 +162,17 @@ private[elasticsearch] object FilterAggregationResponse {
(field.split("#")(1), data.asInstanceOf[ValueCountAggregationResponse])
}
}

Right(FilterAggregationResponse.apply(docCount, Option(subAggs).filter(_.nonEmpty)))
}
}

final implicit class JsonDecoderOps(json: Json) {
private[elasticsearch] sealed trait JsonDecoderOps {
implicit class JsonDecoderOps(json: Json) {
def unsafeAs[A](implicit decoder: JsonDecoder[A]): A =
(json.as[A]: @unchecked) match {
case Right(decoded) => decoded
}
}

}

private[elasticsearch] final case class MaxAggregationResponse(value: Double) extends AggregationResponse
Expand Down Expand Up @@ -224,15 +202,25 @@ private[elasticsearch] object PercentilesAggregationResponse {
DeriveJsonDecoder.gen[PercentilesAggregationResponse]
}

private[elasticsearch] final case class StatsAggregationResponse(
count: Int,
min: Double,
max: Double,
avg: Double,
sum: Double
) extends AggregationResponse

private[elasticsearch] object StatsAggregationResponse {
implicit val decoder: JsonDecoder[StatsAggregationResponse] = DeriveJsonDecoder.gen[StatsAggregationResponse]
}

private[elasticsearch] final case class SumAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object SumAggregationResponse {
implicit val decoder: JsonDecoder[SumAggregationResponse] = DeriveJsonDecoder.gen[SumAggregationResponse]

}

private[elasticsearch] sealed trait AggregationBucket

private[elasticsearch] final case class TermsAggregationResponse(
@jsonField("doc_count_error_upper_bound")
docErrorCount: Int,
Expand All @@ -252,7 +240,7 @@ private[elasticsearch] final case class TermsAggregationBucket(
subAggregations: Option[Map[String, AggregationResponse]] = None
) extends AggregationBucket

private[elasticsearch] object TermsAggregationBucket {
private[elasticsearch] object TermsAggregationBucket extends JsonDecoderOps {
implicit val decoder: JsonDecoder[TermsAggregationBucket] = Obj.decoder.mapOrFail { case Obj(fields) =>
val allFields = fields.flatMap { case (field, data) =>
field match {
Expand Down Expand Up @@ -291,15 +279,7 @@ private[elasticsearch] object TermsAggregationBucket {
case str if str.contains("sum#") =>
Some(field -> SumAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("terms#") =>
Some(
field -> TermsAggregationResponse(
docErrorCount = objFields("doc_count_error_upper_bound").unsafeAs[Int],
sumOtherDocCount = objFields("sum_other_doc_count").unsafeAs[Int],
buckets = objFields("buckets")
.unsafeAs[Chunk[Json]]
.map(_.unsafeAs[TermsAggregationBucket](TermsAggregationBucket.decoder))
)
)
Some(field -> Some(field -> data.unsafeAs[TermsAggregationResponse](TermsAggregationResponse.decoder)))
case str if str.contains("value_count#") =>
Some(field -> ValueCountAggregationResponse(value = objFields("value").unsafeAs[Int]))
}
Expand Down Expand Up @@ -335,16 +315,8 @@ private[elasticsearch] object TermsAggregationBucket {
(field.split("#")(1), data.asInstanceOf[ValueCountAggregationResponse])
}
}

Right(TermsAggregationBucket.apply(key, docCount, Option(subAggs).filter(_.nonEmpty)))
}

final implicit class JsonDecoderOps(json: Json) {
def unsafeAs[A](implicit decoder: JsonDecoder[A]): A =
(json.as[A]: @unchecked) match {
case Right(decoded) => decoded
}
}
}

private[elasticsearch] final case class ValueCountAggregationResponse(value: Int) extends AggregationResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ final case class FilterAggregationResult private[elasticsearch] (
case None =>
Right(None)
}

}

final case class MaxAggregationResult private[elasticsearch] (value: Double) extends AggregationResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,36 +135,36 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
val aggregationWithSubAggregation =
filterAggregation("aggregation", query).withSubAgg(minAggregation("subAggregation", TestDocument.intField))
val aggregationWithMultipleSubAggregations = filterAggregation("aggregation", query)
.withSubAgg(minAggregation("minSubAggregation", TestDocument.intField))
.withSubAgg(maxAggregation("maxSubAggregation", TestDocument.intField))
.withSubAgg(minAggregation("minSubAggregation", TestDocument.intField))

assert(aggregation)(
equalTo(
Filter(
name = "aggregation",
subAggregations = Chunk.empty,
query = query
query = query,
subAggregations = Chunk.empty
)
)
) &&
assert(aggregationWithSubAggregation)(
equalTo(
Filter(
name = "aggregation",
subAggregations = Chunk(minAggregation("subAggregation", TestDocument.intField)),
query = query
query = query,
subAggregations = Chunk(minAggregation("subAggregation", TestDocument.intField))
)
)
) &&
assert(aggregationWithMultipleSubAggregations)(
equalTo(
Filter(
name = "aggregation",
query = query,
subAggregations = Chunk(
maxAggregation("maxSubAggregation", TestDocument.intField),
minAggregation("minSubAggregation", TestDocument.intField)
),
query = query
minAggregation("minSubAggregation", TestDocument.intField),
maxAggregation("maxSubAggregation", TestDocument.intField)
)
)
)
)
Expand Down Expand Up @@ -694,8 +694,8 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
val aggregationWithSubAggregation =
filterAggregation("aggregation", query).withSubAgg(minAggregation("subAggregation", TestDocument.intField))
val aggregationWithMultipleSubAggregations = filterAggregation("aggregation", query)
.withSubAgg(minAggregation("minSubAggregation", TestDocument.intField))
.withSubAgg(maxAggregation("maxSubAggregation", TestDocument.intField))
.withSubAgg(minAggregation("minSubAggregation", TestDocument.intField))

val expected =
"""
Expand Down

0 comments on commit a6efadf

Please sign in to comment.