Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prefix for topics #5062

Merged
merged 1 commit into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ registry:
confdir: "{{ config_root_dir }}/registry"

kafka:
topicsPrefix: "{{ kafka_topics_prefix | default('') }}"
topicsUserEventPrefix: "{{ kafka_topics_userEvent_prefix | default(kafka_topics_prefix) | default('') }}"
ssl:
client_authentication: required
keystore:
Expand Down
4 changes: 4 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@
"{{ kafka_topics_health_retentionMS | default() }}"
"CONFIG_whisk_kafka_topics_health_segmentBytes":
"{{ kafka_topics_health_segmentBytes | default() }}"
"CONFIG_whisk_kafka_topics_prefix":
"{{ kafka.topicsPrefix }}"
"CONFIG_whisk_kafka_topics_userEvent_prefix":
"{{ kafka.topicsUserEventPrefix }}"
"CONFIG_whisk_kafka_common_securityProtocol":
"{{ kafka.protocol }}"
"CONFIG_whisk_kafka_common_sslTruststoreLocation":
Expand Down
2 changes: 2 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@
"CONFIG_whisk_kafka_topics_invoker_retentionBytes": "{{ kafka_topics_invoker_retentionBytes | default() }}"
"CONFIG_whisk_kafka_topics_invoker_retentionMs": "{{ kafka_topics_invoker_retentionMS | default() }}"
"CONFIG_whisk_kakfa_topics_invoker_segmentBytes": "{{ kafka_topics_invoker_segmentBytes | default() }}"
"CONFIG_whisk_kafka_topics_prefix": "{{ kafka.topicsPrefix }}"
"CONFIG_whisk_kafka_topics_userEvent_prefix": "{{ kafka.topicsUserEventPrefix }}"
"CONFIG_whisk_kafka_common_securityProtocol": "{{ kafka.protocol }}"
"CONFIG_whisk_kafka_common_sslTruststoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
"CONFIG_whisk_kafka_common_sslTruststorePassword": "{{ kafka.ssl.keystore.password }}"
Expand Down
4 changes: 4 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ whisk {
retention-bytes = 1073741824
retention-ms = 3600000
}
prefix = ""
user-event {
prefix = ""
}
}

metrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ object UserEvents {

val enabled = loadConfigOrThrow[UserEventsConfig](ConfigKeys.userEvents).enabled

val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix)

