Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use JSON decoder tokenizer to parse packages from storage indexer #881

Merged
merged 5 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Bugfixes

* Reduce peak memory footprint of recycling indices from storage. [#881](https://github.com/elastic/package-registry/pull/881)

### Added

### Deprecated
Expand Down
14 changes: 7 additions & 7 deletions storage/fakestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ var FakeIndexerOptions = IndexerOptions{
WatchInterval: 0,
}

func PrepareFakeServer(t *testing.T, indexPath string) *fakestorage.Server {
func PrepareFakeServer(tb testing.TB, indexPath string) *fakestorage.Server {
indexContent, err := ioutil.ReadFile(indexPath)
require.NoError(t, err, "index file must be populated")
require.NoError(tb, err, "index file must be populated")

const firstRevision = "1"
serverObjects := prepareServerObjects(t, firstRevision, indexContent)
serverObjects := prepareServerObjects(tb, firstRevision, indexContent)
return fakestorage.NewServer(serverObjects)
}

Expand All @@ -41,11 +41,11 @@ func updateFakeServer(t *testing.T, server *fakestorage.Server, revision, indexP
}
}

func prepareServerObjects(t *testing.T, revision string, indexContent []byte) []fakestorage.Object {
func prepareServerObjects(tb testing.TB, revision string, indexContent []byte) []fakestorage.Object {
var index searchIndexAll
err := json.Unmarshal(indexContent, &index)
require.NoError(t, err, "index file must be valid")
require.NotEmpty(t, index.Packages, "index file must contain some package entries")
require.NoError(tb, err, "index file must be valid")
require.NotEmpty(tb, index.Packages, "index file must contain some package entries")

var serverObjects []fakestorage.Object
// Add cursor and index file
Expand All @@ -61,6 +61,6 @@ func prepareServerObjects(t *testing.T, revision string, indexContent []byte) []
},
Content: indexContent,
})
t.Logf("Prepared %d packages with total %d server objects.", len(index.Packages), len(serverObjects))
tb.Logf("Prepared %d packages with total %d server objects.", len(index.Packages), len(serverObjects))
return serverObjects
}
49 changes: 46 additions & 3 deletions storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,53 @@ func loadSearchIndexAll(ctx context.Context, storageClient *storage.Client, buck
}
defer objectReader.Close()

// Using a decoder here as tokenizer to parse the list of packages as a stream
// instead of needing the whole document in memory at the same time. This helps
// reducing memory usage.
// Using `Unmarshal(doc, &sia)` would require to read the whole document.
// Using `dec.Decode(&sia)` would also make the decoder to keep the whole document
// in memory.
// `jsoniter` seemed to be slightly faster, but to use more memory for our use case,
// and we are looking to optimize for memory use.
var sia searchIndexAll
err = json.NewDecoder(objectReader).Decode(&sia)
if err != nil {
return nil, errors.Wrapf(err, "can't decode the index file (path: %s)", rootedIndexStoragePath)
dec := json.NewDecoder(objectReader)
for dec.More() {
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
// Read everything till the "packages" key in the map.
token, err := dec.Token()
if err != nil {
return nil, errors.Wrapf(err, "unexpected error while reading index file")
}
if key, ok := token.(string); !ok || key != "packages" {
continue
}

// Read the opening array now.
token, err = dec.Token()
if err != nil {
return nil, errors.Wrapf(err, "unexpected error while reading index file")
}
if delim, ok := token.(json.Delim); !ok || delim != '[' {
return nil, errors.Errorf("expected opening array, found %v", token)
}

// Read the array of packages one by one.
for dec.More() {
var p packageIndex
err = dec.Decode(&p)
if err != nil {
return nil, errors.Wrapf(err, "unexpected error parsing package from index file (token: %v)", token)
}
sia.Packages = append(sia.Packages, p)
}

// Read the closing array delimiter.
token, err = dec.Token()
if err != nil {
return nil, errors.Wrapf(err, "unexpected error while reading index file")
}
if delim, ok := token.(json.Delim); !ok || delim != ']' {
return nil, errors.Errorf("expected closing array, found %v", token)
}
}
return &sia, nil
}
Expand Down
13 changes: 13 additions & 0 deletions storage/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ func TestInit(t *testing.T) {
require.NoError(t, err)
}

func BenchmarkInit(b *testing.B) {
// given
fs := PrepareFakeServer(b, "testdata/search-index-all-full.json")
defer fs.Stop()
storageClient := fs.Client()

for i := 0; i < b.N; i++ {
indexer := NewIndexer(storageClient, FakeIndexerOptions)
err := indexer.Init(context.Background())
require.NoError(b, err)
}
}

func TestGet_ListAllPackages(t *testing.T) {
// given
fs := PrepareFakeServer(t, "testdata/search-index-all-full.json")
Expand Down