Skip to content

Commit

Permalink
Proto schema registry refresh optimizations (#1040)
Browse files Browse the repository at this point in the history
* backend: optimize proto refresh to only compile changed schemas, use pointers throughout

* backend: improve handling deletion of schemas better

* backend: use len of errors to detect if we have any errors

* backend: just return schemas and errors

* backend: add check for nil or empty schemaRes in case of errors
  • Loading branch information
bojand committed Feb 5, 2024
1 parent 8a44126 commit 50874a7
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 38 deletions.
36 changes: 21 additions & 15 deletions backend/pkg/schema/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,19 @@ func (c *Client) GetSchemaBySubject(ctx context.Context, subject, version string
}

// GetSchemasBySubject gets all versioned schemas for a given subject.
func (c *Client) GetSchemasBySubject(ctx context.Context, subject string, showSoftDeleted bool) ([]SchemaVersionedResponse, error) {
func (c *Client) GetSchemasBySubject(ctx context.Context, subject string, showSoftDeleted bool) ([]*SchemaVersionedResponse, error) {
versionRes, err := c.GetSubjectVersions(ctx, subject, showSoftDeleted)
if err != nil {
return nil, err
}

results := []SchemaVersionedResponse{}
results := []*SchemaVersionedResponse{}
for _, sv := range versionRes.Versions {
sr, err := c.GetSchemaBySubject(ctx, subject, strconv.FormatInt(int64(sv), 10), showSoftDeleted)
if err != nil {
return nil, fmt.Errorf("failed to get schema by subject %s and version %d", subject, sv)
}
results = append(results, *sr)
results = append(results, sr)
}

return results, nil
Expand Down Expand Up @@ -595,8 +595,8 @@ func (c *Client) GetSchemaTypes(ctx context.Context) ([]string, error) {
}

// GetSchemas retrieves all stored schemas from a schema registry.
func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]SchemaVersionedResponse, error) {
var schemas []SchemaVersionedResponse
func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]*SchemaVersionedResponse, []error) {
var schemas []*SchemaVersionedResponse
req := c.client.R().
SetContext(ctx).
SetResult(&schemas)
Expand All @@ -607,7 +607,7 @@ func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]Schema

res, err := req.Get("/schemas")
if err != nil {
return nil, fmt.Errorf("get schemas failed: %w", err)
return nil, []error{fmt.Errorf("get schemas failed: %w", err)}
}

if res.StatusCode() == http.StatusNotFound {
Expand All @@ -619,9 +619,9 @@ func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]Schema
if res.IsError() {
restErr, ok := res.Error().(*RestError)
if !ok {
return nil, fmt.Errorf("get schemas failed: Status code %d", res.StatusCode())
return nil, []error{fmt.Errorf("get schemas failed: Status code %d", res.StatusCode())}
}
return nil, restErr
return nil, []error{restErr}
}

return schemas, nil
Expand Down Expand Up @@ -687,38 +687,44 @@ func (c *Client) CreateSchema(ctx context.Context, subjectName string, schema Sc

// GetSchemasIndividually returns all schemas by describing all schemas one by one. This may be used against
// schema registry that don't support the /schemas endpoint that returns a list of all registered schemas.
func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted bool) ([]SchemaVersionedResponse, error) {
func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted bool) ([]*SchemaVersionedResponse, []error) {
subjectsRes, err := c.GetSubjects(ctx, showSoftDeleted)
if err != nil {
return nil, fmt.Errorf("failed to get subjects to fetch schemas for: %w", err)
return nil, []error{fmt.Errorf("failed to get subjects to fetch schemas for: %w", err)}
}

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // limit max concurrency to 10 requests at a time

schemas := make([]SchemaVersionedResponse, 0, len(subjectsRes.Subjects))
schemas := make([]*SchemaVersionedResponse, 0, len(subjectsRes.Subjects))
errors := make([]error, 0, len(subjectsRes.Subjects))
mutex := sync.Mutex{}

for _, subject := range subjectsRes.Subjects {
subject := subject

g.Go(func() error {
srRes, err := c.GetSchemasBySubject(ctx, subject, showSoftDeleted)
if err != nil {
return err
mutex.Lock()
errors = append(errors, err)
mutex.Unlock()
return nil
}

mutex.Lock()
schemas = append(schemas, srRes...)
mutex.Unlock()
return err
return nil
})
}

// is this even possible?
if err := g.Wait(); err != nil {
return nil, err
return nil, []error{err}
}

return schemas, nil
return schemas, errors
}

