diff --git a/cmd/populator/cmd/register/ocf_manifests.go b/cmd/populator/cmd/register/ocf_manifests.go index 54baf8492..52176df16 100644 --- a/cmd/populator/cmd/register/ocf_manifests.go +++ b/cmd/populator/cmd/register/ocf_manifests.go @@ -2,8 +2,11 @@ package register import ( "context" + "crypto/rand" + "encoding/hex" "fmt" "io/ioutil" + "net/url" "os" "os/exec" "path" @@ -81,9 +84,14 @@ func runDBPopulateWithSources(ctx context.Context, sources []string) (err error) }() // run server with merge file list from various sources + seenFiles := make(map[string]struct{}) var fileList []string var commits []string for _, src := range sourcesInfo { + err = filesAlreadyExists(seenFiles, src.files) + if err != nil { + return errors.Wrap(err, "while validating the source files") + } fileList = append(fileList, src.files...) commits = append(commits, strings.TrimSpace(string(src.gitHash))) } @@ -121,9 +129,6 @@ func runDBPopulateWithSources(ctx context.Context, sources []string) (err error) for _, src := range sourcesInfo { err = runDBPopulate(ctx, cfg, session, log, src) - if rErr := os.RemoveAll(src.dir); rErr != nil { - err = multierror.Append(err, rErr) - } if err != nil { return errors.Wrap(err, "while populating db") } @@ -152,22 +157,25 @@ func removeDuplicateSources(sources []string) []string { return result } -func getSourcesInfo(ctx context.Context, cfg dbpopulator.Config, log *zap.Logger, sources []string, parentDir string) ([]sourceInfo, error) { +func getSourcesInfo(ctx context.Context, cfg dbpopulator.Config, log *zap.Logger, sources []string, parent string) ([]sourceInfo, error) { var sourcesInfo []sourceInfo for _, source := range sources { - parent, err := ioutil.TempDir(parentDir, "*-hub") - if err != nil { - return nil, errors.Wrap(err, "while creating temporary directory") + encodePath := path.Clean(encodePath(source)) + dstDir := path.Join(parent, encodePath) + if encodePath == "." { + tempDir, err := createTempDirName("-local") + if err != nil { + return nil, errors.Wrap(err, "while creating temporary directory for local source") + } + dstDir = path.Join(parent, tempDir) } - dstDir := path.Join(parent, "hub") - - err = getter.Download(ctx, source, dstDir) + err := getter.Download(ctx, source, dstDir) if err != nil { return nil, errors.Wrap(err, "while downloading Hub manifests") } - log.Info("Populating downloaded manifests...", zap.String("path", cfg.ManifestsPath)) + log.Info("Populating downloaded manifests...", zap.String("source", source), zap.String("path", cfg.ManifestsPath)) rootDir := path.Join(dstDir, cfg.ManifestsPath) files, err := io.ListYAMLs(rootDir) if err != nil { @@ -188,6 +196,35 @@ func getSourcesInfo(ctx context.Context, cfg dbpopulator.Config, log *zap.Logger return sourcesInfo, nil } +func encodePath(path string) string { + return url.QueryEscape(path) +} + +func createTempDirName(suffix string) (string, error) { + randBytes := make([]byte, 16) + _, err := rand.Read(randBytes) + if err != nil { + return "", err + } + return hex.EncodeToString(randBytes) + suffix, nil +} + +func filesAlreadyExists(container map[string]struct{}, files []string) error { + for _, file := range files { + shortPath := getPathWithoutRootDir(file) + if _, ok := container[shortPath]; ok { + return fmt.Errorf("duplicate path for: %s", shortPath) + } + container[shortPath] = struct{}{} + } + return nil +} + +func getPathWithoutRootDir(fullPath string) string { + parts := strings.Split(fullPath, string(os.PathSeparator)) + return path.Join(parts[4:]...) +} + func runDBPopulate(ctx context.Context, cfg dbpopulator.Config, session neo4j.Session, log *zap.Logger, source sourceInfo) (err error) { start := time.Now() err = retry.Do(func() error { diff --git a/cmd/populator/docs/populator_register-ocf-manifests.md b/cmd/populator/docs/populator_register-ocf-manifests.md index 502c47918..905986adf 100644 --- a/cmd/populator/docs/populator_register-ocf-manifests.md +++ b/cmd/populator/docs/populator_register-ocf-manifests.md @@ -54,6 +54,12 @@ APP_JSON_PUBLISH_ADDR=http://{HOST_IP} ./populator register ocf-manifests --sour ``` Replace `HOST_IP` with your computer IP +There is an option to run populator with multiple sources with the usage of the `source` flag. +Example, which demonstrates this feature: + ```shell +./populator register ocf-manifests --source {PATH_TO_THE_MAIN_DIRECTORY_OF_THE_SOURCE_1} --source {PATH_TO_THE_MAIN_DIRECTORY_OF_THE_SOURCE_2} +``` + ## Configuration You can set the following environment variables to configure the Hub database populator: @@ -65,6 +71,6 @@ You can set the following environment variables to configure the Hub database po | APP_NEO4J_PASSWORD | yes | | Neo4h admin password | | APP_JSON_PUBLISH_ADDR | yes | | Address on which populator will serve JSON files | | APP_JSON_PUBLISH_PORT | no | `8080` | Port number on which populator will be listening | -| APP_MANIFESTS_PATH | no | `manifests` | Path to a directory in a repository where manifests are stored | +| APP_MANIFESTS_PATH | no | ` ` | Path to a directory in a repository where manifests are stored. In case of many sources the same path will be used. | | APP_UPDATE_ON_GIT_COMMIT | no | `false` | Flag to make populator populate data only when there are new changes in a repository | | APP_LOGGER_DEV_MODE | no | `false` | Enable development mode logging | diff --git a/deploy/kubernetes/charts/capact/charts/hub-public/templates/deployment.yaml b/deploy/kubernetes/charts/capact/charts/hub-public/templates/deployment.yaml index d1c5cafaf..4da63f19a 100644 --- a/deploy/kubernetes/charts/capact/charts/hub-public/templates/deployment.yaml +++ b/deploy/kubernetes/charts/capact/charts/hub-public/templates/deployment.yaml @@ -45,10 +45,12 @@ spec: value: {{ printf "http://%s.%s" (include "hub.fullname" .) .Release.Namespace }} - name: APP_JSON_PUBLISH_PORT value: "{{ .Values.populator.port }}" - - name: MANIFESTS_SOURCES - value: "{{ include "populator.manifestSources" . }}" + - name: APP_MANIFESTS_PATH + value: "{{ .Values.populator.manifestsPath}}" - name: APP_UPDATE_ON_GIT_COMMIT value: "{{ .Values.populator.updateOnGitCommit}}" + - name: MANIFESTS_SOURCES + value: "{{ include "populator.manifestSources" . }}" command: ["/bin/sh", "-c"] args: {{ .Values.populator.args }} diff --git a/deploy/kubernetes/charts/capact/charts/hub-public/values.yaml b/deploy/kubernetes/charts/capact/charts/hub-public/values.yaml index d0718e0c8..31d2e3a41 100644 --- a/deploy/kubernetes/charts/capact/charts/hub-public/values.yaml +++ b/deploy/kubernetes/charts/capact/charts/hub-public/values.yaml @@ -79,6 +79,7 @@ populator: pullPolicy: IfNotPresent port: 8081 updateOnGitCommit: true + manifestsPath: manifests manifestsLocations: - local: false # It's public repository @@ -86,4 +87,4 @@ populator: branch: main # sshKey is a base64 encoded private key used by populator to download manifests. It has read only access #sshKey: - args: ["while true; do /app register ocf-manifests --source $MANIFESTS_PATHS; sleep 600;done"] + args: ["while true; do /app register ocf-manifests --source $MANIFESTS_SOURCES; sleep 600;done"] diff --git a/pkg/sdk/dbpopulator/config.go b/pkg/sdk/dbpopulator/config.go index fa7cfe806..21cb251e0 100644 --- a/pkg/sdk/dbpopulator/config.go +++ b/pkg/sdk/dbpopulator/config.go @@ -24,7 +24,7 @@ type Config struct { // ManifestsPath is a path to a directory in a repository where // manifests are stored - ManifestsPath string `envconfig:"default=manifests"` + ManifestsPath string `envconfig:"optional"` // UpdateOnGitCommit makes populator to populate a new data // only when a git commit chaned in source repository diff --git a/pkg/sdk/dbpopulator/populator.go b/pkg/sdk/dbpopulator/populator.go index 7b2ae197f..ff52b0633 100644 --- a/pkg/sdk/dbpopulator/populator.go +++ b/pkg/sdk/dbpopulator/populator.go @@ -455,6 +455,11 @@ call apoc.periodic.iterate("MATCH (n:to_remove) return n", "DETACH DELETE n", {b yield batches, total return batches, total ` +const ( + commitEncodeSep = "," + maxStoredCommits = 1000 +) + // Populate imports Public Hub manifests into a Neo4j database. func Populate(ctx context.Context, log *zap.Logger, session neo4j.Session, paths []string, rootDir string, publishPath string) (bool, error) { err := populate(ctx, log, session, paths, rootDir, publishPath) @@ -483,19 +488,19 @@ func IsDataInDB(session neo4j.Session, log *zap.Logger, commits []string) (bool, return false, errors.Wrap(err, "while getting commit hash of populated data") } - commitsExistInDB := true - unknownCommit := "" - for _, commit := range commits { - if !strings.Contains(currentCommits, commit) { - commitsExistInDB = false - unknownCommit = commit - } + storedCommits := decodeCommits(currentCommits) + if len(storedCommits) != len(commits) { + log.Info("new sources were added or removed", zap.String("current commits", currentCommits), zap.String("given commits", encodeCommits(commits))) + return false, nil } - if commitsExistInDB { - log.Info("git commits did not change. Finishing") - return true, nil + + unknownCommits, found := detectUnknownCommit(commits, storedCommits) + if found { + log.Info("detected unknown commits", zap.String("current commits", currentCommits), zap.String("unknown commits", encodeCommits(unknownCommits))) + return false, nil } - log.Info("found at least one changed commit", zap.String("current commits", currentCommits), zap.String("not found hash", unknownCommit)) + + log.Info("git commits did not change. Finishing") return false, nil } @@ -504,7 +509,7 @@ func IsDataInDB(session neo4j.Session, log *zap.Logger, commits []string) (bool, func SaveCommitsMetadata(session neo4j.Session, commits []string) error { _, err := session.WriteTransaction(func(transaction neo4j.Transaction) (interface{}, error) { contentMetadata := "CREATE (n:ContentMetadata:published { commits: '%s', timestamp: '%s'}) RETURN *" - q := fmt.Sprintf(contentMetadata, strings.Join(commits, " "), time.Now()) + q := fmt.Sprintf(contentMetadata, encodeCommits(commits), time.Now()) result, err := transaction.Run(q, nil) if err != nil { return nil, errors.Wrapf(err, "while running %s", q) @@ -599,6 +604,34 @@ func currentCommits(session neo4j.Session) (string, error) { return commit, errors.Wrap(result.Err(), "while executing neo4j transaction") } +func detectUnknownCommit(newCommits []string, storedCommits []string) ([]string, bool) { + var out []string + indexed := indexStringSlice(storedCommits) + for _, commit := range newCommits { + if _, found := indexed[commit]; found { + continue + } + out = append(out, commit) + } + return out, len(out) > 0 +} + +func decodeCommits(in string) []string { + return strings.SplitN(in, commitEncodeSep, maxStoredCommits) +} + +func encodeCommits(in []string) string { + return strings.Join(in, commitEncodeSep) +} + +func indexStringSlice(slice []string) map[string]struct{} { + set := make(map[string]struct{}, len(slice)) + for _, s := range slice { + set[s] = struct{}{} + } + return set +} + func warmup(session neo4j.Session) error { _, err := session.Run("CALL apoc.warmup.run(true, true, true)", map[string]interface{}{}) return errors.Wrap(err, "while warming up the data")