Skip to content

Commit

Permalink
Remove else statement that causes schemas to not be set correctly (#99)
Browse files Browse the repository at this point in the history
* Remove else statement that causes schemas to not be set correctly

* Add return error on version generation failure

* Expand comment

---------

Co-authored-by: Maarten van der Heijden <maarten.van.der.heijden@ing.com>
  • Loading branch information
survivorbat and Maarten van der Heijden authored Sep 10, 2023
1 parent 96a09e7 commit baa74d8
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 103 deletions.
64 changes: 35 additions & 29 deletions mockSchemaRegistryClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ type MockSchemaRegistryClient struct {
// schemaRegistryURL is used to form errors
schemaRegistryURL string

// schemaCache is a map of subject to a map of versions to the actual schema
schemaCache map[string]map[int]*Schema
// schemaVersions is a map of subject to a map of versions to the actual schema
schemaVersions map[string]map[int]*Schema

// idCache is a map of schema ID to the actual schema
idCache map[int]*Schema
// schemaIDs is a map of schema ID to the actual schema
schemaIDs map[int]*Schema

// idCounter is used to generate unique IDs for each schema
idCounter int
Expand All @@ -41,8 +41,8 @@ type MockSchemaRegistryClient struct {
func CreateMockSchemaRegistryClient(mockURL string) *MockSchemaRegistryClient {
mockClient := &MockSchemaRegistryClient{
schemaRegistryURL: mockURL,
schemaCache: map[string]map[int]*Schema{},
idCache: map[int]*Schema{},
schemaVersions: map[string]map[int]*Schema{},
schemaIDs: map[int]*Schema{},
}

return mockClient
Expand Down Expand Up @@ -74,9 +74,9 @@ func (mck *MockSchemaRegistryClient) SetSchema(id int, subject string, schema st
return nil, errInvalidSchemaType
}

resultFromSchemaCache, ok := mck.schemaCache[subject]
resultFromSchemaCache, ok := mck.schemaVersions[subject]
if !ok {
return mck.generateVersion(id, subject, schema, schemaType, version), nil
return mck.generateVersion(id, subject, schema, schemaType, version)
}

// Verify if it's not the same schema as an existing version
Expand All @@ -91,12 +91,12 @@ func (mck *MockSchemaRegistryClient) SetSchema(id int, subject string, schema st
}
}

return mck.generateVersion(id, subject, schema, schemaType, version), nil
return mck.generateVersion(id, subject, schema, schemaType, version)
}

// GetSchema Returns a Schema for the given ID
func (mck *MockSchemaRegistryClient) GetSchema(schemaID int) (*Schema, error) {
thisSchema, ok := mck.idCache[schemaID]
thisSchema, ok := mck.schemaIDs[schemaID]
if !ok {
posErr := url.Error{
Op: "GET",
Expand Down Expand Up @@ -134,7 +134,7 @@ func (mck *MockSchemaRegistryClient) GetSchemaVersions(subject string) ([]int, e
// GetSchemaByVersion Returns the given Schema according to the passed in subject and version number
func (mck *MockSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*Schema, error) {
var schema *Schema
schemaVersionMap, ok := mck.schemaCache[subject]
schemaVersionMap, ok := mck.schemaVersions[subject]
if !ok {
posErr := url.Error{
Op: "GET",
Expand Down Expand Up @@ -165,7 +165,7 @@ func (mck *MockSchemaRegistryClient) GetSchemaByVersion(subject string, version
func (mck *MockSchemaRegistryClient) GetSubjects() ([]string, error) {
var allSubjects []string

for subject := range mck.schemaCache {
for subject := range mck.schemaVersions {
allSubjects = append(allSubjects, subject)
}

Expand All @@ -179,13 +179,13 @@ func (mck *MockSchemaRegistryClient) GetSubjectsIncludingDeleted() ([]string, er

// DeleteSubject removes given subject from the cache
func (mck *MockSchemaRegistryClient) DeleteSubject(subject string, _ bool) error {
delete(mck.schemaCache, subject)
delete(mck.schemaVersions, subject)
return nil
}

// DeleteSubjectByVersion removes given subject's version from cache
func (mck *MockSchemaRegistryClient) DeleteSubjectByVersion(subject string, version int, _ bool) error {
_, ok := mck.schemaCache[subject]
_, ok := mck.schemaVersions[subject]
if !ok {
posErr := url.Error{
Op: "DELETE",
Expand All @@ -195,9 +195,9 @@ func (mck *MockSchemaRegistryClient) DeleteSubjectByVersion(subject string, vers
return &posErr
}

for schemaVersion := range mck.schemaCache[subject] {
for schemaVersion := range mck.schemaVersions[subject] {
if schemaVersion == version {
delete(mck.schemaCache[subject], schemaVersion)
delete(mck.schemaVersions[subject], schemaVersion)
return nil
}
}
Expand Down Expand Up @@ -275,24 +275,30 @@ qualify for key/value subjects, it expects to have a `concrete subject` passed o
*/

// generateVersion the next version of the schema for the given subject, givenVersion can be set to -1 to generate one.
func (mck *MockSchemaRegistryClient) generateVersion(id int, subject string, schema string, schemaType SchemaType, givenVersion int) *Schema {
versions := mck.allVersions(subject)
func (mck *MockSchemaRegistryClient) generateVersion(id int, subject string, schema string, schemaType SchemaType, givenVersion int) (*Schema, error) {
schemaVersionMap := map[int]*Schema{}
currentVersion := 1

// Determine if a version was given
if givenVersion >= 0 {
currentVersion = givenVersion
} else {
// Otherwise, determine if we need to generate one from existing versions
if len(versions) > 0 {
schemaVersionMap = mck.schemaCache[subject]
}

// if existing versions are found, make sure to load in the version map
if existingMap := mck.schemaVersions[subject]; len(existingMap) > 0 {
schemaVersionMap = existingMap

// If no version was given, and existing versions are found, +1 the new number from the latest version
if givenVersion <= 0 {
versions := mck.allVersions(subject)
currentVersion = versions[len(versions)-1] + 1
}
}

// Add a codec, required otherwise Codec() panics
codec, _ := goavro.NewCodec(schema)
// Add a codec, required otherwise Codec() panics and the mock registry is unusable
codec, err := goavro.NewCodec(schema)
if err != nil {
return nil, err
}

schemaToRegister := &Schema{
id: id,
Expand All @@ -303,16 +309,16 @@ func (mck *MockSchemaRegistryClient) generateVersion(id int, subject string, sch
}

schemaVersionMap[currentVersion] = schemaToRegister
mck.schemaCache[subject] = schemaVersionMap
mck.idCache[schemaToRegister.id] = schemaToRegister
mck.schemaVersions[subject] = schemaVersionMap
mck.schemaIDs[schemaToRegister.id] = schemaToRegister

return schemaToRegister
return schemaToRegister, nil
}

// allVersions returns all versions for a given subject, assumes it exists
func (mck *MockSchemaRegistryClient) allVersions(subject string) []int {
var versions []int
result, ok := mck.schemaCache[subject]
result, ok := mck.schemaVersions[subject]

if ok {
for version := range result {
Expand Down
Loading

0 comments on commit baa74d8

Please sign in to comment.