Skip to content

Commit

Permalink
refactor template cache
Browse files Browse the repository at this point in the history
Co-authored-by: Shiming Zhang <wzshiming@foxmail.com>
  • Loading branch information
carlory and wzshiming committed Nov 26, 2022
1 parent d2e609c commit 67c6917
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 69 deletions.
12 changes: 6 additions & 6 deletions pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type NodeController struct {
nodesSets *stringSets
nodeHeartbeatTemplate string
nodeStatusTemplate string
funcMap template.FuncMap
renderer *renderer
logger logger.Logger
nodeHeartbeatInterval time.Duration
nodeHeartbeatParallelism int
Expand Down Expand Up @@ -111,15 +111,15 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) {
lockNodeParallelism: conf.LockNodeParallelism,
nodeChan: make(chan string),
}
n.funcMap = template.FuncMap{
funcMap = template.FuncMap{
"NodeIP": func() string {
return n.nodeIP
},
}
for k, v := range conf.FuncMap {
n.funcMap[k] = v
funcMap[k] = v
}

n.renderer = newRenderer(funcMap)
return n, nil
}

Expand Down Expand Up @@ -343,7 +343,7 @@ func (c *NodeController) LockNode(ctx context.Context, nodeName string) error {
}

func (c *NodeController) configureNode(node *corev1.Node) ([]byte, error) {
patch, err := toTemplateJson(c.nodeStatusTemplate, node, c.funcMap)
patch, err := c.renderer.renderToJson(c.nodeStatusTemplate, node)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -380,7 +380,7 @@ func (c *NodeController) configureNode(node *corev1.Node) ([]byte, error) {
}

func (c *NodeController) configureHeartbeatNode(node *corev1.Node) ([]byte, error) {
patch, err := toTemplateJson(c.nodeHeartbeatTemplate, node, c.funcMap)
patch, err := c.renderer.renderToJson(c.nodeHeartbeatTemplate, node)
if err != nil {
return nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/kwok/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type PodController struct {
ipPool *ipPool
podStatusTemplate string
logger logger.Logger
funcMap template.FuncMap
renderer *renderer
lockPodChan chan *corev1.Pod
lockPodParallelism int
deletePodChan chan *corev1.Pod
Expand Down Expand Up @@ -116,7 +116,7 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
deletePodChan: make(chan *corev1.Pod),
deletePodParallelism: conf.DeletePodParallelism,
}
n.funcMap = template.FuncMap{
funcMap = template.FuncMap{
"NodeIP": func() string {
return n.nodeIP
},
Expand All @@ -125,8 +125,9 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
},
}
for k, v := range conf.FuncMap {
n.funcMap[k] = v
funcMap[k] = v
}
n.renderer = newRenderer(funcMap)
return n, nil
}

Expand Down Expand Up @@ -350,7 +351,7 @@ func (c *PodController) configurePod(pod *corev1.Pod) ([]byte, error) {
c.ipPool.Use(pod.Status.PodIP)
}

patch, err := configurePod(pod, c.podStatusTemplate, c.funcMap)
patch, err := c.computePatchData(pod, c.podStatusTemplate)
if err != nil {
return nil, err
}
Expand All @@ -363,8 +364,8 @@ func (c *PodController) configurePod(pod *corev1.Pod) ([]byte, error) {
})
}

func configurePod(pod *corev1.Pod, temp string, funcMap template.FuncMap) ([]byte, error) {
patch, err := toTemplateJson(temp, pod, funcMap)
func (c *PodController) computePatchData(pod *corev1.Pod, temp string) ([]byte, error) {
patch, err := c.renderer.renderToJson(temp, pod)
if err != nil {
return nil, err
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/kwok/controllers/renderer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"bytes"
"encoding/json"
"fmt"
"strings"
"sync"
"text/template"

"sigs.k8s.io/yaml"
)

type renderer struct {
cache sync.Map
bufferPool sync.Pool
funcMap template.FuncMap
}

func newRenderer(funcMap template.FuncMap) *renderer {
return &renderer{
funcMap: funcMap,
cache: sync.Map{},
bufferPool: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 4*1024))
},
},
}
}

