Skip to content

Commit

Permalink
#1299 schema loading from schema registry by topicNameStem,
Browse files Browse the repository at this point in the history
 secure SR works by supplying keyStore & trustStore config values.
 Optionally, value+key can get combined, nontrivial combination for struct-based schemas added -> TODO add tests!
 menas template update with SR stuff

 SecureKafka.scala moved to utils and renamed to SecureConfig.

 Smart Pathjoining added via /.
  • Loading branch information
dk1844 committed Jun 17, 2020
1 parent 1e71722 commit e351748
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 38 deletions.
10 changes: 10 additions & 0 deletions menas/src/main/resources/application.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,13 @@ menas.oozie.splineMongoURL=mongodb://localhost:27017
#Provide optional spark config options to be passed onto standardization and conformance jobs
#menas.oozie.extraSparkConfigs={"spark.eventLog.dir" : "hdfs:///spark2-history/", \
#"spark.history.fs.logDirectory": "hdfs:///spark2-history" }

#----------- Schema Registry
# URL of schema registry:
menas.schemaRegistryBaseUrl=https://schemaRegistryHostname:8081

# When using secure schema registry, following paths and passwords must be specified
javax.net.ssl.trustStore=/path/to/truststore.jks
javax.net.ssl.trustStorePassword=somePassword
javax.net.ssl.keyStore=/path/to/keystore.jks
javax.net.ssl.keyStorePassword=somePassword
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package za.co.absa.enceladus.menas.controllers

import java.net.URL
import java.util.Optional
import java.util.concurrent.CompletableFuture

Expand All @@ -27,24 +26,25 @@ import org.springframework.security.core.annotation.AuthenticationPrincipal
import org.springframework.security.core.userdetails.UserDetails
import org.springframework.web.bind.annotation._
import org.springframework.web.multipart.MultipartFile
import za.co.absa.enceladus.menas.models.rest.exceptions.{RemoteSchemaRetrievalException, SchemaParsingException}
import za.co.absa.enceladus.menas.models.rest.exceptions.SchemaParsingException
import za.co.absa.enceladus.menas.repositories.RefCollection
import za.co.absa.enceladus.menas.services.{AttachmentService, SchemaService}
import za.co.absa.enceladus.menas.services.{AttachmentService, SchemaRegistryService, SchemaService}
import za.co.absa.enceladus.menas.utils.SchemaType
import za.co.absa.enceladus.menas.utils.converters.SparkMenasSchemaConvertor
import za.co.absa.enceladus.menas.utils.parsers.SchemaParser
import za.co.absa.enceladus.model.Schema
import za.co.absa.enceladus.model.menas._
import za.co.absa.enceladus.utils.schema.SchemaUtils

import scala.io.Source
import scala.util.control.NonFatal

@RestController
@RequestMapping(Array("/api/schema"))
class SchemaController @Autowired()(
schemaService: SchemaService,
attachmentService: AttachmentService,
sparkMenasConvertor: SparkMenasSchemaConvertor)
sparkMenasConvertor: SparkMenasSchemaConvertor,
schemaRegistryService: SchemaRegistryService
)
extends VersionedModelController(schemaService) {

import za.co.absa.enceladus.menas.utils.implicits._
Expand All @@ -59,36 +59,54 @@ class SchemaController @Autowired()(
@RequestParam format: Optional[String]): CompletableFuture[Option[Schema]] = {

val schemaType: SchemaType.Value = SchemaType.fromOptSchemaName(format)
val (url, fileContent, mimeType) = {
try {
val url = new URL(remoteUrl)
val connection = url.openConnection()
val mimeType = SchemaController.avscContentType // only AVSC is expected to come from the schema registry
val fileStream = Source.fromInputStream(connection.getInputStream)
val fileContent = fileStream.mkString
fileStream.close()

(url, fileContent, mimeType)

} catch {
case NonFatal(e) =>
throw RemoteSchemaRetrievalException(schemaType, s"Could not retrieve a schema file from $remoteUrl. Please check the correctness of the URL and a presence of the schema at the mentioned endpoint", e)
}
}

val sparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(fileContent)
val schemaResponse = schemaRegistryService.loadSchemaByUrl(remoteUrl)
val sparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(schemaResponse.fileContent)

val menasFile = MenasAttachment(refCollection = RefCollection.SCHEMA.name().toLowerCase,
refName = name,
refVersion = version + 1, // version is the current one, refVersion is the to-be-created one
attachmentType = MenasAttachment.ORIGINAL_SCHEMA_ATTACHMENT,
filename = url.getFile,
fileContent = fileContent.getBytes,
fileMIMEType = mimeType)
filename = schemaResponse.url.getFile,
fileContent = schemaResponse.fileContent.getBytes,
fileMIMEType = schemaResponse.mimeType)

uploadSchemaToMenas(principal.getUsername, menasFile, sparkStruct, schemaType)
}

