From 9b802da3d748f98c28ca961d7062ba0d68424e0a Mon Sep 17 00:00:00 2001 From: PlanetScale Actions Bot <60239337+planetscale-actions-bot@users.noreply.github.com> Date: Wed, 12 Jul 2023 04:00:01 -0400 Subject: [PATCH] Enable Tcp keep alive and provide keep alive period setting (#13434) (#2573) * tcp keep alive and period settings * added e2e test for tcp keep alive * added unit test and removed e2e test due to cgo dependency * addressed review comments * introduce a marker on conn to know if keepalive is on, used for testing --------- Signed-off-by: Harshit Gangal Co-authored-by: Harshit Gangal --- go/flags/endtoend/vtgate.txt | 1 + go/mysql/auth_server_clientcert_test.go | 4 +- go/mysql/client_test.go | 10 +-- go/mysql/conn.go | 31 ++++++++ go/mysql/fakesqldb/server.go | 6 +- go/mysql/handshake_test.go | 4 +- go/mysql/server.go | 62 ++++++++------- go/mysql/server_flaky_test.go | 75 +++++++++++++------ .../endtoend/vtgate/queries/misc/misc_test.go | 8 +- go/vt/vtgate/plugin_mysql_server.go | 5 ++ 10 files changed, 138 insertions(+), 68 deletions(-) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 6b59edcee06..fdfdc86c8ab 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -111,6 +111,7 @@ Usage of vtgate: --max_payload_size int The threshold for query payloads in bytes. A payload greater than this threshold will result in a failure to handle the query. --message_stream_grace_period duration the amount of time to give for a vttablet to resume if it ends a message stream, usually because of a reparent. (default 30s) --min_number_serving_vttablets int The minimum number of vttablets for each replicating tablet_type (e.g. replica, rdonly) that will be continue to be used even with replication lag above discovery_low_replication_lag, but still below discovery_high_replication_lag_minimum_serving. (default 2) + --mysql-server-keepalive-period duration TCP period between keep-alives --mysql-server-pool-conn-read-buffers If set, the server will pool incoming connection read buffers --mysql_allow_clear_text_without_tls If set, the server will allow the use of a clear text password over non-SSL connections. --mysql_auth_server_impl string Which auth server implementation to use. Options: none, ldap, clientcert, static, vault. (default "static") diff --git a/go/mysql/auth_server_clientcert_test.go b/go/mysql/auth_server_clientcert_test.go index 4528ee5dbf4..28ed19fd9c5 100644 --- a/go/mysql/auth_server_clientcert_test.go +++ b/go/mysql/auth_server_clientcert_test.go @@ -45,7 +45,7 @@ func TestValidCert(t *testing.T) { authServer := newAuthServerClientCert() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed: %v", err) defer l.Close() host := l.Addr().(*net.TCPAddr).IP.String() @@ -114,7 +114,7 @@ func TestNoCert(t *testing.T) { authServer := newAuthServerClientCert() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed: %v", err) defer l.Close() host := l.Addr().(*net.TCPAddr).IP.String() diff --git a/go/mysql/client_test.go b/go/mysql/client_test.go index f9db5cee523..ddbb7f19f06 100644 --- a/go/mysql/client_test.go +++ b/go/mysql/client_test.go @@ -149,7 +149,7 @@ func TestTLSClientDisabled(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() @@ -221,7 +221,7 @@ func TestTLSClientPreferredDefault(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() @@ -294,7 +294,7 @@ func TestTLSClientRequired(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() @@ -341,7 +341,7 @@ func TestTLSClientVerifyCA(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() @@ -424,7 +424,7 @@ func TestTLSClientVerifyIdentity(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() diff --git a/go/mysql/conn.go b/go/mysql/conn.go index 3dfc96398c9..4b22e99e480 100644 --- a/go/mysql/conn.go +++ b/go/mysql/conn.go @@ -199,6 +199,10 @@ type Conn struct { // See: ConnParams.EnableQueryInfo enableQueryInfo bool + // keepAliveOn marks when keep alive is active on the connection. + // This is currently used for testing. + keepAliveOn bool + // mu protects the fields below mu sync.Mutex // cancel keep the cancel function for the current executing query. @@ -254,10 +258,21 @@ func newConn(conn net.Conn) *Conn { // the server is shutting down, and has the ability to control buffer // size for reads. func newServerConn(conn net.Conn, listener *Listener) *Conn { + // Enable KeepAlive on TCP connections and change keep-alive period if provided. + enabledKeepAlive := false + if tcpConn, ok := conn.(*net.TCPConn); ok { + if err := setTcpConnProperties(tcpConn, listener.connKeepAlivePeriod); err != nil { + log.Errorf("error in setting tcp properties: %v", err) + } else { + enabledKeepAlive = true + } + } + c := &Conn{ conn: conn, listener: listener, PrepareData: make(map[uint32]*PrepareData), + keepAliveOn: enabledKeepAlive, } if listener.connReadBufferSize > 0 { @@ -275,6 +290,22 @@ func newServerConn(conn net.Conn, listener *Listener) *Conn { return c } +func setTcpConnProperties(conn *net.TCPConn, keepAlivePeriod time.Duration) error { + if err := conn.SetKeepAlive(true); err != nil { + return vterrors.Wrapf(err, "unable to enable keepalive on tcp connection") + } + + if keepAlivePeriod <= 0 { + return nil + } + + if err := conn.SetKeepAlivePeriod(keepAlivePeriod); err != nil { + return vterrors.Wrapf(err, "unable to set keepalive period on tcp connection") + } + + return nil +} + // startWriterBuffering starts using buffered writes. This should // be terminated by a call to endWriteBuffering. func (c *Conn) startWriterBuffering() { diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index f43f63c0d53..746f82aed2a 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -188,7 +188,7 @@ func New(t testing.TB) *DB { authServer := mysql.NewAuthServerNone() // Start listening. - db.listener, err = mysql.NewListener("unix", socketFile, authServer, db, 0, 0, false, false) + db.listener, err = mysql.NewListener("unix", socketFile, authServer, db, 0, 0, false, false, 0) if err != nil { t.Fatalf("NewListener failed: %v", err) } @@ -382,7 +382,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R if db.shouldClose.Load() { c.Close() - //log error + // log error if err := callback(&sqltypes.Result{}); err != nil { log.Errorf("callback failed : %v", err) } @@ -393,7 +393,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R // The driver may send this at connection time, and we don't want it to // interfere. if key == "set names utf8" || strings.HasPrefix(key, "set collation_connection = ") { - //log error + // log error if err := callback(&sqltypes.Result{}); err != nil { log.Errorf("callback failed : %v", err) } diff --git a/go/mysql/handshake_test.go b/go/mysql/handshake_test.go index b6532f830b3..c2b27d6f6d4 100644 --- a/go/mysql/handshake_test.go +++ b/go/mysql/handshake_test.go @@ -45,7 +45,7 @@ func TestClearTextClientAuth(t *testing.T) { defer authServer.close() // Create the listener. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed: %v", err) defer l.Close() host := l.Addr().(*net.TCPAddr).IP.String() @@ -99,7 +99,7 @@ func TestSSLConnection(t *testing.T) { defer authServer.close() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed: %v", err) defer l.Close() host := l.Addr().(*net.TCPAddr).IP.String() diff --git a/go/mysql/server.go b/go/mysql/server.go index 92be26161ee..1ebebb56342 100644 --- a/go/mysql/server.go +++ b/go/mysql/server.go @@ -194,6 +194,9 @@ type Listener struct { // connBufferPooling configures if vtgate server pools connection buffers connBufferPooling bool + // connKeepAlivePeriod is period between tcp keep-alives. + connKeepAlivePeriod time.Duration + // shutdown indicates that Shutdown method was called. shutdown atomic.Bool @@ -216,15 +219,17 @@ func NewFromListener( connReadTimeout time.Duration, connWriteTimeout time.Duration, connBufferPooling bool, + keepAlivePeriod time.Duration, ) (*Listener, error) { cfg := ListenerConfig{ - Listener: l, - AuthServer: authServer, - Handler: handler, - ConnReadTimeout: connReadTimeout, - ConnWriteTimeout: connWriteTimeout, - ConnReadBufferSize: connBufferSize, - ConnBufferPooling: connBufferPooling, + Listener: l, + AuthServer: authServer, + Handler: handler, + ConnReadTimeout: connReadTimeout, + ConnWriteTimeout: connWriteTimeout, + ConnReadBufferSize: connBufferSize, + ConnBufferPooling: connBufferPooling, + ConnKeepAlivePeriod: keepAlivePeriod, } return NewListenerWithConfig(cfg) } @@ -238,6 +243,7 @@ func NewListener( connWriteTimeout time.Duration, proxyProtocol bool, connBufferPooling bool, + keepAlivePeriod time.Duration, ) (*Listener, error) { listener, err := net.Listen(protocol, address) if err != nil { @@ -245,24 +251,25 @@ func NewListener( } if proxyProtocol { proxyListener := &proxyproto.Listener{Listener: listener} - return NewFromListener(proxyListener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling) + return NewFromListener(proxyListener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling, keepAlivePeriod) } - return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling) + return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling, keepAlivePeriod) } // ListenerConfig should be used with NewListenerWithConfig to specify listener parameters. type ListenerConfig struct { // Protocol-Address pair and Listener are mutually exclusive parameters - Protocol string - Address string - Listener net.Listener - AuthServer AuthServer - Handler Handler - ConnReadTimeout time.Duration - ConnWriteTimeout time.Duration - ConnReadBufferSize int - ConnBufferPooling bool + Protocol string + Address string + Listener net.Listener + AuthServer AuthServer + Handler Handler + ConnReadTimeout time.Duration + ConnWriteTimeout time.Duration + ConnReadBufferSize int + ConnBufferPooling bool + ConnKeepAlivePeriod time.Duration } // NewListenerWithConfig creates new listener using provided config. There are @@ -280,15 +287,16 @@ func NewListenerWithConfig(cfg ListenerConfig) (*Listener, error) { } return &Listener{ - authServer: cfg.AuthServer, - handler: cfg.Handler, - listener: l, - ServerVersion: servenv.AppVersion.MySQLVersion(), - connectionID: 1, - connReadTimeout: cfg.ConnReadTimeout, - connWriteTimeout: cfg.ConnWriteTimeout, - connReadBufferSize: cfg.ConnReadBufferSize, - connBufferPooling: cfg.ConnBufferPooling, + authServer: cfg.AuthServer, + handler: cfg.Handler, + listener: l, + ServerVersion: servenv.AppVersion.MySQLVersion(), + connectionID: 1, + connReadTimeout: cfg.ConnReadTimeout, + connWriteTimeout: cfg.ConnWriteTimeout, + connReadBufferSize: cfg.ConnReadBufferSize, + connBufferPooling: cfg.ConnBufferPooling, + connKeepAlivePeriod: cfg.ConnKeepAlivePeriod, }, nil } diff --git a/go/mysql/server_flaky_test.go b/go/mysql/server_flaky_test.go index ad8c324c96e..963a0aa7bf8 100644 --- a/go/mysql/server_flaky_test.go +++ b/go/mysql/server_flaky_test.go @@ -263,7 +263,7 @@ func TestConnectionFromListener(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:") require.NoError(t, err, "net.Listener failed") - l, err := NewFromListener(listener, authServer, th, 0, 0, false) + l, err := NewFromListener(listener, authServer, th, 0, 0, false, 0) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -292,7 +292,7 @@ func TestConnectionWithoutSourceHost(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -325,7 +325,7 @@ func TestConnectionWithSourceHost(t *testing.T) { } defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -358,7 +358,7 @@ func TestConnectionUseMysqlNativePasswordWithSourceHost(t *testing.T) { } defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -396,7 +396,7 @@ func TestConnectionUnixSocket(t *testing.T) { os.Remove(unixSocket.Name()) - l, err := NewListener("unix", unixSocket.Name(), authServer, th, 0, 0, false, false) + l, err := NewListener("unix", unixSocket.Name(), authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -422,7 +422,7 @@ func TestClientFoundRows(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -471,7 +471,7 @@ func TestConnCounts(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed") defer l.Close() go l.Accept() @@ -503,12 +503,12 @@ func TestConnCounts(t *testing.T) { // Test after closing connections. time.Sleep lets it work, but seems flakey. c.Close() - //time.Sleep(10 * time.Millisecond) - //checkCountsForUser(t, user, 1) + // time.Sleep(10 * time.Millisecond) + // checkCountsForUser(t, user, 1) c2.Close() - //time.Sleep(10 * time.Millisecond) - //checkCountsForUser(t, user, 0) + // time.Sleep(10 * time.Millisecond) + // checkCountsForUser(t, user, 0) } func checkCountsForUser(t *testing.T, user string, expected int64) { @@ -528,7 +528,7 @@ func TestServer(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) l.SlowConnectWarnThreshold.Store(time.Nanosecond.Nanoseconds()) defer l.Close() @@ -628,7 +628,7 @@ func TestServerStats(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) l.SlowConnectWarnThreshold.Store(time.Nanosecond.Nanoseconds()) defer l.Close() @@ -702,7 +702,7 @@ func TestClearTextServer(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() go l.Accept() @@ -775,7 +775,7 @@ func TestDialogServer(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) l.AllowClearTextWithoutTLS.Store(true) defer l.Close() @@ -818,7 +818,7 @@ func TestTLSServer(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() @@ -870,7 +870,7 @@ func TestTLSServer(t *testing.T) { // Run a 'select rows' command with results. conn, err := Connect(context.Background(), params) - //output, ok := runMysql(t, params, "select rows") + // output, ok := runMysql(t, params, "select rows") require.NoError(t, err) results, err := conn.ExecuteFetch("select rows", 1000, true) require.NoError(t, err) @@ -916,7 +916,7 @@ func TestTLSRequired(t *testing.T) { // Below, we are enabling --ssl-verify-server-cert, which adds // a check that the common name of the certificate matches the // server host name we connect to. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() @@ -1005,7 +1005,7 @@ func TestCachingSha2PasswordAuthWithTLS(t *testing.T) { defer authServer.close() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed: %v", err) defer l.Close() host := l.Addr().(*net.TCPAddr).IP.String() @@ -1099,7 +1099,7 @@ func TestCachingSha2PasswordAuthWithMoreData(t *testing.T) { defer authServer.close() // Create the listener, so we can get its host. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed: %v", err) defer l.Close() host := l.Addr().(*net.TCPAddr).IP.String() @@ -1168,7 +1168,7 @@ func TestCachingSha2PasswordAuthWithoutTLS(t *testing.T) { defer authServer.close() // Create the listener. - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err, "NewListener failed: %v", err) defer l.Close() host := l.Addr().(*net.TCPAddr).IP.String() @@ -1210,7 +1210,7 @@ func TestErrorCodes(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() go l.Accept() @@ -1388,7 +1388,7 @@ func TestListenerShutdown(t *testing.T) { UserData: "userData1", }} defer authServer.close() - l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", authServer, th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() go l.Accept() @@ -1461,7 +1461,7 @@ func TestServerFlush(t *testing.T) { th := &testHandler{} - l, err := NewListener("tcp", "127.0.0.1:", NewAuthServerNone(), th, 0, 0, false, false) + l, err := NewListener("tcp", "127.0.0.1:", NewAuthServerNone(), th, 0, 0, false, false, 0) require.NoError(t, err) defer l.Close() go l.Accept() @@ -1503,3 +1503,30 @@ func TestServerFlush(t *testing.T) { require.NoError(t, err) assert.Nil(t, row) } + +func TestTcpKeepAlive(t *testing.T) { + th := &testHandler{} + l, err := NewListener("tcp", "127.0.0.1:", NewAuthServerNone(), th, 0, 0, false, false, 0) + require.NoError(t, err) + defer l.Close() + go l.Accept() + + host, port := getHostPort(t, l.Addr()) + params := &ConnParams{ + Host: host, + Port: port, + } + + // on connect, the tcp method should be called. + c, err := Connect(context.Background(), params) + require.NoError(t, err) + defer c.Close() + require.True(t, th.lastConn.keepAliveOn, "tcp property method not called") + + // close the connection + th.lastConn.Close() + + // now calling this method should fail. + err = setTcpConnProperties(th.lastConn.conn.(*net.TCPConn), 0) + require.ErrorContains(t, err, "unable to enable keepalive on tcp connection") +} diff --git a/go/test/endtoend/vtgate/queries/misc/misc_test.go b/go/test/endtoend/vtgate/queries/misc/misc_test.go index 667e59ed1ea..710f4934786 100644 --- a/go/test/endtoend/vtgate/queries/misc/misc_test.go +++ b/go/test/endtoend/vtgate/queries/misc/misc_test.go @@ -24,7 +24,6 @@ import ( "testing" _ "github.com/go-sql-driver/mysql" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -37,7 +36,7 @@ func start(t *testing.T) (utils.MySQLCompare, func()) { require.NoError(t, err) deleteAll := func() { - tables := []string{"t1"} + tables := []string{"t1", "uks.unsharded"} for _, table := range tables { _, _ = mcmp.ExecAndIgnore("delete from " + table) } @@ -126,7 +125,7 @@ func TestQueryTimeoutWithTables(t *testing.T) { // unsharded utils.Exec(t, mcmp.VtConn, "insert /*vt+ QUERY_TIMEOUT_MS=1000 */ into uks.unsharded(id1) values (1),(2),(3),(4),(5)") for i := 0; i < 12; i++ { - utils.Exec(t, mcmp.VtConn, "insert /*vt+ QUERY_TIMEOUT_MS=1000 */ into uks.unsharded(id1) select id1+5 from uks.unsharded") + utils.Exec(t, mcmp.VtConn, "insert /*vt+ QUERY_TIMEOUT_MS=2000 */ into uks.unsharded(id1) select id1+5 from uks.unsharded") } utils.Exec(t, mcmp.VtConn, "select count(*) from uks.unsharded where id1 > 31") @@ -304,12 +303,11 @@ func TestPrepareStatements(t *testing.T) { assert.ErrorContains(t, err, "VT09011: Unknown prepared statement handler (prep_art) given to DEALLOCATE PREPARE") } +// TestBuggyOuterJoin validates inconsistencies around outer joins, adding these tests to stop regressions. func TestBuggyOuterJoin(t *testing.T) { - // We found a couple of inconsistencies around outer joins, adding these tests to stop regressions mcmp, closer := start(t) defer closer() mcmp.Exec("insert into t1(id1, id2) values (1,2), (42,5), (5, 42)") - mcmp.Exec("select t1.id1, t2.id1 from t1 left join t1 as t2 on t2.id1 = t2.id2") } diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index 1775709bbc0..88841c670a4 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -62,6 +62,7 @@ var ( mysqlSslServerCA string mysqlTLSMinVersion string + mysqlKeepAlivePeriod time.Duration mysqlConnReadTimeout time.Duration mysqlConnWriteTimeout time.Duration mysqlQueryTimeout time.Duration @@ -94,6 +95,7 @@ func registerPluginFlags(fs *pflag.FlagSet) { fs.DurationVar(&mysqlConnWriteTimeout, "mysql_server_write_timeout", mysqlConnWriteTimeout, "connection write timeout") fs.DurationVar(&mysqlQueryTimeout, "mysql_server_query_timeout", mysqlQueryTimeout, "mysql query timeout") fs.BoolVar(&mysqlConnBufferPooling, "mysql-server-pool-conn-read-buffers", mysqlConnBufferPooling, "If set, the server will pool incoming connection read buffers") + fs.DurationVar(&mysqlKeepAlivePeriod, "mysql-server-keepalive-period", mysqlKeepAlivePeriod, "TCP period between keep-alives") fs.StringVar(&mysqlDefaultWorkloadName, "mysql_default_workload", mysqlDefaultWorkloadName, "Default session workload (OLTP, OLAP, DBA)") } @@ -506,6 +508,7 @@ func initMySQLProtocol() { mysqlConnWriteTimeout, mysqlProxyProtocol, mysqlConnBufferPooling, + mysqlKeepAlivePeriod, ) if err != nil { log.Exitf("mysql.NewListener failed: %v", err) @@ -556,6 +559,7 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys mysqlConnWriteTimeout, false, mysqlConnBufferPooling, + mysqlKeepAlivePeriod, ) switch err := err.(type) { @@ -587,6 +591,7 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys mysqlConnWriteTimeout, false, mysqlConnBufferPooling, + mysqlKeepAlivePeriod, ) return listener, listenerErr default: