Skip to content

Commit

Permalink
Add fswatch library to watch and batch filesystem events, use in allo…
Browse files Browse the repository at this point in the history
…cator (#2792)

This pull refactors the fsnotify code in allocator/main out to a
shared library, and in that shared library implements a batched
notification processor.

Closes #1816: This takes a slightly different approach than specified
in the issue, instead choosing to just delay processing until after a
batch processing period. I chose 1s - it's far longer than necessary,
but still much shorter than it takes for the secret changes to
propagate to the container anyways.

I considered the approach in #1816 of trying to parse the actual
events, but it's too fiddly to get exactly right: e.g. maybe you only
refresh on "write", but then "chmod" could make the file readable
whereas it wasn't before, "rename" could expose a file that wasn't
there before, etc.
  • Loading branch information
zmerlynn authored Nov 9, 2022
1 parent dbb00ea commit fc9727d
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 59 deletions.
86 changes: 27 additions & 59 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"agones.dev/agones/pkg/client/informers/externalversions"
"agones.dev/agones/pkg/gameserverallocations"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/fswatch"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
gw_runtime "github.com/grpc-ecosystem/grpc-gateway/runtime"
Expand All @@ -49,7 +50,6 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"gopkg.in/fsnotify.v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -224,70 +224,38 @@ func main() {
h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime)

if !h.tlsDisabled {
watcherTLS, err := fsnotify.NewWatcher()
cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() {
tlsCert, err := readTLSCert()
if err != nil {
logger.WithError(err).Error("could not load TLS certs; keeping old one")
return
}
h.tlsMutex.Lock()
defer h.tlsMutex.Unlock()
h.tlsCert = tlsCert
logger.Info("TLS certs updated")
})
if err != nil {
logger.WithError(err).Fatal("could not create watcher for tls certs")
}
defer watcherTLS.Close() // nolint: errcheck
if err := watcherTLS.Add(tlsDir); err != nil {
logger.WithError(err).Fatalf("cannot watch folder %s for secret changes", tlsDir)
logger.WithError(err).Fatal("could not create watcher for TLS certs")
}

// Watching for the events in certificate directory for updating certificates, when there is a change
go func() {
for {
select {
// watch for events
case event := <-watcherTLS.Events:
tlsCert, err := readTLSCert()
if err != nil {
logger.WithError(err).Error("could not load TLS cert; keeping old one")
} else {
h.tlsMutex.Lock()
h.tlsCert = tlsCert
h.tlsMutex.Unlock()
}
logger.Infof("Tls directory change event %v", event)

// watch for errors
case err := <-watcherTLS.Errors:
logger.WithError(err).Error("error watching for TLS directory")
}
}
}()
defer cancelTLS()

if !h.mTLSDisabled {
// creates a new file watcher for client certificate folder
watcher, err := fsnotify.NewWatcher()
cancelCert, err := fswatch.Watch(logger, certDir, time.Second, func() {
h.certMutex.Lock()
defer h.certMutex.Unlock()
caCertPool, err := getCACertPool(certDir)
if err != nil {
logger.WithError(err).Error("could not load CA certs; keeping old ones")
return
}
h.caCertPool = caCertPool
logger.Info("CA certs updated")
})
if err != nil {
logger.WithError(err).Fatal("could not create watcher for client certs")
}
defer watcher.Close() // nolint: errcheck
if err := watcher.Add(certDir); err != nil {
logger.WithError(err).Fatalf("cannot watch folder %s for secret changes", certDir)
logger.WithError(err).Fatal("could not create watcher for CA certs")
}

go func() {
for {
select {
// watch for events
case event := <-watcher.Events:
h.certMutex.Lock()
caCertPool, err := getCACertPool(certDir)
if err != nil {
logger.WithError(err).Error("could not load CA certs; keeping old ones")
} else {
h.caCertPool = caCertPool
}
logger.Infof("Certificate directory change event %v", event)
h.certMutex.Unlock()

// watch for errors
case err := <-watcher.Errors:
logger.WithError(err).Error("error watching for certificate directory")
}
}
}()
defer cancelCert()
}
}

Expand Down
81 changes: 81 additions & 0 deletions pkg/util/fswatch/fswatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2022 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package fswatch provies Watch(), a utility function to watch a filesystem path.
package fswatch

import (
"time"

"github.com/sirupsen/logrus"
"gopkg.in/fsnotify.v1"
)

// Watch watches the filesystem path `path`. When anything changes, changes are
// batched for the period `batchFor`, then `processEvent` is called.
//
// Returns a cancel() function to terminate the watch.
func Watch(logger *logrus.Entry, path string, batchFor time.Duration, processEvent func()) (func(), error) {
logger = logger.WithField("path", path)
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
cancelChan := make(chan struct{})
cancel := func() {
close(cancelChan)
_ = watcher.Close()
}
if err := watcher.Add(path); err != nil {
cancel()
return nil, err
}

go batchWatch(batchFor, watcher.Events, watcher.Errors, cancelChan, processEvent, func(error) {
logger.WithError(err).Errorf("error watching path")
})
return cancel, nil
}

// batchWatch: watch for events; when an event occurs, keep draining events for duration `batchFor`, then call processEvent().
// Intended for batching of rapid-fire events where we want to process the batch once, like filesystem update notifications.
func batchWatch(batchFor time.Duration, events chan fsnotify.Event, errors chan error, cancelChan chan struct{}, processEvent func(), onError func(error)) {
// Pattern shamelessly stolen from https://blog.gopheracademy.com/advent-2013/day-24-channel-buffering-patterns/
timer := time.NewTimer(0)
var timerCh <-chan time.Time

for {
select {
// start a timer when an event occurs, otherwise ignore event
case <-events:
if timerCh == nil {
timer.Reset(batchFor)
timerCh = timer.C
}

// on timer, run the batch; nil channels are silently ignored
case <-timerCh:
processEvent()
timerCh = nil

// handle errors
case err := <-errors:
onError(err)

// on cancel, abort
case <-cancelChan:
return
}
}
}
66 changes: 66 additions & 0 deletions pkg/util/fswatch/fswatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2022 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fswatch

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"gopkg.in/fsnotify.v1"
)

func TestBatchWatch(t *testing.T) {
eventChan := make(chan fsnotify.Event)
errorChan := make(chan error)
cancelChan := make(chan struct{})
defer close(cancelChan)

eventOut := make(chan struct{}, 1) // only allow one event
errorCount := 0

go batchWatch(time.Second, eventChan, errorChan, cancelChan, func() {
select {
case eventOut <- struct{}{}:
// capacity
default:
assert.FailNow(t, "second event written - did not want")
}
}, func(error) {
errorCount++
})

drainEventAndErrors := func(wantErrors int) {
timeout := time.NewTimer(2 * time.Second)
select {
case <-eventOut:
case <-timeout.C:
assert.FailNow(t, "no event in 2s")
}
assert.Equal(t, wantErrors, errorCount)
}

for i := 0; i < 10; i++ {
eventChan <- fsnotify.Event{}
}
drainEventAndErrors(0)

for i := 0; i < 10; i++ {
errorChan <- errors.New("some error")
eventChan <- fsnotify.Event{}
}
drainEventAndErrors(10)
}

0 comments on commit fc9727d

Please sign in to comment.