This repository has been archived by the owner on Jan 28, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathnaffka.go
342 lines (307 loc) · 9.65 KB
/
naffka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
package naffka
import (
"fmt"
"log"
"sync"
"time"
sarama "github.com/Shopify/sarama"
"github.com/matrix-org/naffka/storage"
"github.com/matrix-org/naffka/types"
)
// Naffka is an implementation of the sarama kafka API designed to run within a
// single go process. It implements both the sarama.SyncProducer and the
// sarama.Consumer interfaces. This means it can act as a drop in replacement
// for kafka for testing or single instance deployment.
// Does not support multiple partitions.
type Naffka struct {
db storage.Database
topicsMutex sync.Mutex
topics map[string]*topic
}
// New creates a new Naffka instance.
func New(db storage.Database) (*Naffka, error) {
n := &Naffka{db: db, topics: map[string]*topic{}}
maxOffsets, err := db.MaxOffsets()
if err != nil {
return nil, err
}
for topicName, offset := range maxOffsets {
n.topics[topicName] = &topic{
db: db,
topicName: topicName,
nextOffset: offset + 1,
}
}
return n, nil
}
// SendMessage implements sarama.SyncProducer
func (n *Naffka) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
err = n.SendMessages([]*sarama.ProducerMessage{msg})
return msg.Partition, msg.Offset, err
}
// SendMessages implements sarama.SyncProducer
func (n *Naffka) SendMessages(msgs []*sarama.ProducerMessage) error {
byTopic := map[string][]*sarama.ProducerMessage{}
for _, msg := range msgs {
byTopic[msg.Topic] = append(byTopic[msg.Topic], msg)
}
var topicNames []string
for topicName := range byTopic {
topicNames = append(topicNames, topicName)
}
now := time.Now()
topics := n.getTopics(topicNames)
for topicName := range byTopic {
if err := topics[topicName].send(now, byTopic[topicName]); err != nil {
return err
}
}
return nil
}
func (n *Naffka) getTopics(topicNames []string) map[string]*topic {
n.topicsMutex.Lock()
defer n.topicsMutex.Unlock()
result := map[string]*topic{}
for _, topicName := range topicNames {
t := n.topics[topicName]
if t == nil {
// If the topic doesn't already exist then create it.
t = &topic{db: n.db, topicName: topicName}
n.topics[topicName] = t
}
result[topicName] = t
}
return result
}
// Topics implements sarama.Consumer
func (n *Naffka) Topics() ([]string, error) {
n.topicsMutex.Lock()
defer n.topicsMutex.Unlock()
var result []string
for topic := range n.topics {
result = append(result, topic)
}
return result, nil
}
// Partitions implements sarama.Consumer
func (n *Naffka) Partitions(topic string) ([]int32, error) {
// Naffka stores a single partition per topic, so this always returns a single partition ID.
return []int32{0}, nil
}
// ConsumePartition implements sarama.Consumer
// Note: offset is *inclusive*, i.e. it will include the message with that offset.
func (n *Naffka) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
if partition != 0 {
return nil, fmt.Errorf("Unknown partition ID %d", partition)
}
topics := n.getTopics([]string{topic})
return topics[topic].consume(offset), nil
}
// HighWaterMarks implements sarama.Consumer
func (n *Naffka) HighWaterMarks() map[string]map[int32]int64 {
n.topicsMutex.Lock()
defer n.topicsMutex.Unlock()
result := map[string]map[int32]int64{}
for topicName, topic := range n.topics {
result[topicName] = map[int32]int64{
0: topic.highwaterMark(),
}
}
return result
}
// Close implements sarama.SyncProducer and sarama.Consumer
func (n *Naffka) Close() error {
return nil
}
const channelSize = 1024
// partitionConsumer ensures that all messages written to a particular
// topic, from an offset, get sent in order to a channel.
// Implements sarama.PartitionConsumer
type partitionConsumer struct {
topic *topic
messages chan *sarama.ConsumerMessage
// Whether the consumer is in "catchup" mode or not.
// See "catchup" function for details.
// Reads and writes to this field are proctected by the topic mutex.
catchingUp bool
}
// AsyncClose implements sarama.PartitionConsumer
func (c *partitionConsumer) AsyncClose() {
}
// Close implements sarama.PartitionConsumer
func (c *partitionConsumer) Close() error {
// TODO: Add support for performing a clean shutdown of the consumer.
return nil
}
// Messages implements sarama.PartitionConsumer
func (c *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return c.messages
}
// Errors implements sarama.PartitionConsumer
func (c *partitionConsumer) Errors() <-chan *sarama.ConsumerError {
// TODO: Add option to pass consumer errors to an errors channel.
return nil
}
// HighWaterMarkOffset implements sarama.PartitionConsumer
func (c *partitionConsumer) HighWaterMarkOffset() int64 {
return c.topic.highwaterMark()
}
// catchup makes the consumer go into "catchup" mode, where messages are read
// from the database instead of directly from producers.
// Once the consumer is up to date, i.e. no new messages in the database, then
// the consumer will go back into normal mode where new messages are written
// directly to the channel.
// Must be called with the c.topic.mutex lock
func (c *partitionConsumer) catchup(fromOffset int64) {
// If we're already in catchup mode or up to date, noop
if c.catchingUp || fromOffset == c.topic.nextOffset {
return
}
c.catchingUp = true
// Due to the checks above there can only be one of these goroutines
// running at a time
go func() {
for {
// Check if we're up to date yet. If we are we exit catchup mode.
c.topic.mutex.Lock()
nextOffset := c.topic.nextOffset
if fromOffset == nextOffset {
c.catchingUp = false
c.topic.mutex.Unlock()
return
}
c.topic.mutex.Unlock()
// Limit the number of messages we request from the database to be the
// capacity of the channel.
if nextOffset > fromOffset+int64(cap(c.messages)) {
nextOffset = fromOffset + int64(cap(c.messages))
}
// Fetch the messages from the database.
msgs, err := c.topic.db.FetchMessages(c.topic.topicName, fromOffset, nextOffset)
if err != nil {
// TODO: Add option to write consumer errors to an errors channel
// as an alternative to logging the errors.
log.Print("Error reading messages: ", err)
// Wait before retrying.
// TODO: Maybe use an exponentional backoff scheme here.
// TODO: This timeout should take account of all the other goroutines
// that might be doing the same thing. (If there are a 10000 consumers
// then we don't want to end up retrying every millisecond)
time.Sleep(10 * time.Second)
continue
}
if len(msgs) == 0 {
// This should only happen if the database is corrupted and has lost the
// messages between the requested offsets.
log.Fatalf("Corrupt database returned no messages between %d and %d", fromOffset, nextOffset)
}
// Pass the messages into the consumer channel.
// Blocking each write until the channel has enough space for the message.
for i := range msgs {
c.messages <- msgs[i].ConsumerMessage(c.topic.topicName)
}
// Update our the offset for the next loop iteration.
fromOffset = msgs[len(msgs)-1].Offset + 1
}
}()
}
// notifyNewMessage tells the consumer about a new message
// Must be called with the c.topic.mutex lock
func (c *partitionConsumer) notifyNewMessage(cmsg *sarama.ConsumerMessage) {
// If we're in "catchup" mode then the catchup routine will send the
// message later, since cmsg has already been written to the database
if c.catchingUp {
return
}
// Otherwise, lets try writing the message directly to the channel
select {
case c.messages <- cmsg:
default:
// The messages channel has filled up, so lets go into catchup
// mode. Once the channel starts being read from again messages
// will be read from the database
c.catchup(cmsg.Offset)
}
}
type topic struct {
db storage.Database
topicName string
mutex sync.Mutex
consumers []*partitionConsumer
// nextOffset is the offset that will be assigned to the next message in
// this topic, i.e. one greater than the last message offset.
nextOffset int64
}
// send writes messages to a topic.
func (t *topic) send(now time.Time, pmsgs []*sarama.ProducerMessage) error {
var err error
// Encode the message keys and values.
msgs := make([]types.Message, len(pmsgs))
for i := range msgs {
if pmsgs[i].Key != nil {
msgs[i].Key, err = pmsgs[i].Key.Encode()
if err != nil {
return err
}
}
if pmsgs[i].Value != nil {
msgs[i].Value, err = pmsgs[i].Value.Encode()
if err != nil {
return err
}
}
pmsgs[i].Timestamp = now
msgs[i].Timestamp = now
msgs[i].Headers = pmsgs[i].Headers
}
// Take the lock before assigning the offsets.
t.mutex.Lock()
defer t.mutex.Unlock()
offset := t.nextOffset
for i := range msgs {
pmsgs[i].Offset = offset
msgs[i].Offset = offset
offset++
}
// Store the messages while we hold the lock.
err = t.db.StoreMessages(t.topicName, msgs)
if err != nil {
return err
}
t.nextOffset = offset
// Now notify the consumers about the messages.
for _, msg := range msgs {
cmsg := msg.ConsumerMessage(t.topicName)
for _, c := range t.consumers {
c.notifyNewMessage(cmsg)
}
}
return nil
}
func (t *topic) consume(offset int64) *partitionConsumer {
t.mutex.Lock()
defer t.mutex.Unlock()
c := &partitionConsumer{
topic: t,
}
// Handle special offsets.
if offset == sarama.OffsetNewest {
offset = t.nextOffset
}
if offset == sarama.OffsetOldest {
offset = 0
}
c.messages = make(chan *sarama.ConsumerMessage, channelSize)
t.consumers = append(t.consumers, c)
// If we're not streaming from the latest offset we need to go into
// "catchup" mode
if offset != t.nextOffset {
c.catchup(offset)
}
return c
}
func (t *topic) highwaterMark() int64 {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.nextOffset
}