Skip to content

Commit

Permalink
fix:tests; feat: db.close panic recover; feat: update do and packages
Browse files Browse the repository at this point in the history
  • Loading branch information
ElrondfromRussia committed May 23, 2024
1 parent 66048b1 commit 5c8d75d
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 253 deletions.
64 changes: 64 additions & 0 deletions conn_getters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package dbpool

import (
"context"
"errors"
"time"

"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
)

///////////////////////////////////////////////////////// Mapper functions

func JsonMapperFunc() *reflectx.Mapper {
return reflectx.NewMapperFunc("json", func(s string) string { return s })
}

/////////////////////////////////////////////////////////

// GetConnectionByParams - get *sqlx.DB from cache (if exists, with default-json-mapperFunc) or create new and put into cache
func GetConnectionByParams(Ctx context.Context, connCache *SafeDbMapCache,
duration time.Duration, driver, connString string) (*sqlx.DB, error) {

return GetConnectionWithMapper(Ctx, connCache, duration, driver, connString, JsonMapperFunc())
}

// GetConnectionWithMapper - get *sqlx.DB from cache (if exists, with set mapperFunc) or create new and put into cache
func GetConnectionWithMapper(
Ctx context.Context,
connCache *SafeDbMapCache,
duration time.Duration,
driver, connString string,
mapperFunc *reflectx.Mapper) (*sqlx.DB, error) {

conn, ok := connCache.Get(connString)
if ok && conn != nil {
// ping to check
err := conn.PingContext(Ctx)
if err != nil {
return nil, err
}

return conn, nil
}

// create conn
db, err := sqlx.ConnectContext(Ctx, driver, connString)
if err != nil {
return nil, err
}

db.SetConnMaxLifetime(duration)
db.Mapper = mapperFunc

// add conn to connCache
connCache.Set(connString, db, duration)

conn, ok = connCache.Get(connString)
if !ok && conn == nil {
return nil, errors.New("no conn in connCache")
}

return conn, nil
}
149 changes: 80 additions & 69 deletions dbpool.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package dbpool

import (
. "github.com/NGRsoftlab/ngr-logging"

"errors"
"sync"
"time"

. "github.com/NGRsoftlab/ngr-logging"

"github.com/jmoiron/sqlx"
)

/////// Safe db pool map with string in key ///////////
////////////////////////////////////////////////////////// Safe db pool map with string in key

type PoolItem struct {
Expiration int64
Duration time.Duration
Created time.Time

Db *sqlx.DB
DB *sqlx.DB
}

