-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpool.go
131 lines (119 loc) · 2.39 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package ch_pool
import (
"errors"
"github.com/jmoiron/sqlx"
"sync"
"time"
)
var (
errInvalidConfig = errors.New("invalid pool config")
errPoolClosed = errors.New("pool closed")
)
type factory func()(*sqlx.DB, error)
type Pool interface {
GetConn() (*sqlx.DB, error) // get connect resource
Release(*sqlx.DB) error // release connect resource
Close(*sqlx.DB) error // close connect resource
Shutdown() error // close pool
}
type ChPool struct {
sync.Mutex
pool chan *sqlx.DB
maxOpen int // max open connect num
numOpen int // now pool has open connect num
minOpen int // min open connect num
closed bool // pool is close
maxLifetime time.Duration // connect max life time
factory factory // the factory of create connect
}
func NewChPool(minOpen, maxOpen int, maxLifetime time.Duration, factory factory) (*ChPool, error) {
if maxOpen <= 0 || minOpen > maxOpen {
return nil, errInvalidConfig
}
p := &ChPool{
maxOpen: maxOpen,
minOpen: minOpen,
maxLifetime: maxLifetime,
factory: factory,
pool: make(chan *sqlx.DB, maxOpen),
}
for i := 0; i < minOpen; i++ {
dbConn, err := factory()
if err != nil {
continue
}
p.numOpen++
p.pool <- dbConn
}
return p, nil
}
func (p *ChPool) GetConn()(dbConn *sqlx.DB, err error) {
if p.closed {
err = errPoolClosed
return
}
for {
dbConn, err = p.getOrCreate()
// todo max Lift time处理
return
}
}
func (p *ChPool) getOrCreate()(dbConn *sqlx.DB, err error) {
select {
case dbConn = <-p.pool:
return
default:
}
p.Lock()
if p.numOpen >= p.maxOpen {
dbConn = <-p.pool
p.Unlock()
return
}
// 新建连接
dbConn, err = p.factory()
if err != nil {
p.Unlock()
return
}
p.numOpen++
p.Unlock()
return
}
// 释放单个资源到连接池
func (p *ChPool) Release(dbConn *sqlx.DB) error {
if p.closed {
return errPoolClosed
}
if p.numOpen > p.minOpen {
err := p.Close(dbConn)
return err
}
p.Lock()
p.pool <- dbConn
p.Unlock()
return nil
}
// 关闭单个资源
func (p *ChPool) Close(dbConn *sqlx.DB) error {
p.Lock()
err := dbConn.Close()
p.numOpen--
p.Unlock()
return err
}
// 关闭连接池,释放所有资源
func (p *ChPool) Shutdown() error {
if p.closed {
return errPoolClosed
}
p.Lock()
close(p.pool)
for dbConn := range p.pool {
_ = dbConn.Close()
p.numOpen--
}
p.closed = true
p.Unlock()
return nil
}