Skip to content

Commit

Permalink
feat: updates encryption config file watch logic to polling
Browse files Browse the repository at this point in the history
Signed-off-by: Nilekh Chaudhari <1626598+nilekhc@users.noreply.github.com>

fix (#2)

Signed-off-by: Monis Khan <mok@microsoft.com>
  • Loading branch information
nilekhc committed Oct 30, 2023
1 parent 0957869 commit e95b7c6
Show file tree
Hide file tree
Showing 6 changed files with 523 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,24 @@ func (h *kmsv2PluginProbe) isKMSv2ProviderHealthyAndMaybeRotateDEK(ctx context.C

// loadConfig parses the encryption configuration file at filepath and returns the parsed config and hash of the file.
func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfiguration, string, error) {
data, contentHash, err := loadDataAndHash(filepath)
if err != nil {
return nil, "", fmt.Errorf("error while loading file: %w", err)
}

configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
if err != nil {
return nil, "", fmt.Errorf("error decoding encryption provider configuration file %q: %w", filepath, err)
}
config, ok := configObj.(*apiserverconfig.EncryptionConfiguration)
if !ok {
return nil, "", fmt.Errorf("got unexpected config type: %v", gvk)
}

return config, contentHash, validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
}

func loadDataAndHash(filepath string) ([]byte, string, error) {
f, err := os.Open(filepath)
if err != nil {
return nil, "", fmt.Errorf("error opening encryption provider configuration file %q: %w", filepath, err)
Expand All @@ -518,16 +536,14 @@ func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfig
return nil, "", fmt.Errorf("encryption provider configuration file %q is empty", filepath)
}

configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
if err != nil {
return nil, "", fmt.Errorf("error decoding encryption provider configuration file %q: %w", filepath, err)
}
config, ok := configObj.(*apiserverconfig.EncryptionConfiguration)
if !ok {
return nil, "", fmt.Errorf("got unexpected config type: %v", gvk)
}
return data, computeEncryptionConfigHash(data), nil
}

return config, computeEncryptionConfigHash(data), validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
// GetEncryptionConfigHash reads the encryption configuration file at filepath and returns the hash of the file.
// It does not attempt to decode or load the config, and serves as a cheap check to determine if the file has changed.
func GetEncryptionConfigHash(filepath string) (string, error) {
_, contentHash, err := loadDataAndHash(filepath)
return contentHash, err
}

// prefixTransformersAndProbes creates the set of transformers and KMS probes based on the given resource config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2177,3 +2177,48 @@ func logLines(logs string) []string {
}
return lines
}

func TestGetEncryptionConfigHash(t *testing.T) {
t.Parallel()

tests := []struct {
name string
filepath string
wantHash string
wantErr string
}{
{
name: "empty config file content",
filepath: "testdata/invalid-configs/kms/invalid-content.yaml",
wantHash: "",
wantErr: `encryption provider configuration file "testdata/invalid-configs/kms/invalid-content.yaml" is empty`,
},
{
name: "missing file",
filepath: "testdata/invalid-configs/kms/file-that-does-not-exist.yaml",
wantHash: "",
wantErr: `error opening encryption provider configuration file "testdata/invalid-configs/kms/file-that-does-not-exist.yaml": open testdata/invalid-configs/kms/file-that-does-not-exist.yaml: no such file or directory`,
},
{
name: "valid file",
filepath: "testdata/valid-configs/secret-box-first.yaml",
wantHash: "c638c0327dbc3276dd1fcf3e67895d19ebca16b91ae0d19af24ef0759b8e0f66",
wantErr: ``,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

got, err := GetEncryptionConfigHash(tt.filepath)
if errString(err) != tt.wantErr {
t.Errorf("GetEncryptionConfigHash() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.wantHash {
t.Errorf("GetEncryptionConfigHash() got = %v, want %v", got, tt.wantHash)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"context"
"fmt"
"net/http"
"sync"
"time"

"github.com/fsnotify/fsnotify"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
Expand All @@ -35,6 +35,9 @@ import (
// workqueueKey is the dummy key used to process change in encryption config file.
const workqueueKey = "key"

// EncryptionConfigFileChangePollDuration is exposed so that integration tests can crank up the reload speed.
var EncryptionConfigFileChangePollDuration = time.Minute

// DynamicKMSEncryptionConfigContent which can dynamically handle changes in encryption config file.
type DynamicKMSEncryptionConfigContent struct {
name string
Expand All @@ -53,6 +56,10 @@ type DynamicKMSEncryptionConfigContent struct {

// identity of the api server
apiServerID string

// can be swapped during testing
getEncryptionConfigHash func(ctx context.Context, filepath string) (string, error)
loadEncryptionConfig func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error)
}

func init() {
Expand All @@ -73,77 +80,57 @@ func NewDynamicEncryptionConfiguration(
dynamicTransformers: dynamicTransformers,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
apiServerID: apiServerID,
getEncryptionConfigHash: func(_ context.Context, filepath string) (string, error) {
return encryptionconfig.GetEncryptionConfigHash(filepath)
},
loadEncryptionConfig: encryptionconfig.LoadEncryptionConfig,
}
encryptionConfig.queue.Add(workqueueKey) // to avoid missing any file changes that occur in between the initial load and Run

return encryptionConfig
}

// Run starts the controller and blocks until stopCh is closed.
// Run starts the controller and blocks until ctx is canceled.
func (d *DynamicKMSEncryptionConfigContent) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer d.queue.ShutDown()

klog.InfoS("Starting controller", "name", d.name)
defer klog.InfoS("Shutting down controller", "name", d.name)

// start worker for processing content
go wait.UntilWithContext(ctx, d.runWorker, time.Second)

// start the loop that watches the encryption config file until stopCh is closed.
go wait.UntilWithContext(ctx, func(ctx context.Context) {
if err := d.watchEncryptionConfigFile(ctx); err != nil {
// if there is an error while setting up or handling the watches, this will ensure that we will process the config file.
defer d.queue.Add(workqueueKey)
klog.ErrorS(err, "Failed to watch encryption config file, will retry later")
}
}, time.Second)

<-ctx.Done()
}
var wg sync.WaitGroup

func (d *DynamicKMSEncryptionConfigContent) watchEncryptionConfigFile(ctx context.Context) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("error creating fsnotify watcher: %w", err)
}
defer watcher.Close()

if err = watcher.Add(d.filePath); err != nil {
return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
}

for {
select {
case event := <-watcher.Events:
if err := d.handleWatchEvent(event, watcher); err != nil {
return err
}
case err := <-watcher.Errors:
return fmt.Errorf("received fsnotify error: %w", err)
case <-ctx.Done():
return nil
}
}
}

func (d *DynamicKMSEncryptionConfigContent) handleWatchEvent(event fsnotify.Event, watcher *fsnotify.Watcher) error {
// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
defer d.queue.Add(workqueueKey)
wg.Add(1)
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
defer d.queue.ShutDown()
<-ctx.Done()
}()

// return if file has not been removed or renamed.
if event.Op&(fsnotify.Remove|fsnotify.Rename) == 0 {
return nil
}
wg.Add(1)
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
d.runWorker(ctx)
}()

if err := watcher.Remove(d.filePath); err != nil {
klog.V(2).InfoS("Failed to remove file watch, it may have been deleted", "file", d.filePath, "err", err)
}
if err := watcher.Add(d.filePath); err != nil {
return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
}
// this function polls changes in the encryption config file by placing a dummy key in the queue.
// the 'runWorker' function then picks up this dummy key and processes the changes.
// the goroutine terminates when 'ctx' is canceled.
_ = wait.PollUntilContextCancel(
ctx,
EncryptionConfigFileChangePollDuration,
true,
func(ctx context.Context) (bool, error) {
// add dummy item to the queue to trigger file content processing.
d.queue.Add(workqueueKey)

// return false to continue polling.
return false, nil
},
)

return nil
wg.Wait()
}

// runWorker to process file content
Expand All @@ -161,6 +148,12 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex
}
defer d.queue.Done(key)

