Skip to content

Commit 75d2df3

Browse files
authored
[PIP-165] Auto release idle connections (#963)
### Motivation The go implementation of PIP-165:apache/pulsar#15516 ### Modifications * Add new configuration `ConnectionMaxIdleTime` to `ClientOptions` * Add a goroutine to `ConnectionPool` to period check and release idle connections.
1 parent 48b7d01 commit 75d2df3

File tree

6 files changed

+244
-3
lines changed

6 files changed

+244
-3
lines changed

pulsar/client.go

+4
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ type ClientOptions struct {
143143
// Default prometheus.DefaultRegisterer
144144
MetricsRegisterer prometheus.Registerer
145145

146+
// Release the connection if it is not used for more than ConnectionMaxIdleTime.
147+
// Default is 60 seconds, negative such as -1 to disable.
148+
ConnectionMaxIdleTime time.Duration
149+
146150
EnableTransaction bool
147151

148152
// Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput.

pulsar/client_impl.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ const (
3434
defaultOperationTimeout = 30 * time.Second
3535
defaultKeepAliveInterval = 30 * time.Second
3636
defaultMemoryLimitBytes = 64 * 1024 * 1024
37+
defaultConnMaxIdleTime = 180 * time.Second
38+
minConnMaxIdleTime = 60 * time.Second
3739
)
3840

3941
type client struct {
@@ -56,6 +58,16 @@ func newClient(options ClientOptions) (Client, error) {
5658
logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
5759
}
5860

61+
connectionMaxIdleTime := options.ConnectionMaxIdleTime
62+
if connectionMaxIdleTime == 0 {
63+
connectionMaxIdleTime = defaultConnMaxIdleTime
64+
} else if connectionMaxIdleTime > 0 && connectionMaxIdleTime < minConnMaxIdleTime {
65+
return nil, newError(InvalidConfiguration, fmt.Sprintf("Connection max idle time should be at least %f "+
66+
"seconds", minConnMaxIdleTime.Seconds()))
67+
} else {
68+
logger.Debugf("Disable auto release idle connections")
69+
}
70+
5971
if options.URL == "" {
6072
return nil, newError(InvalidConfiguration, "URL is required for client")
6173
}
@@ -143,7 +155,7 @@ func newClient(options ClientOptions) (Client, error) {
143155

144156
c := &client{
145157
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
146-
maxConnectionsPerHost, logger, metrics),
158+
maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime),
147159
log: logger,
148160
metrics: metrics,
149161
memLimit: internal.NewMemoryLimitController(memLimitBytes),

pulsar/client_impl_test.go

+115
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"crypto/tls"
2323
"fmt"
24+
"log"
2425
"net/http"
2526
"net/http/httptest"
2627
"os"
@@ -1123,3 +1124,117 @@ func TestServiceUrlTLSWithTLSTransportWithBasicAuth(t *testing.T) {
11231124
func TestWebServiceUrlTLSWithTLSTransportWithBasicAuth(t *testing.T) {
11241125
testTLSTransportWithBasicAuth(t, webServiceURLTLS)
11251126
}
1127+
1128+
func TestConfigureConnectionMaxIdleTime(t *testing.T) {
1129+
_, err := NewClient(ClientOptions{
1130+
URL: serviceURL,
1131+
ConnectionMaxIdleTime: 1 * time.Second,
1132+
})
1133+
1134+
assert.Error(t, err, "Should be failed when the connectionMaxIdleTime is less than minConnMaxIdleTime")
1135+
1136+
cli, err := NewClient(ClientOptions{
1137+
URL: serviceURL,
1138+
ConnectionMaxIdleTime: -1, // Disabled
1139+
})
1140+
1141+
assert.Nil(t, err)
1142+
cli.Close()
1143+
1144+
cli, err = NewClient(ClientOptions{
1145+
URL: serviceURL,
1146+
ConnectionMaxIdleTime: 60 * time.Second,
1147+
})
1148+
1149+
assert.Nil(t, err)
1150+
cli.Close()
1151+
}
1152+
1153+
func testSendAndReceive(t *testing.T, producer Producer, consumer Consumer) {
1154+
// send 10 messages
1155+
for i := 0; i < 10; i++ {
1156+
if _, err := producer.Send(context.Background(), &ProducerMessage{
1157+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
1158+
}); err != nil {
1159+
log.Fatal(err)
1160+
}
1161+
}
1162+
1163+
// receive 10 messages
1164+
for i := 0; i < 10; i++ {
1165+
msg, err := consumer.Receive(context.Background())
1166+
if err != nil {
1167+
log.Fatal(err)
1168+
}
1169+
1170+
expectMsg := fmt.Sprintf("hello-%d", i)
1171+
assert.Equal(t, []byte(expectMsg), msg.Payload())
1172+
// ack message
1173+
err = consumer.Ack(msg)
1174+
if err != nil {
1175+
return
1176+
}
1177+
}
1178+
}
1179+
1180+
func TestAutoCloseIdleConnection(t *testing.T) {
1181+
cli, err := NewClient(ClientOptions{
1182+
URL: serviceURL,
1183+
ConnectionMaxIdleTime: -1, // Disable auto release connections first, we will enable it manually later
1184+
})
1185+
1186+
assert.Nil(t, err)
1187+
1188+
topic := "TestAutoCloseIdleConnection"
1189+
1190+
// create consumer
1191+
consumer1, err := cli.Subscribe(ConsumerOptions{
1192+
Topic: topic,
1193+
SubscriptionName: "my-sub",
1194+
})
1195+
assert.Nil(t, err)
1196+
1197+
// create producer
1198+
producer1, err := cli.CreateProducer(ProducerOptions{
1199+
Topic: topic,
1200+
DisableBatching: false,
1201+
})
1202+
assert.Nil(t, err)
1203+
1204+
testSendAndReceive(t, producer1, consumer1)
1205+
1206+
pool := cli.(*client).cnxPool
1207+
1208+
producer1.Close()
1209+
consumer1.Close()
1210+
1211+
assert.NotEqual(t, 0, internal.GetConnectionsCount(&pool))
1212+
1213+
internal.StartCleanConnectionsTask(&pool, 2*time.Second) // Enable auto idle connections release manually
1214+
1215+
time.Sleep(6 * time.Second) // Need to wait at least 3 * ConnectionMaxIdleTime
1216+
1217+
assert.Equal(t, 0, internal.GetConnectionsCount(&pool))
1218+
1219+
// create consumer
1220+
consumer2, err := cli.Subscribe(ConsumerOptions{
1221+
Topic: topic,
1222+
SubscriptionName: "my-sub",
1223+
})
1224+
assert.Nil(t, err)
1225+
1226+
// create producer
1227+
producer2, err := cli.CreateProducer(ProducerOptions{
1228+
Topic: topic,
1229+
DisableBatching: false,
1230+
})
1231+
assert.Nil(t, err)
1232+
1233+
// Ensure the client still works
1234+
testSendAndReceive(t, producer2, consumer2)
1235+
1236+
producer2.Close()
1237+
consumer2.Close()
1238+
1239+
cli.Close()
1240+
}

pulsar/internal/connection.go

+48
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ type connection struct {
170170
metrics *Metrics
171171

172172
keepAliveInterval time.Duration
173+
174+
lastActive time.Time
173175
}
174176

175177
// connectionOptions defines configurations for creating connection.
@@ -927,6 +929,52 @@ func (c *connection) UnregisterListener(id uint64) {
927929
delete(c.listeners, id)
928930
}
929931

932+
func (c *connection) ResetLastActive() {
933+
c.Lock()
934+
defer c.Unlock()
935+
c.lastActive = time.Now()
936+
}
937+
938+
func (c *connection) isIdle() bool {
939+
{
940+
c.pendingLock.Lock()
941+
defer c.pendingLock.Unlock()
942+
if len(c.pendingReqs) != 0 {
943+
return false
944+
}
945+
}
946+
947+
{
948+
c.listenersLock.RLock()
949+
defer c.listenersLock.RUnlock()
950+
if len(c.listeners) != 0 {
951+
return false
952+
}
953+
}
954+
955+
{
956+
c.consumerHandlersLock.Lock()
957+
defer c.consumerHandlersLock.Unlock()
958+
if len(c.consumerHandlers) != 0 {
959+
return false
960+
}
961+
}
962+
963+
if len(c.incomingRequestsCh) != 0 || len(c.writeRequestsCh) != 0 {
964+
return false
965+
}
966+
return true
967+
}
968+
969+
func (c *connection) CheckIdle(maxIdleTime time.Duration) bool {
970+
// We don't need to lock here because this method should only be
971+
// called in a single goroutine of the connectionPool
972+
if !c.isIdle() {
973+
c.lastActive = time.Now()
974+
}
975+
return time.Since(c.lastActive) > maxIdleTime
976+
}
977+
930978
// Close closes the connection by
931979
// closing underlying socket connection and closeCh.
932980
// This also triggers callbacks to the ConnectionClosed listeners.

pulsar/internal/connection_pool.go

+31-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type connectionPool struct {
4747
maxConnectionsPerHost int32
4848
roundRobinCnt int32
4949
keepAliveInterval time.Duration
50+
closeCh chan struct{}
5051

5152
metrics *Metrics
5253
log log.Logger
@@ -60,8 +61,9 @@ func NewConnectionPool(
6061
keepAliveInterval time.Duration,
6162
maxConnectionsPerHost int,
6263
logger log.Logger,
63-
metrics *Metrics) ConnectionPool {
64-
return &connectionPool{
64+
metrics *Metrics,
65+
connectionMaxIdleTime time.Duration) ConnectionPool {
66+
p := &connectionPool{
6567
connections: make(map[string]*connection),
6668
tlsOptions: tlsOptions,
6769
auth: auth,
@@ -70,7 +72,10 @@ func NewConnectionPool(
7072
keepAliveInterval: keepAliveInterval,
7173
log: logger,
7274
metrics: metrics,
75+
closeCh: make(chan struct{}),
7376
}
77+
go p.checkAndCleanIdleConnections(connectionMaxIdleTime)
78+
return p
7479
}
7580

7681
func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
@@ -109,6 +114,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
109114
p.Unlock()
110115
conn.start()
111116
} else {
117+
conn.ResetLastActive()
112118
// we already have a connection
113119
p.Unlock()
114120
}
@@ -119,6 +125,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
119125

120126
func (p *connectionPool) Close() {
121127
p.Lock()
128+
close(p.closeCh)
122129
for k, c := range p.connections {
123130
delete(p.connections, k)
124131
c.Close()
@@ -134,3 +141,25 @@ func (p *connectionPool) getMapKey(addr *url.URL) string {
134141
idx := cnt % p.maxConnectionsPerHost
135142
return fmt.Sprint(addr.Host, '-', idx)
136143
}
144+
145+
func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) {
146+
if maxIdleTime < 0 {
147+
return
148+
}
149+
for {
150+
select {
151+
case <-p.closeCh:
152+
return
153+
case <-time.After(maxIdleTime):
154+
p.Lock()
155+
for k, c := range p.connections {
156+
if c.CheckIdle(maxIdleTime) {
157+
c.log.Debugf("Closed connection due to inactivity.")
158+
delete(p.connections, k)
159+
c.Close()
160+
}
161+
}
162+
p.Unlock()
163+
}
164+
}
165+
}

pulsar/internal/helper.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package internal
19+
20+
import "time"
21+
22+
// These method should only be used by tests
23+
24+
func StartCleanConnectionsTask(p *ConnectionPool, connectionMaxIdleTime time.Duration) {
25+
go (*p).(*connectionPool).checkAndCleanIdleConnections(connectionMaxIdleTime)
26+
}
27+
28+
func GetConnectionsCount(p *ConnectionPool) int {
29+
pool := (*p).(*connectionPool)
30+
pool.Lock()
31+
defer pool.Unlock()
32+
return len(pool.connections)
33+
}

0 commit comments

Comments
 (0)