Skip to content

Commit

Permalink
Persist activated bundle etag to store
Browse files Browse the repository at this point in the history
Currently etag from the HTTP response of activated bundles is not
persisted to store. Hence if OPA restarts and an activated bundle
loaded from the disk store is up-to-date, OPA may still download
the same version of the bundle and activate it. With this change,
OPA should include the right etag in the bundle download request
thereby avoiding unnecessary bundle download and activation.

Fixes: open-policy-agent#4544

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar authored and rokkiter committed Apr 18, 2022
1 parent a2c8499 commit 4a12289
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 23 deletions.
10 changes: 10 additions & 0 deletions bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Bundle struct {
WasmModules []WasmModuleFile
PlanModules []PlanModuleFile
Patch Patch
Etag string
}

// Patch contains an array of objects wherein each object represents the patch operation to be
Expand Down Expand Up @@ -342,6 +343,7 @@ type Reader struct {
processAnnotations bool
files map[string]FileInfo // files in the bundle signature payload
sizeLimitBytes int64
etag string
}

// NewReader is deprecated. Use NewCustomReader instead.
Expand Down Expand Up @@ -406,6 +408,12 @@ func (r *Reader) WithSizeLimitBytes(n int64) *Reader {
return r
}

// WithBundleEtag sets the given etag value on the bundle
func (r *Reader) WithBundleEtag(etag string) *Reader {
r.etag = etag
return r
}

// Read returns a new Bundle loaded from the reader.
func (r *Reader) Read() (Bundle, error) {

Expand Down Expand Up @@ -584,6 +592,8 @@ func (r *Reader) Read() (Bundle, error) {
}
}

bundle.Etag = r.etag

return bundle, nil
}

Expand Down
17 changes: 17 additions & 0 deletions bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ func TestReadWithSizeLimit(t *testing.T) {
}
}

func TestReadWithBundleEtag(t *testing.T) {

files := [][2]string{
{"/.manifest", `{"revision": "quickbrownfaux"}`},
}

buf := archive.MustWriteTarGz(files)
bundle, err := NewReader(buf).WithBundleEtag("foo").Read()
if err != nil {
t.Fatal(err)
}

if bundle.Etag != "foo" {
t.Fatalf("Expected bundle etag foo but got %v\n", bundle.Etag)
}
}