@PostMapping(Array("/topic"))
@ResponseStatus(HttpStatus.CREATED)
def handleTopicStem(@AuthenticationPrincipal principal: UserDetails,
@RequestParam topicStem: String,
@RequestParam mergeWithKey: Boolean,
@RequestParam version: Int,
@RequestParam name: String,
@RequestParam format: Optional[String]): CompletableFuture[Option[Schema]] = {

val schemaType: SchemaType.Value = SchemaType.fromOptSchemaName(format)

val valueSchemaResponse = schemaRegistryService.loadSchemaByTopicName(s"$topicStem-value")
val valueSparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(valueSchemaResponse.fileContent)

val combinedStructType = if (mergeWithKey) {
val keySchemaResponse = schemaRegistryService.loadSchemaByTopicName(s"$topicStem-key")
val keySparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(keySchemaResponse.fileContent)

SchemaUtils.combineStructTypes(valueSparkStruct, keySparkStruct)
} else {
valueSparkStruct
}

val menasFile = MenasAttachment(refCollection = RefCollection.SCHEMA.name().toLowerCase,
refName = name,
refVersion = version + 1, // version is the current one, refVersion is the to-be-created one
attachmentType = MenasAttachment.ORIGINAL_SCHEMA_ATTACHMENT,
filename = valueSchemaResponse.url.getFile, // only the value file gets saved as an attachment
fileContent = valueSchemaResponse.fileContent.getBytes,
fileMIMEType = valueSchemaResponse.mimeType)

uploadSchemaToMenas(principal.getUsername, menasFile, combinedStructType, schemaType)
}

