Skip to content

Commit

Permalink
storage: Optimized read mode for default data storage
Browse files Browse the repository at this point in the history
A new optimized read mode has been added to the default in-memory store, where data written to the store is eagerly converted to AST values (the data format used during evaluation). This pre-converted data is faster to read, and won’t cause memory spikes during load; but comes with slower data writes (affects startup and bundle load/update time) and a larger lowest overall memory footprint for OPA. Can be enabled for `opa run`, `opa eval`, and `opa bench` by setting the `—optimize-store-for-read-speed`. See http://localhost:8888/docs/edge/policy-performance/#storage-optimization.

Implements: #4147

Signed-off-by: Johan Fylling <johan.dev@fylling.se>
Co-authored-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
johanfylling and ashutosh-narkar authored Oct 30, 2024
1 parent 1b797d9 commit 6af5e79
Show file tree
Hide file tree
Showing 24 changed files with 3,013 additions and 1,079 deletions.
21 changes: 21 additions & 0 deletions ast/term.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,11 @@ func (arr *Array) Elem(i int) *Term {
return arr.elems[i]
}

// Set sets the element i of arr.
func (arr *Array) Set(i int, v *Term) {
arr.set(i, v)
}

// rehash updates the cached hash of arr.
func (arr *Array) rehash() {
arr.hash = 0
Expand All @@ -1306,6 +1311,7 @@ func (arr *Array) set(i int, v *Term) {
arr.ground = arr.ground && v.IsGround()
arr.elems[i] = v
arr.hashs[i] = v.Value.Hash()
arr.rehash()
}

// Slice returns a slice of arr starting from i index to j. -1
Expand Down Expand Up @@ -2560,6 +2566,8 @@ func (obj *object) insert(k, v *Term) {
}

curr.value = v

obj.rehash()
return
}
}
Expand All @@ -2584,6 +2592,19 @@ func (obj *object) insert(k, v *Term) {
}
}

func (obj *object) rehash() {
// obj.keys is considered truth, from which obj.hash and obj.elems are recalculated.

obj.hash = 0
obj.elems = make(map[int]*objectElem, len(obj.keys))

for _, elem := range obj.keys {
hash := elem.key.Hash()
obj.hash += hash + elem.value.Hash()
obj.elems[hash] = elem
}
}

func filterObject(o Value, filter Value) (Value, error) {
if filter.Compare(Null{}) == 0 {
return o, nil
Expand Down
63 changes: 48 additions & 15 deletions bundle/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,25 @@ func metadataPath(name string) storage.Path {
return append(BundlesBasePath, name, "manifest", "metadata")
}

func read(ctx context.Context, store storage.Store, txn storage.Transaction, path storage.Path) (interface{}, error) {
value, err := store.Read(ctx, txn, path)
if err != nil {
return nil, err
}

if astValue, ok := value.(ast.Value); ok {
value, err = ast.JSON(astValue)
if err != nil {
return nil, err
}
}

return value, nil
}

// ReadBundleNamesFromStore will return a list of bundle names which have had their metadata stored.
func ReadBundleNamesFromStore(ctx context.Context, store storage.Store, txn storage.Transaction) ([]string, error) {
value, err := store.Read(ctx, txn, BundlesBasePath)
value, err := read(ctx, store, txn, BundlesBasePath)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,7 +169,7 @@ func eraseWasmModulesFromStore(ctx context.Context, store storage.Store, txn sto
// ReadWasmMetadataFromStore will read Wasm module resolver metadata from the store.
func ReadWasmMetadataFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, name string) ([]WasmResolver, error) {
path := wasmEntrypointsPath(name)
value, err := store.Read(ctx, txn, path)
value, err := read(ctx, store, txn, path)
if err != nil {
return nil, err
}
Expand All @@ -176,7 +192,7 @@ func ReadWasmMetadataFromStore(ctx context.Context, store storage.Store, txn sto
// ReadWasmModulesFromStore will write Wasm module resolver metadata from the store.
func ReadWasmModulesFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, name string) (map[string][]byte, error) {
path := wasmModulePath(name)
value, err := store.Read(ctx, txn, path)
value, err := read(ctx, store, txn, path)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -205,7 +221,7 @@ func ReadWasmModulesFromStore(ctx context.Context, store storage.Store, txn stor
// If the bundle is not activated, this function will return
// storage NotFound error.
func ReadBundleRootsFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, name string) ([]string, error) {
value, err := store.Read(ctx, txn, rootsPath(name))
value, err := read(ctx, store, txn, rootsPath(name))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -235,7 +251,7 @@ func ReadBundleRevisionFromStore(ctx context.Context, store storage.Store, txn s
}

func readRevisionFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, path storage.Path) (string, error) {
value, err := store.Read(ctx, txn, path)
value, err := read(ctx, store, txn, path)
if err != nil {
return "", err
}
Expand All @@ -256,7 +272,7 @@ func ReadBundleMetadataFromStore(ctx context.Context, store storage.Store, txn s
}

func readMetadataFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, path storage.Path) (map[string]interface{}, error) {
value, err := store.Read(ctx, txn, path)
value, err := read(ctx, store, txn, path)
if err != nil {
return nil, suppressNotFound(err)
}
Expand All @@ -277,7 +293,7 @@ func ReadBundleEtagFromStore(ctx context.Context, store storage.Store, txn stora
}

func readEtagFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, path storage.Path) (string, error) {
value, err := store.Read(ctx, txn, path)
value, err := read(ctx, store, txn, path)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -544,14 +560,7 @@ func activateDeltaBundles(opts *ActivateOpts, bundles map[string]*Bundle) error
return err
}

bs, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("corrupt manifest data: %w", err)
}

var manifest Manifest

err = util.UnmarshalJSON(bs, &manifest)
manifest, err := valueToManifest(value)
if err != nil {
return fmt.Errorf("corrupt manifest data: %w", err)
}
Expand Down Expand Up @@ -585,6 +594,30 @@ func activateDeltaBundles(opts *ActivateOpts, bundles map[string]*Bundle) error
return nil
}

func valueToManifest(v interface{}) (Manifest, error) {
if astV, ok := v.(ast.Value); ok {
var err error
v, err = ast.JSON(astV)
if err != nil {
return Manifest{}, err
}
}

var manifest Manifest

bs, err := json.Marshal(v)
if err != nil {
return Manifest{}, err
}

err = util.UnmarshalJSON(bs, &manifest)
if err != nil {
return Manifest{}, err
}

return manifest, nil
}

// erase bundles by name and roots. This will clear all policies and data at its roots and remove its
// manifest from storage.
func eraseBundles(ctx context.Context, store storage.Store, txn storage.Transaction, parserOpts ast.ParserOptions, names map[string]struct{}, roots map[string]struct{}) (map[string]*ast.Module, error) {
Expand Down
Loading

0 comments on commit 6af5e79

Please sign in to comment.