func testReadBundle(t *testing.T, baseDir string) {
module := `package example`

Expand Down
59 changes: 59 additions & 0 deletions bundle/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func ManifestStoragePath(name string) storage.Path {
return append(BundlesBasePath, name, "manifest")
}

// EtagStoragePath is the storage path used for the given named bundle etag.
func EtagStoragePath(name string) storage.Path {
return append(BundlesBasePath, name, "etag")
}

func namedBundlePath(name string) storage.Path {
return append(BundlesBasePath, name)
}
Expand Down Expand Up @@ -79,6 +84,11 @@ func WriteManifestToStore(ctx context.Context, store storage.Store, txn storage.
return write(ctx, store, txn, ManifestStoragePath(name), manifest)
}

// WriteEtagToStore will write the bundle etag into the storage. This function is called when the bundle is activated.
func WriteEtagToStore(ctx context.Context, store storage.Store, txn storage.Transaction, name, etag string) error {
return write(ctx, store, txn, EtagStoragePath(name), etag)
}

func write(ctx context.Context, store storage.Store, txn storage.Transaction, path storage.Path, value interface{}) error {
if err := util.RoundTrip(&value); err != nil {
return err
Expand All @@ -104,6 +114,14 @@ func EraseManifestFromStore(ctx context.Context, store storage.Store, txn storag
return suppressNotFound(err)
}

// eraseBundleEtagFromStore will remove the bundle etag from storage. This function is called
// when the bundle is deactivated.
func eraseBundleEtagFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, name string) error {
path := EtagStoragePath(name)
err := store.Write(ctx, txn, storage.RemoveOp, path, nil)
return suppressNotFound(err)
}

func suppressNotFound(err error) error {
if err == nil || storage.IsNotFound(err) {
return nil
Expand Down Expand Up @@ -249,6 +267,27 @@ func readMetadataFromStore(ctx context.Context, store storage.Store, txn storage
return data, nil
}

// ReadBundleEtagFromStore returns the etag for the specified bundle.
// If the bundle is not activated, this function will return
// storage NotFound error.
func ReadBundleEtagFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, name string) (string, error) {
return readEtagFromStore(ctx, store, txn, EtagStoragePath(name))
}

func readEtagFromStore(ctx context.Context, store storage.Store, txn storage.Transaction, path storage.Path) (string, error) {
value, err := store.Read(ctx, txn, path)
if err != nil {
return "", err
}

str, ok := value.(string)
if !ok {
return "", fmt.Errorf("corrupt bundle etag")
}

return str, nil
}

// ActivateOpts defines options for the Activate API call.
type ActivateOpts struct {
Ctx context.Context
Expand Down Expand Up @@ -373,6 +412,10 @@ func activateBundles(opts *ActivateOpts) error {
return err
}

if err := writeEtagToStore(opts, name, b.Etag); err != nil {
return err
}

if err := writeWasmModulesToStore(opts.Ctx, opts.Store, opts.Txn, name, b); err != nil {
return err
}
Expand Down Expand Up @@ -426,6 +469,10 @@ func activateDeltaBundles(opts *ActivateOpts, bundles map[string]*Bundle) error
if err := writeManifestToStore(opts, name, b.Manifest); err != nil {
return err
}

if err := writeEtagToStore(opts, name, b.Etag); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -453,6 +500,10 @@ func eraseBundles(ctx context.Context, store storage.Store, txn storage.Transact
return nil, err
}

if err := eraseBundleEtagFromStore(ctx, store, txn, name); suppressNotFound(err) != nil {
return nil, err
}

if err := eraseWasmModulesFromStore(ctx, store, txn, name); suppressNotFound(err) != nil {
return nil, err
}
Expand Down Expand Up @@ -532,6 +583,14 @@ func writeManifestToStore(opts *ActivateOpts, name string, manifest Manifest) er
return nil
}

func writeEtagToStore(opts *ActivateOpts, name, etag string) error {
if err := WriteEtagToStore(opts.Ctx, opts.Store, opts.Txn, name, etag); err != nil {
return err
}

return nil
}

func writeData(ctx context.Context, store storage.Store, txn storage.Transaction, roots []string, data map[string]interface{}) error {
for _, root := range roots {
path, ok := storage.ParsePathEscaped("/" + root)
Expand Down
23 changes: 17 additions & 6 deletions bundle/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func TestBundleLifecycle(t *testing.T) {
Parsed: ast.MustParseModule(mod2),
},
},
Etag: "foo",
},
"bundle2": {
Manifest: Manifest{
Expand Down Expand Up @@ -318,13 +319,15 @@ func TestBundleLifecycle(t *testing.T) {
"manifest": {
"revision": "",
"roots": ["a"]
}
},
"etag": "foo"
},
"bundle2": {
"manifest": {
"revision": "",
"roots": ["b", "c"]
}
},
"etag": ""
}
}
}
Expand Down Expand Up @@ -415,6 +418,7 @@ func TestDeltaBundleLifecycle(t *testing.T) {
Parsed: ast.MustParseModule(mod1),
},
},
Etag: "foo",
},
"bundle2": {
Manifest: Manifest{
Expand Down Expand Up @@ -534,13 +538,15 @@ func TestDeltaBundleLifecycle(t *testing.T) {
Roots: &[]string{"a"},
},
Patch: Patch{Data: []PatchOperation{p1, p2, p3, p4, p5, p6}},
Etag: "bar",
},
"bundle2": {
Manifest: Manifest{
Revision: "delta-2",
Roots: &[]string{"b", "c"},
},
Patch: Patch{Data: []PatchOperation{p7}},
Etag: "baz",
},
"bundle3": {
Manifest: Manifest{
Expand Down Expand Up @@ -608,19 +614,22 @@ func TestDeltaBundleLifecycle(t *testing.T) {
"manifest": {
"revision": "delta-1",
"roots": ["a"]
}
},
"etag": "bar"
},
"bundle2": {
"manifest": {
"revision": "delta-2",
"roots": ["b", "c"]
}
},
"etag": "baz"
},
"bundle3": {
"manifest": {
"revision": "",
"roots": ["d"]
}
},
"etag": ""
}
}
}
Expand Down Expand Up @@ -659,6 +668,7 @@ func TestDeltaBundleActivate(t *testing.T) {
Roots: &[]string{"a"},
},
Patch: Patch{Data: []PatchOperation{p1}},
Etag: "foo",
},
}

Expand Down Expand Up @@ -722,7 +732,8 @@ func TestDeltaBundleActivate(t *testing.T) {
"manifest": {
"revision": "delta",
"roots": ["a"]
}
},
"etag": "foo"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ func (d *Downloader) download(ctx context.Context, m metrics.Metrics) (*download
loader = bundle.NewTarballLoaderWithBaseURL(resp.Body, baseURL)
}

reader := bundle.NewCustomReader(loader).WithMetrics(m).WithBundleVerificationConfig(d.bvc)
etag := resp.Header.Get("ETag")
reader := bundle.NewCustomReader(loader).WithMetrics(m).WithBundleVerificationConfig(d.bvc).
WithBundleEtag(etag)
if d.sizeLimitBytes != nil {
reader = reader.WithSizeLimitBytes(*d.sizeLimitBytes)
}
Expand Down Expand Up @@ -337,7 +339,7 @@ func (d *Downloader) download(ctx context.Context, m metrics.Metrics) (*download
return &downloaderResponse{
b: &b,
raw: &buf,
etag: resp.Header.Get("ETag"),
etag: etag,
longPoll: isLongPollSupported(resp.Header),
}, nil
}
Expand Down
34 changes: 34 additions & 0 deletions download/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,40 @@ func TestEtagCachingLifecycle(t *testing.T) {
}
}

func TestOneShotWithBundleEtag(t *testing.T) {

ctx := context.Background()
fixture := newTestFixture(t)
fixture.d = New(Config{}, fixture.client, "/bundles/test/bundle1").WithCallback(fixture.oneShot)
fixture.server.expEtag = "some etag value"
defer fixture.server.stop()

// check etag on the downloader is empty
if fixture.d.etag != "" {
t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag)
}

// simulate successful bundle activation and check updated etag on the downloader
fixture.server.expCode = 0
err := fixture.d.oneShot(ctx)
if err != nil {
t.Fatal("Unexpected:", err)
}

if fixture.d.etag != fixture.server.expEtag {
t.Fatalf("Expected downloader ETag %v but got %v", fixture.server.expEtag, fixture.d.etag)
}

if fixture.updates[0].Bundle == nil {
// 200 response on first request, bundle should be present
t.Errorf("Expected bundle in response")
}

if fixture.updates[0].Bundle.Etag != fixture.server.expEtag {
t.Fatalf("Expected bundle ETag %v but got %v", fixture.server.expEtag, fixture.updates[0].Bundle.Etag)
}
}

func TestFailureAuthn(t *testing.T) {

ctx := context.Background()
Expand Down
43 changes: 39 additions & 4 deletions plugins/bundle/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (p *Plugin) Start(ctx context.Context) error {

p.loadAndActivateBundlesFromDisk(ctx)

p.initDownloaders()
p.initDownloaders(ctx)
for name, dl := range p.downloaders {
p.log(name).Info("Starting bundle loader.")
dl.Start(ctx)
Expand Down Expand Up @@ -222,8 +222,16 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) {
} else {
p.log(name).Info("Bundle loader configuration changed. Restarting bundle loader.")
}
p.downloaders[name] = p.newDownloader(name, source)

downloader := p.newDownloader(name, source)

etag := p.readBundleEtagFromStore(ctx, name)
downloader.SetCache(etag)

p.downloaders[name] = downloader
p.etags[name] = etag
p.downloaders[name].Start(ctx)

readyNow = false
}
}
Expand Down Expand Up @@ -303,11 +311,38 @@ func (p *Plugin) Config() *Config {
return &p.config
}

func (p *Plugin) initDownloaders() {
func (p *Plugin) initDownloaders(ctx context.Context) {

// Initialize a downloader for each bundle configured.
for name, source := range p.config.Bundles {
p.downloaders[name] = p.newDownloader(name, source)
downloader := p.newDownloader(name, source)

etag := p.readBundleEtagFromStore(ctx, name)
downloader.SetCache(etag)

p.downloaders[name] = downloader
p.etags[name] = etag
}
}

func (p *Plugin) readBundleEtagFromStore(ctx context.Context, name string) string {
var etag string
err := storage.Txn(ctx, p.manager.Store, storage.TransactionParams{}, func(txn storage.Transaction) error {
var loadErr error
etag, loadErr = bundle.ReadBundleEtagFromStore(ctx, p.manager.Store, txn, name)
if loadErr != nil && !storage.IsNotFound(loadErr) {
p.log(name).Error("Failed to load bundle etag from store: %v", loadErr)
return loadErr
}
return nil
})
if err != nil {
// TODO: This probably shouldn't panic. But OPA shouldn't
// continue in a potentially inconsistent state.
panic(errors.New("Unable to load bundle etag from store: " + err.Error()))
}

return etag
}

func (p *Plugin) loadAndActivateBundlesFromDisk(ctx context.Context) {
Expand Down
Loading

0 comments on commit 4a12289

Please sign in to comment.