type SafeDbMapCache struct {
Expand Down Expand Up @@ -46,30 +46,24 @@ func New(defaultExpiration, cleanupInterval time.Duration) *SafeDbMapCache {
return &cache
}

// Set - setting *sqlx.DB value by key
func (c *SafeDbMapCache) Set(key string, value *sqlx.DB, duration time.Duration) {
var expiration int64

if duration == 0 {
duration = c.defaultExpiration
}

if duration > 0 {
expiration = time.Now().Add(duration).UnixNano()
// closePoolDB - close connector.DB wrapper
func (p *PoolItem) closePoolDB() {
defer panicPoolRecover()
err := p.DB.Close()
if err != nil {
Logger.Warningf("db connection close error: %s", err.Error())
}
}

c.Lock()

defer c.Unlock()

c.pool[key] = PoolItem{
Db: value,
Expiration: expiration,
Duration: duration,
Created: time.Now(),
// nullDBRecover - recover
func panicPoolRecover() {
if r := recover(); r != nil {
Logger.Warning("Recovered in dbpool function: ", r)
}
}

////////////////////////////////////////////////////////// Get-Set

// Get - getting *sqlx.DB value by key
func (c *SafeDbMapCache) Get(key string) (*sqlx.DB, bool) {
// changed from RLock to Lock because of line 99 operation (updating creation time)
Expand All @@ -84,77 +78,60 @@ func (c *SafeDbMapCache) Get(key string) (*sqlx.DB, bool) {
}

if item.Expiration > 0 {

// cache expired
if time.Now().UnixNano() > item.Expiration {
return nil, false
}
}

////TODO: set new timeout (?????? - think about it)
// TODO: set new timeout (?????? - think about it)
var newExpiration int64
if item.Duration > 0 {
newExpiration = time.Now().Add(item.Duration).UnixNano()
}

c.pool[key] = PoolItem{
Db: item.Db,
DB: item.DB,
Expiration: newExpiration,
Duration: item.Duration,
Created: time.Now(),
}

return item.Db, true
return item.DB, true
}

// Delete - delete *sqlx.DB value by key
// Return false if key not found
func (c *SafeDbMapCache) Delete(key string) error {
c.Lock()
defer c.Unlock()

connector, found := c.pool[key]
// Set - setting *sqlx.DB value by key
func (c *SafeDbMapCache) Set(key string, value *sqlx.DB, duration time.Duration) {
var expiration int64

if !found {
return errors.New("key not found")
if duration == 0 {
duration = c.defaultExpiration
}

err := connector.Db.Close()
if err != nil {
Logger.Warningf("db connection close error: %s", err.Error())
if duration > 0 {
expiration = time.Now().Add(duration).UnixNano()
}

delete(c.pool, key)

return nil
}

// StartGC - start Garbage Collection
func (c *SafeDbMapCache) StartGC() {
go c.GC()
}

// GC - Garbage Collection cycle
func (c *SafeDbMapCache) GC() {
for {
<-time.After(c.cleanupInterval)
c.Lock()

if c.pool == nil {
return
}
defer c.Unlock()

if keys := c.ExpiredKeys(); len(keys) != 0 {
c.clearItems(keys)
}
c.pool[key] = PoolItem{
DB: value,
Expiration: expiration,
Duration: duration,
Created: time.Now(),
}
}

////////////////////////////////////////////////////////// Items

// GetItems - returns item list.
func (c *SafeDbMapCache) GetItems() (items []string) {
c.RLock()
defer c.RUnlock()

for k, _ := range c.pool {
for k := range c.pool {
items = append(items, k)
}

Expand Down Expand Up @@ -184,17 +161,54 @@ func (c *SafeDbMapCache) clearItems(keys []string) {
connector, ok := c.pool[k]

if ok {
err := connector.Db.Close()
if err != nil {
Logger.Warningf("db connection close error: %s", err.Error())
}
connector.closePoolDB()
}

delete(c.pool, k)
}
}

// ClearAll - removes all items.
////////////////////////////////////////////////////////// Cleaning

// StartGC - start Garbage Collection
func (c *SafeDbMapCache) StartGC() {
go c.GC()
}

// GC - Garbage Collection cycle
func (c *SafeDbMapCache) GC() {
for {
<-time.After(c.cleanupInterval)

if c.pool == nil {
return
}

if keys := c.ExpiredKeys(); len(keys) != 0 {
c.clearItems(keys)
}
}
}

// Delete - delete *sqlx.DB value by key. Return false if key not found
func (c *SafeDbMapCache) Delete(key string) error {
c.Lock()
defer c.Unlock()

connector, found := c.pool[key]

if !found {
return errors.New("key not found")
}

connector.closePoolDB()

delete(c.pool, key)

return nil
}

// ClearAll - remove all items.
func (c *SafeDbMapCache) ClearAll() {
c.Lock()
defer c.Unlock()
Expand All @@ -203,10 +217,7 @@ func (c *SafeDbMapCache) ClearAll() {
connector, ok := c.pool[k]

if ok {
err := connector.Db.Close()
if err != nil {
Logger.Warningf("db connection close error: %s", err.Error())
}
connector.closePoolDB()
}

delete(c.pool, k)
Expand Down
Loading

0 comments on commit 5c8d75d

Please sign in to comment.