Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(dsl): Support Extended stats aggregation #363

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_extended_stats.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
id: elastic_aggregation_extended_stats
title: "Extended stats Aggregation"
---

The `Extended stats` aggregation is a multi-value metrics aggregation that provides statistical information (count, sum, min, max, average, sum od squares, variance and std deviation of a field) over numeric values extracted from the aggregated documents.
The `Extended stats` aggregation is an extended version of the [`Stats`](https://lambdaworks.github.io/zio-elasticsearch/overview/aggregations/elastic_aggregation_stats) aggregation.

In order to use the `Extended stats` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.ExtendedStatsAggregation
import zio.elasticsearch.ElasticAggregation.extendedStatsAggregation
```

You can create a `Extended stats` aggregation using the `extendedStatsAggregation` method this way:
```scala
val aggregation: ExtendedStatsAggregation = extendedStatsAggregation(name = "extendedStatsAggregation", field = "intField")
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Extended stats` aggregation using the `extendedStatsAggregation` method this way:
```scala
// Document.intField must be number value, because of Stats aggregation
val aggregation: ExtendedStatsAggregation = extendedStatsAggregation(name = "extendedStatsAggregation", field = Document.intField)
```

If you want to change the `missing` parameter, you can use `missing` method:
```scala
val aggregationWithMissing: ExtendedStatsAggregation = extendedStatsAggregation(name = "extendedStatsAggregation", field = Document.intField).missing(10.0)
```

If you want to change the `sigma` parameter, you can use `sigma` method:
```scala
val aggregationWithSigma: ExtendedStatsAggregation = extendedStatsAggregation(name = "extendedStatsAggregation", field = Document.intField).sigma(3.0)
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
val multipleAggregations: MultipleAggregations = extendedStatsAggregation(name = "extendedStatsAggregation1", field = Document.intField).withAgg(extendedStatsAggregation(name = "extendedStatsAggregation2", field = Document.doubleField))
```

You can find more information about `Extended stats` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-extendedstats-aggregation.html#search-aggregations-metrics-extendedstats-aggregation).
2 changes: 1 addition & 1 deletion docs/overview/aggregations/elastic_aggregation_stats.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ id: elastic_aggregation_stats
title: "Stats Aggregation"
---

The `Stats` aggregation is a multi-value metrics aggregation that provides statistical information (count, sum, min, max, and average of a field) over numeric values extracted from the aggregated documents.
The `Stats` aggregation is a multi-value metrics aggregation that provides statistical information (count, sum, min, max and average of a field) over numeric values extracted from the aggregated documents.

In order to use the `Stats` aggregation import the following:
```scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,48 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using extended stats aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 100))
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 50))
.refreshTrue
)
aggregation = extendedStatsAggregation(name = "aggregation", field = TestDocument.intField).sigma(3)
aggsRes <-
Executor
.execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation))
.asExtendedStatsAggregation("aggregation")
} yield assert(aggsRes.head.count)(equalTo(2)) &&
assert(aggsRes.head.min)(equalTo(50.0)) &&
assert(aggsRes.head.max)(equalTo(100.0)) &&
assert(aggsRes.head.avg)(equalTo(75.0)) &&
assert(aggsRes.head.sum)(equalTo(150.0)) &&
assert(aggsRes.head.sumOfSquares)(equalTo(12500.0)) &&
assert(aggsRes.head.variance)(equalTo(625.0)) &&
assert(aggsRes.head.variancePopulation)(equalTo(625.0)) &&
assert(aggsRes.head.varianceSampling)(equalTo(1250.0)) &&
assert(aggsRes.head.stdDeviation)(equalTo(25.0)) &&
assert(aggsRes.head.stdDeviationPopulation)(equalTo(25.0)) &&
assert(aggsRes.head.stdDeviationSampling)(equalTo(35.35533905932738)) &&
assert(aggsRes.head.stdDeviationBoundsResult.upper)(equalTo(150.0)) &&
assert(aggsRes.head.stdDeviationBoundsResult.lower)(equalTo(0.0)) &&
assert(aggsRes.head.stdDeviationBoundsResult.upperPopulation)(equalTo(150.0)) &&
assert(aggsRes.head.stdDeviationBoundsResult.lowerPopulation)(equalTo(0.0)) &&
assert(aggsRes.head.stdDeviationBoundsResult.upperSampling)(equalTo(181.06601717798213)) &&
assert(aggsRes.head.stdDeviationBoundsResult.lowerSampling)(equalTo(-31.066017177982133))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using max aggregation") {
val expectedResponse = ("aggregationInt", MaxAggregationResult(value = 20.0))
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,38 @@ object ElasticAggregation {
final def cardinalityAggregation(name: String, field: String): CardinalityAggregation =
Cardinality(name = name, field = field, missing = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] using the specified
* parameters.
*
* @param name
* aggregation name
* @param field
* the type-safe field for which extended stats aggregation will be executed
* @tparam A
* expected number type
* @return
* an instance of [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] that represents extended stats
* aggregation to be performed.
*/
final def extendedStatsAggregation[A: Numeric](name: String, field: Field[_, A]): ExtendedStatsAggregation =
ExtendedStats(name = name, field = field.toString, missing = None, sigma = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] using the specified
* parameters.
*
* @param name
* aggregation name
* @param field
* the field for which extended stats aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] that represents extended stats
* aggregation to be performed.
*/
final def extendedStatsAggregation(name: String, field: String): ExtendedStatsAggregation =
ExtendedStats(name = name, field = field, missing = None, sigma = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MaxAggregation]] using the specified parameters.
*
Expand Down Expand Up @@ -246,6 +278,8 @@ object ElasticAggregation {
* the name of the aggregation
* @param field
* the type-safe field for which stats aggregation will be executed
* @tparam A
* expected number type
* @return
* an instance of [[zio.elasticsearch.aggregation.StatsAggregation]] that represents stats aggregation to be
* performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,49 @@ private[elasticsearch] final case class Cardinality(name: String, field: String,
}
}

