From 171beadc5e7a56510d22fafb1d1611163a6a2058 Mon Sep 17 00:00:00 2001 From: eastfisher Date: Tue, 20 Oct 2020 19:07:35 +0800 Subject: [PATCH] add error close in PooledBackendConn (#15) --- pkg/proxy/backend/backend.go | 2 +- pkg/proxy/backend/connpool.go | 10 +++++++ pkg/proxy/driver/domain.go | 11 +++++++- pkg/proxy/driver/queryctx_exec.go | 2 +- tests/proxy/backend/connpool_test.go | 42 ++++++++++++++++++++++++++++ 5 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 tests/proxy/backend/connpool_test.go diff --git a/pkg/proxy/backend/backend.go b/pkg/proxy/backend/backend.go index 3145fb2e..bd061f4f 100644 --- a/pkg/proxy/backend/backend.go +++ b/pkg/proxy/backend/backend.go @@ -114,7 +114,7 @@ func (b *BackendImpl) initConnPools() error { return nil } -func (b *BackendImpl) GetConn(ctx context.Context) (driver.BackendConn, error) { +func (b *BackendImpl) GetConn(ctx context.Context) (driver.SimpleBackendConn, error) { if b.closed.Get() { return nil, ErrBackendClosed } diff --git a/pkg/proxy/backend/connpool.go b/pkg/proxy/backend/connpool.go index 55d94473..dbc8745b 100644 --- a/pkg/proxy/backend/connpool.go +++ b/pkg/proxy/backend/connpool.go @@ -2,11 +2,13 @@ package backend import ( "context" + "fmt" "time" "github.com/pingcap-incubator/weir/pkg/proxy/backend/client" "github.com/pingcap-incubator/weir/pkg/proxy/driver" "github.com/pingcap-incubator/weir/pkg/util/pool" + "github.com/pingcap/errors" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -87,6 +89,14 @@ func (cw *backendPooledConnWrapper) PutBack() { cw.pool.Put(w) } +func (cw *backendPooledConnWrapper) ErrorClose() error { + cw.pool.Put(nil) + if err := cw.Conn.Close(); err != nil { + return errors.WithMessage(err, fmt.Sprintf("close backend conn error, addr: %s, username: %s", cw.addr, cw.username)) + } + return nil +} + func (cw *backendPooledConnWrapper) Close() error { return cw.Conn.Close() } diff --git a/pkg/proxy/driver/domain.go b/pkg/proxy/driver/domain.go index 1383fa92..0b08a2fc 100644 --- a/pkg/proxy/driver/domain.go +++ b/pkg/proxy/driver/domain.go @@ -18,12 +18,21 @@ type Namespace interface { } type PooledBackendConn interface { + // PutBack put conn back to pool PutBack() + + // ErrorClose close conn and connpool create a new conn + // call this function when conn is broken. + ErrorClose() error BackendConn } -type BackendConn interface { +type SimpleBackendConn interface { Close() error + BackendConn +} + +type BackendConn interface { Ping() error UseDB(dbName string) error GetDB() string diff --git a/pkg/proxy/driver/queryctx_exec.go b/pkg/proxy/driver/queryctx_exec.go index b8176e85..1a4ccb8e 100644 --- a/pkg/proxy/driver/queryctx_exec.go +++ b/pkg/proxy/driver/queryctx_exec.go @@ -276,7 +276,7 @@ func (q *QueryCtxImpl) initTxnConn(ctx context.Context) error { func (q *QueryCtxImpl) postUseTxnConn(err error) { if err != nil { if q.txnConn != nil { - if errClose := q.txnConn.Close(); errClose != nil { + if errClose := q.txnConn.ErrorClose(); errClose != nil { logutil.BgLogger().Error("close txn conn error", zap.Error(errClose), zap.String("namespace", q.ns.Name())) } q.txnConn = nil diff --git a/tests/proxy/backend/connpool_test.go b/tests/proxy/backend/connpool_test.go new file mode 100644 index 00000000..7f64c89d --- /dev/null +++ b/tests/proxy/backend/connpool_test.go @@ -0,0 +1,42 @@ +package backend + +import ( + "context" + "testing" + "time" + + "github.com/pingcap-incubator/weir/pkg/proxy/backend" + "github.com/stretchr/testify/require" +) + +func TestConnPool_ErrorClose_Success(t *testing.T) { + cfg := backend.ConnPoolConfig{ + Config: backend.Config{ + Addr:"127.0.0.1:3306", + UserName:"root", + Password:"123456", + }, + Capacity:1, // pool size is set to 1 + IdleTimeout:0, + } + pool := backend.NewConnPool(&cfg) + err := pool.Init() + require.NoError(t, err) + + ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second) + defer cancelFunc() + + conn1, err := pool.GetConn(ctx) + require.NoError(t, err) + + // conn is closed, and another conn is created by pool + err = conn1.ErrorClose() + require.NoError(t, err) + + conn2, err := pool.GetConn(ctx) + require.NoError(t, err) + + err = conn2.ErrorClose() + require.NoError(t, err) +} +