Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement basic Storage Indexer #814

Merged
merged 34 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Prepare stub for Storage Indexer. Disable fetching packages from Package Storage v1. [#811](https://github.com/elastic/package-registry/pull/811)
* Support input packages. [#809](https://github.com/elastic/package-registry/pull/809)
* Implement storage indexer. [#814](https://github.com/elastic/package-registry/pull/814)

### Deprecated

Expand Down
25 changes: 20 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
"syscall"
"time"

gstorage "cloud.google.com/go/storage"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"go.uber.org/zap"

"go.elastic.co/apm"
"go.elastic.co/apm/module/apmgorilla"
"go.uber.org/zap"

ucfgYAML "github.com/elastic/go-ucfg/yaml"

Expand All @@ -45,7 +45,10 @@ var (
dryRun bool
configPath string

featureStorageIndexer bool
featureStorageIndexer bool
storageIndexerBucketInternal string
storageIndexerBucketPublic string
storageIndexerWatchInterval time.Duration

defaultConfig = Config{
CacheTimeIndex: 10 * time.Second,
Expand All @@ -64,8 +67,12 @@ func init() {
// This flag is experimental and might be removed in the future or renamed
flag.BoolVar(&dryRun, "dry-run", false, "Runs a dry-run of the registry without starting the web service (experimental).")
flag.BoolVar(&packages.ValidationDisabled, "disable-package-validation", false, "Disable package content validation.")
// This flag is a technical preview and might be removed in the future or renamed
// The following storage related flags are technical preview and might be removed in the future or renamed
flag.BoolVar(&featureStorageIndexer, "feature-storage-indexer", false, "Enable storage indexer to include packages from Package Storage v2 (technical preview).")
flag.StringVar(&storageIndexerBucketInternal, "storage-indexer-bucket-internal", "", "Path to the internal Package Storage bucket (with gs:// prefix).")
flag.StringVar(&storageIndexerBucketPublic, "storage-indexer-bucket-public", "", "Path to the public Package Storage bucket (with gs:// prefix).")
flag.DurationVar(&storageIndexerWatchInterval, "storage-indexer-watch-interval", 1*time.Minute, "Address of the package-registry service.")

}

type Config struct {
Expand Down Expand Up @@ -131,7 +138,15 @@ func initServer(logger *zap.Logger) *http.Server {

var indexers []Indexer
if featureStorageIndexer {
indexers = append(indexers, storage.NewIndexer())
storageClient, err := gstorage.NewClient(ctx)
if err != nil {
logger.Fatal("can't initialize storage client", zap.Error(err))
}
indexers = append(indexers, storage.NewIndexer(storageClient, storage.IndexerOptions{
PackageStorageBucketInternal: storageIndexerBucketInternal,
PackageStorageBucketPublic: storageIndexerBucketPublic,
WatchInterval: storageIndexerWatchInterval,
}))
} else {
indexers = append(indexers, packages.NewFileSystemIndexer(packagesBasePaths...))
indexers = append(indexers, packages.NewZipFileSystemIndexer(packagesBasePaths...))
Expand Down
12 changes: 8 additions & 4 deletions storage/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"context"
"encoding/json"
"io/ioutil"
"log"

"cloud.google.com/go/storage"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/elastic/package-registry/util"
)

type cursor struct {
Expand All @@ -27,12 +29,13 @@ func (c *cursor) String() string {
}

func loadCursor(ctx context.Context, storageClient *storage.Client, bucketName, rootStoragePath string) (*cursor, error) {
log.Println("Load cursor file")
logger := util.Logger()
logger.Debug("load cursor file")

rootedCursorStoragePath := joinObjectPaths(rootStoragePath, cursorStoragePath)
objectReader, err := storageClient.Bucket(bucketName).Object(rootedCursorStoragePath).NewReader(ctx)
if err == storage.ErrObjectNotExist {
log.Printf("Cursor file doesn't exist, most likely a first run (path: %s)", rootedCursorStoragePath)
logger.Debug("cursor file doesn't exist, most likely a first run", zap.String("path", rootedCursorStoragePath))
return new(cursor), nil
}
if err != nil {
Expand All @@ -51,6 +54,7 @@ func loadCursor(ctx context.Context, storageClient *storage.Client, bucketName,
return nil, errors.Wrapf(err, "can't unmarshal the cursor file")
}

log.Printf("Loaded cursor file: %s", c.String())
logger.Debug("cursor file doesn't exist, most likely a first run", zap.String("path", rootedCursorStoragePath))
logger.Debug("loaded cursor file", zap.String("cursor", c.String()))
return &c, nil
}
55 changes: 41 additions & 14 deletions storage/fakestorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,39 @@ func prepareFakeServer(t *testing.T, indexPath string) *fakestorage.Server {
indexContent, err := ioutil.ReadFile(indexPath)
require.NoError(t, err, "index file must be populated")

const firstRevision = "1"
serverObjects := prepareServerObjects(t, firstRevision, indexContent)
return fakestorage.NewServer(serverObjects)
}

func updateFakeServer(t *testing.T, server *fakestorage.Server, revision, indexPath string) {
indexContent, err := ioutil.ReadFile(indexPath)
require.NoError(t, err, "index file must be populated")

serverObjects := prepareServerObjects(t, revision, indexContent)

for _, so := range serverObjects {
server.CreateObject(so)
}
}

func prepareServerObjects(t *testing.T, revision string, indexContent []byte) []fakestorage.Object {
var index searchIndexAll
err = json.Unmarshal(indexContent, &index)
err := json.Unmarshal(indexContent, &index)
require.NoError(t, err, "index file must be valid")
require.NotEmpty(t, index.Packages, "index file must contain some package entries")

const firstRevision = "1"

var serverObjects []fakestorage.Object
// Add cursor and index file
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketInternal, Name: cursorStoragePath,
},
Content: []byte(`{"cursor":"` + firstRevision + `"}`),
Content: []byte(`{"current":"` + revision + `"}`),
})
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketInternal, Name: joinObjectPaths(v2MetadataStoragePath, firstRevision, searchIndexAllFile),
BucketName: fakePackageStorageBucketInternal, Name: joinObjectPaths(v2MetadataStoragePath, revision, searchIndexAllFile),
},
Content: indexContent,
})
Expand All @@ -56,13 +71,15 @@ func prepareFakeServer(t *testing.T, indexPath string) *fakestorage.Server {
nameVersion := fmt.Sprintf("%s-%s", aPackage.PackageManifest.Name, aPackage.PackageManifest.Version)

// Add fake static resources: docs, img
for _, asset := range aPackage.Assets {
if !strings.HasPrefix(asset, "docs") &&
!strings.HasPrefix(asset, "img") {
for _, asset := range aPackage.PackageManifest.Assets {
assetPath, err := filepath.Rel(filepath.Join("/package", aPackage.PackageManifest.Name, aPackage.PackageManifest.Version), asset)
require.NoError(t, err, "relative path expected")
if !strings.HasPrefix(assetPath, "docs") &&
!strings.HasPrefix(assetPath, "img") {
continue
}

path := joinObjectPaths(artifactsStaticStoragePath, nameVersion, asset)
path := joinObjectPaths(artifactsStaticStoragePath, nameVersion, assetPath)
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketPublic, Name: path,
Expand All @@ -71,23 +88,31 @@ func prepareFakeServer(t *testing.T, indexPath string) *fakestorage.Server {
})
}

// Add fake .zip.sig
path := joinObjectPaths(artifactsPackagesStoragePath, nameVersion+".zip.sig")
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketPublic, Name: path,
},
Content: []byte(filepath.Base(path)),
})

// Add fake .zip package
path := joinObjectPaths(artifactsPackagesStoragePath, nameVersion+".zip")
path = joinObjectPaths(artifactsPackagesStoragePath, nameVersion+".zip")
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketPublic, Name: path,
},
Content: []byte(filepath.Base(path)),
})
}

