From 50874a7c76ec175e0b6bf5280898fda50a15307c Mon Sep 17 00:00:00 2001 From: Bojan Date: Mon, 5 Feb 2024 16:03:13 -0400 Subject: [PATCH] Proto schema registry refresh optimizations (#1040) * backend: optimize proto refresh to only compile changed schemas, use pointers throughout * backend: improve handling deletion of schemas better * backend: use len of errors to detect if we have any errors * backend: just return schemas and errors * backend: add check for nil or empty schemaRes in case of errors --- backend/pkg/schema/client.go | 36 ++++++----- backend/pkg/schema/service.go | 118 ++++++++++++++++++++++++++++------ taskfiles/backend.yaml | 8 ++- 3 files changed, 124 insertions(+), 38 deletions(-) diff --git a/backend/pkg/schema/client.go b/backend/pkg/schema/client.go index 4250ee8d6..4bfecb6d2 100644 --- a/backend/pkg/schema/client.go +++ b/backend/pkg/schema/client.go @@ -211,19 +211,19 @@ func (c *Client) GetSchemaBySubject(ctx context.Context, subject, version string } // GetSchemasBySubject gets all versioned schemas for a given subject. -func (c *Client) GetSchemasBySubject(ctx context.Context, subject string, showSoftDeleted bool) ([]SchemaVersionedResponse, error) { +func (c *Client) GetSchemasBySubject(ctx context.Context, subject string, showSoftDeleted bool) ([]*SchemaVersionedResponse, error) { versionRes, err := c.GetSubjectVersions(ctx, subject, showSoftDeleted) if err != nil { return nil, err } - results := []SchemaVersionedResponse{} + results := []*SchemaVersionedResponse{} for _, sv := range versionRes.Versions { sr, err := c.GetSchemaBySubject(ctx, subject, strconv.FormatInt(int64(sv), 10), showSoftDeleted) if err != nil { return nil, fmt.Errorf("failed to get schema by subject %s and version %d", subject, sv) } - results = append(results, *sr) + results = append(results, sr) } return results, nil @@ -595,8 +595,8 @@ func (c *Client) GetSchemaTypes(ctx context.Context) ([]string, error) { } // GetSchemas retrieves all stored schemas from a schema registry. -func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]SchemaVersionedResponse, error) { - var schemas []SchemaVersionedResponse +func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]*SchemaVersionedResponse, []error) { + var schemas []*SchemaVersionedResponse req := c.client.R(). SetContext(ctx). SetResult(&schemas) @@ -607,7 +607,7 @@ func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]Schema res, err := req.Get("/schemas") if err != nil { - return nil, fmt.Errorf("get schemas failed: %w", err) + return nil, []error{fmt.Errorf("get schemas failed: %w", err)} } if res.StatusCode() == http.StatusNotFound { @@ -619,9 +619,9 @@ func (c *Client) GetSchemas(ctx context.Context, showSoftDeleted bool) ([]Schema if res.IsError() { restErr, ok := res.Error().(*RestError) if !ok { - return nil, fmt.Errorf("get schemas failed: Status code %d", res.StatusCode()) + return nil, []error{fmt.Errorf("get schemas failed: Status code %d", res.StatusCode())} } - return nil, restErr + return nil, []error{restErr} } return schemas, nil @@ -687,38 +687,44 @@ func (c *Client) CreateSchema(ctx context.Context, subjectName string, schema Sc // GetSchemasIndividually returns all schemas by describing all schemas one by one. This may be used against // schema registry that don't support the /schemas endpoint that returns a list of all registered schemas. -func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted bool) ([]SchemaVersionedResponse, error) { +func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted bool) ([]*SchemaVersionedResponse, []error) { subjectsRes, err := c.GetSubjects(ctx, showSoftDeleted) if err != nil { - return nil, fmt.Errorf("failed to get subjects to fetch schemas for: %w", err) + return nil, []error{fmt.Errorf("failed to get subjects to fetch schemas for: %w", err)} } g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) // limit max concurrency to 10 requests at a time - schemas := make([]SchemaVersionedResponse, 0, len(subjectsRes.Subjects)) + schemas := make([]*SchemaVersionedResponse, 0, len(subjectsRes.Subjects)) + errors := make([]error, 0, len(subjectsRes.Subjects)) mutex := sync.Mutex{} for _, subject := range subjectsRes.Subjects { subject := subject + g.Go(func() error { srRes, err := c.GetSchemasBySubject(ctx, subject, showSoftDeleted) if err != nil { - return err + mutex.Lock() + errors = append(errors, err) + mutex.Unlock() + return nil } mutex.Lock() schemas = append(schemas, srRes...) mutex.Unlock() - return err + return nil }) } + // is this even possible? if err := g.Wait(); err != nil { - return nil, err + return nil, []error{err} } - return schemas, nil + return schemas, errors } // GetSchemaReferencesResponse is the response to fetching schema references. diff --git a/backend/pkg/schema/service.go b/backend/pkg/schema/service.go index 8f28333f4..d7f003dc1 100644 --- a/backend/pkg/schema/service.go +++ b/backend/pkg/schema/service.go @@ -12,8 +12,10 @@ package schema import ( "context" "fmt" + "maps" "strconv" "strings" + "sync" "time" "github.com/hamba/avro/v2" @@ -41,6 +43,11 @@ type Service struct { // by subjects is needed to lookup references in avro schemas. schemaBySubjectVersion *cache.Cache[string, *SchemaVersionedResponse] avroSchemaByID *cache.Cache[uint32, avro.Schema] + + // for protobuf schema refreshing and compiling + srRefreshMutex sync.RWMutex + protoSchemasByID map[int]*SchemaVersionedResponse + protoFDByID map[int]*desc.FileDescriptor } // NewService to access schema registry. Returns an error if connection can't be established. @@ -67,35 +74,73 @@ func (s *Service) CheckConnectivity(ctx context.Context) error { // GetProtoDescriptors returns all file descriptors in a map where the key is the schema id. // The value is a set of file descriptors because each schema may references / imported proto schemas. +// +//nolint:gocognit,cyclop // complicated refresh logic func (s *Service) GetProtoDescriptors(ctx context.Context) (map[int]*desc.FileDescriptor, error) { // Singleflight makes sure to not run the function body if there are concurrent requests. We use this to avoid // duplicate requests against the schema registry key := "get-proto-descriptors" - v, err, _ := s.requestGroup.Do(key, func() (any, error) { - schemasRes, err := s.registryClient.GetSchemas(ctx, false) - if err != nil { - // If schema registry returns an error we want to retry it next time, so let's forget the key - s.requestGroup.Forget(key) - return nil, fmt.Errorf("failed to get schema from registry: %w", err) + _, err, _ := s.requestGroup.Do(key, func() (any, error) { + schemasRes, errs := s.registryClient.GetSchemas(ctx, false) + if len(errs) > 0 { + for _, err := range errs { + s.logger.Error("failed to get schema from registry", zap.Error(err)) + } + + if len(schemasRes) == 0 { + return nil, nil + } + } + + s.srRefreshMutex.Lock() + defer s.srRefreshMutex.Unlock() + + if s.protoSchemasByID == nil { + s.protoSchemasByID = make(map[int]*SchemaVersionedResponse, len(schemasRes)) + } + + // collect existing schema IDs + existingSchemaIDs := make(map[int]struct{}, len(s.protoSchemasByID)) + for id := range s.protoSchemasByID { + existingSchemaIDs[id] = struct{}{} } - // 1. Index all returned schemas by their respective subject name and version as stored in the schema registry - schemasBySubjectAndVersion := make(map[string]map[int]SchemaVersionedResponse) + schemasToCompile := make([]*SchemaVersionedResponse, 0, len(schemasRes)) + + newSchemaIDs := make(map[int]struct{}, len(schemasRes)) + + // Index all returned schemas by their respective subject name and version as stored in the schema registry + // Collect the new or updated schemas to compile + schemasBySubjectAndVersion := make(map[string]map[int]*SchemaVersionedResponse) for _, schema := range schemasRes { + schema := schema + if schema.Type != TypeProtobuf { continue } _, exists := schemasBySubjectAndVersion[schema.Subject] if !exists { - schemasBySubjectAndVersion[schema.Subject] = make(map[int]SchemaVersionedResponse) + schemasBySubjectAndVersion[schema.Subject] = make(map[int]*SchemaVersionedResponse) } schemasBySubjectAndVersion[schema.Subject][schema.Version] = schema + + if existing, ok := s.protoSchemasByID[schema.SchemaID]; !ok || !strings.EqualFold(existing.Schema, schema.Schema) { + schemasToCompile = append(schemasToCompile, schema) + } + + newSchemaIDs[schema.SchemaID] = struct{}{} + + s.protoSchemasByID[schema.SchemaID] = schema } - // 2. Compile each subject with each of it's references into one or more filedescriptors so that they can be + compileStart := time.Now() + + // 2. Compile each subject with each of it's references into one or more file descriptors so that they can be // registered in their own proto registry. - fdBySchemaID := make(map[int]*desc.FileDescriptor) - for _, schema := range schemasRes { + newFDBySchemaID := make(map[int]*desc.FileDescriptor, len(schemasToCompile)) + for _, schema := range schemasToCompile { + schema := schema + if schema.Type != TypeProtobuf { continue } @@ -108,24 +153,57 @@ func (s *Service) GetProtoDescriptors(ctx context.Context) (map[int]*desc.FileDe zap.Error(err)) continue } - fdBySchemaID[schema.SchemaID] = fd + newFDBySchemaID[schema.SchemaID] = fd + } + + compileDuration := time.Since(compileStart) + + // merge + if s.protoFDByID == nil { + s.protoFDByID = make(map[int]*desc.FileDescriptor, len(newFDBySchemaID)) + } + + maps.Copy(s.protoFDByID, newFDBySchemaID) + + schemasDeleted := 0 + + // remove schemas only if no errors + if len(errs) == 0 { + schemasIDsToDelete := make([]int, 0, len(s.protoSchemasByID)/2) + + for id := range existingSchemaIDs { + if _, ok := newSchemaIDs[id]; !ok { + schemasIDsToDelete = append(schemasIDsToDelete, id) + } + } + + for _, id := range schemasIDsToDelete { + delete(s.protoSchemasByID, id) + delete(s.protoFDByID, id) + } + + schemasDeleted = len(schemasIDsToDelete) } - return fdBySchemaID, nil + s.logger.Info("compiled new schemas", + zap.Int("updated_schemas", len(schemasToCompile)), + zap.Int("deleted_schemas", schemasDeleted), + zap.Duration("compile_duration", compileDuration)) + + return nil, nil }) if err != nil { return nil, err } - descriptors, ok := v.(map[int]*desc.FileDescriptor) - if !ok { - return nil, fmt.Errorf("failed to type assert file descriptors") - } + s.srRefreshMutex.RLock() + descriptors := maps.Clone(s.protoFDByID) + s.srRefreshMutex.RUnlock() return descriptors, nil } -func (s *Service) addReferences(schema SchemaVersionedResponse, schemaRepository map[string]map[int]SchemaVersionedResponse, schemasByPath map[string]string) error { +func (s *Service) addReferences(schema *SchemaVersionedResponse, schemaRepository map[string]map[int]*SchemaVersionedResponse, schemasByPath map[string]string) error { for _, ref := range schema.References { refSubject, exists := schemaRepository[ref.Subject] if !exists { @@ -147,7 +225,7 @@ func (s *Service) addReferences(schema SchemaVersionedResponse, schemaRepository return nil } -func (s *Service) compileProtoSchemas(schema SchemaVersionedResponse, schemaRepository map[string]map[int]SchemaVersionedResponse) (*desc.FileDescriptor, error) { +func (s *Service) compileProtoSchemas(schema *SchemaVersionedResponse, schemaRepository map[string]map[int]*SchemaVersionedResponse) (*desc.FileDescriptor, error) { // 1. Let's find the references for each schema and put the references' schemas into our in memory filesystem. schemasByPath := make(map[string]string) schemasByPath[schema.Subject] = schema.Schema diff --git a/taskfiles/backend.yaml b/taskfiles/backend.yaml index 09a44a465..68fc8a226 100644 --- a/taskfiles/backend.yaml +++ b/taskfiles/backend.yaml @@ -37,7 +37,7 @@ tasks: install-gofumpt: vars: - GOFUMPT_VERSION: 0.4.0 + GOFUMPT_VERSION: 0.5.0 desc: install gofumpt go formater deps: - install-go @@ -50,7 +50,7 @@ tasks: install-gci: vars: - GCI_VERSION: 0.9.0 + GCI_VERSION: 0.12.1 desc: install gci deps: - install-go @@ -108,7 +108,9 @@ tasks: cmds: - '{{ .BUILD_ROOT }}/bin/go/goimports -l -w -local "github.com/redpanda-data/console/backend" .' - '{{ .BUILD_ROOT }}/bin/go/gofumpt -l -w .' - - '{{ .BUILD_ROOT }}/bin/go/gci write -s standard -s default -s ''Prefix(github.com/redpanda-data/console/backend)'' .' + # https://github.com/daixiang0/gci/issues/174 + # moves "maps" into the wrong place + # - '{{ .BUILD_ROOT }}/bin/go/gci write -s default -s standard -s ''prefix(github.com/redpanda-data/console/backend)'' .' lint: desc: Run Go linters for backend code