Skip to content

Commit

Permalink
clean expired cache entries periodically
Browse files Browse the repository at this point in the history
Signed-off-by: Rudrakh Panigrahi <rudrakh97@gmail.com>
  • Loading branch information
rudrakhp committed Nov 10, 2023
1 parent 89855df commit cb43b1e
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 71 deletions.
3 changes: 2 additions & 1 deletion internal/wasm/sdk/internal/wasm/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package wasm_test

import (
"context"
"github.com/open-policy-agent/opa/logging"
"math/rand"
"strings"
"testing"
Expand Down Expand Up @@ -177,7 +178,7 @@ func ensurePoolResults(t *testing.T, ctx context.Context, testPool *wasm.Pool, p
toRelease = append(toRelease, vm)

cfg, _ := cache.ParseCachingConfig(nil)
result, err := vm.Eval(ctx, 0, input, metrics.New(), rand.New(rand.NewSource(0)), time.Now(), cache.NewInterQueryCache(cfg), builtins.NDBCache{}, nil, nil)
result, err := vm.Eval(ctx, 0, input, metrics.New(), rand.New(rand.NewSource(0)), time.Now(), cache.NewInterQueryCache(ctx, logging.NewNoOpLogger(), cfg), builtins.NDBCache{}, nil, nil)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion rego/rego_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/open-policy-agent/opa/ast/location"
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/internal/storage/mock"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
Expand Down Expand Up @@ -2155,7 +2156,7 @@ func TestEvalWithInterQueryCache(t *testing.T) {

// add an inter-query cache
config, _ := cache.ParseCachingConfig(nil)
interQueryCache := cache.NewInterQueryCache(config)
interQueryCache := cache.NewInterQueryCache(context.Background(), logging.NewNoOpLogger(), config)

ctx := context.Background()
_, err := New(Query(query), InterQueryBuiltinCache(interQueryCache)).Eval(ctx)
Expand Down
5 changes: 3 additions & 2 deletions rego/rego_wasmtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/open-policy-agent/opa/ast"
sdk_errors "github.com/open-policy-agent/opa/internal/wasm/sdk/opa/errors"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/storage/inmem"
"github.com/open-policy-agent/opa/topdown"
"github.com/open-policy-agent/opa/topdown/cache"
Expand Down Expand Up @@ -325,7 +326,7 @@ func TestEvalWasmWithInterQueryCache(t *testing.T) {

// add an inter-query cache
config, _ := cache.ParseCachingConfig(nil)
interQueryCache := cache.NewInterQueryCache(config)
interQueryCache := cache.NewInterQueryCache(context.Background(), logging.NewNoOpLogger(), config)

ctx := context.Background()
_, err := New(Target("wasm"), Query(query), InterQueryBuiltinCache(interQueryCache)).Eval(ctx)
Expand Down Expand Up @@ -367,7 +368,7 @@ func TestEvalWasmWithHTTPAllowNet(t *testing.T) {

// add an inter-query cache
config, _ := cache.ParseCachingConfig(nil)
interQueryCache := cache.NewInterQueryCache(config)
interQueryCache := cache.NewInterQueryCache(context.Background(), logging.NewNoOpLogger(), config)

ctx := context.Background()
// StrictBuiltinErrors(true) has no effect when target is 'wasm'
Expand Down
10 changes: 9 additions & 1 deletion runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,18 @@ func (rt *Runtime) Serve(ctx context.Context) error {
rt.server = rt.server.WithUnixSocketPermission(rt.Params.UnixSocketPerm)
}

rt.server, err = rt.server.Init(ctx)
ctx, cancel := context.WithCancel(ctx)
rt.server, err = rt.server.Init(ctx, rt.logger)
if err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to initialize server.")
cancel()
return err
}

if rt.Params.Watch {
if err := rt.startWatcher(ctx, rt.Params.Paths, rt.onReloadLogger); err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to open watch.")
cancel()
return err
}
}
Expand All @@ -594,12 +597,14 @@ func (rt *Runtime) Serve(ctx context.Context) error {
100*time.Millisecond,
time.Second*time.Duration(rt.Params.ReadyTimeout)); err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to wait for plugins activation.")
cancel()
return err
}

loops, err := rt.server.Listeners()
if err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to create listeners.")
cancel()
return err
}

Expand Down Expand Up @@ -630,11 +635,14 @@ func (rt *Runtime) Serve(ctx context.Context) error {
for {
select {
case <-ctx.Done():
cancel()
return rt.gracefulServerShutdown(rt.server)
case <-signalc:
cancel()
return rt.gracefulServerShutdown(rt.server)
case err := <-errc:
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Listener failed.")
cancel()
os.Exit(1)
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/opa.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (opa *OPA) configure(ctx context.Context, bs []byte, ready chan struct{}, b

opa.state.manager = manager
opa.state.queryCache.Clear()
opa.state.interQueryBuiltinCache = cache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig())
opa.state.interQueryBuiltinCache = cache.NewInterQueryCache(ctx, opa.logger, manager.InterQueryBuiltinCacheConfig())
opa.config = bs

return nil
Expand Down
4 changes: 3 additions & 1 deletion server/authorizer/authorizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package authorizer

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -15,6 +16,7 @@ import (
"testing"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/server/identifier"
"github.com/open-policy-agent/opa/server/types"
"github.com/open-policy-agent/opa/storage/inmem"
Expand Down Expand Up @@ -499,7 +501,7 @@ func TestInterQueryCache(t *testing.T) {
}

config, _ := cache.ParseCachingConfig(nil)
interQueryCache := cache.NewInterQueryCache(config)
interQueryCache := cache.NewInterQueryCache(context.Background(), logging.NewNoOpLogger(), config)

basic := NewBasic(&mockHandler{}, compiler, inmem.New(), InterQueryCache(interQueryCache), Decision(func() ast.Ref {
return ast.MustParseRef("data.system.authz.allow")
Expand Down
8 changes: 4 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func New() *Server {

// Init initializes the server. This function MUST be called before starting any loops
// from s.Listeners().
func (s *Server) Init(ctx context.Context) (*Server, error) {
s.initRouters()
func (s *Server) Init(ctx context.Context, logger logging.Logger) (*Server, error) {
s.initRouters(ctx, logger)

txn, err := s.store.NewTransaction(ctx, storage.WriteParams)
if err != nil {
Expand Down Expand Up @@ -706,7 +706,7 @@ func (s *Server) initHandlerCompression(handler http.Handler) (http.Handler, err
return compressHandler, nil
}

func (s *Server) initRouters() {
func (s *Server) initRouters(ctx context.Context, logger logging.Logger) {
mainRouter := s.router
if mainRouter == nil {
mainRouter = mux.NewRouter()
Expand All @@ -715,7 +715,7 @@ func (s *Server) initRouters() {
diagRouter := mux.NewRouter()

// authorizer, if configured, needs the iCache to be set up already
s.interQueryBuiltinCache = iCache.NewInterQueryCache(s.manager.InterQueryBuiltinCacheConfig())
s.interQueryBuiltinCache = iCache.NewInterQueryCache(ctx, logger, s.manager.InterQueryBuiltinCacheConfig())
s.manager.RegisterCacheTrigger(s.updateCacheConfig)

// Add authorization handler. This must come BEFORE authentication handler
Expand Down
14 changes: 7 additions & 7 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3395,7 +3395,7 @@ func TestQueryPostBasic(t *testing.T) {
WithAddresses([]string{"localhost:8182"}).
WithStore(f.server.store).
WithManager(f.server.manager).
Init(context.Background())
Init(context.Background(), logging.NewNoOpLogger())

setup := []tr{
{http.MethodPost, "/query", `{"query": "a=data.k.x with data.k as {\"x\" : 7}"}`, 200, `{"result":[{"a":7}]}`},
Expand Down Expand Up @@ -3909,7 +3909,7 @@ func TestAuthorization(t *testing.T) {
WithStore(store).
WithManager(m).
WithAuthorization(AuthorizationBasic).
Init(ctx)
Init(ctx, logging.NewNoOpLogger())

if err != nil {
panic(err)
Expand Down Expand Up @@ -4040,7 +4040,7 @@ allow {
WithStore(store).
WithManager(m).
WithAuthorization(AuthorizationBasic).
Init(ctx)
Init(ctx, logging.NewNoOpLogger())

if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -4225,7 +4225,7 @@ func TestQueryBindingIterationError(t *testing.T) {
panic(err)
}

server, err := New().WithStore(mock).WithManager(m).WithAddresses([]string{":8182"}).Init(ctx)
server, err := New().WithStore(mock).WithManager(m).WithAddresses([]string{":8182"}).Init(ctx, logging.NewNoOpLogger())
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -4288,7 +4288,7 @@ func newFixture(t *testing.T, opts ...func(*Server)) *fixture {
if err := m.Start(ctx); err != nil {
t.Fatal(err)
}
server, err = server.Init(ctx)
server, err = server.Init(ctx, logging.NewNoOpLogger())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -4318,7 +4318,7 @@ func newFixtureWithConfig(t *testing.T, config string, opts ...func(*Server)) *f
if err := m.Start(ctx); err != nil {
t.Fatal(err)
}
server, err = server.Init(ctx)
server, err = server.Init(ctx, logging.NewNoOpLogger())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -4349,7 +4349,7 @@ func newFixtureWithStore(t *testing.T, store storage.Store, opts ...func(*Server
for _, opt := range opts {
opt(server)
}
server, err = server.Init(ctx)
server, err = server.Init(ctx, logging.NewNoOpLogger())
if err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit cb43b1e

Please sign in to comment.