Skip to content

Commit

Permalink
backend: Add bun
Browse files Browse the repository at this point in the history
  • Loading branch information
monstermunchkin committed Nov 15, 2024
1 parent 64ea1e6 commit 8a450d8
Show file tree
Hide file tree
Showing 19 changed files with 506 additions and 1,161 deletions.
21 changes: 0 additions & 21 deletions backend/postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,12 @@ Configuration for the PostgreSQL connection pool of the microservice.
* postgres user to access the database
* `POSTGRES_DB` default: `postgres`
* database to access
* `POSTGRES_MAX_RETRIES` default: `5`
* Maximum number of retries before giving up
* `POSTGRES_RETRY_STATEMENT_TIMEOUT` default: `false`
* Whether to retry queries cancelled because of statement_timeout
* `POSTGRES_MIN_RETRY_BACKOFF` default: `250ms`
* Minimum backoff between each retry
* `POSTGRES_MAX_RETRY_BACKOFF` default: `4s`
* Maximum backoff between each retry
* `POSTGRES_DIAL_TIMEOUT` default: `5s`
* Dial timeout for establishing new connections
* `POSTGRES_READ_TIMEOUT` default: `30s`
* Timeout for socket reads. If reached, commands will fail with a timeout instead of blocking
* `POSTGRES_WRITE_TIMEOUT` default: `30s`
* Timeout for socket writes. If reached, commands will fail with a timeout instead of blocking.
* `POSTGRES_POOL_SIZE` default: `100`
* Maximum number of socket connections
* `POSTGRES_MIN_IDLE_CONNECTIONS` default: `10`
* Minimum number of idle connections which is useful when establishing new connection is slow
* `POSTGRES_MAX_CONN_AGE` default: `30m`
* Connection age at which client retires (closes) the connection
* `POSTGRES_POOL_TIMEOUT` default: `31s`
* Time for which client waits for free connection if all connections are busy before returning an error
* `POSTGRES_IDLE_TIMEOUT` default: `5m`
* Amount of time after which client closes idle connections
* `POSTGRES_IDLE_CHECK_FREQUENCY` default: `1m`
* Frequency of idle checks made by idle connections reaper
* `POSTGRES_HEALTH_CHECK_TABLE_NAME` default: `healthcheck`
* Name of the Table that is created to try if database is writeable
* `POSTGRES_HEALTH_CHECK_RESULT_TTL` default: `10s`
Expand All @@ -53,7 +33,6 @@ Prometheus metrics exposed.
* `pace_postgres_query_total{database}` Collects stats about the number of postgres queries made
* `pace_postgres_query_failed{database}` Collects stats about the number of postgres queries failed
* `pace_postgres_query_duration_seconds{database}` Collects performance metrics for each postgres query
* `pace_postgres_query_rows_total{database}` Collects stats about the number of rows returned by a postgres query
* `pace_postgres_query_affected_total{database}` Collects stats about the number of rows affected by a postgres query
* `pace_postgres_connection_pool_hits{database}` Collects number of times free connection was found in the pool
* `pace_postgres_connection_pool_misses{database}` Collects number of times free connection was NOT found in the pool
Expand Down
10 changes: 6 additions & 4 deletions backend/postgres/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"io"
"net"

"github.com/go-pg/pg"
"github.com/uptrace/bun/driver/pgdriver"
)

var ErrNotUnique = errors.New("not unique")
Expand All @@ -25,14 +25,16 @@ func IsErrConnectionFailed(err error) bool {
}

// go-pg has similar check for integrity violation issues, here we check network issues
pgErr, ok := err.(pg.Error)
if ok {
var pgErr pgdriver.Error

if errors.As(err, &pgErr) {
code := pgErr.Field('C')
// We check on error codes of Class 08 — Connection Exception.
// https://www.postgresql.org/docs/10/errcodes-appendix.html
if code[0:2] == "08" {
if len(code) > 2 && code[0:2] == "08" {
return true
}
}

return false
}
19 changes: 4 additions & 15 deletions backend/postgres/errors_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package postgres_test

import (
"context"
"errors"
"fmt"
"io"
"testing"

"github.com/go-pg/pg"
"github.com/stretchr/testify/require"

pbpostgres "github.com/pace/bricks/backend/postgres"
Expand All @@ -19,13 +19,10 @@ func TestIsErrConnectionFailed(t *testing.T) {
})

t.Run("connection failed (net.Error)", func(t *testing.T) {
db := pbpostgres.CustomConnectionPool(&pg.Options{}) // invalid connection
_, err := db.Exec("")
require.True(t, pbpostgres.IsErrConnectionFailed(err))
})
ctx := context.Background()

t.Run("connection failed (pg.Error)", func(t *testing.T) {
err := error(mockPGError{m: map[byte]string{'C': "08000"}})
db := pbpostgres.NewDB(ctx, pbpostgres.WithHost("foobar")) // invalid connection
_, err := db.Exec("")
require.True(t, pbpostgres.IsErrConnectionFailed(err))
})

Expand All @@ -34,11 +31,3 @@ func TestIsErrConnectionFailed(t *testing.T) {
require.False(t, pbpostgres.IsErrConnectionFailed(err))
})
}

type mockPGError struct {
m map[byte]string
}

func (err mockPGError) Field(k byte) string { return err.m[k] }
func (err mockPGError) IntegrityViolation() bool { return false }
func (err mockPGError) Error() string { return fmt.Sprintf("%+v", err.m) }
91 changes: 91 additions & 0 deletions backend/postgres/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright © 2024 by PACE Telematics GmbH. All rights reserved.

package postgres

import (
"context"
"database/sql"
"time"

"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/uptrace/bun"
)

type queryExecutor interface {
Exec(ctx context.Context, dest ...interface{}) (sql.Result, error)
}

// HealthCheck checks the state of a postgres connection. It must not be changed
// after it was registered as a health check.
type HealthCheck struct {
state servicehealthcheck.ConnectionState

createTableQueryExecutor queryExecutor
deleteQueryExecutor queryExecutor
dropTableQueryExecutor queryExecutor
insertQueryExecutor queryExecutor
selectQueryExecutor queryExecutor
}

type healthcheck struct {
bun.BaseModel

OK bool `bun:"column:ok"`
}

// NewHealthCheck creates a new HealthCheck instance.
func NewHealthCheck(db *bun.DB) *HealthCheck {
return &HealthCheck{
createTableQueryExecutor: db.NewCreateTable().Model((*healthcheck)(nil)).ModelTableExpr(cfg.HealthCheckTableName),
deleteQueryExecutor: db.NewDelete().ModelTableExpr(cfg.HealthCheckTableName).Where("TRUE"),
dropTableQueryExecutor: db.NewDropTable().ModelTableExpr(cfg.HealthCheckTableName).IfExists(),
insertQueryExecutor: db.NewInsert().ModelTableExpr(cfg.HealthCheckTableName).Model(&healthcheck{OK: true}),
selectQueryExecutor: db.NewRaw("SELECT 1;"),
}
}

// Init initializes the test table
func (h *HealthCheck) Init(ctx context.Context) error {
_, err := h.createTableQueryExecutor.Exec(ctx)
return err
}

// HealthCheck performs the read test on the database. If enabled, it performs a
// write test as well.
func (h *HealthCheck) HealthCheck(ctx context.Context) servicehealthcheck.HealthCheckResult {
if time.Since(h.state.LastChecked()) <= cfg.HealthCheckResultTTL {
// the last result of the Health Check is still not outdated
return h.state.GetState()
}

// Readcheck
if _, err := h.selectQueryExecutor.Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// writecheck - add Data to configured Table
if _, err := h.insertQueryExecutor.Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// and while we're at it, check delete as well (so as not to clutter the database
// because UPSERT is impractical here
if _, err := h.deleteQueryExecutor.Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// If no error occurred set the State of this Health Check to healthy
h.state.SetHealthy()

return h.state.GetState()
}

// CleanUp drops the test table.
func (h *HealthCheck) CleanUp(ctx context.Context) error {
_, err := h.dropTableQueryExecutor.Exec(ctx)

return err
}
65 changes: 0 additions & 65 deletions backend/postgres/health_postgres.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ package postgres

import (
"context"
"database/sql"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/go-pg/pg/orm"
"github.com/stretchr/testify/require"

http2 "github.com/pace/bricks/http"
"github.com/pace/bricks/maintenance/errors"
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/pace/bricks/maintenance/log"
"github.com/stretchr/testify/require"
)

func setup() *http.Response {
Expand Down Expand Up @@ -52,7 +53,7 @@ type testPool struct {
err error
}

func (t *testPool) Exec(ctx context.Context, query interface{}, params ...interface{}) (res orm.Result, err error) {
func (t *testPool) Exec(ctx context.Context, dest ...any) (sql.Result, error) {
return nil, t.err
}

Expand All @@ -63,16 +64,25 @@ func TestHealthCheckCaching(t *testing.T) {
cfg.HealthCheckResultTTL = time.Minute
requiredErr := errors.New("TestHealthCheckCaching")
pool := &testPool{err: requiredErr}
h := &HealthCheck{Pool: pool}
h := &HealthCheck{
createTableQueryExecutor: pool,
deleteQueryExecutor: pool,
dropTableQueryExecutor: pool,
insertQueryExecutor: pool,
selectQueryExecutor: pool,
}

res := h.HealthCheck(ctx)
// get the error for the first time
require.Equal(t, servicehealthcheck.Err, res.State)
require.Equal(t, "TestHealthCheckCaching", res.Msg)

res = h.HealthCheck(ctx)
pool.err = nil
// getting the cached error
require.Equal(t, servicehealthcheck.Err, res.State)
require.Equal(t, "TestHealthCheckCaching", res.Msg)

// Resetting the TTL to get a uncached result
cfg.HealthCheckResultTTL = 0
res = h.HealthCheck(ctx)
Expand Down
Loading

0 comments on commit 8a450d8

Please sign in to comment.