sealed trait ExtendedStatsAggregation
extends SingleElasticAggregation
with HasMissing[ExtendedStatsAggregation]
with WithAgg {

/**
* Sets the `sigma` parameter for the [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]]. The`sigma` parameter
* controls how many standard deviations plus/minus from the mean should std_deviation_bounds object display.
*
* @param value
* the value to use for sigma parameter
* @return
* an instance of the [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] enriched with the `sigma`
* parameter.
*/
def sigma(value: Double): ExtendedStatsAggregation
}

private[elasticsearch] final case class ExtendedStats(
name: String,
field: String,
missing: Option[Double],
sigma: Option[Double]
) extends ExtendedStatsAggregation { self =>

def missing(value: Double): ExtendedStatsAggregation =
self.copy(missing = Some(value))

def sigma(value: Double): ExtendedStatsAggregation =
self.copy(sigma = Some(value))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def toJson: Json = {
val missingJson: Json = missing.fold(Obj())(m => Obj("missing" -> m.toJson))

val sigmaJson: Json = sigma.fold(Obj())(m => Obj("sigma" -> m.toJson))

Obj(name -> Obj("extended_stats" -> (Obj("field" -> field.toJson) merge missingJson merge sigmaJson)))
}
}

sealed trait MaxAggregation extends SingleElasticAggregation with HasMissing[MaxAggregation] with WithAgg

