Skip to content

Commit

Permalink
#1299 Requirement changed: -value + -key combination removed, now onl…
Browse files Browse the repository at this point in the history
…y subject is loaded + fallback subject-value. (unit-tests removed)

 - topicName -> subject rewording
  • Loading branch information
dk1844 committed Jun 23, 2020
1 parent 1a72028 commit 228b086
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 231 deletions.
2 changes: 1 addition & 1 deletion menas/src/main/resources/application.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ menas.oozie.splineMongoURL=mongodb://localhost:27017

#----------- Schema Registry
# URL of schema registry [optional]:
# if omitted, the ability to load schmema by topic name will be hidden.
# if omitted, the ability to load schema by topic name will be hidden.
menas.schemaRegistryBaseUrl=https://localhost:8081

# When using secure schema registry, following paths and passwords must be specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import za.co.absa.enceladus.model.menas._
import za.co.absa.enceladus.utils.schema.SchemaUtils

import scala.concurrent.Future
import scala.util.{Failure, Success, Try}


@RestController
Expand Down Expand Up @@ -79,27 +80,23 @@ class SchemaController @Autowired()(

@PostMapping(Array("/registry"))
@ResponseStatus(HttpStatus.CREATED)
def handleTopicStem(@AuthenticationPrincipal principal: UserDetails,
@RequestParam topicStem: String,
@RequestParam mergeWithKey: Boolean,
def handleSubject(@AuthenticationPrincipal principal: UserDetails,
@RequestParam subject: String,
@RequestParam version: Int,
@RequestParam name: String,
@RequestParam format: Optional[String]): CompletableFuture[Option[Schema]] = {

val schemaType: SchemaType.Value = SchemaType.fromOptSchemaName(format)

val valueSchemaResponse = schemaRegistryService.loadSchemaBySubjectName(s"$topicStem-value")
val valueSparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(valueSchemaResponse.fileContent)

val combinedStructType = if (mergeWithKey) {
val keySchemaResponse = schemaRegistryService.loadSchemaBySubjectName(s"$topicStem-key")
val keySparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(keySchemaResponse.fileContent)

SchemaUtils.combineStructTypes(valueSparkStruct, keySparkStruct)
} else {
valueSparkStruct
val valueSchemaResponse = Try {
schemaRegistryService.loadSchemaBySubjectName(s"$subject")
} match {
case Success(schemaResponse) => schemaResponse
case Failure(_) => schemaRegistryService.loadSchemaBySubjectName(s"$subject-value") // fallback to -value
}

val valueSparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(valueSchemaResponse.fileContent)

val menasFile = MenasAttachment(refCollection = RefCollection.SCHEMA.name().toLowerCase,
refName = name,
refVersion = version + 1, // version is the current one, refVersion is the to-be-created one
Expand All @@ -108,7 +105,7 @@ class SchemaController @Autowired()(
fileContent = valueSchemaResponse.fileContent.getBytes,
fileMIMEType = valueSchemaResponse.mimeType)

uploadSchemaToMenas(principal.getUsername, menasFile, combinedStructType, schemaType)
uploadSchemaToMenas(principal.getUsername, menasFile, valueSparkStruct, schemaType)
}

@PostMapping(Array("/upload"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ abstract class BaseRestApiTest extends BaseRepositoryTest {
fromRemote(urlPath, headers, parameters)
}

def sendPostTopicName[T](urlPath: String,
parameters: Map[String, Any],
headers: HttpHeaders = new HttpHeaders())
(implicit ct: ClassTag[T]): ResponseEntity[T] = {
require(parameters.keySet.contains("topicStem") && parameters.keySet.contains("mergeWithKey"),
s"parameters map must contain the 'topicStem' and 'mergeWithKey' entry, but only $parameters was found")
def sendPostSubject[T](urlPath: String,
parameters: Map[String, Any],
headers: HttpHeaders = new HttpHeaders())
(implicit ct: ClassTag[T]): ResponseEntity[T] = {
require(parameters.keySet.contains("subject"),
s"parameters map must contain the 'subject', but only $parameters was found")

fromRemote(urlPath, headers, parameters)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder
import com.github.tomakehurst.wiremock.core.WireMockConfiguration
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.types.{DataType, StructType}
import org.junit.runner.RunWith
import org.mockito.Mockito
import org.scalatest.BeforeAndAfterAll
import org.scalatest.mockito.MockitoSugar
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.http.MediaType
import org.springframework.http.{HttpStatus, MediaType}
import org.springframework.test.context.ActiveProfiles
import org.springframework.test.context.junit4.SpringRunner
import za.co.absa.enceladus.menas.TestResourcePath
import za.co.absa.enceladus.menas.integration.fixtures._
import za.co.absa.enceladus.menas.models.Validation
import za.co.absa.enceladus.menas.models.rest.RestResponse
import za.co.absa.enceladus.menas.models.rest.errors.{SchemaFormatError, SchemaParsingError}
import za.co.absa.enceladus.menas.models.{SchemaApiAvailability, Validation}
import za.co.absa.enceladus.menas.repositories.RefCollection
import za.co.absa.enceladus.menas.services.SchemaRegistryService
import za.co.absa.enceladus.menas.utils.SchemaType
import za.co.absa.enceladus.menas.utils.converters.SparkMenasSchemaConvertor
import za.co.absa.enceladus.model.menas.MenasReference
Expand All @@ -47,7 +49,7 @@ import scala.collection.immutable.HashMap
@RunWith(classOf[SpringRunner])
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles(Array("withEmbeddedMongo"))
class SchemaApiIntegrationSuite extends BaseRestApiTest with BeforeAndAfterAll {
class SchemaApiIntegrationSuite extends BaseRestApiTest with BeforeAndAfterAll with MockitoSugar {

private val port = 8877 // same port as in test/resources/application.conf in the `menas.schemaRegistryBaseUrl` key
private val wireMockServer = new WireMockServer(WireMockConfiguration.wireMockConfig().port(port))
Expand Down Expand Up @@ -1009,20 +1011,20 @@ class SchemaApiIntegrationSuite extends BaseRestApiTest with BeforeAndAfterAll {
}

s"POST $apiUrl/registry" should {
def topicPath(topicName: String) = s"/subjects/$topicName/versions/latest/schema"
def subjectPath(subjectName: String) = s"/subjects/$subjectName/versions/latest/schema"

"return 201" when {
"an avro schema has no errors" should {
"return -value schema for the topicName (no merging)" in {
"load schema by subject name as-is" in {
val schema = SchemaFactory.getDummySchema()
schemaFixture.add(schema)

wireMockServer.stubFor(get(urlPathEqualTo(topicPath("myTopic1-value")))
wireMockServer.stubFor(get(urlPathEqualTo(subjectPath("myTopic1-value")))
.willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.Avro.ok)))

val params = HashMap[String, Any](
"name" -> schema.name, "version" -> schema.version, "format" -> "avro", "topicStem" -> "myTopic1", "mergeWithKey" -> false)
val responseRemoteLoaded = sendPostTopicName[Schema](s"$apiUrl/registry", params)
"name" -> schema.name, "version" -> schema.version, "format" -> "avro", "subject" -> "myTopic1-value")
val responseRemoteLoaded = sendPostSubject[Schema](s"$apiUrl/registry", params)
assertCreated(responseRemoteLoaded)

val actual = responseRemoteLoaded.getBody
Expand All @@ -1031,26 +1033,25 @@ class SchemaApiIntegrationSuite extends BaseRestApiTest with BeforeAndAfterAll {
assert(actual.fields.length == 7)
}

"return -value + -key schema for the topicName (merged together)" in {
"load schema by subject name -value fallback" in {
val schema = SchemaFactory.getDummySchema()
schemaFixture.add(schema)

wireMockServer.stubFor(get(urlPathEqualTo(topicPath("myTopic1-value")))
.willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.AvroCombining.value)))
wireMockServer.stubFor(get(urlPathEqualTo(topicPath("myTopic1-key")))
.willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.AvroCombining.key)))
wireMockServer.stubFor(get(urlPathEqualTo(subjectPath("myTopic2"))) // will fail
.willReturn(notFound()))

wireMockServer.stubFor(get(urlPathEqualTo(subjectPath("myTopic2-value"))) // fallback will kick in
.willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.Avro.ok)))

val params = HashMap[String, Any](
"name" -> schema.name, "version" -> schema.version, "format" -> "avro", "topicStem" -> "myTopic1", "mergeWithKey" -> true)
val responseRemoteLoaded = sendPostTopicName[Schema](s"$apiUrl/registry", params)
"name" -> schema.name, "version" -> schema.version, "format" -> "avro", "subject" -> "myTopic2")
val responseRemoteLoaded = sendPostSubject[Schema](s"$apiUrl/registry", params)
assertCreated(responseRemoteLoaded)

val expectedSchema: StructType = DataType.fromJson(readTestResourceAsString(TestResourcePath.AvroCombining.expectedCombination)).asInstanceOf[StructType]

val actualMenasSchema = responseRemoteLoaded.getBody
val actualSparkSchema = convertor.convertMenasToSparkFields(actualMenasSchema.fields)

assert(actualSparkSchema == expectedSchema.fields.toSeq)
val actual = responseRemoteLoaded.getBody
assert(actual.name == schema.name)
assert(actual.version == schema.version + 1)
assert(actual.fields.length == 7)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion menas/ui/components/dataset/datasetInfo.fragment.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
press="toSchema"
cust:name="{dataset>/schemaName}"
cust:version="{dataset>/schemaVersion}"
id="currentDatasetSchmea"/>
id="currentDatasetSchema"/>
<Label text="Last Update"/>
<Text id="currentDatasetLastUpdate"
text="{path: 'dataset>/lastUpdated', formatter: 'Formatters.stringDateShortFormatter'}"/>
Expand Down
38 changes: 9 additions & 29 deletions menas/ui/components/schema/schemaDetail.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ sap.ui.define([
const auditUtils = new AuditTrail(auditTable);
auditUtils.applyTableUtils();

this._model.setProperty("/topicNameStem", "");
this._model.setProperty("/topicMergeWithKey", false);
this._model.setProperty("/subjectName", "");

// initially, registry integration is disabled in UI - get enabled by qeurying schemaApiAvailability
this._model.setProperty("/registryEnabled", false);
Expand Down Expand Up @@ -171,18 +170,15 @@ sap.ui.define([
}
},

handleTopicSubmit: function (oParams) {
handleSubjectSubmit: function (oParams) {
const schema = this._model.getProperty("/currentSchema");
const topicStem = this._model.getProperty("/topicNameStem");
const mergeWithKey = this._model.getProperty("/topicMergeWithKey");

const subjectName = this._model.getProperty("/subjectName");

sap.ui.core.BusyIndicator.show();

let data = {
"format": "avro", // the only supported Schema registry type
"topicStem": topicStem,
"mergeWithKey": mergeWithKey,
"subject": subjectName,
"name": schema.name,
"version": schema.version
};
Expand All @@ -196,7 +192,7 @@ sap.ui.define([
headers: {
'X-CSRF-TOKEN': localStorage.getItem("csrfToken")
},
complete: this.handleRemoteLoadFromTopicNameComplete
complete: this.handleRemoteLoadFromSubjectNameComplete
});
},

Expand Down Expand Up @@ -266,8 +262,8 @@ sap.ui.define([
return isOkToSubmit;
},

handleRemoteLoadFromTopicNameComplete: function (ajaxResponse) {
this.handleRemoteLoadComplete(ajaxResponse, "topicName")
handleRemoteLoadFromSubjectNameComplete: function (ajaxResponse) {
this.handleRemoteLoadComplete(ajaxResponse, "subject")
},

handleRemoteLoadFromUrlComplete: function (ajaxResponse) {
Expand All @@ -282,8 +278,7 @@ sap.ui.define([

if (status === 201) {
this.byId("remoteUrl").setValue("");
model.setProperty("/topicNameStem", "");
model.setProperty("/topicMergeWithKey", false);
model.setProperty("/subjectName", "");

MessageToast.show("Schema successfully loaded.");
let oData = JSON.parse(ajaxResponse.responseText);
Expand All @@ -303,7 +298,7 @@ sap.ui.define([
if (source === "remote") {
msg = `Error retrieving the schema file from "${this.byId("remoteUrl").getValue()}".${errorMessageDetails}`
} else {
msg = `Error retrieving the schema file by topic-name stem "${this.byId("topicNameBase").getValue()}".${errorMessageDetails}`
msg = `Error retrieving the schema file by topic-name stem "${this.byId("subjectName").getValue()}".${errorMessageDetails}`
}
} else {
msg = `Error parsing the schema file. Ensure that the file is a valid avro schema and ` +
Expand Down Expand Up @@ -433,21 +428,6 @@ sap.ui.define([
fixSchemaRegistryUrl: function (str) {
// trailing slash gets stripped
return str.replace(this.schemaRegistryRx, '$1/schema');
},

valueTopicFormatter: function(topicNameStem) {
return `topic: ${topicNameStem}-value`;
},

valueKeyTopicFormatter: function(topicNameStem) {
return `topics: ${topicNameStem}-value, ${topicNameStem}-key`;
},

topicFormatter: function(topicNameStem, includeKey) {
if (includeKey)
return this.valueKeyTopicFormatter(topicNameStem);
else
return this.valueTopicFormatter(topicNameStem);
}

});
Expand Down
34 changes: 9 additions & 25 deletions menas/ui/components/schema/schemaDetail.view.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,39 +93,23 @@
<Panel headerText="Load schema from schema registry" expanded="false" expandable="true"
visible="{/registryEnabled}">
<FormattedText htmlText='&lt;p>
A new schema can be loaded from an internally-defined schema registry by specifying a
topic-name &lt;em>stem&lt;/em> - based on which the topic name(s) will be derived.&lt;/p>
A new schema can be loaded from an internally-defined schema registry by specifying
a subject name that exists in the schema repository.&lt;/p>
&lt;p>For a given stem, &lt;code>-value&lt;/code> will be appended to get the primary topic name to be fetched.
Optionally, &lt;code>-key&lt;/code> variant can also be loaded and merged with the -value topic.&lt;/p>
&lt;p>Should the loading of the schema fail, a fallback of
&lt;em>subject_name&lt;/em>&lt;code>-value&lt;/code> will be attempted, too.&lt;/p>
&lt;p>Merging of the &lt;code>-key&lt;/code> and &lt;code>-value&lt;/code> schemas works as follows:
&lt;ul>
&lt;li>fields from both &lt;code>-value&lt;/code> and &lt;code>-key&lt;/code> are in the result provided there is no name-duplication&lt;/li>
&lt;li>for non-struct field names duplicates, fields from &lt;code>-value&lt;/code> take precedence&lt;/li>
&lt;li>struct-fields of the same name are recursively deep-combined&lt;/li>
&lt;ul>
&lt;/p>
'/>
<Label text="Specify topic name stem:" labelFor="topicNameBase"/>
<Label text="Specify subject name:" labelFor="subjectName"/>
<VBox width="35em">
<Input id="topicNameBase" type="Text" placeholder="Enter topic name stem (without the -value suffix), e.g. myName123"
value="{/topicNameStem}" valueLiveUpdate="true">
<Input id="subjectName" type="Text" placeholder="Enter subject name e.g. myName123-value"
value="{/subjectName}" valueLiveUpdate="true">
<layoutData>
<FlexItemData growFactor="1"/>
</layoutData>
</Input>
<Label text="Topic to be loaded from: {/topicNameStem}-value"/>
<CheckBox id="mergeWithKey" selected="{/topicMergeWithKey}" editable="true"
text="Load {/topicNameStem}-key, too, and merge it with {/topicNameStem}-value"/>

<Button text="Submit {
parts : [
{path: '/topicNameStem'},
{path: '/topicMergeWithKey'}
],
formatter: '.topicFormatter'
}" press="handleTopicSubmit" enabled="{= ${/topicNameStem}.length > 0 }"/>
<Label text="Subject to be loaded from: {/subjectName}; fallback: {/subjectName}-value"/>
<Button text="Submit {/subjectName} " press="handleSubjectSubmit" enabled="{= ${/subjectName}.length > 0 }"/>
</VBox>
</Panel>
</VBox>
Expand Down
Loading

0 comments on commit 228b086

Please sign in to comment.