Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #27 from ps-dev/ADAPT1-1570-AllReplacementTopicsVa…
Browse files Browse the repository at this point in the history
…lidationsOnBothNewOldTopics

ADAPT1-1570: Enforce replacementTopics validations on both old and new topics
  • Loading branch information
aman-minz authored May 27, 2024
2 parents 5a797fa + 8d27f49 commit 97b57d3
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ sealed trait AdditionalValidation extends EnumEntry
sealed trait MetadataAdditionalValidation extends AdditionalValidation
sealed trait SchemaAdditionalValidation extends AdditionalValidation

// NOTE: Please note that any case object added here once must be retained throughout for schema to evolve.
object MetadataAdditionalValidation extends Enum[MetadataAdditionalValidation] {

case object replacementTopics extends MetadataAdditionalValidation

override val values: immutable.IndexedSeq[MetadataAdditionalValidation] = findValues
}

// NOTE: Please note that any value added here once must be retained throughout for schema to evolve.
object SchemaAdditionalValidation extends Enum[SchemaAdditionalValidation] {

case object defaultInRequiredField extends SchemaAdditionalValidation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ object TopicMetadataError {
override def message: String = s"Field 'replacementTopics' is required when the topic '$topic' is being deprecated!"
}

case class InvalidTopicFormatError(topic: String) extends TopicMetadataError {
override def message: String = s"$topic : ${Subject.invalidFormat}"
case class SelfRefReplacementTopicsError(topic: String) extends TopicMetadataError {
override def message: String = s"A non-deprecated topic '$topic' pointing to itself in replacementTopics is not useful!"
}

case class TopicDoesNotExist(topic: String) extends TopicMetadataError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,28 @@ package hydra.kafka.programs
import cats.effect.Sync
import cats.syntax.all._
import hydra.common.validation.{AdditionalValidation, AdditionalValidationUtil, MetadataAdditionalValidation, Validator}
import hydra.common.validation.Validator.{ValidationChain, valid}
import hydra.common.validation.Validator.valid
import hydra.kafka.algebras.{KafkaAdminAlgebra, MetadataAlgebra}
import hydra.kafka.model.DataClassification._
import hydra.kafka.model.TopicMetadataV2Request.Subject
import hydra.kafka.model._

class TopicMetadataV2Validator[F[_] : Sync](metadataAlgebra: MetadataAlgebra[F], kafkaAdmin: KafkaAdminAlgebra[F]) extends Validator {

def validate(updateMetadataV2Request: TopicMetadataV2Request, subject: Subject): F[Unit] =
def validate(request: TopicMetadataV2Request, subject: Subject): F[Unit] =
for {
metadata <- metadataAlgebra.getMetadataFor(subject)
_ <- validateSubDataClassification(updateMetadataV2Request.dataClassification, updateMetadataV2Request.subDataClassification)
_ <- validateTopicsExist(updateMetadataV2Request.replacementTopics)
_ <- validateTopicsExist(updateMetadataV2Request.previousTopics)
_ <- validatePreviousTopicsCannotPointItself(updateMetadataV2Request.previousTopics, subject.value)
_ <- validateSubDataClassification(request.dataClassification, request.subDataClassification)
_ <- validateTopicsExist(request.replacementTopics)
_ <- validateTopicsExist(request.previousTopics)
_ <- validateDeprecatedTopicHasReplacementTopic(request.deprecated, request.replacementTopics, subject.value)
_ <- validateNonDepSelfRefReplacementTopics(request.deprecated, request.replacementTopics, subject.value)
_ <- validatePreviousTopicsCannotPointItself(request.previousTopics, subject.value)
additionalValidations <- new AdditionalValidationUtil(
isExistingTopic = metadata.isDefined,
currentAdditionalValidations = metadata.flatMap(_.value.additionalValidations)
).pickValidations().getOrElse(List.empty).pure
_ <- validateAdditional(additionalValidations, updateMetadataV2Request, subject.value)
_ <- validateAdditional(additionalValidations)
} yield ()

private def validateSubDataClassification(dataClassification: DataClassification,
Expand Down Expand Up @@ -66,19 +68,24 @@ class TopicMetadataV2Validator[F[_] : Sync](metadataAlgebra: MetadataAlgebra[F],
(topicName, t.isDefined)
}

private def validateAdditional(additionalValidations: List[AdditionalValidation],
request: TopicMetadataV2Request,
topic: String): F[Unit] = {
private def validateAdditional(additionalValidations: List[AdditionalValidation]): F[Unit] = {
val validations = additionalValidations.collect {
case MetadataAdditionalValidation.replacementTopics =>
validateDeprecatedTopicHasReplacementTopic(request.deprecated, request.replacementTopics, topic)
// Add extra validations applicable on topics created after replacementTopics feature was introduced.
case MetadataAdditionalValidation.replacementTopics => valid
}
resultOf(validations.pure)
}

private def validateDeprecatedTopicHasReplacementTopic(deprecated: Boolean, replacementTopics: Option[List[String]], topic: String): ValidationChain = {
private def validateDeprecatedTopicHasReplacementTopic(deprecated: Boolean, replacementTopics: Option[List[String]], topic: String): F[Unit] = {
val hasReplacementTopicsIfDeprecated = if (deprecated) replacementTopics.exists(_.nonEmpty) else true
validate(hasReplacementTopicsIfDeprecated, TopicMetadataError.ReplacementTopicsMissingError(topic))
resultOf(validate(hasReplacementTopicsIfDeprecated, TopicMetadataError.ReplacementTopicsMissingError(topic)))
}

private def validateNonDepSelfRefReplacementTopics(deprecated: Boolean,
replacementTopics: Option[List[String]],
topic: String): F[Unit] = {
val topicNotDeprecatedDoesNotPointToSelf = if (!deprecated) replacementTopics.forall(!_.contains(topic)) else true
resultOf(validate(topicNotDeprecatedDoesNotPointToSelf, TopicMetadataError.SelfRefReplacementTopicsError(topic)))
}

private def validatePreviousTopicsCannotPointItself(maybeTopics: Option[List[String]], currentTopic: String): F[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package hydra.kafka.services

import java.net.InetAddress
import java.util.UUID

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.kafka.scaladsl.Consumer
Expand All @@ -25,8 +24,6 @@ import org.joda.time.format.ISODateTimeFormat

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import spray.json._
import DefaultJsonProtocol._

class StreamsManagerActor(
bootstrapKafkaConfig: Config,
Expand Down Expand Up @@ -129,6 +126,16 @@ object StreamsManagerActor {
c.getStringOpt("metadata-topic-name")
.getOrElse("_hydra.metadata.topic")

private def toOptionList(record: GenericRecord, fieldName: String): Option[List[String]] =
if (record.hasField(fieldName) && record.get(fieldName) != null) {
Option(record.get(fieldName).toString) map { value =>
val trimmed = value.trim.stripPrefix("[").stripSuffix("]")
trimmed.split(",").map(_.trim).toList
}
} else {
None
}

private[services] def createMetadataStream[K, V](
config: Config,
kafkaClientSecurityConfig: KafkaClientSecurityConfig,
Expand Down Expand Up @@ -165,12 +172,8 @@ object StreamsManagerActor {
record.get("streamType").toString,
record.get("derived").toString.toBoolean,
Try(Option(record.get("deprecated"))).toOption.flatten.map(_.toString.toBoolean),
Try(Option(record.get("replacementTopics"))).toOption.flatten map { rt =>
rt.toString.parseJson.convertTo[List[String]]
},
Try(Option(record.get("previousTopics"))).toOption.flatten map { pt =>
pt.toString.parseJson.convertTo[List[String]]
},
toOptionList(record, "replacementTopics"),
toOptionList(record, "previousTopics"),
record.get("dataClassification").toString,
Try(Option(record.get("subDataClassification"))).toOption.flatten.map(_.toString),
record.get("contact").toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import hydra.common.config.ConfigSupport
import ConfigSupport._
import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig
import hydra.common.config.KafkaConfigUtils.kafkaSecurityEmptyConfig
import hydra.common.validation.MetadataAdditionalValidation
import hydra.core.akka.SchemaRegistryActor.{RegisterSchemaRequest, RegisterSchemaResponse}
import hydra.core.ingest.{HydraRequest, RequestParams}
import hydra.core.marshallers.{GenericSchema, HydraJsonSupport, StreamType, TopicMetadataRequest}
Expand All @@ -23,12 +22,10 @@ import hydra.kafka.services.StreamsManagerActor.{GetMetadata, GetMetadataRespons
import hydra.kafka.util.KafkaUtils
import hydra.kafka.util.KafkaUtils.TopicDetails

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.io.Source
import scala.util._
import spray.json.DefaultJsonProtocol

class TopicBootstrapActor(
schemaRegistryActor: ActorRef,
Expand Down Expand Up @@ -108,10 +105,10 @@ class TopicBootstrapActor(
.flatMap { metadataResponse =>
val (topicMetadataRequest, metadata) = metadataResponse.metadata.get(schema.subject) match {
case Some(topicMetadata) => (request, Some(topicMetadata))
case None => (request.copy(additionalValidations = Some(MetadataAdditionalValidation.values.toList)), None)
case None => (request, None)
}

TopicMetadataValidator.validate(topicMetadataRequest, schema, metadataResponse.metadata) match {
TopicMetadataValidator.validate(topicMetadataRequest, schema, kafkaUtils) match {
case Success(_) =>
executeEndpoint(topicMetadataRequest, metadata)
case Failure(ex: ValidatorException) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package hydra.kafka.services

import hydra.common.validation.MetadataAdditionalValidation
import hydra.core.marshallers.{GenericSchema, TopicMetadataRequest}
import hydra.kafka.model.TopicMetadata
import hydra.kafka.programs.TopicMetadataError
import hydra.kafka.util.KafkaUtils

import scala.util.{Failure, Success, Try}

Expand All @@ -19,14 +18,15 @@ object TopicMetadataValidator {
validFormat
)

def validate(metadataRequest: TopicMetadataRequest, schema: GenericSchema, metadataMap: Map[String, TopicMetadata]): Try[ValidationResponse] =
def validate(request: TopicMetadataRequest, schema: GenericSchema, kafkaUtils: KafkaUtils): Try[ValidationResponse] =
mergeValidationResponses(
List(
validateSubject(Option(schema)),
validateTopicsExist(metadataRequest.replacementTopics, metadataMap),
validateTopicsExist(metadataRequest.previousTopics, metadataMap),
validatePreviousTopicsCannotPointItself(metadataRequest.previousTopics, schema.subject),
validateAdditional(metadataRequest, schema.subject),
validateTopicsExist(request.replacementTopics, kafkaUtils),
validateTopicsExist(request.previousTopics, kafkaUtils),
validateDeprecatedTopicHasReplacementTopic(request.deprecated.contains(true), request.replacementTopics, schema.subject),
validateNonDepSelfRefReplacementTopics(request.deprecated.contains(true), request.replacementTopics, schema.subject),
validatePreviousTopicsCannotPointItself(request.previousTopics, schema.subject)
)
)

Expand Down Expand Up @@ -89,28 +89,27 @@ object TopicMetadataValidator {
}
}

private def validateAdditional(metadataRequest: TopicMetadataRequest, topic: String): Try[ValidationResponse] = {
val invalidReasons = metadataRequest.additionalValidations.getOrElse(Nil).collect {
case MetadataAdditionalValidation.replacementTopics =>
validateDeprecatedTopicHasReplacementTopic(metadataRequest.deprecated.contains(true), metadataRequest.replacementTopics, topic)
}.collect {
case Invalid(reason) => reason
}

if (invalidReasons.nonEmpty) {
Failure(ValidatorException(invalidReasons))
}
else {
private def validateDeprecatedTopicHasReplacementTopic(deprecated: Boolean,
replacementTopics: Option[List[String]],
topic: String): Try[ValidationResponse] = {
val isDeprecatedWithReplacementTopic = if (deprecated) replacementTopics.exists(_.nonEmpty) else true
if (isDeprecatedWithReplacementTopic) {
Success(Valid)
} else {
val errorMessage = TopicMetadataError.ReplacementTopicsMissingError(topic).message
Failure(ValidatorException(Seq(errorMessage)))
}
}

def validateDeprecatedTopicHasReplacementTopic(deprecated: Boolean, replacementTopics: Option[List[String]], topic: String): ValidationResponse = {
val isDeprecatedWithReplacementTopic = if (deprecated) replacementTopics.exists(_.nonEmpty) else true
if (isDeprecatedWithReplacementTopic) {
Valid
private def validateNonDepSelfRefReplacementTopics(deprecated: Boolean,
replacementTopics: Option[List[String]],
topic: String): Try[ValidationResponse] = {
val topicNotDeprecatedDoesNotPointToSelf = if (!deprecated) replacementTopics.forall(!_.contains(topic)) else true
if (topicNotDeprecatedDoesNotPointToSelf) {
Success(Valid)
} else {
Invalid(TopicMetadataError.ReplacementTopicsMissingError(topic).message)
val errorMessage = TopicMetadataError.SelfRefReplacementTopicsError(topic).message
Failure(ValidatorException(Seq(errorMessage)))
}
}

Expand All @@ -122,18 +121,18 @@ object TopicMetadataValidator {
case _ => Success(Valid)
}

private def validateTopicsExist(maybeTopics: Option[List[String]], metadataMap: Map[String, TopicMetadata]): Try[ValidationResponse] =
maybeTopics.map(areTopicsInMetadataMap(_, metadataMap)).getOrElse(Success(Valid))
private def validateTopicsExist(maybeTopics: Option[List[String]], kafkaUtils: KafkaUtils): Try[ValidationResponse] =
maybeTopics.map(doTopicsExist(_, kafkaUtils)).getOrElse(Success(Valid))

private def validatePreviousTopicsCannotPointItself(maybeTopics: Option[List[String]], currentTopic: String): Try[ValidationResponse] =
maybeTopics match {
case Some(topics) if topics.contains(currentTopic) => Failure(ValidatorException(List(s"Previous topics cannot point to itself, '$currentTopic'!")))
case _ => Success(Valid)
}

private def areTopicsInMetadataMap(topics: List[String], metadataMap: Map[String, TopicMetadata]): Try[ValidationResponse] = {
private def doTopicsExist(topics: List[String], kafkaUtils: KafkaUtils): Try[ValidationResponse] = {
val notExistingTopics = topics.collect {
case topic if !metadataMap.contains(topic) => s"Topic '$topic' does not exist within DVS!"
case topic if !kafkaUtils.topicExists(topic).get => s"Topic '$topic' does not exist!"
}

if (notExistingTopics.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ case class KafkaUtils(config: Map[String, AnyRef], kafkaClientSecurityConfig: Ka
}

def topicExists(name: String): Try[Boolean] = withClient { c =>
c.listTopics().names.get.asScala.exists(s => s == name)
c.listTopics().names.get.asScala.contains(name)
}

def topicNames(): Try[Seq[String]] =
Expand Down
Loading

0 comments on commit 97b57d3

Please sign in to comment.