t.Logf("Prepared %d packages with total %d server objects.", len(index.Packages), len(serverObjects))
return fakestorage.NewServer(serverObjects)
return serverObjects
}

func TestPrepareFakeServer(t *testing.T) {
// given
indexFile := "testdata/search-index-all-1.json"
indexFile := "testdata/search-index-all-full.json"
testIndexFile, err := os.ReadFile(indexFile)
require.NoErrorf(t, err, "index file should be present in testdata")

Expand All @@ -100,11 +125,13 @@ func TestPrepareFakeServer(t *testing.T) {
require.NotNil(t, client, "client should be initialized")

aCursor := readObject(t, client.Bucket(fakePackageStorageBucketInternal).Object(cursorStoragePath))
assert.Equal(t, []byte(`{"cursor":"1"}`), aCursor)
assert.Equal(t, []byte(`{"current":"1"}`), aCursor)
anIndex := readObject(t, client.Bucket(fakePackageStorageBucketInternal).Object(joinObjectPaths(v2MetadataStoragePath, "1", searchIndexAllFile)))
assert.Equal(t, testIndexFile, anIndex)
packageZip := readObject(t, client.Bucket(fakePackageStorageBucketPublic).Object(joinObjectPaths(artifactsPackagesStoragePath, "1password-1.1.1.zip")))
assert.NotZero(t, len(packageZip), ".zip package must have fake content")
packageSig := readObject(t, client.Bucket(fakePackageStorageBucketPublic).Object(joinObjectPaths(artifactsPackagesStoragePath, "1password-1.1.1.zip.sig")))
assert.NotZero(t, len(packageSig), ".zip.sig must have fake content")

// check few static files
readme := readObject(t, client.Bucket(fakePackageStorageBucketPublic).Object(joinObjectPaths(artifactsStaticStoragePath, "1password-1.1.1", "docs/README.md")))
Expand Down
145 changes: 60 additions & 85 deletions storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,102 +4,77 @@

package storage

import (
"context"
"encoding/json"
"io/ioutil"

"cloud.google.com/go/storage"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/elastic/package-registry/packages"
"github.com/elastic/package-registry/util"
)

type searchIndexAll struct {
Packages []packageIndex `json:"packages"`
}

type packageIndex struct {
PackageManifest packageManifest `json:"package"`
DataStreamManifests []dataStreamManifest `json:"data_streams"`

Assets []string `json:"assets"`
PackageManifest packages.Package `json:"package_manifest"`
}

type packageManifest struct {
FormatVersion string `json:"format_version,omitempty"`
Name string `json:"name,omitempty"`
Title string `json:"title,omitempty"`
Version string `json:"version,omitempty"`
Release string `json:"release,omitempty"`
License string `json:"license,omitempty"`
Description string `json:"description,omitempty"`
Type string `json:"type,omitempty"`
Icons []image `json:"icons,omitempty"`
Screenshots []image `json:"screenshots,omitempty"`
Conditions *struct {
Kibana *struct {
Version string `json:"version,omitempty"`
} `json:"kibana,omitempty"`
} `json:"conditions,omitempty"`
Owner *struct {
Github string `json:"github,omitempty"`
} `json:"owner,omitempty"`
Categories []string `json:"categories,omitempty"`

PolicyTemplates []struct {
Name string `json:"name,omitempty"`
Title string `json:"title,omitempty"`
Categories []string `json:"categories,omitempty"`
DataStreams []string `json:"data_streams,omitempty"`
Description string `json:"description,omitempty"`
Icons []image `json:"icons,omitempty"`
Input struct {
Title string `json:"title,omitempty"`
Type string `json:"type,omitempty"`
Description string `json:"description,omitempty"`
InputGroup string `json:"input_group,omitempty"`
TemplatePath string `json:"template_path,omitempty"`
Vars []variable `json:"vars,omitempty"`
} `json:"input,omitempty"`
Screenshots []image `json:"screenshots,omitempty"`
Vars []variable `json:"vars,omitempty"`
} `json:"policy_templates,omitempty"`
func loadSearchIndexAll(ctx context.Context, storageClient *storage.Client, bucketName, rootStoragePath string, aCursor cursor) (*searchIndexAll, error) {
indexFile := searchIndexAllFile

logger := util.Logger()
logger.Debug("load search-index-all index", zap.String("index.file", indexFile))

content, err := loadIndexContent(ctx, storageClient, indexFile, bucketName, rootStoragePath, aCursor)
if err != nil {
return nil, errors.Wrap(err, "can't load search-index-all content")
}

var sia searchIndexAll
if content == nil {
return &sia, nil
}

err = json.Unmarshal(content, &sia)
if err != nil {
return nil, errors.Wrap(err, "can't unmarshal search-index-all")
}
return &sia, nil
}

type image struct {
Src string `json:"src,omitempty"`
Title string `json:"title,omitempty"`
Size string `json:"size,omitempty"`
Type string `json:"type,omitempty"`
func loadIndexContent(ctx context.Context, storageClient *storage.Client, indexFile, bucketName, rootStoragePath string, aCursor cursor) ([]byte, error) {
logger := util.Logger()
logger.Debug("load index content", zap.String("index.file", indexFile))

rootedIndexStoragePath := buildIndexStoragePath(rootStoragePath, aCursor, indexFile)
objectReader, err := storageClient.Bucket(bucketName).Object(rootedIndexStoragePath).NewReader(ctx)
if err != nil {
return nil, errors.Wrapf(err, "can't read the index file (path: %s)", rootedIndexStoragePath)
}
defer objectReader.Close()

b, err := ioutil.ReadAll(objectReader)
if err != nil {
return nil, errors.Wrapf(err, "ioutil.ReadAll failed")
}

return b, nil
}

type variable struct {
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Multi bool `json:"multi,omitempty"`
Required bool `json:"required,omitempty"`
ShowUser bool `json:"show_user,omitempty"`
Default interface{} `json:"default,omitempty"`
func buildIndexStoragePath(rootStoragePath string, aCursor cursor, indexFile string) string {
return joinObjectPaths(rootStoragePath, v2MetadataStoragePath, aCursor.Current, indexFile)
}

type dataStreamManifest struct {
Title string `json:"title,omitempty"`
Type string `type:"type,omitempty"`
Dataset string `json:"dataset,omitempty"`
Hidden bool `json:"hidden,omitempty"`
IlmPolicy string `json:"ilm_policy,omitempty"`
DatasetIsPrefix bool `json:"dataset_is_prefix,omitempty"`
Release string `json:"release,omitempty"`
Streams []struct {
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Enabled bool `json:"enabled,omitempty"`
Input string `json:"input,omitempty"`
TemplatePath string `json:"template_path,omitempty"`
Vars []variable `json:"vars,omitempty"`
} `json:"streams,omitempty" `
Elasticsearch *struct {
IndexTemplate *struct {
Settings map[string]interface{} `json:"settings,omitempty"`
Mappings map[string]interface{} `json:"mappings,omitempty"`
}
IngestPipeline *struct {
Name string `json:"name,omitempty"`
} `json:"ingest_pipeline,omitempty"`
Privileges *struct {
Indices []string `json:"indices,omitempty"`
} `json:"privileges,omitempty"`
} `json:"elasticsearch,omitempty"`
func transformSearchIndexAllToPackages(sia searchIndexAll) (packages.Packages, error) {
var transformedPackages packages.Packages
for i := range sia.Packages {
transformedPackages = append(transformedPackages, &sia.Packages[i].PackageManifest)
}
return transformedPackages, nil
}
Loading