Skip to content

Commit

Permalink
apply second patch of fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Kuziemko committed Dec 20, 2021
1 parent 2513868 commit c7c58b0
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 28 deletions.
59 changes: 48 additions & 11 deletions cmd/populator/cmd/register/ocf_manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package register

import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"io/ioutil"
"net/url"
"os"
"os/exec"
"path"
Expand Down Expand Up @@ -81,9 +84,14 @@ func runDBPopulateWithSources(ctx context.Context, sources []string) (err error)
}()

// run server with merge file list from various sources
seenFiles := make(map[string]struct{})
var fileList []string
var commits []string
for _, src := range sourcesInfo {
err = filesAlreadyExists(seenFiles, src.files)
if err != nil {
return errors.Wrap(err, "while validating the source files")
}
fileList = append(fileList, src.files...)
commits = append(commits, strings.TrimSpace(string(src.gitHash)))
}
Expand Down Expand Up @@ -121,9 +129,6 @@ func runDBPopulateWithSources(ctx context.Context, sources []string) (err error)

for _, src := range sourcesInfo {
err = runDBPopulate(ctx, cfg, session, log, src)
if rErr := os.RemoveAll(src.dir); rErr != nil {
err = multierror.Append(err, rErr)
}
if err != nil {
return errors.Wrap(err, "while populating db")
}
Expand Down Expand Up @@ -152,22 +157,25 @@ func removeDuplicateSources(sources []string) []string {
return result
}

func getSourcesInfo(ctx context.Context, cfg dbpopulator.Config, log *zap.Logger, sources []string, parentDir string) ([]sourceInfo, error) {
func getSourcesInfo(ctx context.Context, cfg dbpopulator.Config, log *zap.Logger, sources []string, parent string) ([]sourceInfo, error) {
var sourcesInfo []sourceInfo
for _, source := range sources {
parent, err := ioutil.TempDir(parentDir, "*-hub")
if err != nil {
return nil, errors.Wrap(err, "while creating temporary directory")
encodePath := path.Clean(encodePath(source))
dstDir := path.Join(parent, encodePath)
if encodePath == "." {
tempDir, err := createTempDirName("-local")
if err != nil {
return nil, errors.Wrap(err, "while creating temporary directory for local source")
}
dstDir = path.Join(parent, tempDir)
}

dstDir := path.Join(parent, "hub")

err = getter.Download(ctx, source, dstDir)
err := getter.Download(ctx, source, dstDir)
if err != nil {
return nil, errors.Wrap(err, "while downloading Hub manifests")
}

log.Info("Populating downloaded manifests...", zap.String("path", cfg.ManifestsPath))
log.Info("Populating downloaded manifests...", zap.String("source", source), zap.String("path", cfg.ManifestsPath))
rootDir := path.Join(dstDir, cfg.ManifestsPath)
files, err := io.ListYAMLs(rootDir)
if err != nil {
Expand All @@ -188,6 +196,35 @@ func getSourcesInfo(ctx context.Context, cfg dbpopulator.Config, log *zap.Logger
return sourcesInfo, nil
}

func encodePath(path string) string {
return url.QueryEscape(path)
}

func createTempDirName(suffix string) (string, error) {
randBytes := make([]byte, 16)
_, err := rand.Read(randBytes)
if err != nil {
return "", err
}
return hex.EncodeToString(randBytes) + suffix, nil
}

func filesAlreadyExists(container map[string]struct{}, files []string) error {
for _, file := range files {
shortPath := getPathWithoutRootDir(file)
if _, ok := container[shortPath]; ok {
return fmt.Errorf("duplicate path for: %s", shortPath)
}
container[shortPath] = struct{}{}
}
return nil
}

func getPathWithoutRootDir(fullPath string) string {
parts := strings.Split(fullPath, string(os.PathSeparator))
return path.Join(parts[4:]...)
}

func runDBPopulate(ctx context.Context, cfg dbpopulator.Config, session neo4j.Session, log *zap.Logger, source sourceInfo) (err error) {
start := time.Now()
err = retry.Do(func() error {
Expand Down
8 changes: 7 additions & 1 deletion cmd/populator/docs/populator_register-ocf-manifests.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ APP_JSON_PUBLISH_ADDR=http://{HOST_IP} ./populator register ocf-manifests --sour
```
Replace `HOST_IP` with your computer IP

There is an option to run populator with multiple sources with the usage of the `source` flag.
Example, which demonstrates this feature:
```shell
./populator register ocf-manifests --source {PATH_TO_THE_MAIN_DIRECTORY_OF_THE_SOURCE_1} --source {PATH_TO_THE_MAIN_DIRECTORY_OF_THE_SOURCE_2}
```

## Configuration

You can set the following environment variables to configure the Hub database populator:
Expand All @@ -65,6 +71,6 @@ You can set the following environment variables to configure the Hub database po
| APP_NEO4J_PASSWORD | yes | | Neo4h admin password |
| APP_JSON_PUBLISH_ADDR | yes | | Address on which populator will serve JSON files |
| APP_JSON_PUBLISH_PORT | no | `8080` | Port number on which populator will be listening |
| APP_MANIFESTS_PATH | no | `manifests` | Path to a directory in a repository where manifests are stored |
| APP_MANIFESTS_PATH | no | ` ` | Path to a directory in a repository where manifests are stored. In case of many sources the same path will be used. |
| APP_UPDATE_ON_GIT_COMMIT | no | `false` | Flag to make populator populate data only when there are new changes in a repository |
| APP_LOGGER_DEV_MODE | no | `false` | Enable development mode logging |
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ spec:
value: {{ printf "http://%s.%s" (include "hub.fullname" .) .Release.Namespace }}
- name: APP_JSON_PUBLISH_PORT
value: "{{ .Values.populator.port }}"
- name: MANIFESTS_SOURCES
value: "{{ include "populator.manifestSources" . }}"
- name: APP_MANIFESTS_PATH
value: "{{ .Values.populator.manifestsPath}}"
- name: APP_UPDATE_ON_GIT_COMMIT
value: "{{ .Values.populator.updateOnGitCommit}}"
- name: MANIFESTS_SOURCES
value: "{{ include "populator.manifestSources" . }}"

command: ["/bin/sh", "-c"]
args: {{ .Values.populator.args }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ populator:
pullPolicy: IfNotPresent
port: 8081
updateOnGitCommit: true
manifestsPath: manifests
manifestsLocations:
- local: false
# It's public repository
repository: github.com/capactio/hub-manifests
branch: main
# sshKey is a base64 encoded private key used by populator to download manifests. It has read only access
#sshKey:
args: ["while true; do /app register ocf-manifests --source $MANIFESTS_PATHS; sleep 600;done"]
args: ["while true; do /app register ocf-manifests --source $MANIFESTS_SOURCES; sleep 600;done"]
2 changes: 1 addition & 1 deletion pkg/sdk/dbpopulator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Config struct {

// ManifestsPath is a path to a directory in a repository where
// manifests are stored
ManifestsPath string `envconfig:"default=manifests"`
ManifestsPath string `envconfig:"optional"`

// UpdateOnGitCommit makes populator to populate a new data
// only when a git commit chaned in source repository
Expand Down
57 changes: 45 additions & 12 deletions pkg/sdk/dbpopulator/populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ call apoc.periodic.iterate("MATCH (n:to_remove) return n", "DETACH DELETE n", {b
yield batches, total return batches, total
`

const (
commitEncodeSep = ","
maxStoredCommits = 1000
)

// Populate imports Public Hub manifests into a Neo4j database.
func Populate(ctx context.Context, log *zap.Logger, session neo4j.Session, paths []string, rootDir string, publishPath string) (bool, error) {
err := populate(ctx, log, session, paths, rootDir, publishPath)
Expand Down Expand Up @@ -483,19 +488,19 @@ func IsDataInDB(session neo4j.Session, log *zap.Logger, commits []string) (bool,
return false, errors.Wrap(err, "while getting commit hash of populated data")
}

commitsExistInDB := true
unknownCommit := ""
for _, commit := range commits {
if !strings.Contains(currentCommits, commit) {
commitsExistInDB = false
unknownCommit = commit
}
storedCommits := decodeCommits(currentCommits)
if len(storedCommits) != len(commits) {
log.Info("new sources were added or removed", zap.String("current commits", currentCommits), zap.String("given commits", encodeCommits(commits)))
return false, nil
}
if commitsExistInDB {
log.Info("git commits did not change. Finishing")
return true, nil

unknownCommits, found := detectUnknownCommit(commits, storedCommits)
if found {
log.Info("detected unknown commits", zap.String("current commits", currentCommits), zap.String("unknown commits", encodeCommits(unknownCommits)))
return false, nil
}
log.Info("found at least one changed commit", zap.String("current commits", currentCommits), zap.String("not found hash", unknownCommit))

log.Info("git commits did not change. Finishing")
return false, nil
}

Expand All @@ -504,7 +509,7 @@ func IsDataInDB(session neo4j.Session, log *zap.Logger, commits []string) (bool,
func SaveCommitsMetadata(session neo4j.Session, commits []string) error {
_, err := session.WriteTransaction(func(transaction neo4j.Transaction) (interface{}, error) {
contentMetadata := "CREATE (n:ContentMetadata:published { commits: '%s', timestamp: '%s'}) RETURN *"
q := fmt.Sprintf(contentMetadata, strings.Join(commits, " "), time.Now())
q := fmt.Sprintf(contentMetadata, encodeCommits(commits), time.Now())
result, err := transaction.Run(q, nil)
if err != nil {
return nil, errors.Wrapf(err, "while running %s", q)
Expand Down Expand Up @@ -599,6 +604,34 @@ func currentCommits(session neo4j.Session) (string, error) {
return commit, errors.Wrap(result.Err(), "while executing neo4j transaction")
}

func detectUnknownCommit(newCommits []string, storedCommits []string) ([]string, bool) {
var out []string
indexed := indexStringSlice(storedCommits)
for _, commit := range newCommits {
if _, found := indexed[commit]; found {
continue
}
out = append(out, commit)
}
return out, len(out) > 0
}

func decodeCommits(in string) []string {
return strings.SplitN(in, commitEncodeSep, maxStoredCommits)
}

func encodeCommits(in []string) string {
return strings.Join(in, commitEncodeSep)
}

func indexStringSlice(slice []string) map[string]struct{} {
set := make(map[string]struct{}, len(slice))
for _, s := range slice {
set[s] = struct{}{}
}
return set
}

func warmup(session neo4j.Session) error {
_, err := session.Run("CALL apoc.warmup.run(true, true, true)", map[string]interface{}{})
return errors.Wrap(err, "while warming up the data")
Expand Down

0 comments on commit c7c58b0

Please sign in to comment.