// GetSchemaReferencesResponse is the response to fetching schema references.
Expand Down
118 changes: 98 additions & 20 deletions backend/pkg/schema/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package schema
import (
"context"
"fmt"
"maps"
"strconv"
"strings"
"sync"
"time"

"github.com/hamba/avro/v2"
Expand Down Expand Up @@ -41,6 +43,11 @@ type Service struct {
// by subjects is needed to lookup references in avro schemas.
schemaBySubjectVersion *cache.Cache[string, *SchemaVersionedResponse]
avroSchemaByID *cache.Cache[uint32, avro.Schema]

// for protobuf schema refreshing and compiling
srRefreshMutex sync.RWMutex
protoSchemasByID map[int]*SchemaVersionedResponse
protoFDByID map[int]*desc.FileDescriptor
}

// NewService to access schema registry. Returns an error if connection can't be established.
Expand All @@ -67,35 +74,73 @@ func (s *Service) CheckConnectivity(ctx context.Context) error {

// GetProtoDescriptors returns all file descriptors in a map where the key is the schema id.
// The value is a set of file descriptors because each schema may references / imported proto schemas.
//
//nolint:gocognit,cyclop // complicated refresh logic
func (s *Service) GetProtoDescriptors(ctx context.Context) (map[int]*desc.FileDescriptor, error) {
// Singleflight makes sure to not run the function body if there are concurrent requests. We use this to avoid
// duplicate requests against the schema registry
key := "get-proto-descriptors"
v, err, _ := s.requestGroup.Do(key, func() (any, error) {
schemasRes, err := s.registryClient.GetSchemas(ctx, false)
if err != nil {
// If schema registry returns an error we want to retry it next time, so let's forget the key
s.requestGroup.Forget(key)
return nil, fmt.Errorf("failed to get schema from registry: %w", err)
_, err, _ := s.requestGroup.Do(key, func() (any, error) {
schemasRes, errs := s.registryClient.GetSchemas(ctx, false)
if len(errs) > 0 {
for _, err := range errs {
s.logger.Error("failed to get schema from registry", zap.Error(err))
}

if len(schemasRes) == 0 {
return nil, nil
}
}

s.srRefreshMutex.Lock()
defer s.srRefreshMutex.Unlock()

if s.protoSchemasByID == nil {
s.protoSchemasByID = make(map[int]*SchemaVersionedResponse, len(schemasRes))
}

// collect existing schema IDs
existingSchemaIDs := make(map[int]struct{}, len(s.protoSchemasByID))
for id := range s.protoSchemasByID {
existingSchemaIDs[id] = struct{}{}
}

// 1. Index all returned schemas by their respective subject name and version as stored in the schema registry
schemasBySubjectAndVersion := make(map[string]map[int]SchemaVersionedResponse)
schemasToCompile := make([]*SchemaVersionedResponse, 0, len(schemasRes))

newSchemaIDs := make(map[int]struct{}, len(schemasRes))

// Index all returned schemas by their respective subject name and version as stored in the schema registry
// Collect the new or updated schemas to compile
schemasBySubjectAndVersion := make(map[string]map[int]*SchemaVersionedResponse)
for _, schema := range schemasRes {
schema := schema

if schema.Type != TypeProtobuf {
continue
}
_, exists := schemasBySubjectAndVersion[schema.Subject]
if !exists {
schemasBySubjectAndVersion[schema.Subject] = make(map[int]SchemaVersionedResponse)
schemasBySubjectAndVersion[schema.Subject] = make(map[int]*SchemaVersionedResponse)
}
schemasBySubjectAndVersion[schema.Subject][schema.Version] = schema

if existing, ok := s.protoSchemasByID[schema.SchemaID]; !ok || !strings.EqualFold(existing.Schema, schema.Schema) {
schemasToCompile = append(schemasToCompile, schema)
}

newSchemaIDs[schema.SchemaID] = struct{}{}

s.protoSchemasByID[schema.SchemaID] = schema
}

// 2. Compile each subject with each of it's references into one or more filedescriptors so that they can be
compileStart := time.Now()

// 2. Compile each subject with each of it's references into one or more file descriptors so that they can be
// registered in their own proto registry.
fdBySchemaID := make(map[int]*desc.FileDescriptor)
for _, schema := range schemasRes {
newFDBySchemaID := make(map[int]*desc.FileDescriptor, len(schemasToCompile))
for _, schema := range schemasToCompile {
schema := schema

if schema.Type != TypeProtobuf {
continue
}
Expand All @@ -108,24 +153,57 @@ func (s *Service) GetProtoDescriptors(ctx context.Context) (map[int]*desc.FileDe
zap.Error(err))
continue
}
fdBySchemaID[schema.SchemaID] = fd
newFDBySchemaID[schema.SchemaID] = fd
}

compileDuration := time.Since(compileStart)

// merge
if s.protoFDByID == nil {
s.protoFDByID = make(map[int]*desc.FileDescriptor, len(newFDBySchemaID))
}

maps.Copy(s.protoFDByID, newFDBySchemaID)

schemasDeleted := 0

// remove schemas only if no errors
if len(errs) == 0 {
schemasIDsToDelete := make([]int, 0, len(s.protoSchemasByID)/2)

for id := range existingSchemaIDs {
if _, ok := newSchemaIDs[id]; !ok {
schemasIDsToDelete = append(schemasIDsToDelete, id)
}
}

for _, id := range schemasIDsToDelete {
delete(s.protoSchemasByID, id)
delete(s.protoFDByID, id)
}

schemasDeleted = len(schemasIDsToDelete)
}

return fdBySchemaID, nil
s.logger.Info("compiled new schemas",
zap.Int("updated_schemas", len(schemasToCompile)),
zap.Int("deleted_schemas", schemasDeleted),
zap.Duration("compile_duration", compileDuration))

return nil, nil
})
if err != nil {
return nil, err
}

descriptors, ok := v.(map[int]*desc.FileDescriptor)
if !ok {
return nil, fmt.Errorf("failed to type assert file descriptors")
}
s.srRefreshMutex.RLock()
descriptors := maps.Clone(s.protoFDByID)
s.srRefreshMutex.RUnlock()

return descriptors, nil
}

func (s *Service) addReferences(schema SchemaVersionedResponse, schemaRepository map[string]map[int]SchemaVersionedResponse, schemasByPath map[string]string) error {
func (s *Service) addReferences(schema *SchemaVersionedResponse, schemaRepository map[string]map[int]*SchemaVersionedResponse, schemasByPath map[string]string) error {
for _, ref := range schema.References {
refSubject, exists := schemaRepository[ref.Subject]
if !exists {
Expand All @@ -147,7 +225,7 @@ func (s *Service) addReferences(schema SchemaVersionedResponse, schemaRepository
return nil
}

func (s *Service) compileProtoSchemas(schema SchemaVersionedResponse, schemaRepository map[string]map[int]SchemaVersionedResponse) (*desc.FileDescriptor, error) {
func (s *Service) compileProtoSchemas(schema *SchemaVersionedResponse, schemaRepository map[string]map[int]*SchemaVersionedResponse) (*desc.FileDescriptor, error) {
// 1. Let's find the references for each schema and put the references' schemas into our in memory filesystem.
schemasByPath := make(map[string]string)
schemasByPath[schema.Subject] = schema.Schema
Expand Down
8 changes: 5 additions & 3 deletions taskfiles/backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ tasks:

install-gofumpt:
vars:
GOFUMPT_VERSION: 0.4.0
GOFUMPT_VERSION: 0.5.0
desc: install gofumpt go formater
deps:
- install-go
Expand All @@ -50,7 +50,7 @@ tasks:

install-gci:
vars:
GCI_VERSION: 0.9.0
GCI_VERSION: 0.12.1
desc: install gci
deps:
- install-go
Expand Down Expand Up @@ -108,7 +108,9 @@ tasks:
cmds:
- '{{ .BUILD_ROOT }}/bin/go/goimports -l -w -local "github.com/redpanda-data/console/backend" .'
- '{{ .BUILD_ROOT }}/bin/go/gofumpt -l -w .'
- '{{ .BUILD_ROOT }}/bin/go/gci write -s standard -s default -s ''Prefix(github.com/redpanda-data/console/backend)'' .'
# https://github.com/daixiang0/gci/issues/174
# moves "maps" into the wrong place
# - '{{ .BUILD_ROOT }}/bin/go/gci write -s default -s standard -s ''prefix(github.com/redpanda-data/console/backend)'' .'

lint:
desc: Run Go linters for backend code
Expand Down

0 comments on commit 50874a7

Please sign in to comment.