Skip to content

Commit

Permalink
Simplify custom type autoloading with pgxpool
Browse files Browse the repository at this point in the history
Provide some backwards-compatible configuration options for pgxpool
which streamlines the use of the bulk loading of custom types:
- AutoLoadTypes: a list of type (or class) names to automatically
  load for each connection, automatically also loading any other
  types these depend on.
- ReuseTypeMaps: if enabled, pgxpool will cache the typemap information,
  avoiding the need to perform any further queries as new connections
  are created.

ReuseTypeMaps is disabled by default as in some situations, a
connection string might resolve to a pool of servers which do not share
the same type name -> OID mapping.
  • Loading branch information
nicois committed Jun 21, 2024
1 parent 06c0451 commit 90826ee
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 9 deletions.
66 changes: 59 additions & 7 deletions pgxpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/puddle/v2"
)

var defaultMaxConns = int32(4)
var defaultMinConns = int32(0)
var defaultMaxConnLifetime = time.Hour
var defaultMaxConnIdleTime = time.Minute * 30
var defaultHealthCheckPeriod = time.Minute
var (
defaultMaxConns = int32(4)
defaultMinConns = int32(0)
defaultMaxConnLifetime = time.Hour
defaultMaxConnIdleTime = time.Minute * 30
defaultHealthCheckPeriod = time.Minute
)

type connResource struct {
conn *pgx.Conn
Expand Down Expand Up @@ -100,6 +103,11 @@ type Pool struct {

closeOnce sync.Once
closeChan chan struct{}

autoLoadTypes []string
reuseTypeMap bool
autoLoadMutex *sync.Mutex
autoLoadTypeInfos []*pgtype.DerivedTypeInfo
}

// Config is the configuration struct for creating a pool. It must be created by [ParseConfig] and then it can be
Expand Down Expand Up @@ -147,6 +155,15 @@ type Config struct {
// HealthCheckPeriod is the duration between checks of the health of idle connections.
HealthCheckPeriod time.Duration

// AutoLoadTypes is a list of user-defined types which should automatically be loaded
// as each new connection is created. This will also load any related types, directly
// or indirectly required to handle these types.
AutoLoadTypes []string

// ReuseTypeMaps, if enabled, will reuse the typemap information being used by AutoLoadTypes.
// This removes the need to query the database each time a new connection is created.
ReuseTypeMaps bool

createdByParseConfig bool // Used to enforce created by ParseConfig rule.
}

Expand Down Expand Up @@ -185,6 +202,8 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
config: config,
beforeConnect: config.BeforeConnect,
afterConnect: config.AfterConnect,
autoLoadTypes: config.AutoLoadTypes,
reuseTypeMap: config.ReuseTypeMaps,
beforeAcquire: config.BeforeAcquire,
afterRelease: config.AfterRelease,
beforeClose: config.BeforeClose,
Expand All @@ -196,6 +215,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
healthCheckPeriod: config.HealthCheckPeriod,
healthCheckChan: make(chan struct{}, 1),
closeChan: make(chan struct{}),
autoLoadMutex: new(sync.Mutex),
}

if t, ok := config.ConnConfig.Tracer.(AcquireTracer); ok {
Expand Down Expand Up @@ -237,6 +257,19 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
}
}

if len(p.autoLoadTypes) > 0 {
types, err := p.loadTypes(ctx, conn, p.autoLoadTypes)
if err != nil {
conn.Close(ctx)
panic(err)
}
if err = conn.TypeMap().RegisterDerivedTypes(types); err != nil {
conn.Close(ctx)

panic(err)
}
}

jitterSecs := rand.Float64() * config.MaxConnLifetimeJitter.Seconds()
maxAgeTime := time.Now().Add(config.MaxConnLifetime).Add(time.Duration(jitterSecs) * time.Second)

Expand Down Expand Up @@ -388,6 +421,27 @@ func (p *Pool) Close() {
})
}

// loadTypes is used internally to autoload the custom types for a connection,
// potentially reusing previously-loaded typemap information.
func (p *Pool) loadTypes(ctx context.Context, conn *pgx.Conn, typeNames []string) ([]*pgtype.DerivedTypeInfo, error) {
if p.reuseTypeMap {
p.autoLoadMutex.Lock()
defer p.autoLoadMutex.Unlock()
if p.autoLoadTypeInfos != nil {
return p.autoLoadTypeInfos, nil
}
types, err := pgx.LoadDerivedTypes(ctx, conn, typeNames)
if err != nil {
return nil, err
}
p.autoLoadTypeInfos = types
return types, err
}
// Avoid needing to acquire the mutex and allow connections to initialise in parallel
// if we have chosen to not reuse the type mapping
return pgx.LoadDerivedTypes(ctx, conn, typeNames)
}

func (p *Pool) isExpired(res *puddle.Resource[*connResource]) bool {
return time.Now().After(res.Value().maxAgeTime)
}
Expand Down Expand Up @@ -482,7 +536,6 @@ func (p *Pool) checkMinConns() error {
func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

errs := make(chan error, targetResources)

for i := 0; i < targetResources; i++ {
Expand All @@ -495,7 +548,6 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in
errs <- err
}()
}

var firstError error
for i := 0; i < targetResources; i++ {
err := <-errs
Expand Down
27 changes: 25 additions & 2 deletions pgxpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,31 @@ func TestPoolBeforeConnect(t *testing.T) {
assert.EqualValues(t, "pgx", str)
}

func TestAutoLoadTypes(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err)

db1, err := pgxpool.NewWithConfig(ctx, config)
require.NoError(t, err)
defer db1.Close()
db1.Exec(ctx, "DROP DOMAIN IF EXISTS autoload_uint64; CREATE DOMAIN autoload_uint64 as numeric(20,0)")
defer db1.Exec(ctx, "DROP DOMAIN autoload_uint64")

config.AutoLoadTypes = []string{"autoload_uint64"}
db2, err := pgxpool.NewWithConfig(ctx, config)
require.NoError(t, err)

var n uint64
err = db2.QueryRow(ctx, "select 12::autoload_uint64").Scan(&n)
require.NoError(t, err)
assert.EqualValues(t, uint64(12), n)
}

func TestPoolAfterConnect(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -676,7 +701,6 @@ func TestPoolQuery(t *testing.T) {
stats = pool.Stat()
assert.EqualValues(t, 0, stats.AcquiredConns())
assert.EqualValues(t, 1, stats.TotalConns())

}

func TestPoolQueryRow(t *testing.T) {
Expand Down Expand Up @@ -1104,7 +1128,6 @@ func TestConnectEagerlyReachesMinPoolSize(t *testing.T) {
}

t.Fatal("did not reach min pool size")

}

func TestPoolSendBatchBatchCloseTwice(t *testing.T) {
Expand Down

0 comments on commit 90826ee

Please sign in to comment.