Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e27675a

Browse files
committedFeb 23, 2023
Fix race
1 parent 150097a commit e27675a

File tree

2 files changed

+39
-19
lines changed

2 files changed

+39
-19
lines changed
 

‎pulsar/client_impl_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -1163,53 +1163,53 @@ func TestAutoCloseIdleConnection(t *testing.T) {
11631163
topic := "TestAutoCloseIdleConnection"
11641164

11651165
// create consumer
1166-
consumer, err := cli.Subscribe(ConsumerOptions{
1166+
consumer1, err := cli.Subscribe(ConsumerOptions{
11671167
Topic: topic,
11681168
SubscriptionName: "my-sub",
11691169
})
11701170
assert.Nil(t, err)
11711171

11721172
// create producer
1173-
producer, err := cli.CreateProducer(ProducerOptions{
1173+
producer1, err := cli.CreateProducer(ProducerOptions{
11741174
Topic: topic,
11751175
DisableBatching: false,
11761176
})
11771177
assert.Nil(t, err)
11781178

1179-
testSendAndReceive(t, producer, consumer)
1179+
testSendAndReceive(t, producer1, consumer1)
11801180

11811181
pool := cli.(*client).cnxPool
11821182

1183-
producer.Close()
1184-
consumer.Close()
1183+
producer1.Close()
1184+
consumer1.Close()
11851185

11861186
assert.NotEqual(t, 0, internal.GetConnectionsCount(&pool))
11871187

11881188
internal.StartCleanConnectionsTask(&pool, 2*time.Second) // Enable auto idle connections release manually
11891189

1190-
time.Sleep(4 * time.Second) // Need to wait at least 2 * ConnectionMaxIdleTime
1190+
time.Sleep(6 * time.Second) // Need to wait at least 3 * ConnectionMaxIdleTime
11911191

11921192
assert.Equal(t, 0, internal.GetConnectionsCount(&pool))
11931193

11941194
// create consumer
1195-
consumer, err = cli.Subscribe(ConsumerOptions{
1195+
consumer2, err := cli.Subscribe(ConsumerOptions{
11961196
Topic: topic,
11971197
SubscriptionName: "my-sub",
11981198
})
11991199
assert.Nil(t, err)
12001200

12011201
// create producer
1202-
producer, err = cli.CreateProducer(ProducerOptions{
1202+
producer2, err := cli.CreateProducer(ProducerOptions{
12031203
Topic: topic,
12041204
DisableBatching: false,
12051205
})
12061206
assert.Nil(t, err)
12071207

12081208
// Ensure the client still works
1209-
testSendAndReceive(t, producer, consumer)
1209+
testSendAndReceive(t, producer2, consumer2)
12101210

1211-
producer.Close()
1212-
consumer.Close()
1211+
producer2.Close()
1212+
consumer2.Close()
12131213

12141214
cli.Close()
12151215
}

‎pulsar/internal/connection.go

+28-8
Original file line numberDiff line numberDiff line change
@@ -926,15 +926,35 @@ func (c *connection) ResetLastActive() {
926926
c.lastActive = time.Now()
927927
}
928928

929-
func (c *connection) CheckIdle(maxIdleTime time.Duration) bool {
930-
c.Lock()
931-
defer c.Unlock()
929+
func (c *connection) isIdle() bool {
930+
c.pendingLock.Lock()
931+
if len(c.pendingReqs) != 0 {
932+
return false
933+
}
934+
c.pendingLock.Unlock()
935+
936+
c.listenersLock.RLock()
937+
if len(c.listeners) != 0 {
938+
return false
939+
}
940+
c.listenersLock.RUnlock()
932941

933-
if len(c.pendingReqs) != 0 ||
934-
len(c.incomingRequestsCh) != 0 ||
935-
len(c.writeRequestsCh) != 0 ||
936-
len(c.listeners) != 0 ||
937-
len(c.consumerHandlers) != 0 {
942+
c.consumerHandlersLock.Lock()
943+
if len(c.consumerHandlers) != 0 {
944+
return false
945+
}
946+
c.consumerHandlersLock.Unlock()
947+
948+
if len(c.incomingRequestsCh) != 0 || len(c.writeRequestsCh) != 0 {
949+
return false
950+
}
951+
return true
952+
}
953+
954+
func (c *connection) CheckIdle(maxIdleTime time.Duration) bool {
955+
// We don't need to lock here because this method should only be
956+
// called in a single goroutine of the connectionPool
957+
if !c.isIdle() {
938958
c.lastActive = time.Now()
939959
}
940960
return time.Since(c.lastActive) > maxIdleTime

0 commit comments

Comments
 (0)
Please sign in to comment.