diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 84b6b0222ae..b6bc4d4225b 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -139,6 +139,8 @@ registry: confdir: "{{ config_root_dir }}/registry" kafka: + topicsPrefix: "{{ kafka_topics_prefix | default('') }}" + topicsUserEventPrefix: "{{ kafka_topics_userEvent_prefix | default(kafka_topics_prefix) | default('') }}" ssl: client_authentication: required keystore: diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 64724c41d54..575aaf4f683 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -176,6 +176,10 @@ "{{ kafka_topics_health_retentionMS | default() }}" "CONFIG_whisk_kafka_topics_health_segmentBytes": "{{ kafka_topics_health_segmentBytes | default() }}" + "CONFIG_whisk_kafka_topics_prefix": + "{{ kafka.topicsPrefix }}" + "CONFIG_whisk_kafka_topics_userEvent_prefix": + "{{ kafka.topicsUserEventPrefix }}" "CONFIG_whisk_kafka_common_securityProtocol": "{{ kafka.protocol }}" "CONFIG_whisk_kafka_common_sslTruststoreLocation": diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index ea4ce48114b..f520515795c 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -216,6 +216,8 @@ "CONFIG_whisk_kafka_topics_invoker_retentionBytes": "{{ kafka_topics_invoker_retentionBytes | default() }}" "CONFIG_whisk_kafka_topics_invoker_retentionMs": "{{ kafka_topics_invoker_retentionMS | default() }}" "CONFIG_whisk_kakfa_topics_invoker_segmentBytes": "{{ kafka_topics_invoker_segmentBytes | default() }}" + "CONFIG_whisk_kafka_topics_prefix": "{{ kafka.topicsPrefix }}" + "CONFIG_whisk_kafka_topics_userEvent_prefix": "{{ kafka.topicsUserEventPrefix }}" "CONFIG_whisk_kafka_common_securityProtocol": "{{ kafka.protocol }}" "CONFIG_whisk_kafka_common_sslTruststoreLocation": "/conf/{{ kafka.ssl.keystore.name }}" "CONFIG_whisk_kafka_common_sslTruststorePassword": "{{ kafka.ssl.keystore.password }}" diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index a894360cdab..230e16d4308 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -193,6 +193,10 @@ whisk { retention-bytes = 1073741824 retention-ms = 3600000 } + prefix = "" + user-event { + prefix = "" + } } metrics { diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala index 5417cfd3b17..f3cdf0009fa 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala @@ -28,9 +28,11 @@ object UserEvents { val enabled = loadConfigOrThrow[UserEventsConfig](ConfigKeys.userEvents).enabled + val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix) + def send(producer: MessageProducer, em: => EventMessage) = { if (enabled) { - producer.send("events", em) + producer.send(userEventTopicPrefix + "events", em) } } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index 19ad39de3c9..1058e533203 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -212,6 +212,8 @@ object ConfigKeys { val kafkaProducer = s"$kafka.producer" val kafkaConsumer = s"$kafka.consumer" val kafkaTopics = s"$kafka.topics" + val kafkaTopicsPrefix = s"$kafkaTopics.prefix" + val kafkaTopicsUserEventPrefix = s"$kafkaTopics.user-event.prefix" val memory = "whisk.memory" val timeLimit = "whisk.time-limit" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala index eb9cce9105d..b798d884a69 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala @@ -19,9 +19,10 @@ package org.apache.openwhisk.core.ack import org.apache.kafka.common.errors.RecordTooLargeException import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.ConfigKeys import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer} import org.apache.openwhisk.core.entity._ - +import pureconfig._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -29,6 +30,9 @@ class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventS implicit logging: Logging, ec: ExecutionContext) extends ActiveAck { + + private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) + override def apply(tid: TransactionId, activationResult: WhiskActivation, blockingInvoke: Boolean, @@ -38,7 +42,7 @@ class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventS implicit val transid: TransactionId = tid def send(msg: AcknowledegmentMessage, recovery: Boolean = false) = { - producer.send(topic = "completed" + controllerInstance.asString, msg).andThen { + producer.send(topic = topicPrefix + "completed" + controllerInstance.asString, msg).andThen { case Success(_) => val info = if (recovery) s"recovery ${msg.messageType}" else msg.messageType logging.info(this, s"posted $info of activation ${acknowledegment.activationId}") diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala index c8c432b9ad8..c5b5e019b3a 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala @@ -24,12 +24,11 @@ import scala.concurrent.duration.DurationInt import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.actor.ActorSystem import akka.actor.Props import spray.json._ import org.apache.openwhisk.common.Logging -import org.apache.openwhisk.core.WhiskConfig +import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.core.connector.Message import org.apache.openwhisk.core.connector.MessageFeed import org.apache.openwhisk.core.connector.MessagingProvider @@ -41,6 +40,7 @@ import org.apache.openwhisk.core.entity.WhiskPackage import org.apache.openwhisk.core.entity.WhiskRule import org.apache.openwhisk.core.entity.WhiskTrigger import org.apache.openwhisk.spi.SpiLoader +import pureconfig._ case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message { override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint @@ -101,5 +101,6 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: } object RemoteCacheInvalidation { - val cacheInvalidationTopic = "cacheInvalidation" + val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) + val cacheInvalidationTopic = topicPrefix + "cacheInvalidation" } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala index 935219685ed..a8ead9da2dd 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala @@ -32,7 +32,7 @@ import spray.json.DefaultJsonProtocol._ import spray.json._ import org.apache.openwhisk.common.Https.HttpsConfig import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId} -import org.apache.openwhisk.core.WhiskConfig +import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.core.connector.MessagingProvider import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation} @@ -187,6 +187,9 @@ object Controller { protected val interface = loadConfigOrThrow[String]("whisk.controller.interface") protected val readinessThreshold = loadConfig[Double]("whisk.controller.readiness-fraction") + val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) + val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix) + // requiredProperties is a Map whose keys define properties that must be bound to // a value, and whose values are default values. A null value in the Map means there is // no default value specified, so it must appear in the properties file @@ -263,10 +266,10 @@ object Controller { val msgProvider = SpiLoader.get[MessagingProvider] Seq( - ("completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)), - ("health", "health", None), - ("cacheInvalidation", "cache-invalidation", None), - ("events", "events", None)).foreach { + (topicPrefix + "completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)), + (topicPrefix + "health", "health", None), + (topicPrefix + "cacheInvalidation", "cache-invalidation", None), + (userEventTopicPrefix + "events", "events", None)).foreach { case (topic, topicConfigurationKey, maxMessageBytes) => if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) { abort(s"failure during msgProvider.ensureTopic for topic $topic") diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala index 3d0cc2f287e..78202071fa1 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala @@ -30,6 +30,7 @@ import pureconfig.generic.auto._ import org.apache.openwhisk.common.LoggingMarkers._ import org.apache.openwhisk.common._ import org.apache.openwhisk.core.connector._ +import org.apache.openwhisk.core.controller.Controller import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} @@ -177,7 +178,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig, invoker: InvokerInstanceId): Future[RecordMetadata] = { implicit val transid: TransactionId = msg.transid - val topic = s"invoker${invoker.toInt}" + val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}" MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START) val start = transid.started( diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala index 20225127c3c..ee839b3c62a 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala @@ -23,8 +23,10 @@ import akka.stream.ActorMaterializer import org.apache.openwhisk.common.{Logging, TransactionId} import org.apache.openwhisk.core.WhiskConfig import org.apache.openwhisk.core.connector._ +import org.apache.openwhisk.core.controller.Controller import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.spi.Spi + import scala.concurrent.duration._ /** @@ -94,7 +96,7 @@ trait LoadBalancerProvider extends Spi { def createFeedFactory(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging): FeedFactory = { - val activeAckTopic = s"completed${instance.asString}" + val activeAckTopic = s"${Controller.topicPrefix}completed${instance.asString}" val maxActiveAcksPerPoll = 128 val activeAckPollDuration = 1.second diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 28c1a835c81..14d3ff4e263 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -36,6 +36,7 @@ import org.apache.openwhisk.core.connector._ import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size.SizeLong import org.apache.openwhisk.common.LoggingMarkers._ +import org.apache.openwhisk.core.controller.Controller import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive} import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.spi.SpiLoader @@ -352,7 +353,11 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { InvokerPool.props( (f, i) => f.actorOf(InvokerActor.props(i, instance)), (m, i) => sendActivationToInvoker(messagingProducer, m, i), - messagingProvider.getConsumer(whiskConfig, s"health${instance.asString}", "health", maxPeek = 128), + messagingProvider.getConsumer( + whiskConfig, + s"${Controller.topicPrefix}health${instance.asString}", + s"${Controller.topicPrefix}health", + maxPeek = 128), monitor)) } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 1b0c8bf797e..9dac97275ae 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -72,6 +72,8 @@ object Invoker { protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol") + val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) + /** * An object which records the environment variables required for this component to run. */ @@ -174,7 +176,7 @@ object Invoker { initKamon(assignedInvokerId) val topicBaseName = "invoker" - val topicName = topicBaseName + assignedInvokerId + val topicName = topicPrefix + topicBaseName + assignedInvokerId val maxMessageBytes = Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT) val invokerInstance = diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 6aa088444e9..87814b3b899 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -118,7 +118,7 @@ class InvokerReactive( } /** Initialize message consumers */ - private val topic = s"invoker${instance.toInt}" + private val topic = s"${Invoker.topicPrefix}invoker${instance.toInt}" private val maximumContainers = (poolConfig.userMemory / MemoryLimit.MIN_MEMORY).toInt private val msgProvider = SpiLoader.get[MessagingProvider] @@ -294,7 +294,7 @@ class InvokerReactive( private val healthProducer = msgProvider.getProducer(config) Scheduler.scheduleWaitAtMost(1.seconds)(() => { - healthProducer.send("health", PingMessage(instance)).andThen { + healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen { case Failure(t) => logging.error(this, s"failed to ping the controller: $t") } }) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala index d9bb08d9c7f..61f99271a95 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala @@ -138,12 +138,9 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE */ val memoryQueueFactory = "" // TODO: TBD - val schedulerConsumer = msgProvider.getConsumer( - config, - s"scheduler${schedulerId.asString}", - s"scheduler${schedulerId.asString}", - maxPeek, - maxPollInterval = TimeLimit.MAX_DURATION + 1.minute) + val topic = s"${Scheduler.topicPrefix}scheduler${schedulerId.asString}" + val schedulerConsumer = + msgProvider.getConsumer(config, topic, topic, maxPeek, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute) implicit val trasnid = TransactionId.containerCreation @@ -171,6 +168,8 @@ object Scheduler { protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol") + val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) + /** * The scheduler has two ports, one for akka-remote and the other for akka-grpc. */ @@ -236,8 +235,11 @@ object Scheduler { val msgProvider = SpiLoader.get[MessagingProvider] Seq( - ("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)), - ("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))) + (topicPrefix + "scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)), + ( + topicPrefix + "creationAck" + instanceId.asString, + "creationAck", + Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))) .foreach { case (topic, topicConfigurationKey, maxMessageBytes) => if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) { diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2 index cdae2bddaa7..5ce7f312d80 100644 --- a/tests/src/test/resources/application.conf.j2 +++ b/tests/src/test/resources/application.conf.j2 @@ -29,6 +29,10 @@ whisk { retention-bytes = 1073741824 retention-ms = 3600000 } + prefix = "{{ kafka.topicsPrefix }}" + user-event { + prefix = "{{ kafka.topicsUserEventPrefix }}" + } } common { security-protocol: {{ kafka.protocol }}