Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After Closing client, the cnxPool isn't closed, the closed client will hold a fd forever and cause many goroutinues leak #493

Closed
XuanYang-cn opened this issue Mar 26, 2021 · 0 comments · Fixed by #494

Comments

@XuanYang-cn
Copy link
Contributor

Expected behavior

goroutinues won't leak and fd resources will be released.

Actual behavior

1. panic because the system runs out of fd. Ths system provides 1024 fd for every process by default, and pulsar-client holds all.

INFO[0002] [Connecting to broker]                        remote_addr="pulsar://127.0.0.1:6650"
WARN[0002] [Failed to connect to broker.]                error="dial tcp 127.0.0.1:6650: socket: too many open files" remote_addr="pulsar://127.0.0.1:6650"
INFO[0002] [Connection closed]                           remote_addr="pulsar://127.0.0.1:6650"
panic: connection error

goroutine 37 [running]:

2. many goroutinues leak

If you change 10000 to 500(which will only set up 1000 TCP connections), and you can see the massive goroutinue leak in pprof.

ALL connections are leaking....

goroutine profile: total 4507
1000 @ 0x445285 0x43da9b 0x475235 0x4e34e5 0x4e4525 0x4e4503 0x70082f 0x71446e 0x4dfa67 0x9ff8d9 0x9ff6f3 0x9fef58 0x47ac81
#	0x475234	internal/poll.runtime_pollWait+0x54								/usr/local/go/src/runtime/netpoll.go:220
#	0x4e34e4	internal/poll.(*pollDesc).wait+0x44								/usr/local/go/src/internal/poll/fd_poll_runtime.go:87
#	0x4e4524	internal/poll.(*pollDesc).waitRead+0x1a4							/usr/local/go/src/internal/poll/fd_poll_runtime.go:92
#	0x4e4502	internal/poll.(*FD).Read+0x182									/usr/local/go/src/internal/poll/fd_unix.go:159
#	0x70082e	net.(*netFD).Read+0x4e										/usr/local/go/src/net/fd_posix.go:55
#	0x71446d	net.(*conn).Read+0x8d										/usr/local/go/src/net/net.go:182
#	0x4dfa66	io.ReadAtLeast+0x86										/usr/local/go/src/io/io.go:314
#	0x9ff8d8	github.com/apache/pulsar-client-go/pulsar/internal.(*connectionReader).readAtLeast+0xf8		*********/mod/github.com/apache/pulsar-client-go@v0.4.0/pulsar/internal/connection_reader.go:120
#	0x9ff6f2	github.com/apache/pulsar-client-go/pulsar/internal.(*connectionReader).readSingleCommand+0x512	**********/mod/github.com/apache/pulsar-client-go@v0.4.0/pulsar/internal/connection_reader.go:69
#	0x9fef57	github.com/apache/pulsar-client-go/pulsar/internal.(*connectionReader).readFromConnection+0x57	**********/mod/github.com/apache/pulsar-client-go@v0.4.0/pulsar/internal/connection_reader.go:45

1000 @ 0x445285 0x45538f 0x9fa6b8 0xa07645 0x47ac81
#	0x9fa6b7	github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run+0x297		**********/mod/github.com/apache/pulsar-client-go@v0.4.0/pulsar/internal/connection.go:376
#	0xa07644	github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1+0x84	**********/mod/github.com/apache/pulsar-client-go@v0.4.0/pulsar/internal/connection.go:215

1000 @ 0x445285 0x45538f 0x9fa8db 0x47ac81
#	0x9fa8da	github.com/apache/pulsar-client-go/pulsar/internal.(*connection).runPingCheck+0xba	*********/mod/github.com/apache/pulsar-client-go@v0.4.0/pulsar/internal/connection.go:397

1000 @ 0x445285 0x45538f 0xa0799f 0x47ac81
#	0xa0799e	github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run.func2+0xde	*********/mod/github.com/apache/pulsar-client-go@v0.4.0/pulsar/internal/connection.go:361

500 @ 0x445285 0x41170f 0x41138b 0xa498e5 0x47ac81
#	0xa498e4	github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).failTimeoutMessages+0xc4	**********/mod/github.com/apache/pulsar-client-go@v0.4.0/pulsar/producer_partition.go:455

Steps to reproduce

package main

import (
	"github.com/apache/pulsar-client-go/pulsar"

	"fmt"
	"net/http"
	_ "net/http/pprof"
	"os"
	"os/signal"
	"syscall"
)

func main() {
	go func() {
		fmt.Println(http.ListenAndServe("localhost:9876", nil))
	}()

	newClient := func() {
		var client pulsar.Client

		opts := pulsar.ClientOptions{
			URL: "pulsar://127.0.0.1:6650",
		}

		client, err := pulsar.NewClient(opts)
		if err != nil {
			panic(err)
		}

		channelName := "insert"
		pp, err := client.CreateProducer(pulsar.ProducerOptions{Topic: channelName})
		if err != nil {
			panic(err)
		}
		fmt.Println("producer: ", pp)

		receiveChannel := make(chan pulsar.ConsumerMessage, 1024)
		pc, err := client.Subscribe(pulsar.ConsumerOptions{
			Topic:                       channelName,
			SubscriptionName:            channelName,
			Type:                        pulsar.KeyShared,
			SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
			MessageChannel:              receiveChannel,
		})
		if err != nil {
			panic(err)
		}
		fmt.Println("consumer: ", pc)
		pp.Close()
		pc.Close()

		client.Close()

	}

	sc := make(chan os.Signal, 1)

	go func() {
		for i := 0; i < 10000; i++ {
			newClient()
		}
	}()

	signal.Notify(sc,
		syscall.SIGHUP,
		syscall.SIGINT,
		syscall.SIGTERM,
		syscall.SIGQUIT)

	sig := <-sc
	fmt.Println("Get signal to exit", sig.String())
}

How can we reproduce the issue

System configuration

Pulsar version: latest,
pulsar-client-go: 0.4.0

It's easy to reproduce and easy to fix, jus add one line in client_impl.go Close() function:

c.cnxPool.Close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant