Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: memory leak and mutex usage to have isolation of concurrent pipeline #93

Merged
merged 3 commits into from
Jul 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions .github/workflows/k6.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: K6 🛠️
on:
pull_request:
types:
- ready_for_review
push:
branches:
- main
workflow_dispatch:
permissions:
contents: read
jobs:
k6-load-script:
name: "K6 Load test"
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
goVersion: [ '1.18' ]
steps:
- name: Checkout project
uses: actions/checkout@v3
- name: Setup go
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.goVersion }}
check-latest: true
- name: Install k6
run: |
curl https://github.com/loadimpact/k6/releases/download/v0.39.0/k6-v0.39.0-linux64.tar.gz -L | tar xvz --strip-components 1
- name: Start application and run K6
run: |
go run main.go serve --config tests/webhooks.tests.yml &
./k6 run tests/k6-load-script.js
8 changes: 5 additions & 3 deletions internal/server/v1alpha1/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func webhookService(s *Server, spec *config.WebhookSpec, r *http.Request) (err e
if r.Body == nil {
return errRequestBodyMissing
}
defer r.Body.Close()

data, err := io.ReadAll(r.Body)
if err != nil {
return err
Expand All @@ -110,9 +112,9 @@ func webhookService(s *Server, spec *config.WebhookSpec, r *http.Request) (err e
NewTemplateData(storage.Formatting.Template).
WithRequest(r).
WithPayload(data).
WithSpec(spec).
WithStorage(storage).
WithConfig().
WithData("Spec", spec).
WithData("Storage", storage).
WithData("Config", config.Current()).
Render()
if err != nil {
return err
Expand Down
30 changes: 30 additions & 0 deletions pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"text/template"

"github.com/rs/zerolog/log"
Expand All @@ -20,6 +21,7 @@ const ctxPipeline contextKey = "pipeline"
func newFactory(f IFactory) *Factory {
return &Factory{
ctx: context.Background(),
mu: sync.RWMutex{},
Name: f.Name(),
Fn: f.Func(),
Config: make(map[string]interface{}),
Expand All @@ -28,6 +30,28 @@ func newFactory(f IFactory) *Factory {
}
}

// DeepCopy creates a deep copy of the pipeline.
func (f *Factory) DeepCopy() *Factory {
deepCopy := &Factory{
ctx: f.ctx,
mu: sync.RWMutex{},
Name: f.Name,
Fn: f.Fn,
Config: make(map[string]interface{}),
Inputs: make([]*Var, len(f.Inputs)),
Outputs: make([]*Var, len(f.Outputs)),
}

copy(deepCopy.Inputs, f.Inputs)
copy(deepCopy.Outputs, f.Outputs)

for k, v := range f.Config {
deepCopy.Config[k] = v
}

return deepCopy
}

// GetVar returns the variable with the given name from the given slice.
// @param list the Var slice to search in
// @param name the name of the variable to search for
Expand Down Expand Up @@ -85,6 +109,9 @@ func (f *Factory) withPipelineInput(name string, value interface{}) {
// @param value the value of the input variable
// @return the factory
func (f *Factory) WithInput(name string, value interface{}) *Factory {
f.mu.Lock()
defer f.mu.Unlock()

f.Inputs, _ = f.with(f.Inputs, name, value)
return f
}
Expand All @@ -101,6 +128,9 @@ func (f *Factory) WithID(id string) *Factory {
// @param config the config of the factory
// @return the factory
func (f *Factory) WithConfig(config map[string]interface{}) *Factory {
f.mu.Lock()
defer f.mu.Unlock()

if id, ok := config["id"]; ok {
f.WithID(id.(string))
delete(config, "id")
Expand Down
7 changes: 7 additions & 0 deletions pkg/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,10 @@ func (suite *testSuiteFactory) TestGoTempalteValue() {
ret := goTemplateValue("{{ .test }}", map[string]interface{}{"test": "testValue"})
suite.Equal("testValue", ret)
}

func (suite *testSuiteFactory) TestFactoryDeepCopy() {
var factory = newFactory(&fakeFactory{})
factory.WithConfig(map[string]interface{}{"name": "test"})

suite.NotSame(factory, factory.DeepCopy())
}
6 changes: 5 additions & 1 deletion pkg/factory/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewPipeline() *Pipeline {
func (p *Pipeline) DeepCopy() *Pipeline {
deepCopy := NewPipeline().WantResult(p.WantedResult)
for _, f := range p.factories {
deepCopy.AddFactory(f)
deepCopy.AddFactory(f.DeepCopy())
}
for k, v := range p.Inputs {
deepCopy.WithInput(k, v)
Expand Down Expand Up @@ -109,6 +109,10 @@ func (p *Pipeline) Run() *Factory {
return p.factories[len(p.factories)-1]
}

// Clean up the pipeline
p.Inputs = make(map[string]interface{})
p.Outputs = make(map[string]map[string]interface{})

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/factory/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,5 @@ func (suite *testSuitePipeline) TestPipelineDeepCopy() {
pipeline.WantResult("test")

var pipeline2 = pipeline.DeepCopy()
suite.Equal(pipeline, pipeline2)
suite.NotSame(pipeline, pipeline2)
}
2 changes: 2 additions & 0 deletions pkg/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Factory struct {
ID string
// Fn is the factory function
Fn RunFunc
// Protect following fields
mu sync.RWMutex
// Config is the configuration for the factory function
Config map[string]interface{}
// Inputs is the inputs of the factory
Expand Down
34 changes: 11 additions & 23 deletions pkg/formatting/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import (
"bytes"
"fmt"
"net/http"
"sync"
"text/template"

"github.com/rs/zerolog/log"

"atomys.codes/webhooked/internal/config"
)

type TemplateData struct {
tmplString string
data map[string]interface{}

mu sync.RWMutex // protect following field amd template parsing
data map[string]interface{}
}

// NewTemplateData returns a new TemplateData instance. It takes the template
Expand All @@ -24,12 +25,16 @@ func NewTemplateData(tmplString string) *TemplateData {
return &TemplateData{
tmplString: tmplString,
data: make(map[string]interface{}),
mu: sync.RWMutex{},
}
}

// WithData adds a key-value pair to the data map. The key is the name of the
// variable and the value is the value of the variable.
func (d *TemplateData) WithData(name string, data interface{}) *TemplateData {
d.mu.Lock()
defer d.mu.Unlock()

d.data[name] = data
return d
}
Expand All @@ -48,31 +53,14 @@ func (d *TemplateData) WithPayload(payload []byte) *TemplateData {
return d
}

// WithSpec adds a webhookspec to the data map. The key of spec is "Spec".
func (d *TemplateData) WithSpec(spec *config.WebhookSpec) *TemplateData {
d.WithData("Spec", spec)
return d
}

// WithStorage adds a storage spec to the data map.
// The key of storage is "Storage".
func (d *TemplateData) WithStorage(spec *config.StorageSpec) *TemplateData {
d.WithData("Storage", spec)
return d
}

// WithConfig adds the current config to the data map.
// The key of config is "Config".
func (d *TemplateData) WithConfig() *TemplateData {
d.WithData("Config", config.Current())
return d
}

// Render returns the rendered template string. It takes the template string
// from the TemplateData instance and the data stored in the TemplateData
// instance. It returns an error if the template string is invalid or when
// rendering the template fails.
func (d *TemplateData) Render() (string, error) {
d.mu.RLock()
defer d.mu.RUnlock()

log.Debug().Msgf("rendering template: %s", d.tmplString)

t := template.New("formattingTmpl").Funcs(funcMap())
Expand Down
36 changes: 3 additions & 33 deletions pkg/formatting/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,6 @@ func Test_WithPayload(t *testing.T) {
assert.JSONEq(`{"test":"test"}`, tmpl.data["Payload"].(string))
}

func Test_WithSpec(t *testing.T) {
assert := assert.New(t)

tmpl := NewTemplateData("").WithSpec(&config.WebhookSpec{Name: "test", Formatting: &config.FormattingSpec{}})
assert.NotNil(tmpl)
assert.Equal("", tmpl.tmplString)
assert.Equal(1, len(tmpl.data))
assert.Equal("test", tmpl.data["Spec"].(*config.WebhookSpec).Name)
}

func Test_WithStorage(t *testing.T) {
assert := assert.New(t)

tmpl := NewTemplateData("").WithStorage(&config.StorageSpec{Type: "testing"})
assert.NotNil(tmpl)
assert.Equal("", tmpl.tmplString)
assert.Equal(1, len(tmpl.data))
assert.Equal("testing", tmpl.data["Storage"].(*config.StorageSpec).Type)
}

func Test_WithConfig(t *testing.T) {
assert := assert.New(t)

tmpl := NewTemplateData("").WithConfig()
assert.NotNil(tmpl)
assert.Equal("", tmpl.tmplString)
assert.Equal(1, len(tmpl.data))
assert.NotNil(tmpl.data["Config"])
}

func Test_Render(t *testing.T) {
assert := assert.New(t)

Expand Down Expand Up @@ -123,9 +93,9 @@ func Test_Render(t *testing.T) {
`).
WithPayload([]byte(`{"test": "test"}`)).
WithRequest(req).
WithSpec(&config.WebhookSpec{Name: "test", EntrypointURL: "/webhooks/test", Formatting: &config.FormattingSpec{}}).
WithStorage(&config.StorageSpec{Type: "testing", Specs: map[string]interface{}{}}).
WithConfig()
WithData("Spec", &config.WebhookSpec{Name: "test", EntrypointURL: "/webhooks/test", Formatting: &config.FormattingSpec{}}).
WithData("Storage", &config.StorageSpec{Type: "testing", Specs: map[string]interface{}{}}).
WithData("Config", config.Current())
assert.NotNil(tmpl)

str, err = tmpl.Render()
Expand Down
35 changes: 35 additions & 0 deletions tests/k6-load-script.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import http from 'k6/http';

export const options = {
stages: [
{ duration: '5s', target: 10 },
{ duration: '10s', target: 200 },
{ duration: '10s', target: 1000 },
{ duration: '10s', target: 1000 },
{ duration: '10s', target: 100 },
{ duration: '10m', target: 100 },
{ duration: '10s', target: 10 },
{ duration: '5s', target: 0 },
],
thresholds: {
http_req_failed: ['rate<0.0001'],
http_req_duration: ['p(95)<20', 'p(99.9) < 100'],
},
};

export default function () {
const url = 'http://localhost:8080/v1alpha1/webhooks/example';
const payload = JSON.stringify({
data: {},
timestamp: Date.now(),
});

const params = {
headers: {
'Content-Type': 'application/json',
'X-Hook-Secret': 'test'
},
};

http.post(url, payload, params);
}