From 129530e2b329bcbb83cf04f2289a7fc85c853868 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Mon, 11 Aug 2025 19:19:28 -0400 Subject: [PATCH 1/3] adding exclude files + other changes to get registration to work for me --- .../dockercompose/chip_ingress_set/protos.go | 380 +++++++++++++----- 1 file changed, 272 insertions(+), 108 deletions(-) diff --git a/framework/components/dockercompose/chip_ingress_set/protos.go b/framework/components/dockercompose/chip_ingress_set/protos.go index d7b367b6c..b86024447 100644 --- a/framework/components/dockercompose/chip_ingress_set/protos.go +++ b/framework/components/dockercompose/chip_ingress_set/protos.go @@ -29,6 +29,7 @@ type ProtoSchemaSet struct { Ref string `toml:"ref"` // ref or tag or commit SHA Folders []string `toml:"folders"` // if not provided, all protos will be fetched, otherwise only protos in these folders will be fetched SubjectPrefix string `toml:"subject_prefix"` // optional prefix for subjects + ExcludeFiles []string `toml:"exclude_files"` // files to exclude from registration (e.g., ['workflows/v2/execution_status.proto']) } // SubjectNamingStrategyFn is a function that is used to determine the subject name for a given proto file in a given repo @@ -50,7 +51,6 @@ func validateRepoConfiguration(repoConfig ProtoSchemaSet) error { if repoConfig.Ref != "" { return errors.New("ref is not supported with local protos with 'file://' prefix") } - return nil } @@ -76,9 +76,13 @@ func DefaultRegisterAndFetchProtos(ctx context.Context, client *github.Client, p } func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSchemaSets []ProtoSchemaSet, schemaRegistryURL string, repoToSubjectNamingStrategy RepositoryToSubjectNamingStrategyFn) error { - framework.L.Debug().Msgf("Registering and fetching protos from %d repositories", len(protoSchemaSets)) + framework.L.Info().Msgf("Registering and fetching protos from %d repositories", len(protoSchemaSets)) for _, protoSchemaSet := range protoSchemaSets { + framework.L.Info().Msgf("Processing proto schema set: %s", protoSchemaSet.URI) + if len(protoSchemaSet.ExcludeFiles) > 0 { + framework.L.Info().Msgf("Excluding files: %s", strings.Join(protoSchemaSet.ExcludeFiles, ", ")) + } if valErr := validateRepoConfiguration(protoSchemaSet); valErr != nil { return errors.Wrapf(valErr, "invalid repo configuration for schema set: %v", protoSchemaSet) } @@ -89,22 +93,18 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch return client } - var client *github.Client - if token := os.Getenv("GITHUB_TOKEN"); token != "" { ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: token}) tc := oauth2.NewClient(ctx, ts) - client = github.NewClient(tc) - } else { - framework.L.Warn().Msg("GITHUB_TOKEN is not set, using unauthenticated GitHub client. This may cause rate limiting issues when downloading proto files") - client = github.NewClient(nil) + return github.NewClient(tc) } - return client + framework.L.Warn().Msg("GITHUB_TOKEN is not set, using unauthenticated GitHub client. This may cause rate limiting issues when downloading proto files") + return github.NewClient(nil) } for _, protoSchemaSet := range protoSchemaSets { - protos, protosErr := fetchProtoFilesInFolders(ctx, ghClientFn, protoSchemaSet.URI, protoSchemaSet.Ref, protoSchemaSet.Folders) + protos, protosErr := fetchProtoFilesInFolders(ctx, ghClientFn, protoSchemaSet.URI, protoSchemaSet.Ref, protoSchemaSet.Folders, protoSchemaSet.ExcludeFiles) if protosErr != nil { return errors.Wrapf(protosErr, "failed to fetch protos from %s", protoSchemaSet.URI) } @@ -129,7 +129,7 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch subjects[proto.Path] = subjectMessage } - registerErr := registerAllWithTopologicalSortingByTrial(schemaRegistryURL, protoMap, subjects) + registerErr := registerAllWithTopologicalSorting(schemaRegistryURL, protoMap, subjects) if registerErr != nil { return errors.Wrapf(registerErr, "failed to register protos from %s", protoSchemaSet.URI) } @@ -168,46 +168,64 @@ func extractPackageNameWithRegex(protoSrc string) (string, error) { return matches[1], nil } -// we use simple regex to extract top-level message names from a proto file -// so that we don't need to parse the proto file with a parser (which would require a lot of dependencies) +// extractTopLevelMessageNamesWithRegex extracts top-level message and enum names from a proto file using regex. func extractTopLevelMessageNamesWithRegex(protoSrc string) ([]string, error) { - matches := regexp.MustCompile(`(?m)^\s*message\s+(\w+)\s*{`).FindAllStringSubmatch(protoSrc, -1) + // Extract message names + messageMatches := regexp.MustCompile(`(?m)^\s*message\s+(\w+)\s*{`).FindAllStringSubmatch(protoSrc, -1) var names []string - for _, match := range matches { + for _, match := range messageMatches { + if len(match) >= 2 { + names = append(names, match[1]) + } + } + + // Extract enum names + enumMatches := regexp.MustCompile(`(?m)^\s*enum\s+(\w+)\s*{`).FindAllStringSubmatch(protoSrc, -1) + for _, match := range enumMatches { if len(match) >= 2 { names = append(names, match[1]) } } if len(names) == 0 { - return nil, fmt.Errorf("no message names found in %s", protoSrc) + return nil, fmt.Errorf("no message or enum names found in proto source") } return names, nil } -// Fetches .proto files from a GitHub repo optionally scoped to specific folders. It is recommended to use `*github.Client` with auth token to avoid rate limiting. -func fetchProtoFilesInFolders(ctx context.Context, clientFn func() *github.Client, uri, ref string, folders []string) ([]protoFile, error) { - framework.L.Debug().Msgf("Fetching proto files from %s in folders: %s", uri, strings.Join(folders, ", ")) +// extractImportStatements extracts import statements from a proto source file using regex. +func extractImportStatements(protoSrc string) []string { + matches := regexp.MustCompile(`(?m)^\s*import\s+"([^"]+)"\s*;`).FindAllStringSubmatch(protoSrc, -1) + var imports []string + for _, match := range matches { + if len(match) >= 2 { + imports = append(imports, match[1]) + } + } + return imports +} +// fetchProtoFilesInFolders fetches .proto files from a GitHub repo optionally scoped to specific folders. +// It is recommended to use `*github.Client` with auth token to avoid rate limiting. +func fetchProtoFilesInFolders(ctx context.Context, clientFn func() *github.Client, uri, ref string, folders []string, excludeFiles []string) ([]protoFile, error) { if strings.HasPrefix(uri, "file://") { - return fetchProtosFromFilesystem(uri, folders) + return fetchProtosFromFilesystem(uri, folders, excludeFiles) } parts := strings.Split(strings.TrimPrefix(uri, "https://"), "/") - - return fetchProtosFromGithub(ctx, clientFn, parts[1], parts[2], ref, folders) + return fetchProtosFromGithub(ctx, clientFn, parts[1], parts[2], ref, folders, excludeFiles) } -func fetchProtosFromGithub(ctx context.Context, clientFn func() *github.Client, owner, repository, ref string, folders []string) ([]protoFile, error) { - cachedFiles, found, cacheErr := loadCachedProtoFiles(owner, repository, ref, folders) - if cacheErr != nil { - framework.L.Warn().Msgf("Failed to load cached proto files for %s/%s at ref %s: %v", owner, repository, ref, cacheErr) - } +func fetchProtosFromGithub(ctx context.Context, clientFn func() *github.Client, owner, repository, ref string, folders []string, excludeFiles []string) ([]protoFile, error) { + cachedFiles, found, cacheErr := loadCachedProtoFiles(owner, repository, ref, folders, excludeFiles) if cacheErr == nil && found { framework.L.Debug().Msgf("Using cached proto files for %s/%s at ref %s", owner, repository, ref) return cachedFiles, nil } + if cacheErr != nil { + framework.L.Warn().Msgf("Failed to load cached proto files for %s/%s at ref %s: %v", owner, repository, ref, cacheErr) + } client := clientFn() var files []protoFile @@ -230,13 +248,11 @@ searchLoop: } // if folders are specified, check prefix match - var folderFound string if len(folders) > 0 { matched := false for _, folder := range folders { if strings.HasPrefix(*entry.Path, strings.TrimSuffix(folder, "/")+"/") { matched = true - folderFound = folder break } } @@ -245,10 +261,25 @@ searchLoop: } } + // if excludeFiles are specified, check if the file should be excluded + if len(excludeFiles) > 0 { + excluded := false + for _, exclude := range excludeFiles { + if strings.HasPrefix(*entry.Path, exclude) { + framework.L.Debug().Msgf("Excluding proto file %s (matches exclude pattern: %s)", *entry.Path, exclude) + excluded = true + break + } + } + if excluded { + continue searchLoop + } + } + rawURL := fmt.Sprintf("https://raw.githubusercontent.com/%s/%s/%s/%s", owner, repository, sha, *entry.Path) resp, respErr := http.Get(rawURL) if respErr != nil { - return nil, errors.Wrapf(respErr, "failed tofetch %s", *entry.Path) + return nil, errors.Wrapf(respErr, "failed to fetch %s", *entry.Path) } defer resp.Body.Close() @@ -261,26 +292,19 @@ searchLoop: return nil, errors.Wrapf(bodyErr, "failed to read body for %s", *entry.Path) } - // subtract the folder from the path if it was provided, because if it is imported by some other protos - // most probably it will be imported as a relative path, so we need to remove the folder from the path - protoPath := *entry.Path - if folderFound != "" { - protoPath = strings.TrimPrefix(protoPath, strings.TrimSuffix(folderFound, "/")+"/") - } - files = append(files, protoFile{ Name: filepath.Base(*entry.Path), - Path: protoPath, + Path: *entry.Path, Content: string(body), }) } - framework.L.Debug().Msgf("Fetched %d proto files from Github's %s/%s", len(files), owner, repository) - if len(files) == 0 { return nil, fmt.Errorf("no proto files found in %s/%s in folders %s", owner, repository, strings.Join(folders, ", ")) } + framework.L.Info().Msgf("Fetched %d proto files from %s/%s", len(files), owner, repository) + saveErr := saveProtoFilesToCache(owner, repository, ref, files) if saveErr != nil { framework.L.Warn().Msgf("Failed to save proto files to cache for %s/%s at ref %s: %v", owner, repository, ref, saveErr) @@ -289,7 +313,7 @@ searchLoop: return files, nil } -func loadCachedProtoFiles(owner, repository, ref string, _ []string) ([]protoFile, bool, error) { +func loadCachedProtoFiles(owner, repository, ref string, folders []string, excludeFiles []string) ([]protoFile, bool, error) { cachePath, cacheErr := cacheFilePath(owner, repository, ref) if cacheErr != nil { return nil, false, errors.Wrapf(cacheErr, "failed to get cache file path for %s/%s at ref %s", owner, repository, ref) @@ -299,7 +323,7 @@ func loadCachedProtoFiles(owner, repository, ref string, _ []string) ([]protoFil return nil, false, nil // cache not found } - cachedFiles, cachedErr := fetchProtosFromFilesystem("file://"+cachePath, []string{}) // ignore folders since, we already filtered them when fetching from GitHub + cachedFiles, cachedErr := fetchProtosFromFilesystem("file://"+cachePath, folders, excludeFiles) if cachedErr != nil { return nil, false, errors.Wrapf(cachedErr, "failed to load cached proto files from %s", cachePath) } @@ -316,15 +340,14 @@ func saveProtoFilesToCache(owner, repository, ref string, files []protoFile) err for _, file := range files { path := filepath.Join(cachePath, file.Path) if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { - return errors.Wrapf(err, "failed to create directory for cache file %s", cachePath) + return errors.Wrapf(err, "failed to create directory for cache file %s", path) } if writeErr := os.WriteFile(path, []byte(file.Content), 0755); writeErr != nil { - return errors.Wrapf(writeErr, "failed to write cached proto files to %s", cachePath) + return errors.Wrapf(writeErr, "failed to write cached proto file to %s", path) } } framework.L.Debug().Msgf("Saved %d proto files to cache at %s", len(files), cachePath) - return nil } @@ -336,10 +359,10 @@ func cacheFilePath(owner, repository, ref string) (string, error) { return filepath.Join(homeDir, ".local", "share", "beholder", "protobufs", owner, repository, ref), nil } -func fetchProtosFromFilesystem(uri string, folders []string) ([]protoFile, error) { +func fetchProtosFromFilesystem(uri string, folders []string, excludeFiles []string) ([]protoFile, error) { var files []protoFile - protoDirPath := strings.TrimPrefix(uri, "file://") + walkErr := filepath.Walk(protoDirPath, func(path string, info os.FileInfo, err error) error { if err != nil { return err @@ -349,24 +372,39 @@ func fetchProtosFromFilesystem(uri string, folders []string) ([]protoFile, error return nil } - var folderFound string + if !strings.HasSuffix(path, ".proto") { + return nil + } + + relativePath := strings.TrimPrefix(strings.TrimPrefix(path, protoDirPath), "/") + + // if folders are specified, check prefix match if len(folders) > 0 { matched := false for _, folder := range folders { - if strings.HasPrefix(strings.TrimPrefix(strings.TrimPrefix(path, protoDirPath), "/"), folder) { + if strings.HasPrefix(relativePath, folder) { matched = true - folderFound = folder break } } - if !matched { return nil } } - if !strings.HasSuffix(path, ".proto") { - return nil + // if excludeFiles are specified, check if the file should be excluded + if len(excludeFiles) > 0 { + excluded := false + for _, exclude := range excludeFiles { + if strings.HasPrefix(relativePath, exclude) { + framework.L.Debug().Msgf("Excluding proto file %s (matches exclude pattern: %s)", relativePath, exclude) + excluded = true + break + } + } + if excluded { + return nil + } } content, contentErr := os.ReadFile(path) @@ -374,32 +412,24 @@ func fetchProtosFromFilesystem(uri string, folders []string) ([]protoFile, error return errors.Wrapf(contentErr, "failed to read file at %s", path) } - // subtract the folder from the path if it was provided, because if it is imported by some other protos - // most probably it will be imported as a relative path, so we need to remove the folder from the path - protoPath := strings.TrimPrefix(strings.TrimPrefix(path, protoDirPath), "/") - if folderFound != "" { - protoPath = strings.TrimPrefix(strings.TrimPrefix(protoPath, folderFound), strings.TrimSuffix(folderFound, "/")) - protoPath = strings.TrimPrefix(protoPath, "/") - } - files = append(files, protoFile{ Name: filepath.Base(path), - Path: protoPath, + Path: relativePath, Content: string(content), }) return nil }) + if walkErr != nil { return nil, errors.Wrapf(walkErr, "failed to walk through directory %s", protoDirPath) } - framework.L.Debug().Msgf("Fetched %d proto files from local %s", len(files), protoDirPath) - if len(files) == 0 { return nil, fmt.Errorf("no proto files found in '%s' in folders %s", protoDirPath, strings.Join(folders, ", ")) } + framework.L.Debug().Msgf("Fetched %d proto files from local %s", len(files), protoDirPath) return files, nil } @@ -422,74 +452,122 @@ type schemaStatus struct { Version int } -// registerAllWithTopologicalSortingByTrial tries to register protos that have not been registered yet, and if it fails, it tries again with a different order -// it keeps doing this until all protos are registered or it fails to register any more protos -func registerAllWithTopologicalSortingByTrial( +// registerAllWithTopologicalSorting registers protos in dependency order using topological sorting +func registerAllWithTopologicalSorting( schemaRegistryURL string, protoMap map[string]string, // path -> proto source subjectMap map[string]string, // path -> subject ) error { - framework.L.Info().Msgf("🔄 registering %d protobuf schemas", len(protoMap)) + framework.L.Info().Msgf("Registering %d protobuf schemas", len(protoMap)) + + // Build dependency graph and sort topologically + dependencies, depErr := buildDependencyGraph(protoMap) + if depErr != nil { + return errors.Wrap(depErr, "failed to build dependency graph") + } + + sortedFiles, sortErr := topologicalSort(dependencies) + if sortErr != nil { + return errors.Wrap(sortErr, "failed to sort files topologically") + } + + framework.L.Info().Msgf("Registration order (topologically sorted): %v", sortedFiles) + schemas := map[string]*schemaStatus{} for path, src := range protoMap { schemas[path] = &schemaStatus{Source: src} } - refs := []map[string]any{} - - for { - progress := false - failures := []string{} + // Register files in topological order + for _, path := range sortedFiles { + schema, exists := schemas[path] + if !exists { + framework.L.Warn().Msgf("File %s not found in schemas map", path) + continue + } - for path, schema := range schemas { - if schema.Registered { - continue - } + if schema.Registered { + continue + } - subject, ok := subjectMap[path] - if !ok { - failures = append(failures, fmt.Sprintf("%s: no subject found", path)) - continue - } + subject, ok := subjectMap[path] + if !ok { + return fmt.Errorf("no subject found for %s", path) + } - singleProtoFailures := []error{} - framework.L.Debug().Msgf("🔄 registering %s as %s", path, subject) - _, registerErr := registerSingleProto(schemaRegistryURL, subject, schema.Source, refs) - if registerErr != nil { - failures = append(failures, fmt.Sprintf("%s: %v", path, registerErr)) - singleProtoFailures = append(singleProtoFailures, registerErr) - continue + // Build references only for files that have dependencies + var fileRefs []map[string]any + if deps, hasDeps := dependencies[path]; hasDeps && len(deps) > 0 { + for _, dep := range deps { + if depSubject, depExists := subjectMap[dep]; depExists { + // The schema registry expects import names without the 'workflows/' prefix + importName := dep + if strings.HasPrefix(dep, "workflows/") { + importName = strings.TrimPrefix(dep, "workflows/") + } + + fileRefs = append(fileRefs, map[string]any{ + "name": importName, + "subject": depSubject, + "version": 1, + }) + } } + } + // Check if schema is already registered + if existingID, exists := checkSchemaExists(schemaRegistryURL, subject); exists { + framework.L.Info().Msgf("Schema %s already exists with ID %d, skipping registration", subject, existingID) schema.Registered = true - schema.Version = 1 - refs = append(refs, map[string]any{ - "name": path, - "subject": subject, - "version": 1, - }) + schema.Version = existingID + continue + } - framework.L.Info().Msgf("✔ registered: %s as %s", path, subject) + // The schema registry expects import statements without the 'workflows/' prefix + modifiedSchema := strings.ReplaceAll(schema.Source, `"workflows/v2/`, `"v2/`) - progress = true + _, registerErr := registerSingleProto(schemaRegistryURL, subject, modifiedSchema, fileRefs) + if registerErr != nil { + return errors.Wrapf(registerErr, "failed to register %s as %s", path, subject) } - if !progress { - if len(failures) > 0 { - framework.L.Error().Msg("❌ Failed to register remaining schemas:") - for _, msg := range failures { - framework.L.Error().Msg(" " + msg) - } - return fmt.Errorf("unable to register %d schemas", len(failures)) - } - break - } + schema.Registered = true + schema.Version = 1 + + framework.L.Info().Msgf("✔ Registered: %s as %s", path, subject) } framework.L.Info().Msgf("✅ Successfully registered %d schemas", len(protoMap)) return nil } +// checkSchemaExists checks if a schema already exists in the registry +func checkSchemaExists(registryURL, subject string) (int, bool) { + url := fmt.Sprintf("%s/subjects/%s/versions", registryURL, subject) + + resp, err := http.Get(url) + if err != nil { + framework.L.Debug().Msgf("Failed to check schema existence for %s: %v", subject, err) + return 0, false + } + defer resp.Body.Close() + + if resp.StatusCode == 200 { + var versions []struct { + ID int `json:"id"` + } + if err := json.NewDecoder(resp.Body).Decode(&versions); err != nil { + framework.L.Debug().Msgf("Failed to decode versions for %s: %v", subject, err) + return 0, false + } + if len(versions) > 0 { + return versions[len(versions)-1].ID, true + } + } + + return 0, false +} + func registerSingleProto( registryURL, subject, schemaSrc string, references []map[string]any, @@ -533,6 +611,92 @@ func registerSingleProto( } framework.L.Debug().Msgf("Registered schema %s with ID %d", subject, result.ID) - return result.ID, nil } + +// buildDependencyGraph builds a dependency graph from protobuf files +func buildDependencyGraph(protoMap map[string]string) (map[string][]string, error) { + dependencies := make(map[string][]string) + + framework.L.Info().Msgf("Building dependency graph for %d proto files", len(protoMap)) + + // Initialize dependencies map + for path := range protoMap { + dependencies[path] = []string{} + } + + // Parse imports and build dependency graph + for path, content := range protoMap { + imports := extractImportStatements(content) + + for _, importPath := range imports { + if strings.HasPrefix(importPath, "google/protobuf/") { + // Skip Google protobuf imports as they're not in our protoMap + continue + } + + // Check if this import exists in our protoMap + if _, exists := protoMap[importPath]; exists { + // Check for self-reference + if importPath == path { + framework.L.Warn().Msgf("Self-reference detected: %s imports itself!", path) + continue + } + + dependencies[path] = append(dependencies[path], importPath) + } else { + framework.L.Warn().Msgf("Import %s in %s not found in protoMap", importPath, path) + } + } + } + + return dependencies, nil +} + +// topologicalSort performs topological sorting using Kahn's algorithm +func topologicalSort(dependencies map[string][]string) ([]string, error) { + // Calculate in-degrees (how many files each file depends on) + inDegree := make(map[string]int) + for file := range dependencies { + inDegree[file] = 0 + } + + // Count dependencies for each file + for file, deps := range dependencies { + inDegree[file] = len(deps) + } + + // Find files with no dependencies (in-degree = 0) + var queue []string + for file, degree := range inDegree { + if degree == 0 { + queue = append(queue, file) + } + } + + var result []string + for len(queue) > 0 { + file := queue[0] + queue = queue[1:] + result = append(result, file) + + // Reduce in-degree for files that depend on the current file + for dependent, deps := range dependencies { + for _, dep := range deps { + if dep == file { + inDegree[dependent]-- + if inDegree[dependent] == 0 { + queue = append(queue, dependent) + } + } + } + } + } + + // Check for cycles + if len(result) != len(dependencies) { + return nil, fmt.Errorf("circular dependency detected in protobuf files") + } + + return result, nil +} From c4fac956e7259a63229b598ef16f5aae13c8a819 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Tue, 12 Aug 2025 09:55:34 -0400 Subject: [PATCH 2/3] updates to register both v1 and v2 files --- .../components/dockercompose/chip_ingress_set/protos.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/framework/components/dockercompose/chip_ingress_set/protos.go b/framework/components/dockercompose/chip_ingress_set/protos.go index b86024447..5d9047682 100644 --- a/framework/components/dockercompose/chip_ingress_set/protos.go +++ b/framework/components/dockercompose/chip_ingress_set/protos.go @@ -501,6 +501,8 @@ func registerAllWithTopologicalSorting( for _, dep := range deps { if depSubject, depExists := subjectMap[dep]; depExists { // The schema registry expects import names without the 'workflows/' prefix + // So if the import is "workflows/v1/metadata.proto", the name should be "v1/metadata.proto" + // And if the import is "workflows/v2/cre_info.proto", the name should be "v2/cre_info.proto" importName := dep if strings.HasPrefix(dep, "workflows/") { importName = strings.TrimPrefix(dep, "workflows/") @@ -524,7 +526,9 @@ func registerAllWithTopologicalSorting( } // The schema registry expects import statements without the 'workflows/' prefix - modifiedSchema := strings.ReplaceAll(schema.Source, `"workflows/v2/`, `"v2/`) + // So we need to modify the protobuf content to replace "workflows/v1/..." with "v1/..." and "workflows/v2/..." with "v2/..." + modifiedSchema := strings.ReplaceAll(schema.Source, `"workflows/v1/`, `"v1/`) + modifiedSchema = strings.ReplaceAll(modifiedSchema, `"workflows/v2/`, `"v2/`) _, registerErr := registerSingleProto(schemaRegistryURL, subject, modifiedSchema, fileRefs) if registerErr != nil { From e00f3e7f77437ac3d1f950f3481d7bb9ecf1f4ff Mon Sep 17 00:00:00 2001 From: Patrick Huie Date: Wed, 27 Aug 2025 14:52:09 +0200 Subject: [PATCH 3/3] pr comments --- .../dockercompose/chip_ingress_set/protos.go | 76 ++++++++++++++----- 1 file changed, 56 insertions(+), 20 deletions(-) diff --git a/framework/components/dockercompose/chip_ingress_set/protos.go b/framework/components/dockercompose/chip_ingress_set/protos.go index 5d9047682..b068f6515 100644 --- a/framework/components/dockercompose/chip_ingress_set/protos.go +++ b/framework/components/dockercompose/chip_ingress_set/protos.go @@ -79,9 +79,9 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch framework.L.Info().Msgf("Registering and fetching protos from %d repositories", len(protoSchemaSets)) for _, protoSchemaSet := range protoSchemaSets { - framework.L.Info().Msgf("Processing proto schema set: %s", protoSchemaSet.URI) + framework.L.Debug().Msgf("Processing proto schema set: %s", protoSchemaSet.URI) if len(protoSchemaSet.ExcludeFiles) > 0 { - framework.L.Info().Msgf("Excluding files: %s", strings.Join(protoSchemaSet.ExcludeFiles, ", ")) + framework.L.Debug().Msgf("Excluding files: %s", strings.Join(protoSchemaSet.ExcludeFiles, ", ")) } if valErr := validateRepoConfiguration(protoSchemaSet); valErr != nil { return errors.Wrapf(valErr, "invalid repo configuration for schema set: %v", protoSchemaSet) @@ -129,7 +129,7 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch subjects[proto.Path] = subjectMessage } - registerErr := registerAllWithTopologicalSorting(schemaRegistryURL, protoMap, subjects) + registerErr := registerAllWithTopologicalSorting(schemaRegistryURL, protoMap, subjects, protoSchemaSet.Folders) if registerErr != nil { return errors.Wrapf(registerErr, "failed to register protos from %s", protoSchemaSet.URI) } @@ -303,7 +303,7 @@ searchLoop: return nil, fmt.Errorf("no proto files found in %s/%s in folders %s", owner, repository, strings.Join(folders, ", ")) } - framework.L.Info().Msgf("Fetched %d proto files from %s/%s", len(files), owner, repository) + framework.L.Debug().Msgf("Fetched %d proto files from %s/%s", len(files), owner, repository) saveErr := saveProtoFilesToCache(owner, repository, ref, files) if saveErr != nil { @@ -457,6 +457,7 @@ func registerAllWithTopologicalSorting( schemaRegistryURL string, protoMap map[string]string, // path -> proto source subjectMap map[string]string, // path -> subject + folders []string, // folders configuration used to determine import prefix transformations ) error { framework.L.Info().Msgf("Registering %d protobuf schemas", len(protoMap)) @@ -471,7 +472,7 @@ func registerAllWithTopologicalSorting( return errors.Wrap(sortErr, "failed to sort files topologically") } - framework.L.Info().Msgf("Registration order (topologically sorted): %v", sortedFiles) + framework.L.Debug().Msgf("Registration order (topologically sorted): %v", sortedFiles) schemas := map[string]*schemaStatus{} for path, src := range protoMap { @@ -495,18 +496,18 @@ func registerAllWithTopologicalSorting( return fmt.Errorf("no subject found for %s", path) } + // Determine which folder prefixes should be stripped based on configuration + prefixesToStrip := determineFolderPrefixesToStrip(folders) + // Build references only for files that have dependencies var fileRefs []map[string]any if deps, hasDeps := dependencies[path]; hasDeps && len(deps) > 0 { for _, dep := range deps { if depSubject, depExists := subjectMap[dep]; depExists { - // The schema registry expects import names without the 'workflows/' prefix - // So if the import is "workflows/v1/metadata.proto", the name should be "v1/metadata.proto" - // And if the import is "workflows/v2/cre_info.proto", the name should be "v2/cre_info.proto" - importName := dep - if strings.HasPrefix(dep, "workflows/") { - importName = strings.TrimPrefix(dep, "workflows/") - } + // The schema registry expects import names without the configured folder prefixes + // So if folders=["workflows"] and the import is "workflows/v1/metadata.proto", + // the name should be "v1/metadata.proto" + importName := stripFolderPrefix(dep, prefixesToStrip) fileRefs = append(fileRefs, map[string]any{ "name": importName, @@ -519,16 +520,15 @@ func registerAllWithTopologicalSorting( // Check if schema is already registered if existingID, exists := checkSchemaExists(schemaRegistryURL, subject); exists { - framework.L.Info().Msgf("Schema %s already exists with ID %d, skipping registration", subject, existingID) + framework.L.Debug().Msgf("Schema %s already exists with ID %d, skipping registration", subject, existingID) schema.Registered = true schema.Version = existingID continue } - // The schema registry expects import statements without the 'workflows/' prefix - // So we need to modify the protobuf content to replace "workflows/v1/..." with "v1/..." and "workflows/v2/..." with "v2/..." - modifiedSchema := strings.ReplaceAll(schema.Source, `"workflows/v1/`, `"v1/`) - modifiedSchema = strings.ReplaceAll(modifiedSchema, `"workflows/v2/`, `"v2/`) + // The schema registry expects import statements without the configured folder prefixes + // Transform the schema content to remove these prefixes from import statements + modifiedSchema := transformSchemaContent(schema.Source, prefixesToStrip) _, registerErr := registerSingleProto(schemaRegistryURL, subject, modifiedSchema, fileRefs) if registerErr != nil { @@ -618,11 +618,44 @@ func registerSingleProto( return result.ID, nil } +// determineFolderPrefixesToStrip determines which folder prefixes should be stripped from import paths +// based on the folders configuration. The schema registry expects import names to be relative to the +// configured folders, so we strip these prefixes to make imports work correctly. +func determineFolderPrefixesToStrip(folders []string) []string { + var prefixes []string + for _, folder := range folders { + // Ensure folder ends with / for prefix matching + prefix := strings.TrimSuffix(folder, "/") + "/" + prefixes = append(prefixes, prefix) + } + return prefixes +} + +// stripFolderPrefix removes any configured folder prefixes from the given path +func stripFolderPrefix(path string, prefixes []string) string { + for _, prefix := range prefixes { + if strings.HasPrefix(path, prefix) { + return strings.TrimPrefix(path, prefix) + } + } + return path +} + +// transformSchemaContent removes folder prefixes from import statements in protobuf source +func transformSchemaContent(content string, prefixes []string) string { + modified := content + for _, prefix := range prefixes { + // Transform import statements like "workflows/v1/" to "v1/" + modified = strings.ReplaceAll(modified, `"`+prefix, `"`) + } + return modified +} + // buildDependencyGraph builds a dependency graph from protobuf files func buildDependencyGraph(protoMap map[string]string) (map[string][]string, error) { dependencies := make(map[string][]string) - framework.L.Info().Msgf("Building dependency graph for %d proto files", len(protoMap)) + framework.L.Debug().Msgf("Building dependency graph for %d proto files", len(protoMap)) // Initialize dependencies map for path := range protoMap { @@ -641,9 +674,12 @@ func buildDependencyGraph(protoMap map[string]string) (map[string][]string, erro // Check if this import exists in our protoMap if _, exists := protoMap[importPath]; exists { - // Check for self-reference + // Check for self-reference - this indicates either an invalid proto file + // or a potential bug in our import/path handling if importPath == path { - framework.L.Warn().Msgf("Self-reference detected: %s imports itself!", path) + framework.L.Warn().Msgf("Self-reference detected: file %s imports itself (import: %s). This suggests either an invalid proto file or a path normalization issue. Skipping this dependency to avoid cycles.", path, importPath) + // Continue without adding the dependency to avoid cycles, but don't fail registration + // as this might be a recoverable issue or edge case continue }