Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Expire idle connections no longer acquired during lifetime" #958

Merged
merged 1 commit into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ func (ch *clickhouse) dial(ctx context.Context) (conn *connect, err error) {
if err != nil {
return nil, err
}
go result.conn.closeAfterMaxLifeTime()
return result.conn, nil
}

Expand Down
45 changes: 5 additions & 40 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"log"
"net"
"os"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -129,8 +128,6 @@ type connect struct {
readTimeout time.Duration
blockBufferSize uint8
maxCompressionBuffer int

rwLock sync.Mutex
}

func (c *connect) settings(querySettings Settings) []proto.Setting {
Expand All @@ -151,7 +148,8 @@ func (c *connect) settings(querySettings Settings) []proto.Setting {
}

func (c *connect) isBad() bool {
if c.isClosed() {
switch {
case c.closed:
return true
}

Expand All @@ -165,48 +163,15 @@ func (c *connect) isBad() bool {
return false
}

// closeAfterMaxLifeTime closes the connection if it has been used for longer than ConnMaxLifeTime
func (c *connect) closeAfterMaxLifeTime() {
t := time.NewTimer(c.opt.ConnMaxLifetime)
defer t.Stop()

// check if connection should be closed after duration of ConnMaxLifeTime
// if connection is closed, return
for {
select {
case <-t.C:
c.close()
return
default:
if c.isClosed() {
return
}

time.Sleep(time.Second)
}
}
}

func (c *connect) isClosed() bool {
c.rwLock.Lock()
defer c.rwLock.Unlock()

return c.closed
}

func (c *connect) close() error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

if c.closed {
return nil
}

c.closed = true
c.buffer = nil
c.reader = nil
if err := c.conn.Close(); err != nil {
c.debugf("[close] %s", err)
return err
}
return nil
}
Expand Down Expand Up @@ -271,10 +236,10 @@ func (c *connect) sendData(block *proto.Block, name string) error {
if err := c.flush(); err != nil {
if errors.Is(err, syscall.EPIPE) {
c.debugf("[send data] pipe is broken, closing connection")
c.close()
c.closed = true
} else if errors.Is(err, io.EOF) {
c.debugf("[send data] unexpected EOF, closing connection")
c.close()
c.closed = true
} else {
c.debugf("[send data] unexpected error: %v", err)
}
Expand Down
6 changes: 0 additions & 6 deletions conn_handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ import (
)

func (c *connect) handshake(database, username, password string) error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

defer c.buffer.Reset()
c.debugf("[handshake] -> %s", proto.ClientHandshake{})
// set a read deadline - alternative to context.Read operation will fail if no data is received after deadline.
Expand Down Expand Up @@ -86,9 +83,6 @@ func (c *connect) handshake(database, username, password string) error {
}

func (c *connect) sendAddendum() error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

if c.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY {
c.buffer.PutString("") // todo quota key support
}
Expand Down
3 changes: 0 additions & 3 deletions conn_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ import (
// Connection::ping
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
func (c *connect) ping(ctx context.Context) (err error) {
c.rwLock.Lock()
defer c.rwLock.Unlock()

// set a read deadline - alternative to context.Read operation will fail if no data is received after deadline.
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
defer c.conn.SetReadDeadline(time.Time{})
Expand Down
5 changes: 0 additions & 5 deletions conn_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ func (c *connect) process(ctx context.Context, on *onProcess) error {
return ctx.Err()
default:
}
c.rwLock.Lock()
packet, err := c.reader.ReadByte()
c.rwLock.Unlock()
if err != nil {
return err
}
Expand All @@ -84,9 +82,6 @@ func (c *connect) process(ctx context.Context, on *onProcess) error {
}

func (c *connect) handle(ctx context.Context, packet byte, on *onProcess) error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

switch packet {
case proto.ServerData, proto.ServerTotals, proto.ServerExtremes:
block, err := c.readData(ctx, packet, true)
Expand Down
3 changes: 0 additions & 3 deletions conn_send_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import (
// Connection::sendQuery
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
func (c *connect) sendQuery(body string, o *QueryOptions) error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

c.debugf("[send query] compression=%q %s", c.compression, body)
c.buffer.PutByte(proto.ClientQuery)
q := proto.Query{
Expand Down
55 changes: 0 additions & 55 deletions tests/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"os"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -306,57 +305,3 @@ func TestEmptyDatabaseConfig(t *testing.T) {
err = anotherConn.Ping(context.Background())
require.NoError(t, err)
}

func TestConnectionExpiresIdleConnection(t *testing.T) {
runInDocker, _ := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_DOCKER", "true"))
if !runInDocker {
t.Skip("Skip test in cloud environment. This test is not stable in cloud environment, due to race conditions.")
}

// given
ctx := context.Background()
testEnv, err := GetTestEnvironment(testSet)
require.NoError(t, err)

baseConn, err := testClientWithDefaultSettings(testEnv)
require.NoError(t, err)

expectedConnections := getActiveConnections(t, baseConn)

// when the client is configured to expire idle connections after 1/10 of a second
opts := clientOptionsFromEnv(testEnv, clickhouse.Settings{})
opts.MaxIdleConns = 20
opts.MaxOpenConns = 20
opts.ConnMaxLifetime = time.Second / 10
conn, err := clickhouse.Open(&opts)
require.NoError(t, err)

// run 1000 queries in parallel
var wg sync.WaitGroup
const selectToRunAtOnce = 1000
for i := 0; i < selectToRunAtOnce; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r, err := conn.Query(ctx, "SELECT 1")
require.NoError(t, err)

r.Close()
}()
}
wg.Wait()

// then we expect that all connections will be closed when they are idle
// retrying for 10 seconds to make sure that the connections are closed
assert.Eventuallyf(t, func() bool {
return getActiveConnections(t, baseConn) == expectedConnections
}, time.Second*10, opts.ConnMaxLifetime, "expected connections to be reset back to %d", expectedConnections)
}

func getActiveConnections(t *testing.T, client clickhouse.Conn) (conns int64) {
ctx := context.Background()
r := client.QueryRow(ctx, "SELECT sum(value) as conns FROM system.metrics WHERE metric LIKE '%Connection'")
require.NoError(t, r.Err())
require.NoError(t, r.Scan(&conns))
return conns
}