// renderToJson renders the template with the given text and original object.
func (r *renderer) renderToJson(text string, original interface{}) ([]byte, error) {
text = strings.TrimSpace(text)
v, ok := r.cache.Load(text)
if !ok {
temp, err := template.New("_").Funcs(r.funcMap).Parse(text)
if err != nil {
return nil, err
}
r.cache.Store(text, temp)
v = temp
}
temp := v.(*template.Template)
buf := r.bufferPool.Get().(*bytes.Buffer)
defer r.bufferPool.Put(buf)

buf.Reset()
err := json.NewEncoder(buf).Encode(original)
if err != nil {
return nil, err
}

var data interface{}
decoder := json.NewDecoder(buf)
decoder.UseNumber()
err = decoder.Decode(&data)
if err != nil {
return nil, err
}

buf.Reset()
err = temp.Execute(buf, data)
if err != nil {
return nil, err
}

out, err := yaml.YAMLToJSON(buf.Bytes())
if err != nil {
return nil, fmt.Errorf("%w: %s", err, buf.String())
}
return out, nil
}
82 changes: 82 additions & 0 deletions pkg/kwok/controllers/renderer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"html/template"
"testing"
)

func TestRenderToJson(t *testing.T) {

testCases := []struct {
name string
funcMap template.FuncMap
templText string
original interface{}
expected string
}{
{
name: "basic",
funcMap: template.FuncMap{},
original: map[string]interface{}{"k": "v1"},
templText: `{"k":{{ .k }}}`,
expected: `{"k":"v1"}`,
},
{
name: "basic with yaml format",
funcMap: template.FuncMap{},
original: map[string]interface{}{"k": "v1"},
templText: `k: {{ .k }}`,
expected: `{"k":"v1"}`,
},
{
name: "with funcMap",
funcMap: template.FuncMap{
"Foo": func() string {
return "foo"
},
},
original: map[string]interface{}{"k": "v1"},
templText: `{"foo":{{ Foo }},"k":{{ .k }}}`,
expected: `{"foo":"foo","k":"v1"}`,
},
{
name: "with whitespace",
funcMap: template.FuncMap{
"Foo": func() string {
return "foo"
},
},
original: map[string]interface{}{"k": "v1"},
templText: ` {"foo":{{ Foo }},"k":{{ .k }}} `,
expected: `{"foo":"foo","k":"v1"}`,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
r := newRenderer(tc.funcMap)
actual, err := r.renderToJson(tc.templText, tc.original)
if err != nil {
t.Fatal(err)
}
if string(actual) != tc.expected {
t.Fatalf("expected %s, got %s", tc.expected, actual)
}
})
}
}
57 changes: 0 additions & 57 deletions pkg/kwok/controllers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@ limitations under the License.
package controllers

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"text/template"
"time"

"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/yaml"
)

func parseCIDR(s string) (*net.IPNet, error) {
Expand Down Expand Up @@ -122,57 +116,6 @@ func (i *ipPool) Use(ip string) {
i.used[ip] = struct{}{}
}

func toTemplateJson(text string, original interface{}, funcMap template.FuncMap) ([]byte, error) {
text = strings.TrimSpace(text)
v, ok := templateCache.Load(text)
if !ok {
temp, err := template.New("_").Funcs(funcMap).Parse(text)
if err != nil {
return nil, err
}
templateCache.Store(text, temp)
v = temp
}
temp := v.(*template.Template)
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)

buf.Reset()
err := json.NewEncoder(buf).Encode(original)
if err != nil {
return nil, err
}

var data interface{}
decoder := json.NewDecoder(buf)
decoder.UseNumber()
err = decoder.Decode(&data)
if err != nil {
return nil, err
}

buf.Reset()
err = temp.Execute(buf, data)
if err != nil {
return nil, err
}

out, err := yaml.YAMLToJSON(buf.Bytes())
if err != nil {
return nil, fmt.Errorf("%w: %s", err, buf.String())
}
return out, nil
}

var (
templateCache = sync.Map{}
bufferPool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
)

type parallelTasks struct {
wg sync.WaitGroup
bucket chan struct{}
Expand Down

0 comments on commit 67c6917

Please sign in to comment.