Skip to content

Commit

Permalink
plugins, runtime: Add visibility for server init
Browse files Browse the repository at this point in the history
- Plugins can now access a channel which receives a message when the OPA
  server is fully initialized and ready to receive traffic.
- Added ServerInitialized() and ServerInitializedChannel() to plugin
  manager.
- Runtime now calls ServerInitialized() when server listeners are
  initialized.

Fixes open-policy-agent#3701

Signed-off-by: Grant Shively <gshively@godaddy.com>
Signed-off-by: Dolev Farhi <farhi.dolev@gmail.com>
  • Loading branch information
gshively11 authored and dolevf committed Nov 4, 2021
1 parent cb6b3f9 commit 69307f4
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
19 changes: 19 additions & 0 deletions plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type Manager struct {
registeredCacheTriggers []func(*cache.Config)
logger logging.Logger
consoleLogger logging.Logger
serverInitialized chan struct{}
serverInitializedOnce sync.Once
}

type managerContextKey string
Expand Down Expand Up @@ -286,6 +288,7 @@ func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*M
pluginStatusListeners: map[string]StatusListener{},
maxErrors: -1,
interQueryBuiltinCacheConfig: interQueryBuiltinCacheConfig,
serverInitialized: make(chan struct{}),
}

if m.logger == nil {
Expand Down Expand Up @@ -759,6 +762,22 @@ func (m *Manager) ConsoleLogger() logging.Logger {
return m.consoleLogger
}

// ServerInitialized signals a channel indicating that the OPA
// server has finished initialization.
func (m *Manager) ServerInitialized() {
m.serverInitializedOnce.Do(func() { close(m.serverInitialized) })
}

// ServerInitializedChannel returns a receive-only channel that
// is closed when the OPA server has finished initialization.
// Be aware that the socket of the server listener may not be
// open by the time this channel is closed. There is a very
// small window where the socket may still be closed, due to
// a race condition.
func (m *Manager) ServerInitializedChannel() <-chan struct{} {
return m.serverInitialized
}

// RegisterCacheTrigger accepts a func that receives new inter-query cache config generated by
// a reconfigure of the plugin manager, so that it can be propagated to existing inter-query caches.
func (m *Manager) RegisterCacheTrigger(trigger func(*cache.Config)) {
Expand Down
33 changes: 33 additions & 0 deletions plugins/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,39 @@ func TestPluginManagerConsoleLogger(t *testing.T) {
}
}

func TestPluginManagerServerInitialized(t *testing.T) {
// Verify that ServerInitializedChannel is closed when
// ServerInitialized is called.
m1, err := New([]byte{}, "test1", inmem.New())
if err != nil {
t.Fatal(err)
}
initChannel1 := m1.ServerInitializedChannel()
m1.ServerInitialized()
// Verify that ServerInitialized is idempotent and will not panic
m1.ServerInitialized()
select {
case <-initChannel1:
break
default:
t.Fatal("expected ServerInitializedChannel to be closed")
}

// Verify that ServerInitializedChannel is open when
// ServerInitialized is not called.
m2, err := New([]byte{}, "test2", inmem.New())
if err != nil {
t.Fatal(err)
}
initChannel2 := m2.ServerInitializedChannel()
select {
case <-initChannel2:
t.Fatal("expected ServerInitializedChannel to be open and have no messages")
default:
break
}
}

type myAuthPluginMock struct{}

func (m *myAuthPluginMock) NewClient(c rest.Config) (*http.Client, error) {
Expand Down
4 changes: 4 additions & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,13 @@ func (rt *Runtime) Serve(ctx context.Context) error {
signalc := make(chan os.Signal, 1)
signal.Notify(signalc, syscall.SIGINT, syscall.SIGTERM)

// Note that there is a small chance the socket of the server listener is still
// closed by the time this block is executed, due to the serverLoop above
// executing in a goroutine.
rt.serverInitMtx.Lock()
rt.serverInitialized = true
rt.serverInitMtx.Unlock()
rt.Manager.ServerInitialized()

logrus.Debug("Server initialized.")

Expand Down
30 changes: 30 additions & 0 deletions runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,36 @@ func TestCheckAuthIneffective(t *testing.T) {

}

func TestServerInitialized(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond)
defer cancel() // NOTE(sr): The timeout will have been reached by the time `done` is closed.
var output bytes.Buffer

params := NewParams()
params.Output = &output
params.Addrs = &[]string{":0"}
params.GracefulShutdownPeriod = 1
rt, err := NewRuntime(ctx, params)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
logrus.SetOutput(rt.Params.Output)

initChannel := rt.Manager.ServerInitializedChannel()
done := make(chan struct{})
go func() {
rt.StartServer(ctx)
close(done)
}()
<-done
select {
case <-initChannel:
return
default:
t.Fatal("expected ServerInitializedChannel to be closed")
}
}

func getTestServer(update interface{}, statusCode int) (baseURL string, teardownFn func()) {
mux := http.NewServeMux()
ts := httptest.NewServer(mux)
Expand Down

0 comments on commit 69307f4

Please sign in to comment.