d.processWorkItem(serverCtx, key)

return true
}

func (d *DynamicKMSEncryptionConfigContent) processWorkItem(serverCtx context.Context, workqueueKey interface{}) {
var (
updatedEffectiveConfig bool
err error
Expand Down Expand Up @@ -188,25 +181,25 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex
metrics.RecordEncryptionConfigAutomaticReloadFailure(d.apiServerID)
utilruntime.HandleError(fmt.Errorf("error processing encryption config file %s: %v", d.filePath, err))
// add dummy item back to the queue to trigger file content processing.
d.queue.AddRateLimited(key)
d.queue.AddRateLimited(workqueueKey)
}
}()

encryptionConfiguration, configChanged, err = d.processEncryptionConfig(ctx)
if err != nil {
return true
return
}
if !configChanged {
return true
return
}

if len(encryptionConfiguration.HealthChecks) != 1 {
err = fmt.Errorf("unexpected number of healthz checks: %d. Should have only one", len(encryptionConfiguration.HealthChecks))
return true
return
}
// get healthz checks for all new KMS plugins.
if err = d.validateNewTransformersHealth(ctx, encryptionConfiguration.HealthChecks[0], encryptionConfiguration.KMSCloseGracePeriod); err != nil {
return true
return
}

// update transformers.
Expand All @@ -223,26 +216,37 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex
klog.V(2).InfoS("Loaded new kms encryption config content", "name", d.name)

updatedEffectiveConfig = true
return true
}

// loadEncryptionConfig processes the next set of content from the file.
func (d *DynamicKMSEncryptionConfigContent) processEncryptionConfig(ctx context.Context) (
encryptionConfiguration *encryptionconfig.EncryptionConfiguration,
_ *encryptionconfig.EncryptionConfiguration,
configChanged bool,
err error,
_ error,
) {
// this code path will only execute if reload=true. So passing true explicitly.
encryptionConfiguration, err = encryptionconfig.LoadEncryptionConfig(ctx, d.filePath, true, d.apiServerID)
contentHash, err := d.getEncryptionConfigHash(ctx, d.filePath)
if err != nil {
return nil, false, err
}

// check if encryptionConfig is different from the current. Do nothing if they are the same.
if contentHash == d.lastLoadedEncryptionConfigHash {
klog.V(4).InfoS("Encryption config has not changed (before load)", "name", d.name)
return nil, false, nil
}

// this code path will only execute if reload=true. So passing true explicitly.
encryptionConfiguration, err := d.loadEncryptionConfig(ctx, d.filePath, true, d.apiServerID)
if err != nil {
return nil, false, err
}

// check if encryptionConfig is different from the current (again to avoid TOCTOU). Do nothing if they are the same.
if encryptionConfiguration.EncryptionFileContentHash == d.lastLoadedEncryptionConfigHash {
klog.V(4).InfoS("Encryption config has not changed", "name", d.name)
klog.V(4).InfoS("Encryption config has not changed (after load)", "name", d.name)
return nil, false, nil
}

return encryptionConfiguration, true, nil
}

Expand Down
Loading

0 comments on commit e95b7c6

Please sign in to comment.