-
Notifications
You must be signed in to change notification settings - Fork 1
/
subscriber.go
247 lines (189 loc) · 6.45 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package tankgo
import (
"context"
"encoding/binary"
"fmt"
"net"
"strconv"
"sync"
"time"
tbinary "github.com/TheBestCo/tankgo/binary"
"github.com/TheBestCo/tankgo/message"
)
type Subscriber interface {
Connect(ctx context.Context, broker string) error
Subscribe(r *message.ConsumeRequest, maxConcurrentReads int) (<-chan message.Log, <-chan error)
Reset(ctx context.Context) error
GetTopicsHighWaterMark(r *message.ConsumeRequest) (map[string]uint64, error)
Ping() error
Close() error
}
// Logger interface.
type Logger interface {
Printf(format string, v ...interface{})
}
// Writable interface of every serializable message.
type Writable interface {
WriteToBuffer(c *tbinary.WriteBuffer) error
}
// Readable interface of every serializable message.
type Readable interface {
readFromBuffer(rb *tbinary.ReadBuffer, payloadSize uint32) error
}
type TankSubscriber struct {
con *net.TCPConn
// read buffer (synchronized on rlock)
rlock *sync.Mutex
readBuffer tbinary.ReadBuffer
// write buffer (synchronized on wlock)
wlock *sync.Mutex
writeBuffer tbinary.WriteBuffer
}
// Connect initializes a TankSubscriber with the underlying connection to broker.
// A context is passed to optionally handle the initialization of the underlying connection if the connection is not established during the timeout provided.
// If connectTimeout is set to 0 then a default timeout of 500 ms is used.
// If bufsize is set to 0 then the default buffer size of 100KB is used instead.
func (s *TankSubscriber) Connect(ctx context.Context, broker string, connectTimeout time.Duration, bufsize int) error {
tcpAddr, err := net.ResolveTCPAddr("tcp", broker)
if err != nil {
return err
}
if connectTimeout == 0 {
connectTimeout = DefaultConTimeout
}
d := net.Dialer{Timeout: connectTimeout}
conn, err := d.DialContext(ctx, "tcp", tcpAddr.String())
if err != nil {
return err
}
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
return fmt.Errorf("cannot create new tcp connection")
}
if bufsize == 0 {
bufsize = DefaultBufSize
}
t := TankSubscriber{
con: tcpConn,
rlock: &sync.Mutex{},
readBuffer: tbinary.NewReadBuffer(tcpConn, binary.LittleEndian, bufsize),
writeBuffer: tbinary.NewWriteBuffer(tcpConn, binary.LittleEndian),
wlock: &sync.Mutex{},
}
*s = t
return nil
}
// Reset resets TankSubscriber's underlying connection and read/write buffers.
// It's used when the subscriber wants to early drop the existing connection with TANK and all data from a previous request because a new request will follow or use a different context value.
func (s *TankSubscriber) Reset(ctx context.Context) error {
d := net.Dialer{Timeout: time.Millisecond * 500}
conn, err := d.DialContext(ctx, "tcp", s.con.RemoteAddr().String())
if err != nil {
return err
}
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
return fmt.Errorf("cannot create new tcp connection")
}
if err = s.con.Close(); err != nil {
return err
}
s.con = tcpConn
s.readBuffer.Reset(s.con)
s.writeBuffer.Reset(s.con)
return nil
}
// Subscribe to TANK server based on the provided consume request. It returns a message log channel of maxConcurrentReads size and an error channel.
func (s *TankSubscriber) Subscribe(r *message.ConsumeRequest, maxConcurrentReads int) (<-chan message.Log, <-chan error) {
errChan := make(chan error, 1)
bh, err := s.sendSubscribeRequest(r)
if err != nil {
errChan <- err
return nil, errChan
}
topicPartitionBaseSeq := make(map[string]uint64)
for _, t := range r.Topics {
for _, p := range t.Partitions {
topicPartitionBaseSeq[t.Name+"/"+strconv.Itoa(int(p.PartitionID))] = p.ABSSequenceNumber
}
}
m := message.ConsumeResponse{TopicPartitionBaseSeq: topicPartitionBaseSeq}
msgChan := make(chan message.Log, maxConcurrentReads)
// consume from stream in the background.
done := make(chan bool)
go func() {
errChan <- m.Consume(&s.readBuffer, bh.PayloadSize, msgChan)
done <- true
}()
go func() {
defer close(msgChan)
<-done
}()
return msgChan, errChan
}
// GetTopicsHighWaterMark returns a map with the HighWaterMark values per topic based on the request.
// Because there is no specific TANK request to respond with just the HighWaterMark for a topic, the request should be the same as if the subsriber requested a regular consume request.
// If a new consume request is to be requested after this call then Reset must be called in between or else TANK will respond with an error.
func (s *TankSubscriber) GetTopicsHighWaterMark(r *message.ConsumeRequest) (map[string]uint64, error) {
_, err := s.sendSubscribeRequest(r)
if err != nil {
return map[string]uint64{}, err
}
topicPartitionBaseSeq := make(map[string]uint64)
for _, t := range r.Topics {
for _, p := range t.Partitions {
topicPartitionBaseSeq[t.Name+"/"+strconv.Itoa(int(p.PartitionID))] = p.ABSSequenceNumber
}
}
m := message.ConsumeResponse{TopicPartitionBaseSeq: topicPartitionBaseSeq}
seqNumbersMap, err := m.GetTopicsLatestSequenceNumber(&s.readBuffer)
if err != nil {
return map[string]uint64{}, err
}
return seqNumbersMap, nil
}
func (s *TankSubscriber) Close() error {
if s.con != nil {
return s.con.Close()
}
return nil
}
// Ping is a wrapper method of readFromTopic expecting a ping response.
func (s *TankSubscriber) Ping() error {
header, err := s.readBasicHeader()
if header.MessageType != message.TypePing {
return fmt.Errorf("expected ping response, got :%#v", header.MessageType)
}
return err
}
func (s *TankSubscriber) readBasicHeader() (message.BasicHeader, error) {
s.rlock.Lock()
defer s.rlock.Unlock()
bh := message.BasicHeader{}
if err := bh.ReadHeader(&s.readBuffer, message.SizeOfBasicHeader); err != nil {
return message.BasicHeader{}, err
}
return bh, nil
}
func (s *TankSubscriber) sendSubscribeRequest(w Writable) (message.BasicHeader, error) {
s.wlock.Lock()
defer s.wlock.Unlock()
if err := w.WriteToBuffer(&s.writeBuffer); err != nil {
return message.BasicHeader{}, tbinary.NewProtocolError(err, "error during write message")
}
if err := s.writeBuffer.Flush(); err != nil {
return message.BasicHeader{}, tbinary.NewProtocolError(err, "error during write message")
}
header, err := s.readBasicHeader()
if err != nil {
return header, err
}
if header.MessageType != message.TypeConsume {
return header, fmt.Errorf("expected TypeConsume in header message type, got: %#v", header.MessageType)
}
return header, nil
}
const (
DefaultConTimeout = time.Millisecond * 500
DefaultBufSize = 1024 * 100
)