Skip to content

Commit

Permalink
add MarkUnusable func to PoolConn
Browse files Browse the repository at this point in the history
  • Loading branch information
nange committed Nov 8, 2017
1 parent db29d27 commit 3444e54
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 4 deletions.
10 changes: 9 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ type PoolConn struct {
net.Conn
hp *heapPool
updatedtime time.Time
unusable bool
}

func (pc *PoolConn) Close() error {
pc.updatedtime = time.Now()
if pc.unusable {
return pc.close()
}

pc.updatedtime = time.Now()
if err := pc.hp.put(pc); err != nil {
log.Printf("put conn failed:%v\n", err)
pc.hp = nil
Expand All @@ -23,6 +27,10 @@ func (pc *PoolConn) Close() error {
return nil
}

func (pc *PoolConn) MarkUnusable() {
pc.unusable = true
}

func (pc *PoolConn) close() error {
return pc.Conn.Close()
}
11 changes: 10 additions & 1 deletion heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewHeapPool(config *PoolConfig) (Pool, error) {
maxIdle: maxIdle,
idletime: idletime,
maxLifetime: maxLifetime,
cleanerCh: make(chan struct{}, 1),
cleanerCh: make(chan struct{}),
factory: config.Factory,
}

Expand Down Expand Up @@ -126,6 +126,9 @@ func (hp *heapPool) Close() {
hp.mu.Lock()
defer hp.mu.Unlock()

if hp.freeConn == nil {
return
}
hp.cleanerCh <- struct{}{}
hp.factory = nil
for hp.freeConn.Len() > 0 {
Expand All @@ -140,6 +143,9 @@ func (hp *heapPool) put(conn *PoolConn) error {
hp.mu.Lock()
defer hp.mu.Unlock()

if hp.freeConn == nil {
return ErrClosed
}
if hp.freeConn.Len() >= hp.maxCap {
return errors.New("pool have been filled")
}
Expand All @@ -151,6 +157,9 @@ func (hp *heapPool) Len() int {
hp.mu.Lock()
defer hp.mu.Unlock()

if hp.freeConn == nil {
return 0
}
return hp.freeConn.Len()
}

Expand Down
31 changes: 29 additions & 2 deletions heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,15 @@ func TestPriorityQueue(t *testing.T) {
if pc1.updatedtime.Sub(pc2.updatedtime) > 0 {
t.Errorf("priority is invalid, older conn should first out")
}
pc1.Close()
pc2.Close()

p.Close()
}

func TestPoolConcurrent(t *testing.T) {
p, _ := newHeapPool()
for i := 0; i < 100; i++ {
for i := 0; i < MaxCap+10; i++ {
conn, err := p.Get()
if err != nil {
t.Errorf("Get error: %s", err)
Expand All @@ -93,7 +96,7 @@ func TestPoolConcurrent(t *testing.T) {

time.Sleep(5 * time.Second)
if p.Len() != MaxCap {
t.Errorf("Pool length should equals MaxCap, but get:%v", p.Len())
t.Errorf("Pool length should equals:, but get:%v", MaxCap, p.Len())
}

time.Sleep(time.Minute)
Expand All @@ -103,6 +106,30 @@ func TestPoolConcurrent(t *testing.T) {
p.Close()
}

func TestPoolConcurrent2(t *testing.T) {
p, _ := newHeapPool()
for i := 0; i < MaxCap; i++ {
conn, err := p.Get()
if err != nil {
t.Errorf("Get error: %s", err)
}
go func(conn net.Conn, i int) {
time.Sleep(time.Second)
if i >= MaxCap-10 {
conn.(*PoolConn).MarkUnusable()
}
conn.Close()
}(conn, i)
}

time.Sleep(5 * time.Second)
if p.Len() != MaxCap-10 {
t.Errorf("Pool length should equals:%v, but get:%v", MaxCap-10, p.Len())
}

p.Close()
}

func newHeapPool() (Pool, error) {
return NewHeapPool(&PoolConfig{
InitialCap: InitialCap,
Expand Down

0 comments on commit 3444e54

Please sign in to comment.