Skip to content

Commit 5116708

Browse files
fix(kafka): 调整客户端一批只会发送一条消息 (#17)
2 parents fd7de33 + 4f34623 commit 5116708

File tree

15 files changed

+34
-75
lines changed

15 files changed

+34
-75
lines changed

.github/workflows/go-fmt.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
run: |
3535
go install mvdan.cc/gofumpt@latest && \
3636
go install golang.org/x/tools/cmd/goimports@latest && \
37-
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.54.2
37+
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.62.0
3838
3939
- name: Check
4040
run: |

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/ecodeclub/mq-api
22

3-
go 1.21
3+
go 1.24.2
44

55
require (
66
github.com/ecodeclub/ekit v0.0.8

internal/pkg/validator/validator_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ func TestIsValidTopic(t *testing.T) {
3535
}
3636

3737
for _, topic := range validTopics {
38-
topic := topic
3938
t.Run(topic, func(t *testing.T) {
4039
t.Parallel()
4140
assert.True(t, validator.IsValidTopic(topic), "Expected topic to be valid: %s", topic)
@@ -61,7 +60,6 @@ func TestIsValidTopic(t *testing.T) {
6160
}
6261

6362
for _, topic := range invalidTopics {
64-
topic := topic
6563
t.Run(topic, func(t *testing.T) {
6664
t.Parallel()
6765
assert.False(t, validator.IsValidTopic(topic), "Expected topic to be invalid: %s", topic)

kafka/balancer_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ func TestSpecifiedPartitionBalancer(t *testing.T) {
8080
}
8181

8282
for _, tc := range testCases {
83-
tc := tc
8483
t.Run(tc.name, func(t *testing.T) {
8584
t.Parallel()
8685

kafka/producer.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ import (
3030
kafkago "github.com/segmentio/kafka-go"
3131
)
3232

33+
const (
34+
defaultBatchSize = 1
35+
)
36+
3337
type Producer struct {
3438
topic string
3539
writer *kafkago.Writer
@@ -45,9 +49,10 @@ func NewProducer(address []string, topic string, balancer kafkago.Balancer) *Pro
4549
topic: topic,
4650
locker: &sync.RWMutex{},
4751
writer: &kafkago.Writer{
48-
Addr: kafkago.TCP(address...),
49-
Topic: topic,
50-
Balancer: balancer,
52+
Addr: kafkago.TCP(address...),
53+
Topic: topic,
54+
Balancer: balancer,
55+
BatchSize: defaultBatchSize,
5156
},
5257
closed: false,
5358
closeOnce: &sync.Once{},

memory/consumer.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package memory
1616

1717
import (
1818
"context"
19-
"log"
2019
"sync"
2120
"time"
2221

@@ -67,9 +66,7 @@ func (c *Consumer) eventLoop() {
6766
for {
6867
select {
6968
case <-ticker.C:
70-
log.Printf("消费者 %s 开始消费数据", c.name)
7169
c.consumeAndReport()
72-
log.Printf("消费者 %s 结束消费数据", c.name)
7370
case event, ok := <-c.receiveCh:
7471
if !ok {
7572
return
@@ -84,7 +81,6 @@ func (c *Consumer) consumeAndReport() {
8481
for idx, record := range c.partitionRecords {
8582
msgs := c.partitions[record.Index].getBatch(record.Offset, limit)
8683
for _, msg := range msgs {
87-
log.Printf("消费者 %s 消费数据 %v", c.name, msg)
8884
c.msgCh <- msg
8985
}
9086
record.Offset += len(msgs)
@@ -98,7 +94,6 @@ func (c *Consumer) consumeAndReport() {
9894
}
9995
err := <-errCh
10096
if err != nil {
101-
log.Printf("上报偏移量失败:%v", err)
10297
return
10398
}
10499
close(errCh)
@@ -111,14 +106,12 @@ func (c *Consumer) handle(event *Event) {
111106
// 服务端发起的重新加入事件
112107
case RejoinEvent:
113108
// 消费者上报消费进度
114-
log.Printf("消费者 %s开始上报消费进度", c.name)
115109
c.reportCh <- &Event{
116110
Type: RejoinAckEvent,
117111
Data: c.partitionRecords,
118112
}
119113
// 设置消费进度
120114
partitionInfo := <-c.receiveCh
121-
log.Printf("消费者 %s接收到分区信息 %v", c.name, partitionInfo)
122115
c.partitionRecords, _ = partitionInfo.Data.([]PartitionRecord)
123116
// 返回设置完成的信号
124117
c.reportCh <- &Event{
@@ -155,13 +148,11 @@ func (c *Consumer) Close() error {
155148
Type: ExitGroupEvent,
156149
Data: c.closeCh,
157150
}
158-
log.Printf("消费者 %s 准备关闭", c.name)
159151
// 等待服务端退出完成
160152
<-c.closeCh
161153
// 关闭资源
162154
close(c.receiveCh)
163155
close(c.msgCh)
164-
log.Printf("消费者 %s 关闭成功", c.name)
165156
})
166157

167158
return nil

memory/consumergroup.go

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package memory
1616

1717
import (
1818
"fmt"
19-
"log"
2019
"sync"
2120
"sync/atomic"
2221
"time"
@@ -105,16 +104,13 @@ func (c *ConsumerGroup) eventHandler(name string, event *Event) {
105104
var err error
106105
err = c.reportOffset(data.Records)
107106
data.ErrChan <- err
108-
log.Printf("消费者%s上报offset成功", name)
109107
case RejoinAckEvent:
110108
// consumer响应重平衡信号返回的数据,返回的是当前所有分区的偏移量
111109
records, _ := event.Data.([]PartitionRecord)
112110
// 不管上报成不成功
113111
_ = c.reportOffset(records)
114-
log.Printf("消费者%s成功接受到重平衡信号,并上报offset", name)
115112
c.balanceCh <- struct{}{}
116113
case PartitionNotifyAckEvent:
117-
log.Printf("消费者%s 成功设置分区信息", name)
118114
c.balanceCh <- struct{}{}
119115
}
120116
}
@@ -128,12 +124,9 @@ func (c *ConsumerGroup) exitGroup(name string, closeCh chan struct{}) {
128124
time.Sleep(defaultSleepTime)
129125
continue
130126
}
131-
log.Printf("消费者 %s 准备退出消费组", name)
132127
c.consumers.Delete(name)
133128
c.reBalance()
134-
log.Printf("给消费者 %s 发送退出确认信号", name)
135129
close(closeCh)
136-
log.Printf("消费者 %s 成功退出消费组", name)
137130
if !atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable) {
138131
atomic.CompareAndSwapInt32(&c.status, StatusStopping, StatusStop)
139132
}
@@ -167,7 +160,7 @@ func (c *ConsumerGroup) Close() {
167160
}
168161

169162
func (c *ConsumerGroup) close() {
170-
c.consumers.Range(func(key string, value *Consumer) bool {
163+
c.consumers.Range(func(_ string, value *Consumer) bool {
171164
ch := make(chan struct{})
172165
value.receiveCh <- &Event{
173166
Type: CloseEvent,
@@ -180,39 +173,30 @@ func (c *ConsumerGroup) close() {
180173

181174
// reBalance 单独使用该方法是并发不安全的
182175
func (c *ConsumerGroup) reBalance() {
183-
log.Println("开始重平衡")
184176
// 通知每一个消费者进行偏移量的上报
185177
length := 0
186178
consumers := make([]string, 0, consumerCap)
187-
log.Println("开始给每个消费者,重平衡信号")
188179
c.consumers.Range(func(key string, value *Consumer) bool {
189-
log.Printf("开始通知消费者%s", key)
190180
value.receiveCh <- &Event{
191181
Type: RejoinEvent,
192182
}
193183
consumers = append(consumers, key)
194184
length++
195-
log.Printf("通知消费者%s成功", key)
196185
return true
197186
})
198187
number := 0
199-
log.Println("xxxxxxxxxx长度", length)
200188
// 等待所有消费者都接收到信号,并上报自己offset
201189
for length > 0 {
202190
<-c.balanceCh
203191
number++
204192
if number != length {
205-
log.Println("xxxxxxxxxx number", number)
206193
continue
207194
}
208195
// 接收到所有信号
209-
log.Println("所有消费者已经接受到重平衡请求,并上报了消费进度")
210196
consumerMap := c.consumerPartitionAssigner.AssignPartition(consumers, len(c.partitions))
211197
// 通知所有消费者分配
212-
log.Println("开始分配分区")
213198
for consumerName, partitions := range consumerMap {
214199
// 查找消费者所属的channel
215-
log.Printf("消费者 %s 消费 %v 分区", consumerName, partitions)
216200
consumer, ok := c.consumers.Load(consumerName)
217201
if ok {
218202
// 往每个消费者的receive_channel发送partition的信息
@@ -232,7 +216,6 @@ func (c *ConsumerGroup) reBalance() {
232216

233217
}
234218
}
235-
log.Println("重平衡结束")
236219
return
237220
}
238221
}
@@ -250,7 +233,7 @@ func (c *ConsumerGroup) JoinGroup() (*Consumer, error) {
250233
}
251234

252235
var length int
253-
c.consumers.Range(func(key string, value *Consumer) bool {
236+
c.consumers.Range(func(_ string, _ *Consumer) bool {
254237
length++
255238
return true
256239
})
@@ -269,7 +252,6 @@ func (c *ConsumerGroup) JoinGroup() (*Consumer, error) {
269252
c.consumers.Store(name, consumer)
270253
go c.consumerEventsHandler(name, reportCh)
271254
go consumer.eventLoop()
272-
log.Printf("新建消费者 %s", name)
273255
// 重平衡分配分区
274256
c.reBalance()
275257
atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable)
@@ -287,10 +269,3 @@ func (c *ConsumerGroup) consumerEventsHandler(name string, reportCh chan *Event)
287269
}
288270
}
289271
}
290-
291-
func min(i, j int) int {
292-
if i < j {
293-
return i
294-
}
295-
return j
296-
}

memory/consumergroup_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,13 @@ func TestConsumerGroup_Close(t *testing.T) {
7171
wg.Wait()
7272
// consumerGroup中没有消费者
7373
var flag atomic.Bool
74-
cg.consumers.Range(func(key string, value *Consumer) bool {
74+
cg.consumers.Range(func(_ string, _ *Consumer) bool {
7575
flag.Store(true)
7676
return true
7777
})
7878
assert.False(t, flag.Load())
7979
// 所有加入的消费者都是关闭状态
80-
cg.consumers.Range(func(key string, value *Consumer) bool {
80+
cg.consumers.Range(func(_ string, value *Consumer) bool {
8181
assert.True(t, value.closed)
8282
return true
8383
})

memory/consumerpartitionassigner/equaldivide/balancer_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ func TestBalancer_AssignPartition(t *testing.T) {
6363
},
6464
}
6565
for _, tc := range testcases {
66-
tc := tc
6766
t.Run(tc.name, func(t *testing.T) {
6867
t.Parallel()
6968
actualVal := balancer.AssignPartition(tc.consumers, tc.partition)

memory/mq.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package memory
1717
import (
1818
"context"
1919
"fmt"
20-
"log"
20+
"log/slog"
2121
"sync"
2222

2323
"github.com/ecodeclub/mq-api/internal/pkg/validator"
@@ -133,7 +133,7 @@ func (m *MQ) Close() error {
133133
m.topics.Range(func(key string, value *Topic) bool {
134134
err := value.Close()
135135
if err != nil {
136-
log.Printf("topic: %s关闭失败 %v", key, err)
136+
slog.Error("topic关闭失败", slog.String("topic", key), slog.String("error", err.Error()))
137137
}
138138
return true
139139
})
@@ -155,7 +155,7 @@ func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error {
155155
if ok {
156156
err := topic.Close()
157157
if err != nil {
158-
log.Printf("topic: %s关闭失败 %v", t, err)
158+
slog.Error("topic关闭失败", slog.String("error", err.Error()))
159159
continue
160160
}
161161
m.topics.Delete(t)

0 commit comments

Comments
 (0)