Skip to content

Commit

Permalink
Fail fast when bundle and discovery plugins faced with unrecoverable …
Browse files Browse the repository at this point in the history
…errors.

Signed-off-by: Pushpalanka Jayawardhana <pushpalanka.jayawardhana@zalando.de>
  • Loading branch information
Pushpalanka committed Aug 18, 2024
1 parent 526b67e commit f2b5ca6
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 76 deletions.
129 changes: 85 additions & 44 deletions filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,61 +477,78 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes []
// Start asynchronously starts the policy engine's plugins that download
// policies, report status, etc.
func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error {

discoveryPlugin := discovery.Lookup(opa.manager)
bundlePlugin := bundle.Lookup(opa.manager)

done := make(chan struct{})
defer close(done)
failed := make(chan error)
defer close(failed)
failed := make(chan error, 1)

discoveryPlugin.RegisterListener("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...))
}
var registerDiscoveryListenerOnce sync.Once
registerDiscoveryListenerOnce.Do(func() {
discoveryPlugin.RegisterListener("skipper-instance-startup-discovery", func(status bundle.Status) {
handleStatusErrors(status, failed, "discovery plugin")
})
})

bundlePlugin.Register("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...))
defer discoveryPlugin.Unregister("skipper-instance-startup-discovery")

// Register listener for "bundle" status
var registerBundleListenerOnce sync.Once
opa.manager.RegisterPluginStatusListener("skipper-instance-startup-plugin", func(status map[string]*plugins.Status) {
if _, exists := status["bundle"]; exists {
bundlePlugin := bundle.Lookup(opa.manager)
if bundlePlugin != nil {
registerBundleListenerOnce.Do(func() {
bundlePlugin.Register("skipper-instance-startup-bundle", func(status bundle.Status) {
handleStatusErrors(status, failed, "bundle plugin")
})
})
defer bundlePlugin.Unregister("skipper-instance-startup-bundle")
}
}
})
defer bundlePlugin.Unregister("startuplistener")
defer opa.manager.UnregisterPluginStatusListener("skipper-instance-startup-plugin")

opa.manager.RegisterPluginStatusListener("startuplistener", func(status map[string]*plugins.Status) {
for _, pluginstatus := range status {
if pluginstatus != nil && pluginstatus.State != plugins.StateOK {
// Register listener for general plugin status checks
opa.manager.RegisterPluginStatusListener("generalStartUpListener", func(status map[string]*plugins.Status) {
for _, pluginStatus := range status {
if pluginStatus != nil && pluginStatus.State != plugins.StateOK {
return
}
}
close(done)
})
defer opa.manager.UnregisterPluginStatusListener("startuplistener")
defer opa.manager.UnregisterPluginStatusListener("generalStartUpListener")

err := opa.manager.Start(ctx)
if err != nil {
return err
}

select {
case <-done:
return nil
case err := <-failed:
opa.Close(ctx)

return err
case <-ctx.Done():
timeoutErr := ctx.Err()

for pluginName, status := range opa.manager.PluginStatus() {
if status != nil && status.State != plugins.StateOK {
opa.Logger().WithFields(map[string]interface{}{
"plugin_name": pluginName,
"plugin_state": status.State,
"error_message": status.Message,
}).Error("Open policy agent plugin did not start in time")
}).Error("Open policy agent plugin: %v did not start in time", pluginName)
}
}
opa.Close(ctx)
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err)
if timeoutErr != nil {
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, timeoutErr)
}
return fmt.Errorf("one or more open policy agent plugins failed to start in %v", timeout)

case <-done:
return nil
case err := <-failed:
opa.Close(ctx)

return err
}
}

Expand All @@ -542,25 +559,6 @@ func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) {
})
}

func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error {
if fun() {
return nil
}
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out while starting: %w", ctx.Err())
case <-ticker.C:
if fun() {
return nil
}
}
}
}

func configLabelsInfo(opaConfig config.Config) func(*plugins.Manager) {
info := ast.NewObject()
labels := ast.NewObject()
Expand Down Expand Up @@ -826,3 +824,46 @@ func (l *QuietLogger) Error(fmt string, a ...interface{}) {
func (l *QuietLogger) Warn(fmt string, a ...interface{}) {
l.target.Warn(fmt, a)
}

var temporaryClientErrorHTTPCodes = map[int64]struct{}{
429: {}, // too many requests
408: {}, // request timeout
}

func isTemporaryError(code int64) bool {
_, exists := temporaryClientErrorHTTPCodes[code]
return exists
}

func handleStatusErrors(
status bundle.Status,
failed chan error,
prefix string,
) {
if status.Code == "bundle_error" {
if status.HTTPCode == "" {
failed <- formatStatusError(prefix, status)
return
}
code, err := status.HTTPCode.Int64()
if err == nil {
if code >= 400 && code < 500 && !isTemporaryError(code) {
// Fail for error codes in the range 400-500 excluding temporary errors
failed <- formatStatusError(prefix, status)
return
} else if code >= 500 {
// Do not fail for 5xx errors and keep retrying
return
}
}
if err != nil {
failed <- formatStatusError(prefix, status)
return
}
}
}

func formatStatusError(prefix string, status bundle.Status) error {
return fmt.Errorf("%s failed: Name: %s, Code: %s, Message: %s, HTTPCode: %s, Errors: %v",
prefix, status.Name, status.Code, status.Message, status.HTTPCode, status.Errors)
}
155 changes: 150 additions & 5 deletions filters/openpolicyagent/openpolicyagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/opentracing/opentracing-go"
"io"
"net/http"
"net/http/httptest"
"os"
"strconv"
"testing"
Expand All @@ -20,7 +22,6 @@ import (
opaconf "github.com/open-policy-agent/opa/config"
opasdktest "github.com/open-policy-agent/opa/sdk/test"
"github.com/open-policy-agent/opa/storage/inmem"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/filtertest"
Expand Down Expand Up @@ -242,7 +243,7 @@ func TestRegistry(t *testing.T) {
assert.Error(t, err, "should not work after close")
}

func TestOpaEngineStartFailureWithTimeout(t *testing.T) {
func TestOpaEngineStartFailureWithWrongBundle(t *testing.T) {
_, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery-with-wrong-bundle")

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
Expand All @@ -258,7 +259,7 @@ func TestOpaEngineStartFailureWithTimeout(t *testing.T) {

err = engine.Start(ctx, cfg.startupTimeout)
assert.True(t, engine.stopped)
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s")
assert.Contains(t, err.Error(), "bundle plugin failed: Name: bundles/non-existing-bundle, Code: bundle_error, Message: server replied with Not Found, HTTPCode: 404")
}

func TestOpaActivationSuccessWithDiscovery(t *testing.T) {
Expand Down Expand Up @@ -339,7 +340,7 @@ func TestOpaActivationTimeOutWithDiscoveryPointingWrongBundle(t *testing.T) {

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Nil(t, instance)
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded")
assert.Contains(t, err.Error(), "bundle plugin failed: Name: bundles/non-existing-bundle, Code: bundle_error, Message: server replied with Not Found, HTTPCode: 404")
assert.Equal(t, 0, len(registry.instances))
}

Expand All @@ -353,7 +354,7 @@ func TestOpaActivationTimeOutWithDiscoveryParsingError(t *testing.T) {

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Nil(t, instance)
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded")
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded")
assert.Equal(t, 0, len(registry.instances))
}

Expand Down Expand Up @@ -696,3 +697,147 @@ func TestBodyExtractionUnknownBody(t *testing.T) {
f1()
f2()
}

func TestOpaActivationFailureWithInvalidDiscovery(t *testing.T) {
_, config := mockControlPlaneWithDiscoveryBundle("bundles/invalid-discovery")

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config))
assert.NoError(t, err)

_, err = registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Error(t, err)
assert.Equal(t, "discovery plugin failed: Name: discovery, Code: bundle_error, Message: server replied with Not Found, HTTPCode: 404, Errors: []", err.Error())
}

func TestDiscoveryRetryErrors(t *testing.T) {
tests := []struct {
name string
statusCode int
expectedError string
}{
{"TemporaryError429", http.StatusTooManyRequests, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"},
{"TemporaryError408", http.StatusRequestTimeout, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"},
{"Retry5xx", http.StatusInternalServerError, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"},
{"Failure403BundleError", http.StatusForbidden, "discovery plugin failed: Name: discovery, Code: bundle_error, Message: server replied with Forbidden, HTTPCode: 403"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testDiscoveryBundleError(t, tt.statusCode, tt.expectedError)
})
}
}

func TestResourceBundleRetryErrors(t *testing.T) {
tests := []struct {
name string
statusCode int
expectedError string
}{
{"TemporaryError429", http.StatusTooManyRequests, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"},
{"TemporaryError408", http.StatusRequestTimeout, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"},
{"Retry5xx", http.StatusInternalServerError, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"},
{"Failure403BundleError", http.StatusForbidden, "bundle plugin failed: Name: test, Code: bundle_error, Message: server replied with Forbidden, HTTPCode: 403"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testResourceBundleError(t, tt.statusCode, tt.expectedError)
})
}
}

// Helper function to test discovery bundle temporary errors
func testDiscoveryBundleError(t *testing.T, statusCode int, expectedError string) {
mockDiscoveryBundleServer := mockBundleServerWithStatus("/bundles/discovery", statusCode)
defer mockDiscoveryBundleServer.Close()

// Create the plugin configuration
config := []byte(fmt.Sprintf(`{
"services": {
"test": {
"url": %q
}
},
"labels": {
"environment": "envValue"
},
"discovery": {
"name": "discovery",
"resource": "/bundles/discovery",
"service": "test"
}
}`, mockDiscoveryBundleServer.URL))

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second))
assert.NoError(t, err)

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Nil(t, instance)
assert.Contains(t, err.Error(), expectedError)
}

// Helper function to test resource bundle temporary errors
func testResourceBundleError(t *testing.T, statusCode int, expectedError string) {
mockServer := mockBundleServerWithStatus("/bundles/test.tgz", statusCode)
defer mockServer.Close()

// Create the OPA control plane with the discovery bundle
opaControlPlane := opasdktest.MustNewServer(
opasdktest.MockBundle("/bundles/discovery", map[string]string{
"data.json": fmt.Sprintf(`{
"discovery": {
"bundles": {
"test": {
"persist": false,
"resource": "bundles/test.tgz",
"service": "styra-bundles"
}
},
"services": {
"styra-bundles": {
"url": "%s"
}
}
}
}`, mockServer.URL),
}),
)

config := []byte(fmt.Sprintf(`{
"services": {
"discovery": {
"url": %q
}
},
"labels": {
"environment": "envValue"
},
"discovery": {
"name": "discovery",
"resource": "/bundles/discovery",
"service": "discovery"
}
}`, opaControlPlane.URL()))

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second))
assert.NoError(t, err)

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Nil(t, instance)
assert.Contains(t, err.Error(), expectedError)
}

func mockBundleServerWithStatus(path string, statusCode int) *httptest.Server {
handler := http.NewServeMux()
handler.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(statusCode), statusCode)
})
return httptest.NewServer(handler)
}
Loading

0 comments on commit f2b5ca6

Please sign in to comment.