def send(producer: MessageProducer, em: => EventMessage) = {
if (enabled) {
producer.send("events", em)
producer.send(userEventTopicPrefix + "events", em)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ object ConfigKeys {
val kafkaProducer = s"$kafka.producer"
val kafkaConsumer = s"$kafka.consumer"
val kafkaTopics = s"$kafka.topics"
val kafkaTopicsPrefix = s"$kafkaTopics.prefix"
val kafkaTopicsUserEventPrefix = s"$kafkaTopics.user-event.prefix"

val memory = "whisk.memory"
val timeLimit = "whisk.time-limit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ package org.apache.openwhisk.core.ack

import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer}
import org.apache.openwhisk.core.entity._

import pureconfig._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventSender: Option[EventSender])(
implicit logging: Logging,
ec: ExecutionContext)
extends ActiveAck {

private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)

override def apply(tid: TransactionId,
activationResult: WhiskActivation,
blockingInvoke: Boolean,
Expand All @@ -38,7 +42,7 @@ class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventS
implicit val transid: TransactionId = tid

def send(msg: AcknowledegmentMessage, recovery: Boolean = false) = {
producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
producer.send(topic = topicPrefix + "completed" + controllerInstance.asString, msg).andThen {
case Success(_) =>
val info = if (recovery) s"recovery ${msg.messageType}" else msg.messageType
logging.info(this, s"posted $info of activation ${acknowledegment.activationId}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import scala.concurrent.duration.DurationInt
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import akka.actor.ActorSystem
import akka.actor.Props
import spray.json._
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.Message
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.connector.MessagingProvider
Expand All @@ -41,6 +40,7 @@ import org.apache.openwhisk.core.entity.WhiskPackage
import org.apache.openwhisk.core.entity.WhiskRule
import org.apache.openwhisk.core.entity.WhiskTrigger
import org.apache.openwhisk.spi.SpiLoader
import pureconfig._

case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message {
override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint
Expand Down Expand Up @@ -101,5 +101,6 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance:
}

object RemoteCacheInvalidation {
val cacheInvalidationTopic = "cacheInvalidation"
val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
val cacheInvalidationTopic = topicPrefix + "cacheInvalidation"
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
Expand Down Expand Up @@ -187,6 +187,9 @@ object Controller {
protected val interface = loadConfigOrThrow[String]("whisk.controller.interface")
protected val readinessThreshold = loadConfig[Double]("whisk.controller.readiness-fraction")

val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix)

// requiredProperties is a Map whose keys define properties that must be bound to
// a value, and whose values are default values. A null value in the Map means there is
// no default value specified, so it must appear in the properties file
Expand Down Expand Up @@ -263,10 +266,10 @@ object Controller {
val msgProvider = SpiLoader.get[MessagingProvider]

Seq(
("completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
("health", "health", None),
("cacheInvalidation", "cache-invalidation", None),
("events", "events", None)).foreach {
(topicPrefix + "completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
(topicPrefix + "health", "health", None),
(topicPrefix + "cacheInvalidation", "cache-invalidation", None),
(userEventTopicPrefix + "events", "events", None)).foreach {
case (topic, topicConfigurationKey, maxMessageBytes) =>
if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) {
abort(s"failure during msgProvider.ensureTopic for topic $topic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import pureconfig.generic.auto._
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
Expand Down Expand Up @@ -177,7 +178,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
invoker: InvokerInstanceId): Future[RecordMetadata] = {
implicit val transid: TransactionId = msg.transid

val topic = s"invoker${invoker.toInt}"
val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"

MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
val start = transid.started(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.spi.Spi

import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -94,7 +96,7 @@ trait LoadBalancerProvider extends Spi {
def createFeedFactory(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem,
logging: Logging): FeedFactory = {

val activeAckTopic = s"completed${instance.asString}"
val activeAckTopic = s"${Controller.topicPrefix}completed${instance.asString}"
val maxActiveAcksPerPoll = 128
val activeAckPollDuration = 1.second

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size.SizeLong
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.spi.SpiLoader
Expand Down Expand Up @@ -352,7 +353,11 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
InvokerPool.props(
(f, i) => f.actorOf(InvokerActor.props(i, instance)),
(m, i) => sendActivationToInvoker(messagingProducer, m, i),
messagingProvider.getConsumer(whiskConfig, s"health${instance.asString}", "health", maxPeek = 128),
messagingProvider.getConsumer(
whiskConfig,
s"${Controller.topicPrefix}health${instance.asString}",
s"${Controller.topicPrefix}health",
maxPeek = 128),
monitor))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ object Invoker {

protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol")

val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)

/**
* An object which records the environment variables required for this component to run.
*/
Expand Down Expand Up @@ -174,7 +176,7 @@ object Invoker {
initKamon(assignedInvokerId)

val topicBaseName = "invoker"
val topicName = topicBaseName + assignedInvokerId
val topicName = topicPrefix + topicBaseName + assignedInvokerId

val maxMessageBytes = Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)
val invokerInstance =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class InvokerReactive(
}

/** Initialize message consumers */
private val topic = s"invoker${instance.toInt}"
private val topic = s"${Invoker.topicPrefix}invoker${instance.toInt}"
private val maximumContainers = (poolConfig.userMemory / MemoryLimit.MIN_MEMORY).toInt
private val msgProvider = SpiLoader.get[MessagingProvider]

Expand Down Expand Up @@ -294,7 +294,7 @@ class InvokerReactive(

private val healthProducer = msgProvider.getProducer(config)
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
healthProducer.send("health", PingMessage(instance)).andThen {
healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,9 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
*/
val memoryQueueFactory = "" // TODO: TBD

val schedulerConsumer = msgProvider.getConsumer(
config,
s"scheduler${schedulerId.asString}",
s"scheduler${schedulerId.asString}",
maxPeek,
maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
val topic = s"${Scheduler.topicPrefix}scheduler${schedulerId.asString}"
val schedulerConsumer =
msgProvider.getConsumer(config, topic, topic, maxPeek, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)

implicit val trasnid = TransactionId.containerCreation

Expand Down Expand Up @@ -171,6 +168,8 @@ object Scheduler {

protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")

val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)

/**
* The scheduler has two ports, one for akka-remote and the other for akka-grpc.
*/
Expand Down Expand Up @@ -236,8 +235,11 @@ object Scheduler {
val msgProvider = SpiLoader.get[MessagingProvider]

Seq(
("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))
(topicPrefix + "scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
(
topicPrefix + "creationAck" + instanceId.asString,
"creationAck",
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))
.foreach {
case (topic, topicConfigurationKey, maxMessageBytes) =>
if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) {
Expand Down
4 changes: 4 additions & 0 deletions tests/src/test/resources/application.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ whisk {
retention-bytes = 1073741824
retention-ms = 3600000
}
prefix = "{{ kafka.topicsPrefix }}"
user-event {
prefix = "{{ kafka.topicsUserEventPrefix }}"
}
}
common {
security-protocol: {{ kafka.protocol }}
Expand Down