From a6ad9e418e605894c5e96e5601c3e1b8ded4166a Mon Sep 17 00:00:00 2001 From: Dominic Kim Date: Mon, 18 Jan 2021 17:14:41 +0900 Subject: [PATCH] [New Scheduler] Add duration checker (#4984) * Add a duration checker for Elasticsearch. * Add configurations for the ElasticSearchDurationCheckerTests class * Use a private helper function to execute queries. * Add an Ansible variable for the duration checker. * Apply scalaFmt * Include test cases for duration checker to system tests. * Setup ElasticSearch for system tests. * Increase patience config to wait for response longer. * Add postfixOps --- ansible/group_vars/all | 3 + .../apache/openwhisk/core/WhiskConfig.scala | 2 + .../ElasticSearchActivationStore.scala | 44 +- .../queue/ElasticSearchDurationChecker.scala | 245 +++++++++++ .../scheduler/queue/NoopDurationChecker.scala | 47 ++ tests/build.gradle | 4 + tests/src/test/resources/application.conf.j2 | 16 +- ...cSearchDurationCheckResultFormatTest.scala | 124 ++++++ .../ElasticSearchDurationCheckerTests.scala | 406 ++++++++++++++++++ tools/travis/setupPrereq.sh | 1 + tools/travis/setupSystem.sh | 2 +- 11 files changed, 875 insertions(+), 19 deletions(-) create mode 100644 core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala create mode 100644 core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 342ad1d7938..564a0212b67 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -421,3 +421,6 @@ metrics: port: "{{ metrics_kamon_statsd_port | default('8125') }}" user_events: "{{ user_events_enabled | default(false) | lower }}" + +durationChecker: + timeWindow: "{{ duration_checker_time_window | default('1 d') }}" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index 7c0ec2b3998..84e75e8240d 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -272,6 +272,8 @@ object ConfigKeys { val metrics = "whisk.metrics" val featureFlags = "whisk.feature-flags" + val durationChecker = s"whisk.duration-checker" + val whiskConfig = "whisk.config" val sharedPackageExecuteOnly = s"whisk.shared-packages-execute-only" val swaggerUi = "whisk.swagger-ui" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala index 5d110a7328a..8f257126b45 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala @@ -57,27 +57,17 @@ case class ElasticSearchActivationStoreConfig(protocol: String, class ElasticSearchActivationStore( httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None, - elasticSearchConfig: ElasticSearchActivationStoreConfig = - loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore), + elasticSearchConfig: ElasticSearchActivationStoreConfig, useBatching: Boolean = false)(implicit actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) extends ActivationStore { import com.sksamuel.elastic4s.http.ElasticDsl._ + import ElasticSearchActivationStore.{generateIndex, httpClientCallback} private implicit val executionContextExecutor: ExecutionContextExecutor = actorSystem.dispatcher - private val httpClientCallback = new HttpClientConfigCallback { - override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { - val provider = new BasicCredentialsProvider - provider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password)) - httpClientBuilder.setDefaultCredentialsProvider(provider) - } - } - private val client = ElasticClient( ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"), @@ -407,10 +397,6 @@ class ElasticSearchActivationStore( activationId.toString.split("/")(0) } - private def generateIndex(namespace: String): String = { - elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase - } - private def generateRangeQuery(key: String, since: Option[Instant], upto: Option[Instant]): RangeQuery = { rangeQuery(key) .gte(since.map(_.toEpochMilli).getOrElse(minStart)) @@ -418,7 +404,31 @@ class ElasticSearchActivationStore( } } +object ElasticSearchActivationStore { + val elasticSearchConfig: ElasticSearchActivationStoreConfig = + loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore) + + val httpClientCallback = new HttpClientConfigCallback { + override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { + val provider = new BasicCredentialsProvider + provider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password)) + httpClientBuilder.setDefaultCredentialsProvider(provider) + } + } + + def generateIndex(namespace: String): String = { + elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase + } +} + object ElasticSearchActivationStoreProvider extends ActivationStoreProvider { + import ElasticSearchActivationStore.elasticSearchConfig + override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) = - new ElasticSearchActivationStore(useBatching = true)(actorSystem, actorMaterializer, logging) + new ElasticSearchActivationStore(elasticSearchConfig = elasticSearchConfig, useBatching = true)( + actorSystem, + actorMaterializer, + logging) } diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala new file mode 100644 index 00000000000..88e1f2ff177 --- /dev/null +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/ElasticSearchDurationChecker.scala @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openwhisk.core.scheduler.queue + +import akka.actor.ActorSystem +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback} +import com.sksamuel.elastic4s.searches.queries.Query +import com.sksamuel.elastic4s.{ElasticDate, ElasticDateMath, Seconds} +import org.apache.openwhisk.common.Logging +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.core.entity.WhiskActionMetaData +import org.apache.openwhisk.spi.Spi +import pureconfig.loadConfigOrThrow +import spray.json.{JsArray, JsNumber, JsValue, RootJsonFormat, deserializationError, _} + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.language.implicitConversions +import scala.util.{Failure, Try} +import pureconfig.generic.auto._ + +trait DurationChecker { + def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)( + callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] +} + +case class DurationCheckResult(averageDuration: Option[Double], hitCount: Long, took: Long) + +object ElasticSearchDurationChecker { + val FilterAggregationName = "filterAggregation" + val AverageAggregationName = "averageAggregation" + + implicit val serde = new ElasticSearchDurationCheckResultFormat() + + def getFromDate(timeWindow: FiniteDuration): ElasticDateMath = + ElasticDate.now minus (timeWindow.toSeconds.toInt, Seconds) +} + +class ElasticSearchDurationChecker(private val client: ElasticClient, val timeWindow: FiniteDuration)( + implicit val actorSystem: ActorSystem, + implicit val logging: Logging) + extends DurationChecker { + import ElasticSearchDurationChecker._ + import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore.generateIndex + + implicit val ec = actorSystem.getDispatcher + + override def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)( + callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] = { + val index = generateIndex(invocationNamespace) + val fqn = actionMetaData.fullyQualifiedName(false) + val fromDate = getFromDate(timeWindow) + + logging.info(this, s"check average duration for $fqn in $index for last $timeWindow") + + actionMetaData.binding match { + case Some(binding) => + val boolQueryResult = List( + matchQuery("annotations.binding", s"$binding"), + matchQuery("name", actionMetaData.name), + rangeQuery("@timestamp").gte(fromDate)) + + executeQuery(boolQueryResult, callback, index) + + case None => + val queryResult = List(matchQuery("path.keyword", fqn.toString), rangeQuery("@timestamp").gte(fromDate)) + + executeQuery(queryResult, callback, index) + } + } + + private def executeQuery(boolQueryResult: List[Query], + callback: DurationCheckResult => DurationCheckResult, + index: String) = { + client + .execute { + (search(index) query { + boolQuery must { + boolQueryResult + } + } aggregations + avgAgg(AverageAggregationName, "duration")).size(0) + } + .map { res => + logging.debug(this, s"ElasticSearch query results: $res") + Try(serde.read(res.body.getOrElse("").parseJson)) + } + .flatMap(Future.fromTry) + .map(callback(_)) + .andThen { + case Failure(t) => + logging.error(this, s"failed to check the average duration: ${t}") + } + } +} + +object ElasticSearchDurationCheckerProvider extends DurationCheckerProvider { + import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore._ + + override def instance(actorSystem: ActorSystem, log: Logging): ElasticSearchDurationChecker = { + implicit val as: ActorSystem = actorSystem + implicit val logging: Logging = log + + val elasticClient = + ElasticClient( + ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"), + NoOpRequestConfigCallback, + httpClientCallback) + + new ElasticSearchDurationChecker(elasticClient, durationCheckerConfig.timeWindow) + } +} + +trait DurationCheckerProvider extends Spi { + + val durationCheckerConfig: DurationCheckerConfig = + loadConfigOrThrow[DurationCheckerConfig](ConfigKeys.durationChecker) + + def instance(actorSystem: ActorSystem, logging: Logging): DurationChecker +} + +class ElasticSearchDurationCheckResultFormat extends RootJsonFormat[DurationCheckResult] { + import ElasticSearchDurationChecker._ + import spray.json.DefaultJsonProtocol._ + + /** + * Expected sample data + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 5, + "total": 5 + }, + "aggregations": { + "agg": { + "value": 14 + } + }, + "hits": { + "hits": [], + "max_score": 0, + "total": 3 + }, + "timed_out": false, + "took": 0 + } + */ + /** + * Expected sample data + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 5, + "total": 5 + }, + "aggregations": { + "pathAggregation": { + "avg_duration": { + "value": 13 + }, + "doc_count": 3 + } + }, + "hits": { + "hits": [], + "max_score": 0, + "total": 6 + }, + "timed_out": false, + "took": 0 + } + */ + implicit def read(json: JsValue) = { + val jsObject = json.asJsObject + + jsObject.getFields("aggregations", "took", "hits") match { + case Seq(aggregations, took, hits) => + val hitCount = hits.asJsObject.getFields("total").headOption + val filterAggregations = aggregations.asJsObject.getFields(FilterAggregationName) + val averageAggregations = aggregations.asJsObject.getFields(AverageAggregationName) + + (filterAggregations, averageAggregations, hitCount) match { + case (filterAggregations, _, Some(count)) if filterAggregations.nonEmpty => + val averageDuration = + filterAggregations.headOption.flatMap( + _.asJsObject + .getFields(AverageAggregationName) + .headOption + .flatMap(_.asJsObject.getFields("value").headOption)) + + averageDuration match { + case Some(JsNull) => + DurationCheckResult(None, count.convertTo[Long], took.convertTo[Long]) + + case Some(duration) => + DurationCheckResult(Some(duration.convertTo[Double]), count.convertTo[Long], took.convertTo[Long]) + + case _ => deserializationError("Cannot deserialize ProductItem: invalid input. Raw input: ") + } + + case (_, averageAggregations, Some(count)) if averageAggregations.nonEmpty => + val averageDuration = averageAggregations.headOption.flatMap(_.asJsObject.getFields("value").headOption) + + averageDuration match { + case Some(JsNull) => + DurationCheckResult(None, count.convertTo[Long], took.convertTo[Long]) + + case Some(duration) => + DurationCheckResult(Some(duration.convertTo[Double]), count.convertTo[Long], took.convertTo[Long]) + + case t => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $t") + } + + case t => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $t") + } + + case other => deserializationError(s"Cannot deserialize DurationCheckResult: invalid input. Raw input: $other") + } + + } + + // This method would not be used. + override def write(obj: DurationCheckResult): JsValue = { + JsArray(JsNumber(obj.averageDuration.get), JsNumber(obj.hitCount), JsNumber(obj.took)) + } +} + +case class DurationCheckerConfig(timeWindow: FiniteDuration) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala new file mode 100644 index 00000000000..441bfed53c4 --- /dev/null +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/NoopDurationChecker.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.queue + +import akka.actor.ActorSystem +import org.apache.openwhisk.common.Logging +import org.apache.openwhisk.core.entity.WhiskActionMetaData + +import scala.concurrent.Future + +object NoopDurationCheckerProvider extends DurationCheckerProvider { + override def instance(actorSystem: ActorSystem, log: Logging): NoopDurationChecker = { + implicit val as: ActorSystem = actorSystem + implicit val logging: Logging = log + new NoopDurationChecker() + } +} + +object NoopDurationChecker { + implicit val serde = new ElasticSearchDurationCheckResultFormat() +} + +class NoopDurationChecker extends DurationChecker { + import scala.concurrent.ExecutionContext.Implicits.global + + override def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)( + callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] = { + Future { + DurationCheckResult(Option.apply(0), 0, 0) + } + } +} diff --git a/tests/build.gradle b/tests/build.gradle index f2fdd2ac7ad..57c3bd733c1 100644 --- a/tests/build.gradle +++ b/tests/build.gradle @@ -40,6 +40,7 @@ def leanExcludes = [ def projectsWithCoverage = [ ':common:scala', ':core:controller', + ':core:scheduler', ':core:invoker', ':tools:admin', ':core:cosmosdb:cache-invalidator' @@ -52,6 +53,7 @@ def systemIncludes = [ "org/apache/openwhisk/core/apigw/actions/test/**", "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*", "org/apache/openwhisk/core/controller/test/*ControllerApiTests*", + "org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*", "apigw/healthtests/**", "ha/**", "services/**", @@ -70,6 +72,7 @@ ext.testSets = [ "org/apache/openwhisk/standalone/**", "org/apache/openwhisk/core/cli/test/**", "org/apache/openwhisk/core/limits/**", + "org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*", "**/*CacheConcurrencyTests*", "**/*ControllerApiTests*", "org/apache/openwhisk/testEntities/**", @@ -221,6 +224,7 @@ dependencies { compile project(':common:scala') compile project(':core:controller') + compile project(':core:scheduler') compile project(':core:invoker') compile project(':core:cosmosdb:cache-invalidator') compile project(':core:monitoring:user-events') diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2 index 53d1cc98b48..90802bc625d 100644 --- a/tests/src/test/resources/application.conf.j2 +++ b/tests/src/test/resources/application.conf.j2 @@ -97,13 +97,27 @@ whisk { parameter-storage { current = "off" } - + elasticsearch { docker-image = "{{ elasticsearch.docker_image | default('docker.elastic.co/elasticsearch/elasticsearch:' ~ elasticsearch.version ) }}" } helm.release = "release" runtime.delete.timeout = "30 seconds" + + duration-checker { + time-window = "{{ durationChecker.timeWindow }}" + } + + activation-store { + elasticsearch { + protocol = "{{ db.elasticsearch.protocol }}" + hosts = "{{ elasticsearch_connect_string }}" + index-pattern = "{{ db.elasticsearch.index_pattern }}" + username = "{{ db.elasticsearch.auth.admin.username }}" + password = "{{ db.elasticsearch.auth.admin.password }}" + } + } } #test-only overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala new file mode 100644 index 00000000000..fe28e73ca48 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckResultFormatTest.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.queue.test + +import org.apache.openwhisk.core.scheduler.queue.{DurationCheckResult, ElasticSearchDurationCheckResultFormat} +import org.junit.runner.RunWith +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.junit.JUnitRunner +import spray.json._ + +@RunWith(classOf[JUnitRunner]) +class ElasticSearchDurationCheckResultFormatTest extends FlatSpec with Matchers with ScalaFutures { + behavior of "ElasticSearchDurationCheckResultFormatTest" + + val serde = new ElasticSearchDurationCheckResultFormat() + + it should "serialize the data correctly" in { + val normalData = """{ + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 5, + "total": 5 + }, + "aggregations": { + "filterAggregation": { + "averageAggregation": { + "value": 14 + }, + "doc_count": 3 + } + }, + "hits": { + "hits": [], + "max_score": 0, + "total": 3 + }, + "timed_out": false, + "took": 2 + }""" + + val bindingData = """{ + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 5, + "total": 5 + }, + "aggregations": { + "averageAggregation": { + "value": 12 + } + }, + "hits": { + "hits": [], + "max_score": 0, + "total": 2 + }, + "timed_out": false, + "took": 0 + }""" + + val expected1 = DurationCheckResult(Some(14), 3, 2) + val expected2 = DurationCheckResult(Some(12), 2, 0) + val result1 = serde.read(normalData.parseJson) + val result2 = serde.read(bindingData.parseJson) + + result1 shouldBe expected1 + result2 shouldBe expected2 + } + + // Since the write method is not being used, this test is meaningless but added just for the duality. + it should "deserialize the data correctly" in { + val data = DurationCheckResult(Some(14), 3, 2) + val expected = + """[14, 3, 2] + | + |""".stripMargin + val result = serde.write(data) + + result shouldBe expected.parseJson + } + + it should "throw an exception when data is not in the expected format" in { + val malformedData = """{ + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 5, + "total": 5 + }, + "averageAggregation": { + "value": 14 + }, + "hits": { + "hits": [], + "max_score": 0, + "total": 3 + }, + "timed_out": false, + "took": 2 + }""" + + assertThrows[DeserializationException] { + serde.read(malformedData.parseJson) + } + } +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala new file mode 100644 index 00000000000..867c0cdf573 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheckerTests.scala @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.queue.test + +import akka.stream.ActorMaterializer +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback} +import common._ +import common.rest.WskRestOperations +import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} +import org.apache.http.impl.client.BasicCredentialsProvider +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder +import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStoreConfig +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.entity.test.ExecHelpers +import org.apache.openwhisk.core.scheduler.queue.{DurationCheckResult, ElasticSearchDurationChecker} +import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback +import org.junit.runner.RunWith +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FlatSpec, Matchers} +import org.scalatestplus.junit.JUnitRunner +import pureconfig.generic.auto._ +import pureconfig.loadConfigOrThrow +import java.time.Instant +import java.time.temporal.ChronoUnit +import scala.language.postfixOps + +import scala.collection.mutable +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration._ + +/** + * This test will try to fetch the average duration from activation documents. This class guarantee the minimum compatibility. + * In case there are any updates in the activation document, it will catch the difference between the expected and the real. + */ +@RunWith(classOf[JUnitRunner]) +class ElasticSearchDurationCheckerTests + extends FlatSpec + with Matchers + with ScalaFutures + with WskTestHelpers + with StreamLogging + with ExecHelpers + with BeforeAndAfterAll + with BeforeAndAfter { + + private val namespace = "durationCheckNamespace" + val wskadmin: RunCliCmd = new RunCliCmd { + override def baseCommand: mutable.Buffer[String] = WskAdmin.baseCommand + } + implicit val mt: ActorMaterializer = ActorMaterializer() + implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher + implicit val timeoutConfig: PatienceConfig = PatienceConfig(5 seconds, 15 milliseconds) + + private val auth = BasicAuthenticationAuthKey() + implicit val wskprops: WskProps = WskProps(authKey = auth.compact, namespace = namespace) + implicit val transid: TransactionId = TransactionId.testing + + val wsk = new WskRestOperations + val elasticSearchConfig: ElasticSearchActivationStoreConfig = + loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore) + + val testIndex: String = generateIndex(namespace) + val concurrency = 1 + val actionMem: ByteSize = 256.MB + val defaultDurationCheckWindow = 5.seconds + + private val httpClientCallback = new HttpClientConfigCallback { + override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { + val provider = new BasicCredentialsProvider + provider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password)) + httpClientBuilder.setDefaultCredentialsProvider(provider) + } + } + + private val client = + ElasticClient( + ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"), + NoOpRequestConfigCallback, + httpClientCallback) + + private val elasticSearchDurationChecker = new ElasticSearchDurationChecker(client, defaultDurationCheckWindow) + + override def beforeAll(): Unit = { + val res = wskadmin.cli(Seq("user", "create", namespace, "-u", auth.compact)) + res.exitCode shouldBe 0 + + println(s"namespace: $namespace, auth: ${auth.compact}") + super.beforeAll() + } + + override def afterAll(): Unit = { + client.execute { + deleteIndex(testIndex) + } + wskadmin.cli(Seq("user", "delete", namespace)) + logLines.foreach(println) + super.afterAll() + } + + behavior of "ElasticSearchDurationChecker" + + it should "fetch the proper duration from ES" in withAssetCleaner(wskprops) { (_, assetHelper) => + val actionName = "avgDuration" + val dummyActionName = "dummyAction" + + var totalDuration = 0L + val count = 3 + + assetHelper.withCleaner(wsk.action, actionName) { (action, _) => + action.create(actionName, Some(TestUtils.getTestActionFilename("hello.js"))) + } + + assetHelper.withCleaner(wsk.action, dummyActionName) { (action, _) => + action.create(dummyActionName, Some(TestUtils.getTestActionFilename("hello.js"))) + } + + val actionMetaData = + WhiskActionMetaData( + EntityPath(namespace), + EntityName(actionName), + js10MetaData(Some("jsMain"), binary = false), + limits = actionLimits(actionMem, concurrency)) + + val run1 = wsk.action.invoke(actionName, Map()) + withActivation(wsk.activation, run1) { activation => + activation.response.status shouldBe "success" + } + // wait for 1s + Thread.sleep(1000) + + val start = Instant.now() + val run2 = wsk.action.invoke(dummyActionName, Map()) + withActivation(wsk.activation, run2) { activation => + activation.response.status shouldBe "success" + } + + 1 to count foreach { _ => + val run = wsk.action.invoke(actionName, Map()) + withActivation(wsk.activation, run) { activation => + activation.response.status shouldBe "success" + totalDuration += activation.duration + } + } + val end = Instant.now() + val timeWindow = math.ceil(ChronoUnit.MILLIS.between(start, end) / 1000.0).seconds + val durationChecker = new ElasticSearchDurationChecker(client, timeWindow) + + // it should aggregate the recent activations in 5 seconds + val durationCheckResult: DurationCheckResult = + durationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue + + /** + * Expected sample data + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 5, + "total": 5 + }, + "aggregations": { + "filterAggregation": { + "averageAggregation": { + "value": 14 + }, + "doc_count": 3 + } + }, + "hits": { + "hits": [], + "max_score": 0, + "total": 3 + }, + "timed_out": false, + "took": 2 + } + */ + truncateDouble(durationCheckResult.averageDuration.getOrElse(0.0)) shouldBe truncateDouble( + totalDuration.toDouble / count.toDouble) + durationCheckResult.hitCount shouldBe count + } + + it should "fetch proper average duration for a package action" in withAssetCleaner(wskprops) { (_, assetHelper) => + val packageName = "samplePackage" + val actionName = "packageAction" + val fqn = s"$namespace/$packageName/$actionName" + + val actionMetaData = + WhiskActionMetaData( + EntityPath(s"$namespace/$packageName"), + EntityName(actionName), + js10MetaData(Some("jsMain"), binary = false), + limits = actionLimits(actionMem, concurrency)) + + var totalDuration = 0L + val count = 3 + + assetHelper.withCleaner(wsk.pkg, packageName) { (pkg, _) => + pkg.create(packageName) + } + + assetHelper.withCleaner(wsk.action, fqn) { (action, _) => + action.create(fqn, Some(TestUtils.getTestActionFilename("hello.js"))) + } + + 1 to count foreach { _ => + val run = wsk.action.invoke(fqn, Map()) + withActivation(wsk.activation, run) { activation => + activation.response.status shouldBe "success" + } + } + // wait for 1s + Thread.sleep(1000) + + val start = Instant.now() + 1 to count foreach { _ => + val run = wsk.action.invoke(fqn, Map()) + withActivation(wsk.activation, run) { activation => + activation.response.status shouldBe "success" + totalDuration += activation.duration + } + } + val end = Instant.now() + val timeWindow = math.ceil(ChronoUnit.MILLIS.between(start, end) / 1000.0).seconds + val durationChecker = new ElasticSearchDurationChecker(client, timeWindow) + val durationCheckResult: DurationCheckResult = + durationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue + + /** + * Expected sample data + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 5, + "total": 5 + }, + "aggregations": { + "filterAggregation": { + "averageAggregation": { + "value": 13 + }, + "doc_count": 3 + } + }, + "hits": { + "hits": [], + "max_score": 0, + "total": 6 + }, + "timed_out": false, + "took": 0 + } + */ + truncateDouble(durationCheckResult.averageDuration.getOrElse(0.0)) shouldBe truncateDouble( + totalDuration.toDouble / count.toDouble) + durationCheckResult.hitCount shouldBe count + } + + it should "fetch the duration for binding action" in withAssetCleaner(wskprops) { (_, assetHelper) => + val packageName = "testPackage" + val actionName = "testAction" + val originalFQN = s"$namespace/$packageName/$actionName" + val boundPackageName = "boundPackage" + + val actionMetaData = + WhiskActionMetaData( + EntityPath(s"$namespace/$boundPackageName"), + EntityName(actionName), + js10MetaData(Some("jsMain"), binary = false), + limits = actionLimits(actionMem, concurrency), + binding = Some(EntityPath(s"$namespace/$packageName"))) + + var totalDuration = 0L + val count = 3 + + assetHelper.withCleaner(wsk.pkg, packageName) { (pkg, _) => + pkg.create(packageName, shared = Some(true)) + } + + assetHelper.withCleaner(wsk.action, originalFQN) { (action, _) => + action.create(originalFQN, Some(TestUtils.getTestActionFilename("hello.js"))) + } + + assetHelper.withCleaner(wsk.pkg, boundPackageName) { (pkg, _) => + pkg.bind(packageName, boundPackageName) + } + + 1 to count foreach { _ => + val run = wsk.action.invoke(s"$boundPackageName/$actionName", Map()) + withActivation(wsk.activation, run) { activation => + activation.response.status shouldBe "success" + } + } + // wait for 1s + Thread.sleep(1000) + + val start = Instant.now() + 1 to count foreach { _ => + val run = wsk.action.invoke(s"$boundPackageName/$actionName", Map()) + withActivation(wsk.activation, run) { activation => + activation.response.status shouldBe "success" + totalDuration += activation.duration + } + } + val end = Instant.now() + val timeWindow = math.ceil(ChronoUnit.MILLIS.between(start, end) / 1000.0).seconds + val durationChecker = new ElasticSearchDurationChecker(client, timeWindow) + val durationCheckResult: DurationCheckResult = + durationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue + + /** + * Expected sample data + { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 5, + "total": 5 + }, + "aggregations": { + "averageAggregation": { + "value": 14 + } + }, + "hits": { + "hits": [], + "max_score": 0, + "total": 3 + }, + "timed_out": false, + "took": 0 + } + */ + truncateDouble(durationCheckResult.averageDuration.getOrElse(0.0)) shouldBe truncateDouble( + totalDuration.toDouble / count.toDouble) + durationCheckResult.hitCount shouldBe count + } + + it should "return nothing properly if there is no activation yet" in withAssetCleaner(wskprops) { (_, _) => + val actionName = "noneAction" + + val actionMetaData = + WhiskActionMetaData( + EntityPath(s"$namespace"), + EntityName(actionName), + js10MetaData(Some("jsMain"), binary = false), + limits = actionLimits(actionMem, concurrency)) + + val durationCheckResult: DurationCheckResult = + elasticSearchDurationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue + + durationCheckResult.averageDuration shouldBe None + durationCheckResult.hitCount shouldBe 0 + } + + it should "return nothing properly if there is no activation for binding action yet" in withAssetCleaner(wskprops) { + (_, _) => + val packageName = "testPackage2" + val actionName = "noneAction" + val boundPackageName = "boundPackage2" + + val actionMetaData = + WhiskActionMetaData( + EntityPath(s"$namespace/$boundPackageName"), + EntityName(actionName), + js10MetaData(Some("jsMain"), false), + limits = actionLimits(actionMem, concurrency), + binding = Some(EntityPath(s"${namespace}/${packageName}"))) + + val durationCheckResult: DurationCheckResult = + elasticSearchDurationChecker.checkAverageDuration(namespace, actionMetaData)(res => res).futureValue + + durationCheckResult.averageDuration shouldBe None + durationCheckResult.hitCount shouldBe 0 + } + + private def truncateDouble(number: Double, scale: Int = 2) = { + BigDecimal(number).setScale(scale, BigDecimal.RoundingMode.HALF_UP).toDouble + } + + private def generateIndex(namespace: String): String = { + elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase + } +} diff --git a/tools/travis/setupPrereq.sh b/tools/travis/setupPrereq.sh index f820c283231..7832938cdca 100755 --- a/tools/travis/setupPrereq.sh +++ b/tools/travis/setupPrereq.sh @@ -31,6 +31,7 @@ $ANSIBLE_CMD prereq.yml $ANSIBLE_CMD couchdb.yml $ANSIBLE_CMD initdb.yml $ANSIBLE_CMD wipe.yml +$ANSIBLE_CMD elasticsearch.yml $ANSIBLE_CMD properties.yml -e manifest_file="$RUNTIMES_MANIFEST" echo "Time taken for ${0##*/} is $SECONDS secs" diff --git a/tools/travis/setupSystem.sh b/tools/travis/setupSystem.sh index 0750dddcf1c..9f99cf7b013 100755 --- a/tools/travis/setupSystem.sh +++ b/tools/travis/setupSystem.sh @@ -26,7 +26,7 @@ RUNTIMES_MANIFEST=${1:-"/ansible/files/runtimes.json"} cd $ROOTDIR/ansible -$ANSIBLE_CMD openwhisk.yml -e manifest_file="$RUNTIMES_MANIFEST" +$ANSIBLE_CMD openwhisk.yml -e manifest_file="$RUNTIMES_MANIFEST" -e db_activation_backend=ElasticSearch $ANSIBLE_CMD apigateway.yml $ANSIBLE_CMD routemgmt.yml