Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugins, runtime: Add visibility for server init #3706

Merged
merged 1 commit into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type Manager struct {
registeredCacheTriggers []func(*cache.Config)
logger logging.Logger
consoleLogger logging.Logger
serverInitialized chan struct{}
serverInitializedOnce sync.Once
}

type managerContextKey string
Expand Down Expand Up @@ -286,6 +288,7 @@ func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*M
pluginStatusListeners: map[string]StatusListener{},
maxErrors: -1,
interQueryBuiltinCacheConfig: interQueryBuiltinCacheConfig,
serverInitialized: make(chan struct{}),
}

if m.logger == nil {
Expand Down Expand Up @@ -759,6 +762,22 @@ func (m *Manager) ConsoleLogger() logging.Logger {
return m.consoleLogger
}

// ServerInitialized signals a channel indicating that the OPA
// server has finished initialization.
func (m *Manager) ServerInitialized() {
m.serverInitializedOnce.Do(func() { close(m.serverInitialized) })
}

// ServerInitializedChannel returns a receive-only channel that
// is closed when the OPA server has finished initialization.
// Be aware that the socket of the server listener may not be
// open by the time this channel is closed. There is a very
// small window where the socket may still be closed, due to
// a race condition.
func (m *Manager) ServerInitializedChannel() <-chan struct{} {
return m.serverInitialized
}

// RegisterCacheTrigger accepts a func that receives new inter-query cache config generated by
// a reconfigure of the plugin manager, so that it can be propagated to existing inter-query caches.
func (m *Manager) RegisterCacheTrigger(trigger func(*cache.Config)) {
Expand Down
33 changes: 33 additions & 0 deletions plugins/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,39 @@ func TestPluginManagerConsoleLogger(t *testing.T) {
}
}

func TestPluginManagerServerInitialized(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It would be great if you could add a comment as to what you're trying to test in the 2 test cases below. It's not clear to me at least.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I somehow missed you weren't calling ServerInitialized() in the second case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I definitely was calling it lol, and it was passing because I was erroneously using return above it instead of break. So good catch 😁

// Verify that ServerInitializedChannel is closed when
// ServerInitialized is called.
m1, err := New([]byte{}, "test1", inmem.New())
if err != nil {
t.Fatal(err)
}
initChannel1 := m1.ServerInitializedChannel()
m1.ServerInitialized()
// Verify that ServerInitialized is idempotent and will not panic
m1.ServerInitialized()
select {
case <-initChannel1:
break
default:
t.Fatal("expected ServerInitializedChannel to be closed")
}

// Verify that ServerInitializedChannel is open when
// ServerInitialized is not called.
m2, err := New([]byte{}, "test2", inmem.New())
if err != nil {
t.Fatal(err)
}
initChannel2 := m2.ServerInitializedChannel()
select {
case <-initChannel2:
t.Fatal("expected ServerInitializedChannel to be open and have no messages")
default:
break
}
}

type myAuthPluginMock struct{}

func (m *myAuthPluginMock) NewClient(c rest.Config) (*http.Client, error) {
Expand Down
4 changes: 4 additions & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,13 @@ func (rt *Runtime) Serve(ctx context.Context) error {
signalc := make(chan os.Signal, 1)
signal.Notify(signalc, syscall.SIGINT, syscall.SIGTERM)

// Note that there is a small chance the socket of the server listener is still
// closed by the time this block is executed, due to the serverLoop above
// executing in a goroutine.
rt.serverInitMtx.Lock()
rt.serverInitialized = true
rt.serverInitMtx.Unlock()
gshively11 marked this conversation as resolved.
Show resolved Hide resolved
rt.Manager.ServerInitialized()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a race condition here because there's no guarantee that the socket will have been opened.

The serverLoop() call on L430 (above) eventually calls down into net.Listen here and at that's what actually calls socket() and listen() at the OS level (so requests sent in the meantime will get network error.) That said, the window is extremely small; so perhaps that's fine. The solution would be to have this code wait until the server was actually listening for connections but that would require some more invasive changes to the listener interface (e.g., the serverLoop() function could return a channel or something to do indicate the listener was opened...)

@gshively11 if you agree w/ the above and are okay with the potential race condition, just include a comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. I think we're probably OK with this race condition, I'll add the comment. If it does end up being an issue, a modification as you described shouldn't be a breaking change, so it should be easy to implement if we need it.


logrus.Debug("Server initialized.")

Expand Down
30 changes: 30 additions & 0 deletions runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,36 @@ func TestCheckAuthIneffective(t *testing.T) {

}

func TestServerInitialized(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond)
defer cancel() // NOTE(sr): The timeout will have been reached by the time `done` is closed.
var output bytes.Buffer

params := NewParams()
params.Output = &output
params.Addrs = &[]string{":0"}
params.GracefulShutdownPeriod = 1
rt, err := NewRuntime(ctx, params)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
logrus.SetOutput(rt.Params.Output)

initChannel := rt.Manager.ServerInitializedChannel()
done := make(chan struct{})
go func() {
rt.StartServer(ctx)
close(done)
}()
<-done
select {
case <-initChannel:
return
default:
t.Fatal("expected ServerInitializedChannel to be closed")
}
}

func getTestServer(update interface{}, statusCode int) (baseURL string, teardownFn func()) {
mux := http.NewServeMux()
ts := httptest.NewServer(mux)
Expand Down