@PostMapping(Array("/upload"))
@ResponseStatus(HttpStatus.CREATED)
def handleFileUpload(@AuthenticationPrincipal principal: UserDetails,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.enceladus.menas.services

import java.net.URL

import com.typesafe.config.ConfigFactory
import org.springframework.beans.factory.annotation.{Autowired, Value}
import org.springframework.stereotype.Service
import za.co.absa.enceladus.menas.controllers.SchemaController
import za.co.absa.enceladus.menas.models.rest.exceptions.RemoteSchemaRetrievalException
import za.co.absa.enceladus.menas.services.SchemaRegistryService.SchemaResponse
import za.co.absa.enceladus.menas.utils.SchemaType
import za.co.absa.enceladus.utils.config.SecureConfig

import scala.io.Source
import scala.util.control.NonFatal

@Service
class SchemaRegistryService @Autowired()() {

@Value("${menas.schemaRegistryBaseUrl}")
val schemaRegistryBaseUrl: String = ""

private val config = ConfigFactory.load()
SecureConfig.setKeyStoreProperties(config)
SecureConfig.setTrustStoreProperties(config)

/**
* Loading the latest schema by a topicName (e.g. topic1-value), the url is constructed based on the [[schemaRegistryBaseUrl]]
* @param topicName topic name
* @return
*/
def loadSchemaByTopicName(topicName: String): SchemaResponse = {
val schemaUrl = SchemaRegistryService.getLatestSchemaUrl(schemaRegistryBaseUrl, topicName)
loadSchemaByUrl(schemaUrl)
}

/**
* Loading the schema by a full URL
* @param remoteUrl
* @return
*/
def loadSchemaByUrl(remoteUrl: String): SchemaResponse = {
try {
val url = new URL(remoteUrl)
val connection = url.openConnection()
val mimeType = SchemaController.avscContentType // only AVSC is expected to come from the schema registry
val fileStream = Source.fromInputStream(connection.getInputStream)
val fileContent = fileStream.mkString
fileStream.close()

SchemaResponse(fileContent, mimeType, url)
} catch {
case NonFatal(e) =>
throw RemoteSchemaRetrievalException(SchemaType.Avro, s"Could not retrieve a schema file from $remoteUrl. " +
s"Please check the correctness of the URL and a presence of the schema at the mentioned endpoint", e)
}
}

}

object SchemaRegistryService {

import za.co.absa.enceladus.utils.implicits.StringImplicits._

def getLatestSchemaUrl(baseUrl: String, topicName: String): String =
baseUrl / "subjects" / topicName / "versions/latest/schema"

case class SchemaResponse(fileContent: String, mimeType: String, url: URL)

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.plugins.builtin.utils.SecureKafka
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.general.ProjectMetadataTools
import za.co.absa.enceladus.utils.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.enceladus.utils.performance.{PerformanceMeasurer, PerformanceMetricTools}
import za.co.absa.enceladus.utils.schema.SchemaUtils
import za.co.absa.enceladus.utils.config.SecureConfig
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

import scala.util.control.NonFatal
Expand All @@ -61,7 +61,7 @@ object DynamicConformanceJob {
def main(args: Array[String]) {
// This should be the first thing the app does to make secure Kafka work with our CA.
// After Spring activates JavaX, it will be too late.
SecureKafka.setSecureKafkaProperties(conf)
SecureConfig.setSecureKafkaProperties(conf)

SparkVersionGuard.fromDefaultSparkCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.plugins.builtin.utils.SecureKafka
import za.co.absa.enceladus.standardization.interpreter.StandardizationInterpreter
import za.co.absa.enceladus.standardization.interpreter.stages.PlainSchemaGenerator
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
Expand All @@ -47,6 +46,7 @@ import za.co.absa.enceladus.utils.udf.UDFLibrary
import za.co.absa.enceladus.utils.validation.ValidationException
import org.apache.spark.SPARK_VERSION
import za.co.absa.enceladus.common.plugin.PostProcessingService
import za.co.absa.enceladus.utils.config.SecureConfig

import scala.collection.immutable.HashMap
import scala.util.control.NonFatal
Expand All @@ -63,7 +63,7 @@ object StandardizationJob {
def main(args: Array[String]) {
// This should be the first thing the app does to make secure Kafka work with our CA.
// After Spring activates JavaX, it will be too late.
SecureKafka.setSecureKafkaProperties(conf)
SecureConfig.setSecureKafkaProperties(conf)

SparkVersionGuard.fromDefaultSparkCompatibilitySettings.ensureSparkVersionCompatibility(SPARK_VERSION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* limitations under the License.
*/

package za.co.absa.enceladus.plugins.builtin.utils
package za.co.absa.enceladus.utils.config

import java.nio.file.{Files, Paths}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,43 @@
* limitations under the License.
*/

package za.co.absa.enceladus.plugins.builtin.utils
package za.co.absa.enceladus.utils.config

import com.typesafe.config.Config

object SecureKafka {
object SecureConfig {
/**
* Moves Kafka security configuration from the config to system properties
* if it is not defined there already.
* if it is not defined there already (regarding trustStore, keyStore and auth login config).
*
* This is needed to be executed at least once to initialize secure Kafka when running from Spark.
*
* @param conf A configuration.
*/
def setSecureKafkaProperties(conf: Config): Unit = {
setTrustStoreProperties(conf)
setKeyStoreProperties(conf)
ConfigUtils.setSystemPropertyFileFallback(conf, "java.security.auth.login.config")
}

/**
* Sets `javax.net.ssl.trustStore` and `javax.net.ssl.trustStorePassword` system properties from the same-name values
* in the `conf`
* @param conf config to lookup values form
*/
def setTrustStoreProperties(conf: Config): Unit = {
ConfigUtils.setSystemPropertyFileFallback(conf, "javax.net.ssl.trustStore")
ConfigUtils.setSystemPropertyStringFallback(conf, "javax.net.ssl.trustStorePassword")
}

/**
* Sets `javax.net.ssl.keyStore` and `javax.net.ssl.keyStorePassword` system properties from the same-name values
* in the `conf`
* @param conf config to lookup values form
*/
def setKeyStoreProperties(conf: Config): Unit = {
ConfigUtils.setSystemPropertyFileFallback(conf, "javax.net.ssl.keyStore")
ConfigUtils.setSystemPropertyStringFallback(conf, "javax.net.ssl.keyStorePassword")
ConfigUtils.setSystemPropertyFileFallback(conf, "java.security.auth.login.config")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,32 @@ object StringImplicits {
case _ => Option(None, false)
}
}


private[implicits] def joinWithSingleSeparator(another: String, sep: String): String = {
val sb = new StringBuilder
sb.append(string.stripSuffix(sep))
sb.append(sep)
sb.append(another.stripPrefix(sep))
sb.mkString
}

/**
* Joins two strings with / while stripping single existing trailing/leading "/" in between:
* {{{
* "abc" / "123" -> "abc/123"
* "abc/" / "123" -> "abc/123"
* "abc" / "/123" -> "abc/123"
* "abc/" / "/123" -> "abc/123",
* but:
* "file:///" / "path" -> "file:///path",
* }}}
*
* @param another
* @return
*/
def /(another: String): String = { // scalastyle:ignore method.name
joinWithSingleSeparator(another, "/")
}
}
}
Loading

0 comments on commit e351748

Please sign in to comment.