From 3e13930d8d42b455dd0ef06442c57226c1278cb1 Mon Sep 17 00:00:00 2001 From: Jeffsky Date: Thu, 6 Apr 2023 20:48:07 +0800 Subject: [PATCH] fix: connection leak when concurrent updates --- justfile | 8 ++++++++ pkg/mysql/client.go | 1 + pkg/mysql/conn.go | 18 ++++++++++-------- pkg/mysql/execute_handle.go | 3 +++ pkg/mysql/server.go | 2 +- pkg/runtime/runtime.go | 7 ------- pkg/util/misc/misc.go | 8 ++++++++ 7 files changed, 31 insertions(+), 16 deletions(-) diff --git a/justfile b/justfile index 0e13f626..9c547ca2 100644 --- a/justfile +++ b/justfile @@ -17,3 +17,11 @@ cli-raw: fix: @imports-formatter . @license-eye header fix + + +sysbench MODE="run": + sysbench oltp_read_write --mysql-user=root --mysql-password=123456 --mysql-host=127.0.0.1 --mysql-port=3306 --mysql-db=employees_0000 --histogram=on --report-interval=1 --time=300 --db-ps-mode=disable --threads=64 --tables=250 --table_size=25000 --report-interval=1 --percentile=95 --skip-trx=on --mysql-ignore-errors=1062 --forced-shutdown=1 {{MODE}} + + +sysbench2 MODE="run": + sysbench oltp_read_write --mysql-user=arana --mysql-password=123456 --mysql-host=127.0.0.1 --mysql-port=13306 --mysql-db=employees --histogram=on --report-interval=1 --time=300 --db-ps-mode=disable --threads=8 --tables=250 --table_size=25000 --report-interval=1 --percentile=95 --skip-trx=on --mysql-ignore-errors=1062 --forced-shutdown=1 {{MODE}} diff --git a/pkg/mysql/client.go b/pkg/mysql/client.go index 7372c593..d2d62e01 100644 --- a/pkg/mysql/client.go +++ b/pkg/mysql/client.go @@ -986,6 +986,7 @@ func (conn *BackendConnection) writeHandshakeResponse41(capabilities uint32, scr // Sanity-check the length. if pos != len(data) { + conn.c.recycleWritePacket() return err2.NewSQLError(mysql.CRMalformedPacket, mysql.SSUnknownSQLState, "writeHandshakeResponse41: only packed %v bytes, out of %v allocated", pos, len(data)) } diff --git a/pkg/mysql/conn.go b/pkg/mysql/conn.go index 4298d9a4..260d70c5 100644 --- a/pkg/mysql/conn.go +++ b/pkg/mysql/conn.go @@ -337,6 +337,7 @@ func (c *Conn) readEphemeralPacket() ([]byte, error) { if length < mysql.MaxPacketSize { c.currentEphemeralBuffer = bufPool.Get(length) if _, err := io.ReadFull(r, *c.currentEphemeralBuffer); err != nil { + defer c.recycleReadPacket() return nil, errors.Wrapf(err, "io.ReadFull(packet body of length %v) failed", length) } return *c.currentEphemeralBuffer, nil @@ -371,12 +372,12 @@ func (c *Conn) readEphemeralPacket() ([]byte, error) { // readEphemeralPacketDirect attempts to read a packet from the socket directly. // It needs to be used for the first handshake packet the server receives, -// so we do't buffer the SSL negotiation packet. As a shortcut, only +// so we don't buffer the SSL negotiation packet. As a shortcut, only // packets smaller than MaxPacketSize can be read here. // This function usually shouldn't be used - use readEphemeralPacket. func (c *Conn) readEphemeralPacketDirect() ([]byte, error) { if c.currentEphemeralPolicy != ephemeralUnused { - panic(errors.Errorf("readEphemeralPacketDirect: unexpected currentEphemeralPolicy: %v", c.currentEphemeralPolicy)) + panic(fmt.Sprintf("readEphemeralPacketDirect: unexpected currentEphemeralPolicy: %v!", c.currentEphemeralPolicy)) } var r io.Reader = c.conn @@ -396,6 +397,7 @@ func (c *Conn) readEphemeralPacketDirect() ([]byte, error) { if length < mysql.MaxPacketSize { c.currentEphemeralBuffer = bufPool.Get(length) if _, err := io.ReadFull(r, *c.currentEphemeralBuffer); err != nil { + defer c.recycleReadPacket() return nil, errors.Wrapf(err, "io.ReadFull(packet body of length %v) failed", length) } return *c.currentEphemeralBuffer, nil @@ -409,7 +411,7 @@ func (c *Conn) readEphemeralPacketDirect() ([]byte, error) { func (c *Conn) recycleReadPacket() { if c.currentEphemeralPolicy != ephemeralRead { // Programming error. - panic(errors.Errorf("trying to call recycleReadPacket while currentEphemeralPolicy is %d", c.currentEphemeralPolicy)) + panic(fmt.Sprintf("trying to call recycleReadPacket while currentEphemeralPolicy is %d!", c.currentEphemeralPolicy)) } if c.currentEphemeralBuffer != nil { // We are using the pool, put the buffer back in. @@ -603,7 +605,7 @@ func (c *Conn) writePacket(data []byte) error { func (c *Conn) startEphemeralPacket(length int) []byte { if c.currentEphemeralPolicy != ephemeralUnused { - panic("startEphemeralPacket cannot be used while a packet is already started.") + panic(fmt.Sprintf("startEphemeralPacket cannot be used while a packet is already started, actual is %v!", c.currentEphemeralPolicy)) } c.currentEphemeralPolicy = ephemeralWrite @@ -620,11 +622,11 @@ func (c *Conn) writeEphemeralPacket() error { switch c.currentEphemeralPolicy { case ephemeralWrite: if err := c.writePacket(*c.currentEphemeralBuffer); err != nil { - return errors.WithStack(errors.Wrapf(err, "conn %v", c.ID())) + return errors.Wrapf(err, "conn %v", c.ID()) } case ephemeralUnused, ephemeralRead: // Programming error. - panic(errors.Errorf("conn %v: trying to call writeEphemeralPacket while currentEphemeralPolicy is %v", c.ID(), c.currentEphemeralPolicy)) + panic(fmt.Sprintf("conn %v: trying to call writeEphemeralPacket while currentEphemeralPolicy is %v!", c.ID(), c.currentEphemeralPolicy)) } return nil @@ -635,7 +637,7 @@ func (c *Conn) writeEphemeralPacket() error { func (c *Conn) recycleWritePacket() { if c.currentEphemeralPolicy != ephemeralWrite { // Programming error. - panic(errors.Errorf("trying to call recycleWritePacket while currentEphemeralPolicy is %d", c.currentEphemeralPolicy)) + panic(fmt.Sprintf("trying to call recycleWritePacket while currentEphemeralPolicy is %d!", c.currentEphemeralPolicy)) } // Release our reference so the buffer can be gced bufPool.Put(c.currentEphemeralBuffer) @@ -772,7 +774,7 @@ func (c *Conn) writeErrorPacket(errorCode uint16, sqlState string, format string sqlState = mysql.SSUnknownSQLState } if len(sqlState) != 5 { - panic("sqlState has to be 5 characters long") + panic(fmt.Sprintf("sqlState has to be 5 characters long, actual is %d!", len(sqlState))) } pos = writeEOFString(data, pos, sqlState) _ = writeEOFString(data, pos, errorMessage) diff --git a/pkg/mysql/execute_handle.go b/pkg/mysql/execute_handle.go index 083b9108..bcfc36d8 100644 --- a/pkg/mysql/execute_handle.go +++ b/pkg/mysql/execute_handle.go @@ -35,6 +35,7 @@ import ( "github.com/arana-db/arana/pkg/security" "github.com/arana-db/arana/pkg/trace" "github.com/arana-db/arana/pkg/util/log" + "github.com/arana-db/arana/pkg/util/misc" ) func (l *Listener) handleInitDB(c *Conn, ctx *proto.Context) error { @@ -117,6 +118,8 @@ func (l *Listener) handleQuery(c *Conn, ctx *proto.Context) error { statusFlag |= mysql.ServerMoreResultsExists } + _ = misc.TryClose(result) + if err := c.writeOKPacket(affected, insertId, statusFlag, warn); err != nil { log.Errorf("failed to write OK packet into client %v: %v", ctx.C.ID(), err) return err diff --git a/pkg/mysql/server.go b/pkg/mysql/server.go index 15012985..10266f80 100644 --- a/pkg/mysql/server.go +++ b/pkg/mysql/server.go @@ -246,7 +246,7 @@ func (l *Listener) handshake(c *Conn) error { return err } - c.recycleReadPacket() + defer c.recycleReadPacket() handshake, err := l.parseClientHandshakePacket(true, response) if err != nil { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 79fc1f9c..6e23b279 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "io" "sync" "time" ) @@ -413,12 +412,6 @@ func (pi *defaultRuntime) Exec(ctx context.Context, db string, query string, arg if err != nil { return nil, perrors.WithStack(err) } - - if closer, ok := res.(io.Closer); ok { - defer func() { - _ = closer.Close() - }() - } return res, nil } diff --git a/pkg/util/misc/misc.go b/pkg/util/misc/misc.go index 86c89959..8a20dfab 100644 --- a/pkg/util/misc/misc.go +++ b/pkg/util/misc/misc.go @@ -18,6 +18,7 @@ package misc import ( + "io" "regexp" "sync" ) @@ -48,3 +49,10 @@ func ParseTable(input string) (db, tbl string, err error) { tbl = mat[2] return } + +func TryClose(i interface{}) error { + if c, ok := i.(io.Closer); ok { + return c.Close() + } + return nil +}