Skip to content

Commit

Permalink
fix(persistence): connection pool leak due to schema migrations
Browse files Browse the repository at this point in the history
Previously, a separate connection pool was created to execute schema
migrations. This pool was not properly shut down though, leading to open
connections and unused goroutines. This commit reuses the connection
pool created for entgo, thereby preventing any resource leaks.

Signed-off-by: Michael Adler <michael.adler@siemens.com>
  • Loading branch information
michaeladler authored and stormc committed Aug 4, 2023
1 parent 2e54f45 commit 3f42f23
Show file tree
Hide file tree
Showing 22 changed files with 242 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Send HTTP status code 404 when attempting to access the file server while it is disabled
- Configure TLS for Southbound API (if requested via CLI)
- Connection pool leak due to schema migrations (SQLite, MySQL)

### Changed

Expand Down
19 changes: 19 additions & 0 deletions api/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package api

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <michael.adler@siemens.com>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
2 changes: 1 addition & 1 deletion api/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)

t.Cleanup(db.Shutdown)
t.Cleanup(func() {
{
list, _ := db.QueryJobs(context.Background(), persistence.FilterParams{}, persistence.SortParams{}, persistence.PaginationParams{Limit: 100})
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tsenart/vegeta/v12 v12.11.0
github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869
go.uber.org/goleak v1.2.1
golang.org/x/term v0.10.0
gopkg.in/go-playground/colors.v1 v1.2.0
gopkg.in/yaml.v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZE
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
Expand Down
1 change: 1 addition & 0 deletions internal/handler/job/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)
t.Cleanup(db.Shutdown)

require.NoError(t, err)
t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/job/definition/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)

t.Cleanup(db.Shutdown)
t.Cleanup(func() {
{
list, err := db.QueryJobs(context.Background(), persistence.FilterParams{}, persistence.SortParams{}, persistence.PaginationParams{Limit: 100})
Expand Down
19 changes: 19 additions & 0 deletions internal/handler/job/definition/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package definition

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <michael.adler@siemens.com>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
19 changes: 19 additions & 0 deletions internal/handler/job/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package job

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <michael.adler@siemens.com>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
1 change: 1 addition & 0 deletions internal/handler/job/status/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)
t.Cleanup(db.Shutdown)

require.NoError(t, err)
t.Cleanup(func() {
Expand Down
19 changes: 19 additions & 0 deletions internal/handler/job/status/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package status

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <michael.adler@siemens.com>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
1 change: 1 addition & 0 deletions internal/handler/job/tags/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newInMemoryDB(t *testing.T) persistence.Storage {
db := &entgo.SQLite{}
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)
t.Cleanup(db.Shutdown)

t.Cleanup(func() {
{
Expand Down
19 changes: 19 additions & 0 deletions internal/handler/job/tags/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package tags

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <michael.adler@siemens.com>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
9 changes: 5 additions & 4 deletions internal/handler/workflow/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ func TestCreateWorkflow(t *testing.T) {
}

func newInMemoryDB(t *testing.T) persistence.Storage {
var sqlite entgo.SQLite
err := sqlite.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
var db entgo.SQLite
err := db.Initialize(context.Background(), "file:wfx?mode=memory&cache=shared&_fk=1")
require.NoError(t, err)
t.Cleanup(db.Shutdown)
t.Cleanup(func() {
_ = sqlite.DeleteWorkflow(context.Background(), "wfx.workflow.dau.direct")
_ = db.DeleteWorkflow(context.Background(), "wfx.workflow.dau.direct")
})
return &sqlite
return &db
}
19 changes: 19 additions & 0 deletions internal/handler/workflow/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package workflow

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <michael.adler@siemens.com>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
19 changes: 19 additions & 0 deletions internal/persistence/entgo/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package entgo

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <michael.adler@siemens.com>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
26 changes: 17 additions & 9 deletions internal/persistence/entgo/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,30 @@ func (wrapper *MySQL) Initialize(ctx context.Context, options string) error {

db := sql.OpenDB(connector)
if err := db.PingContext(ctx); err != nil {
log.Error().Err(err).Msg("Failed to ping PostgreSQL database")
log.Error().Err(err).Msg("Failed to ping MySQL database")
_ = db.Close()
return fault.Wrap(err)
}

log.Info().Msg("Applying migrations")
src, err := iofs.New(mysqlMigrations, "migrations/mysql")
if err != nil {
return fault.Wrap(err)
}
{
log.Info().Msg("Applying migrations")
src, err := iofs.New(mysqlMigrations, "migrations/mysql")
if err != nil {
return fault.Wrap(err)
}

{ // run migrations
drv, err := mysql.WithInstance(db, &mysql.Config{})
conn, err := db.Conn(ctx)
if err != nil {
return fault.Wrap(err)
}
if err := runMigrations(src, cfg.DBName, drv); err != nil {
defer conn.Close()

m, err := mysql.WithConnection(ctx, conn, &mysql.Config{})
if err != nil {
return fault.Wrap(err)
}

if err := runMigrations(src, cfg.DBName, m); err != nil {
return fault.Wrap(err)
}
}
Expand Down
16 changes: 16 additions & 0 deletions internal/persistence/entgo/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,27 @@ import (
"testing"

"github.com/siemens/wfx/internal/persistence/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func TestMySQL_Initialize(t *testing.T) {
defer goleak.VerifyNone(t)
db := setupMySQL(t)
db.Shutdown()
}

func TestMain_InitializeFail(t *testing.T) {
dsn := "foo:bar@tcp(localhost)/wfx"
var mysql MySQL
err := mysql.Initialize(context.Background(), dsn)
assert.NotNil(t, err)
}

func TestMySQL(t *testing.T) {
db := setupMySQL(t)
t.Cleanup(db.Shutdown)
for _, testFn := range tests.AllTests {
name := runtime.FuncForPC(reflect.ValueOf(testFn).Pointer()).Name()
name = strings.TrimPrefix(filepath.Ext(name), ".")
Expand Down
8 changes: 8 additions & 0 deletions internal/persistence/entgo/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@ import (

"github.com/siemens/wfx/internal/persistence/tests"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func TestPostgreSQL_Initialize(t *testing.T) {
defer goleak.VerifyNone(t)
db := setupPostgreSQL(t)
db.Shutdown()
}

func TestPostgreSQL(t *testing.T) {
db := setupPostgreSQL(t)
t.Cleanup(db.Shutdown)
for _, testFn := range tests.AllTests {
name := runtime.FuncForPC(reflect.ValueOf(testFn).Pointer()).Name()
name = strings.TrimPrefix(filepath.Ext(name), ".")
Expand Down
54 changes: 32 additions & 22 deletions internal/persistence/entgo/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ package entgo
import (
"context"
"embed"
"net/url"

"entgo.io/ent/dialect"
"entgo.io/ent/dialect/sql"
"github.com/Southclaws/fault"
"github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
Expand All @@ -36,34 +39,41 @@ func init() {
persistence.RegisterStorage("sqlite", &SQLite{})
}

func (wrapper *SQLite) Initialize(_ context.Context, options string) error {
log.Debug().
Str("dsn", options).
Msg("Initializing SQLite storage")

src, err := iofs.New(sqliteMigrations, "migrations/sqlite")
func (instance *SQLite) Initialize(_ context.Context, dsn string) error {
log.Debug().Str("dsn", dsn).Msg("Connecting to SQLite")
drv, err := sql.Open(dialect.SQLite, dsn)
if err != nil {
log.Error().Err(err).Msg("Failed opening connection to SQLite")
return fault.Wrap(err)
}
client := ent.NewClient(ent.Driver(drv))
log.Debug().Msg("Connected to SQLite")
instance.Database = Database{client: client}

var sqlite sqlite3.Sqlite
driver, err := sqlite.Open(options)
if err != nil {
return fault.Wrap(err)
}
{
// run schema migrations
src, err := iofs.New(sqliteMigrations, "migrations/sqlite")
if err != nil {
return fault.Wrap(err)
}

if err := runMigrations(src, "wfx", driver); err != nil {
return fault.Wrap(err)
}
purl, err := url.Parse(dsn)
if err != nil {
return fault.Wrap(err)
}

log.Debug().Msg("Connecting to SQLite")
client, err := ent.Open("sqlite3", options)
if err != nil {
log.Error().Err(err).Msg("Failed opening connection to sqlite")
return fault.Wrap(err)
}
log.Debug().Msg("Connected to SQLite")
driver, err := sqlite3.WithInstance(drv.DB(), &sqlite3.Config{
MigrationsTable: sqlite3.DefaultMigrationsTable,
DatabaseName: purl.Path,
NoTxWrap: false,
})
if err != nil {
return fault.Wrap(err)
}

wrapper.Database = Database{client: client}
if err := runMigrations(src, "wfx", driver); err != nil {
return fault.Wrap(err)
}
}
return nil
}
Loading

0 comments on commit 3f42f23

Please sign in to comment.