-
Notifications
You must be signed in to change notification settings - Fork 789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Writer wiz zstd, probably memory leak #922
Comments
here is code type KafkaRawMSG struct {
Topic string
KeyBytes []byte
ValueBytes []byte
}
type Ungeziefer map[string]*kafka.Writer
func InitKafka() Ungeziefer {
ungeziefer := make(map[string]*kafka.Writer)
for key, value := range KafkaTopicMap {
kafkaLogger.Debugf("Now spawn kafka %s writer %s", key, value)
ungeziefer[value] = newKafkaWriter(value)
}
return ungeziefer
}
func (u Ungeziefer) PublishLog() {
for msg := range KafkaPublishChan {
w := u[msg.Topic]
kafkaLogger.Debug(fmt.Sprintf("Now publish log content to kakfa, key:%s, value:%s, channel approximately size %d",
string(msg.KeyBytes), string(msg.ValueBytes), len(KafkaPublishChan)))
err := w.WriteMessages(context.Background(), kafka.Message{Key: msg.KeyBytes, Value: msg.ValueBytes})
kafkaLogger.Infof(" [x] Sent %#s\n", msg)
if err != nil {
kafkaLogger.Warnf("Failed to publish a message: %s, message size: %d bytes", msg, unsafe.Sizeof(msg), err)
SendError(err, "Failed to publish a kafka message")
}
}
}
/*
new kafka writer
*/
func newKafkaWriter(topic string) *kafka.Writer {
d := &kafka.Dialer{SASLMechanism: plain.Mechanism{Username: common.CONF.KafkaLQ.User,
Password: common.CONF.KafkaLQ.Password}}
w := kafka.NewWriter(kafka.WriterConfig{
Dialer: d,
Brokers: []string{fmt.Sprintf("%s:%d", common.CONF.KafkaLQ.Host, common.CONF.KafkaLQ.Port)},
Topic: topic,
BatchBytes: 56 * 1024 * 1024, // MB,
CompressionCodec: zstd.NewCompressionCodec(),
BatchTimeout: 8 * time.Millisecond,
Async: true,
})
return w
} cli.Prepare : adapter.RabbitPublishChan = make(chan adapter.RabbitMessage, 888)
franz := adapter.InitKafka()
go franz.PublishLog() |
after improve writer usage
here is code type Kavka struct {
writer *kafka.Writer
}
func NewKavka() *Kavka {
return &Kavka{writer: newWriter()}
}
func newWriter() *kafka.Writer {
transport := &kafka.Transport{
SASL: plain.Mechanism{Username: common.CONF.KafkaLQ.User,
Password: common.CONF.KafkaLQ.Password},
}
writer := kafka.Writer{
Addr: kafka.TCP(fmt.Sprintf("%s:%d", common.CONF.KafkaLQ.Host, common.CONF.KafkaLQ.Port)),
BatchBytes: 56 * 1024 * 1024, // MB,
BatchTimeout: 8 * time.Millisecond,
Async: true,
// Completion: nil,
Compression: kafka.Zstd,
// Logger: nil,
// ErrorLogger: nil,
Transport: transport,
AllowAutoTopicCreation: false,
}
return &writer
}
func (k *Kavka) PublishLog() {
for msg := range KafkaPublishChan {
err := k.writer.WriteMessages(context.Background(), kafka.Message{Topic: msg.Topic, Key: msg.KeyBytes, Value: msg.ValueBytes})
if err != nil {
kafkaLogger.Warnf("Failed to publish a message: %s, message size: %d bytes", msg, unsafe.Sizeof(msg), err)
SendError(err, "Failed to publish a kafka message")
} else {
kafkaLogger.Infof(" [x] Sent %#s\n", msg)
}
}
} cli.Prepare adapter.RabbitPublishChan = make(chan adapter.RabbitMessage, 888)
go adapter.NewKavka().PublishLog() |
does this refer to klauspost/compress#370 and #543 ? @achille-roussel wish your help |
Could you confirm the versions of Kafka-go and klauspost/compress that your program uses? If you're not sure, could you share the go.mod and go.sun files of your application? |
I have the same problem. Frequent calls to batch.Read in goruntine seem to cause memory leaks due to zstd decompression. @achille-roussel go.mod go 1.17
require (
github.com/segmentio/kafka-go v0.4.32-0.20220531181313-4788faf01026
)
require (
github.com/klauspost/compress v1.14.2 // indirect
) goroutine profile: total 19
7 @ 0x438056 0x405565 0x40511d 0x773808 0x467e21
# 0x773807 github.com/klauspost/compress/zstd.(*Decoder).startStreamDecoder+0x227 /home/go/pkg/mod/github.com/klauspost/compress@v1.14.2/zstd/decoder.go:553
7 @ 0x438056 0x40640c 0x405e78 0x76a214 0x467e21
# 0x76a213 github.com/klauspost/compress/zstd.(*blockDec).startDecoder+0x93 /home/go/pkg/mod/github.com/klauspost/compress@v1.14.2/zstd/blockdec.go:211
1 @ 0x438056 0x43075e 0x462889 0x49a4b2 0x49b81a 0x49b808 0x6375a9 0x647945 0x7042ad 0x58b863 0x58c42f 0x58c687 0x6b1699 0x700079 0x70007a 0x705665 0x7099a5 0x467e21
# 0x462888 internal/poll.runtime_pollWait+0x88 /home/go/go/src/runtime/netpoll.go:229
# 0x49a4b1 internal/poll.(*pollDesc).wait+0x31 /home/go/go/src/internal/poll/fd_poll_runtime.go:84
# 0x49b819 internal/poll.(*pollDesc).waitRead+0x259 /home/go/go/src/internal/poll/fd_poll_runtime.go:89
# 0x49b807 internal/poll.(*FD).Read+0x247 /home/go/go/src/internal/poll/fd_unix.go:167
# 0x6375a8 net.(*netFD).Read+0x28 /home/go/go/src/net/fd_posix.go:56
# 0x647944 net.(*conn).Read+0x44 /home/go/go/src/net/net.go:183
# 0x7042ac net/http.(*connReader).Read+0x16c /home/go/go/src/net/http/server.go:780
# 0x58b862 bufio.(*Reader).fill+0x102 /home/go/go/src/bufio/bufio.go:101
# 0x58c42e bufio.(*Reader).ReadSlice+0x2e /home/go/go/src/bufio/bufio.go:360
# 0x58c686 bufio.(*Reader).ReadLine+0x26 /home/go/go/src/bufio/bufio.go:389
# 0x6b1698 net/textproto.(*Reader).readLineSlice+0x98 /home/go/go/src/net/textproto/reader.go:57
# 0x700078 net/textproto.(*Reader).ReadLine+0x78 /home/go/go/src/net/textproto/reader.go:38
# 0x700079 net/http.readRequest+0x79 /home/go/go/src/net/http/request.go:1029
# 0x705664 net/http.(*conn).readRequest+0x224 /home/go/go/src/net/http/server.go:966
# 0x7099a4 net/http.(*conn).serve+0x864 /home/go/go/src/net/http/server.go:1855
1 @ 0x438056 0x43075e 0x462889 0x49a4b2 0x49d80c 0x49d7f9 0x638d55 0x6509c8 0x64fb9d 0x70e654 0x70e27d 0x80a905 0x80a8d9 0x467e21
# 0x462888 internal/poll.runtime_pollWait+0x88 /home/go/go/src/runtime/netpoll.go:229
# 0x49a4b1 internal/poll.(*pollDesc).wait+0x31 /home/go/go/src/internal/poll/fd_poll_runtime.go:84
# 0x49d80b internal/poll.(*pollDesc).waitRead+0x22b /home/go/go/src/internal/poll/fd_poll_runtime.go:89
# 0x49d7f8 internal/poll.(*FD).Accept+0x218 /home/go/go/src/internal/poll/fd_unix.go:402
# 0x638d54 net.(*netFD).accept+0x34 /home/go/go/src/net/fd_unix.go:173
# 0x6509c7 net.(*TCPListener).accept+0x27 /home/go/go/src/net/tcpsock_posix.go:140
# 0x64fb9c net.(*TCPListener).Accept+0x3c /home/go/go/src/net/tcpsock.go:262
# 0x70e653 net/http.(*Server).Serve+0x393 /home/go/go/src/net/http/server.go:3001
# 0x70e27c net/http.(*Server).ListenAndServe+0x7c /home/go/go/src/net/http/server.go:2930
# 0x80a904 net/http.ListenAndServe+0x44 /home/go/go/src/net/http/server.go:3184
# 0x80a8d8 main.main.func1+0x18 /home/go/src/kafka-comsume/main.go:20
1 @ 0x438056 0x4475cc 0x80a845 0x437c87 0x467e21
# 0x80a844 main.main+0x244 /home/go/src/kafka-comsume/main.go:35
# 0x437c86 runtime.main+0x226 /home/go/go/src/runtime/proc.go:255
1 @ 0x462425 0x7b2fb5 0x7b2dcd 0x7aff4b 0x8088da 0x80948e 0x70ab6f 0x70c469 0x70e0db 0x709c48 0x467e21
# 0x462424 runtime/pprof.runtime_goroutineProfileWithLabels+0x24 /home/go/go/src/runtime/mprof.go:746
# 0x7b2fb4 runtime/pprof.writeRuntimeProfile+0xb4 /home/go/go/src/runtime/pprof/pprof.go:724
# 0x7b2dcc runtime/pprof.writeGoroutine+0x4c /home/go/go/src/runtime/pprof/pprof.go:684
# 0x7aff4a runtime/pprof.(*Profile).WriteTo+0x14a /home/go/go/src/runtime/pprof/pprof.go:331
# 0x8088d9 net/http/pprof.handler.ServeHTTP+0x499 /home/go/go/src/net/http/pprof/pprof.go:253
# 0x80948d net/http/pprof.Index+0x12d /home/go/go/src/net/http/pprof/pprof.go:371
# 0x70ab6e net/http.HandlerFunc.ServeHTTP+0x2e /home/go/go/src/net/http/server.go:2046
# 0x70c468 net/http.(*ServeMux).ServeHTTP+0x148 /home/go/go/src/net/http/server.go:2424
# 0x70e0da net/http.serverHandler.ServeHTTP+0x43a /home/go/go/src/net/http/server.go:2878
# 0x709c47 net/http.(*conn).serve+0xb07 /home/go/go/src/net/http/server.go:1929
1 @ 0x467e21 70MB leaked heap profile: 13: 73770816 [408: 577151792] @ heap/1048576
7: 73400320 [8: 83886080] @ 0x787c97 0x773c6c 0x467e21
# 0x787c96 github.com/klauspost/compress/zstd.(*frameDec).initAsync+0xb6 /home/go/pkg/mod/github.com/klauspost/compress@v1.14.2/zstd/framedec.go:365
# 0x773c6b github.com/klauspost/compress/zstd.(*Decoder).startStreamDecoder+0x68b /home/go/pkg/mod/github.com/klauspost/compress@v1.14.2/zstd/decoder.go:517
2: 278528 [2: 278528] @ 0x769fa9 0x78767c 0x773d1b 0x467e21
# 0x769fa8 github.com/klauspost/compress/zstd.(*blockDec).reset+0x228 /home/go/pkg/mod/github.com/klauspost/compress@v1.14.2/zstd/blockdec.go:177
# 0x78767b github.com/klauspost/compress/zstd.(*frameDec).next+0x5b /home/go/pkg/mod/github.com/klauspost/compress@v1.14.2/zstd/framedec.go:281
# 0x773d1a github.com/klauspost/compress/zstd.(*Decoder).startStreamDecoder+0x73a /home/go/pkg/mod/github.com/klauspost/compress@v1.14.2/zstd/decoder.go:533
1: 90112 [1: 90112] @ 0x40e04e 0x40ddfc 0x55beea 0x444ea3 0x444df1 0x444df1 0x444df1 0x444df1 0x437c46 0x467e21
# 0x55bee9 github.com/russross/blackfriday/v2.init+0x89 /home/go/pkg/mod/github.com/russross/blackfriday/v2@v2.1.0/entities.go:4
# 0x444ea2 runtime.doInit+0x122 /home/go/go/src/runtime/proc.go:6498
# 0x444df0 runtime.doInit+0x70 /home/go/go/src/runtime/proc.go:6475
# 0x444df0 runtime.doInit+0x70 /home/go/go/src/runtime/proc.go:6475
# 0x444df0 runtime.doInit+0x70 /home/go/go/src/runtime/proc.go:6475
# 0x444df0 runtime.doInit+0x70 /home/go/go/src/runtime/proc.go:6475
# 0x437c45 runtime.main+0x1e5 /home/go/go/src/runtime/proc.go:238 |
@naughtyGitCat we took a closer look at the issue and might need a few more details from your side:
@lizhifengones we believe your issue (which is on the reader path) is due to using klauspost/compress v1.14.2 (as listed in your go.mod). You need version 1.15.1 at least. Details on the reasons are in klauspost/compress#264 |
thanks for your attention we choose to turn and during our diagnostic, the low throughput agent seems to have less memory usage. those high throughput agents will have more memory usage. some agents produce large single msg, so we choose zstd to zip it, and the zstd in the high throughput circumstance happens with memory usage problem, may be I can init two writers, one is raw or some less memory usage and one is zstd writer, send extreme large msg with zstd writer, and send most msgs with non-compress writer |
From the details you shared ti seems that the system might be behaving as expected, it has a lot of work to do and sometimes needs to hold a lot of data which in turn require allocating large buffers when performing compression. Do you know how much memory you would expect the application to use? |
Since I use two writers (none/zstd) to send msgs, the memory usage is down to 300MB more or less on 700+ agents. Meanwhile, some of the older version agents do not use this policy, mem is up to 3GB more or less Thanks for your attention, maybe this can be a usage suggestion or example on big msg trans situation func (k *Kafka) PublishLog() {
for msg := range KafkaPublishChan {
kafkaMsg := kafka.Message{Topic: msg.Topic, Key: msg.KeyBytes, Value: msg.ValueBytes}
var err error
if calcMessageSize(kafkaMsg) > 1024*1024 {
err = k.zstdWriter.WriteMessages(context.Background(), kafkaMsg)
} else {
err = k.rawWriter.WriteMessages(context.Background(), kafkaMsg)
}
if err != nil {
kafkaLogger.Warnf("Failed to publish a message: %s, message size: %d bytes", msg, unsafe.Sizeof(msg), err)
SendError(err, "Failed to publish a kafka message")
} else {
kafkaLogger.Infof(" [x] Sent %#s\n", msg)
}
}
}
// calcMessageSize extract from package kafka-go@v0.4.31 message.go
func calcMessageSize(msg kafka.Message) int {
return 4 + 1 + 1 + 4 + len(msg.Key) + 4 + len(msg.Value) + 8
} |
The text was updated successfully, but these errors were encountered: