Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add constructor for pre-existing DynamicWatcher #111

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 52 additions & 20 deletions pkg/templates/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ type TemplateResolver struct {
// If caching is disabled, this will act as a temporary cache for objects during the execution of the
// ResolveTemplate call.
tempCallCache client.ObjectCache
// When a pre-existing DynamicWatcher is used, let the caller fully manage the QueryBatch.
skipBatchManagement bool
}

type CacheCleanUpFunc func() error
Expand Down Expand Up @@ -269,6 +271,34 @@ func NewResolverWithCaching(
return resolver, channel, err
}

// NewResolverWithDynamicWatcher creates a new caching TemplateResolver instance, using the provided dependency-watcher.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please document that batch management is disabled?

// The caller is responsible for managing the given DynamicWatcher, including starting and stopping it. The caller must
// start a query batch on the DynamicWatcher for the "watcher" object before calling ResolveTemplate.
//
// - dynWatcher is an already running DynamicWatcher from kubernetes-dependency-watches.
//
// - config is the Config instance for configuring optional values for template processing.
func NewResolverWithDynamicWatcher(dynWatcher client.DynamicWatcher, config Config) (*TemplateResolver, error) {
if (config.StartDelim != "" && config.StopDelim == "") || (config.StartDelim == "" && config.StopDelim != "") {
return nil, fmt.Errorf("the configurations StartDelim and StopDelim cannot be set independently")
}

// It's only required to check config.StartDelim since it's invalid to set these independently
if config.StartDelim == "" {
config.StartDelim = defaultStartDelim
config.StopDelim = defaultStopDelim
}

return &TemplateResolver{
config: config,
dynamicClient: nil,
kubeConfig: nil,
dynamicWatcher: dynWatcher,
tempCallCache: nil,
skipBatchManagement: true,
}, nil
}

// HasTemplate performs a simple check for the template start delimiter or the "$ocm_encrypted" prefix
// (checkForEncrypted must be set to true) to indicate if the input byte slice has a template. If the startDelim
// argument is an empty string, the default start delimiter of "{{" will be used.
Expand Down Expand Up @@ -557,30 +587,32 @@ func (t *TemplateResolver) ResolveTemplate(
if t.dynamicWatcher != nil {
watcher := *options.Watcher

err := t.dynamicWatcher.StartQueryBatch(watcher)
if err != nil {
if !errors.Is(err, client.ErrQueryBatchInProgress) {
return resolvedResult, err
}
if !t.skipBatchManagement {
err := t.dynamicWatcher.StartQueryBatch(watcher)
if err != nil {
if !errors.Is(err, client.ErrQueryBatchInProgress) {
return resolvedResult, err
}

if !options.DisableAutoCacheCleanUp {
return resolvedResult, fmt.Errorf(
"ResolveTemplate cannot be called with the same watchedObject in parallel: %w", err,
)
if !options.DisableAutoCacheCleanUp {
return resolvedResult, fmt.Errorf(
"ResolveTemplate cannot be called with the same watchedObject in parallel: %w", err,
)
}
}
}

if options.DisableAutoCacheCleanUp {
resolvedResult.CacheCleanUp = func() error {
return t.dynamicWatcher.EndQueryBatch(*options.Watcher)
}
} else {
defer func() {
err := t.dynamicWatcher.EndQueryBatch(watcher)
if err != nil && !errors.Is(err, client.ErrQueryBatchNotStarted) {
klog.Errorf("failed to end the query batch for %s: %v", watcher, err)
if options.DisableAutoCacheCleanUp {
resolvedResult.CacheCleanUp = func() error {
return t.dynamicWatcher.EndQueryBatch(*options.Watcher)
}
}()
} else {
defer func() {
err := t.dynamicWatcher.EndQueryBatch(watcher)
if err != nil && !errors.Is(err, client.ErrQueryBatchNotStarted) {
klog.Errorf("failed to end the query batch for %s: %v", watcher, err)
}
}()
}
}

for i, contextTransformer := range options.ContextTransformers {
Expand Down
108 changes: 108 additions & 0 deletions pkg/templates/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stolostron/kubernetes-dependency-watches/client"
yaml "gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func TestNewResolver(t *testing.T) {
Expand Down Expand Up @@ -55,6 +56,28 @@ func TestNewResolverWithCaching(t *testing.T) {
}
}

func TestNewResolverWithDynamicWatcher(t *testing.T) {
t.Parallel()

dynWatcher, err := client.New(k8sConfig, fakeReconciler{}, &client.Options{EnableCache: true})
if err != nil {
t.Fatalf("No error was expected: %v", err)
}

resolver, err := NewResolverWithDynamicWatcher(dynWatcher, Config{})
if err != nil {
t.Fatalf("No error was expected: %v", err)
}

if resolver.config.StartDelim != "{{" || resolver.config.StopDelim != "}}" {
t.Fatalf(
"Expected delimiters: {{ and }} got: %s and %s",
resolver.config.StartDelim,
resolver.config.StopDelim,
)
}
}

func TestNewResolverFailures(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -473,6 +496,91 @@ func TestResolveTemplateWithCachingListQuery(t *testing.T) {
}
}

type fakeReconciler struct{}

func (r fakeReconciler) Reconcile(_ context.Context, _ client.ObjectIdentifier) (reconcile.Result, error) {
return reconcile.Result{}, nil
}

func TestResolveTemplateWithPreexistingWatcher(t *testing.T) {
t.Parallel()

ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

fr := fakeReconciler{}

dynWatcher, err := client.New(k8sConfig, fr, &client.Options{EnableCache: true})
if err != nil {
t.Fatalf("No error was expected: %v", err)
}

resolver, err := NewResolverWithDynamicWatcher(dynWatcher, Config{})
if err != nil {
t.Fatalf("No error was expected: %v", err)
}

tmplStr := `data1: '{{ (lookup "v1" "ConfigMap" "testns" "testcm-enva").data ` +
`| mustToRawJson | toLiteral }}'`

tmplStrBytes, err := yamlToJSON([]byte(tmplStr))
if err != nil {
t.Fatalf("No error was expected: %v", err)
}

watcher := client.ObjectIdentifier{
Version: "v1",
Kind: "ConfigMap",
Namespace: "testns",
Name: "watcher",
}

_, err = resolver.ResolveTemplate(tmplStrBytes, nil, &ResolveOptions{Watcher: &watcher})
if err == nil || !strings.Contains(err.Error(), "DynamicWatcher must be started") {
t.Fatalf("Expected error requiring the DynamicWatcher to be started, got: %v", err)
}

go func() {
_ = dynWatcher.Start(ctx)
}()

<-dynWatcher.Started()

err = dynWatcher.StartQueryBatch(watcher)
if err != nil {
t.Fatalf("No error was expected: %v", err)
}

defer func() {
err := dynWatcher.EndQueryBatch(watcher)
if err != nil {
t.Fatalf("No error was expected: %v", err)
}
}()

// Before resolving the template, do a get on that object through the DynamicWatcher
// Then the test will validate that there is only one watch.
configMapGVK := schema.GroupVersionKind{Version: "v1", Kind: "ConfigMap"}

_, err = dynWatcher.Get(watcher, configMapGVK, "testns", "testcm-enva")
if err != nil {
t.Fatalf("No error was expected: %v", err)
}

result, err := resolver.ResolveTemplate(tmplStrBytes, nil, &ResolveOptions{Watcher: &watcher})
if err != nil {
t.Fatalf("No error was expected: %v", err)
}

if string(result.ResolvedJSON) != `{"data1":{"cmkey1":"cmkey1Val"}}` {
t.Fatalf("Unexpected template: %s", string(result.ResolvedJSON))
}

if resolver.GetWatchCount() != 1 {
t.Fatalf("Expected a watch count of 1 but got: %d", resolver.GetWatchCount())
}
}

func TestResolveTemplateDefaultConfig(t *testing.T) {
t.Parallel()

Expand Down