private[elasticsearch] final case class Max(name: String, field: String, missing: Option[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,43 @@ object AggregationResponse {
AvgAggregationResult(value)
case CardinalityAggregationResponse(value) =>
CardinalityAggregationResult(value)
case ExtendedStatsAggregationResponse(
count,
min,
max,
avg,
sum,
sumOfSquares,
variance,
variancePopulation,
varianceSampling,
stdDeviation,
stdDeviationPopulation,
stdDeviationSampling,
stdDeviationBoundsResponse
) =>
ExtendedStatsAggregationResult(
count,
min,
max,
avg,
sum,
sumOfSquares,
variance,
variancePopulation,
varianceSampling,
stdDeviation,
stdDeviationPopulation,
stdDeviationSampling,
StdDeviationBoundsResult(
stdDeviationBoundsResponse.upper,
stdDeviationBoundsResponse.lower,
stdDeviationBoundsResponse.upperPopulation,
stdDeviationBoundsResponse.lowerPopulation,
stdDeviationBoundsResponse.upperSampling,
stdDeviationBoundsResponse.lowerSampling
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name paramters since there are a lot of them.

)
case MaxAggregationResponse(value) =>
MaxAggregationResult(value)
case MinAggregationResponse(value) =>
Expand Down Expand Up @@ -77,6 +114,34 @@ private[elasticsearch] object CardinalityAggregationResponse {
DeriveJsonDecoder.gen[CardinalityAggregationResponse]
}

private[elasticsearch] final case class ExtendedStatsAggregationResponse(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Camel case. You can use this @jsonField("...") for specify name of json field.

count: Int,
min: Double,
max: Double,
avg: Double,
sum: Double,
@jsonField("sum_of_squares")
sumOfSquares: Double,
variance: Double,
@jsonField("variance_population")
variancePopulation: Double,
@jsonField("variance_sampling")
varianceSampling: Double,
@jsonField("std_deviation")
stdDeviation: Double,
@jsonField("std_deviation_population")
stdDeviationPopulation: Double,
@jsonField("std_deviation_sampling")
stdDeviationSampling: Double,
@jsonField("std_deviation_bounds")
stdDeviationBoundsResponse: StdDeviationBoundsResponse
) extends AggregationResponse

private[elasticsearch] object ExtendedStatsAggregationResponse {
implicit val decoder: JsonDecoder[ExtendedStatsAggregationResponse] =
DeriveJsonDecoder.gen[ExtendedStatsAggregationResponse]
}

private[elasticsearch] final case class MaxAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object MaxAggregationResponse {
Expand Down Expand Up @@ -116,6 +181,24 @@ private[elasticsearch] object StatsAggregationResponse {
implicit val decoder: JsonDecoder[StatsAggregationResponse] = DeriveJsonDecoder.gen[StatsAggregationResponse]
}

private[elasticsearch] case class StdDeviationBoundsResponse(
upper: Double,
lower: Double,
@jsonField("upper_population")
upperPopulation: Double,
@jsonField("lower_population")
lowerPopulation: Double,
@jsonField("upper_sampling")
upperSampling: Double,
@jsonField("lower_sampling")
lowerSampling: Double
) extends AggregationResponse

private[elasticsearch] object StdDeviationBoundsResponse {
implicit val decoder: JsonDecoder[StdDeviationBoundsResponse] =
DeriveJsonDecoder.gen[StdDeviationBoundsResponse]
}

private[elasticsearch] final case class SumAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object SumAggregationResponse {
Expand Down Expand Up @@ -161,6 +244,26 @@ private[elasticsearch] object TermsAggregationBucket {
Some(field -> AvgAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("cardinality#") =>
Some(field -> CardinalityAggregationResponse(value = objFields("value").unsafeAs[Int]))
case str if str.contains("extended_stats#") =>
Some(
field -> ExtendedStatsAggregationResponse(
count = objFields("count").unsafeAs[Int],
min = objFields("min").unsafeAs[Double],
max = objFields("max").unsafeAs[Double],
avg = objFields("avg").unsafeAs[Double],
sum = objFields("sum").unsafeAs[Double],
sumOfSquares = objFields("sum_of_squares").unsafeAs[Double],
variance = objFields("variance").unsafeAs[Double],
variancePopulation = objFields("variance_population").unsafeAs[Double],
varianceSampling = objFields("variance_sampling").unsafeAs[Double],
stdDeviation = objFields("std_deviation").unsafeAs[Double],
stdDeviationPopulation = objFields("std_deviation_population").unsafeAs[Double],
stdDeviationSampling = objFields("std_deviation_sampling").unsafeAs[Double],
stdDeviationBoundsResponse = objFields("std_deviation_sampling").unsafeAs[StdDeviationBoundsResponse](
StdDeviationBoundsResponse.decoder
)
)
)
case str if str.contains("max#") =>
Some(field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("min#") =>
Expand Down Expand Up @@ -208,6 +311,8 @@ private[elasticsearch] object TermsAggregationBucket {
(field.split("#")(1), data.asInstanceOf[AvgAggregationResponse])
case str if str.contains("cardinality#") =>
(field.split("#")(1), data.asInstanceOf[CardinalityAggregationResponse])
case str if str.contains("extended_stats#") =>
(field.split("#")(1), data.asInstanceOf[ExtendedStatsAggregationResponse])
case str if str.contains("max#") =>
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
case str if str.contains("min#") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
WeightedAvgAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("avg#") =>
AvgAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("cardinality#") =>
CardinalityAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("extended_stats#") =>
ExtendedStatsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("max#") =>
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("min#") =>
Expand All @@ -88,8 +92,6 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
StatsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("sum#") =>
SumAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("cardinality#") =>
CardinalityAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("terms#") =>
TermsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("value_count#") =>
Expand Down
12 changes: 12 additions & 0 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w
def asCardinalityAggregation(name: String): RIO[R, Option[CardinalityAggregationResult]] =
aggregationAs[CardinalityAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.ExtendedStatsAggregationResult]].
*/
def asExtendedStatsAggregation(name: String): RIO[R, Option[ExtendedStatsAggregationResult]] =
aggregationAs[ExtendedStatsAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Loading