-
Notifications
You must be signed in to change notification settings - Fork 177
/
Copy pathtopic_manager.go
405 lines (339 loc) · 12.1 KB
/
topic_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
package goka
import (
"context"
"errors"
"fmt"
"time"
"github.com/IBM/sarama"
)
// TopicManager provides an interface to create/check topics and their partitions
type TopicManager interface {
// EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible
EnsureTableExists(topic string, npar int) error
// EnsureStreamExists checks that a stream topic exists, or create one if possible
EnsureStreamExists(topic string, npar int) error
// EnsureTopicExists checks that a topic exists, or create one if possible,
// enforcing the given configuration
EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error
// Partitions returns the number of partitions of a topic, that are assigned to the running
// instance, i.e. it doesn't represent all partitions of a topic.
Partitions(topic string) ([]int32, error)
GetOffset(topic string, partitionID int32, time int64) (int64, error)
// Close closes the topic manager
Close() error
}
type topicManager struct {
admin sarama.ClusterAdmin
client sarama.Client
topicManagerConfig *TopicManagerConfig
}
// NewTopicManager creates a new topic manager using the sarama library
func NewTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig) (TopicManager, error) {
if !saramaConfig.Version.IsAtLeast(sarama.V0_10_0_0) {
return nil, fmt.Errorf("goka's topic manager needs kafka version v0.10.0.0 or higher to function. Version is %s", saramaConfig.Version.String())
}
client, err := sarama.NewClient(brokers, saramaConfig)
if err != nil {
return nil, fmt.Errorf("Error creating the kafka client: %v", err)
}
return newTopicManager(brokers, saramaConfig, topicManagerConfig, client, checkBroker)
}
func newTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig, client sarama.Client, check checkFunc) (*topicManager, error) {
if client == nil {
return nil, errors.New("cannot create topic manager with nil client")
}
if topicManagerConfig == nil {
return nil, errors.New("cannot create topic manager with nil config")
}
activeBrokers := client.Brokers()
if len(activeBrokers) == 0 {
return nil, errors.New("no brokers active in current client")
}
broker := activeBrokers[0]
err := check(broker, saramaConfig)
if err != nil {
return nil, err
}
admin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
return nil, fmt.Errorf("error creating cluster admin: %v", err)
}
return &topicManager{
admin: admin,
client: client,
topicManagerConfig: topicManagerConfig,
}, nil
}
type checkFunc func(broker Broker, config *sarama.Config) error
func checkBroker(broker Broker, config *sarama.Config) error {
if config == nil {
config = DefaultConfig()
}
err := broker.Open(config)
if err != nil {
return fmt.Errorf("error opening broker connection: %v", err)
}
connected, err := broker.Connected()
if err != nil {
return fmt.Errorf("cannot connect to broker %s: %v", broker.Addr(), err)
}
if !connected {
return fmt.Errorf("cannot connect to broker %s: not connected", broker.Addr())
}
return nil
}
func (m *topicManager) Close() error {
return m.client.Close()
}
func (m *topicManager) Partitions(topic string) ([]int32, error) {
// refresh metadata, otherwise we might get an outdated number of partitions.
// we cannot call it for that specific topic,
// otherwise we'd create it if auto.create.topics.enable==true, which we want to avoid
if err := m.client.RefreshMetadata(); err != nil {
return nil, fmt.Errorf("error refreshing metadata %v", err)
}
topics, err := m.client.Topics()
if err != nil {
return nil, err
}
for _, tpc := range topics {
// topic exists, let's list the partitions.
if tpc == topic {
return m.client.Partitions(topic)
}
}
return nil, errTopicNotFound
}
func (m *topicManager) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
return m.client.GetOffset(topic, partitionID, time)
}
func (m *topicManager) createTopic(topic string, npar, rfactor int, config map[string]string) error {
m.topicManagerConfig.Logger.Debugf("creating topic %s with npar=%d, rfactor=%d, config=%#v", topic, npar, rfactor, config)
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(npar)
topicDetail.ReplicationFactor = int16(rfactor)
topicDetail.ConfigEntries = make(map[string]*string)
for k, v := range config {
// configEntries is a map to `*string`, so we have to make a copy of the value
// here or end up having the same value for all, since `v` has the same address everywhere
value := v
topicDetail.ConfigEntries[k] = &value
}
err := m.admin.CreateTopic(topic, topicDetail, false)
if err != nil {
return fmt.Errorf("error creating topic %s, npar=%d, rfactor=%d, config=%#v: %v",
topic, npar, rfactor, config, err)
}
return m.waitForCreated(topic)
}
func (m *topicManager) handleConfigMismatch(message string) error {
switch m.topicManagerConfig.MismatchBehavior {
case TMConfigMismatchBehaviorWarn:
m.topicManagerConfig.Logger.Printf("Warning: %s", message)
return nil
case TMConfigMismatchBehaviorFail:
return fmt.Errorf("%s", message)
// ignores per default
default:
return nil
}
}
func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[string]string) error {
partitions, err := m.Partitions(topic)
if err != nil {
if err != errTopicNotFound {
return fmt.Errorf("error checking topic: %v", err)
}
}
// no topic yet, let's create it
if len(partitions) == 0 {
// (or not)
if m.topicManagerConfig.NoCreate {
return fmt.Errorf("topic %s does not exist but the manager is configured with NoCreate, so it will not attempt to create it", topic)
}
return m.createTopic(topic,
npar,
rfactor,
config)
}
// we have a topic, let's check their values
// partitions do not match
if len(partitions) != npar {
return m.handleConfigMismatch(fmt.Sprintf("partition count mismatch for topic %s. Need %d, but existing topic has %d", topic, npar, len(partitions)))
}
// check additional config values via the cluster admin if our current version supports it
if m.adminSupported() {
cfgMap, err := m.getTopicConfigMap(topic)
if err != nil {
return err
}
// check for all user-passed config values whether they're as expected
for key, value := range config {
entry, ok := cfgMap[key]
if !ok {
return m.handleConfigMismatch(fmt.Sprintf("config for topic %s did not contain requested key %s", topic, key))
}
if entry.Value != value {
return m.handleConfigMismatch(fmt.Sprintf("unexpected config value for topic %s. Expected %s=%s. Got %s=%s", topic, key, value, key, entry.Value))
}
}
// check if the number of replicas match what we expect
topicMinReplicas, err := m.getTopicMinReplicas(topic)
if err != nil {
return err
}
if topicMinReplicas != rfactor {
return m.handleConfigMismatch(fmt.Sprintf("unexpected replication factor for topic %s. Expected %d, got %d",
topic,
rfactor,
topicMinReplicas))
}
}
return nil
}
func (m *topicManager) waitForCreated(topic string) error {
// no timeout defined -> no check
if m.topicManagerConfig.CreateTopicTimeout == 0 {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), m.topicManagerConfig.CreateTopicTimeout)
defer cancel()
for ctx.Err() == nil {
_, err := m.Partitions(topic)
switch err {
case nil:
return nil
case errTopicNotFound:
time.Sleep(time.Second)
default:
return fmt.Errorf("error checking topic: %w", err)
}
}
return fmt.Errorf("waiting for topic %s to be created timed out", topic)
}
func (m *topicManager) adminSupported() bool {
return m.client.Config().Version.IsAtLeast(sarama.V0_11_0_0)
}
func (m *topicManager) getTopicConfigMap(topic string) (map[string]sarama.ConfigEntry, error) {
cfg, err := m.admin.DescribeConfig(sarama.ConfigResource{
Type: sarama.TopicResource,
Name: topic,
})
// now it does not exist anymore -- this means the cluster is somehow unstable
if err != nil {
return nil, fmt.Errorf("Error getting config for topic %s: %w", topic, err)
}
// remap the config values to a map
cfgMap := make(map[string]sarama.ConfigEntry, len(cfg))
for _, cfgEntry := range cfg {
cfgMap[cfgEntry.Name] = cfgEntry
}
return cfgMap, nil
}
func (m *topicManager) getTopicMinReplicas(topic string) (int, error) {
topicsMeta, err := m.admin.DescribeTopics([]string{topic})
if err != nil {
return 0, fmt.Errorf("Error describing topic %s: %w", topic, err)
}
if len(topicsMeta) != 1 {
return 0, fmt.Errorf("cannot find meta data for topic %s", topic)
}
topicMeta := topicsMeta[0]
var replicasMin int
for _, part := range topicMeta.Partitions {
if replicasMin == 0 || len(part.Replicas) < replicasMin {
replicasMin = len(part.Replicas)
}
}
return replicasMin, nil
}
func (m *topicManager) EnsureStreamExists(topic string, npar int) error {
return m.ensureExists(
topic,
npar,
m.topicManagerConfig.Stream.Replication,
map[string]string{
"cleanup.policy": m.topicManagerConfig.streamCleanupPolicy(),
"retention.ms": fmt.Sprintf("%d", m.topicManagerConfig.Stream.Retention.Milliseconds()),
})
}
func (m *topicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error {
return m.ensureExists(
topic,
npar,
rfactor,
config)
}
func (m *topicManager) EnsureTableExists(topic string, npar int) error {
return m.ensureExists(
topic,
npar,
m.topicManagerConfig.Table.Replication,
map[string]string{
"cleanup.policy": m.topicManagerConfig.tableCleanupPolicy(),
})
}
// TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be
// treated
type TMConfigMismatchBehavior int
const (
// TMConfigMismatchBehaviorIgnore ignore wrong config values
TMConfigMismatchBehaviorIgnore TMConfigMismatchBehavior = 0
// TMConfigMismatchBehaviorWarn warns if the topic is configured differently than requested
TMConfigMismatchBehaviorWarn TMConfigMismatchBehavior = 1
// TMConfigMismatchBehaviorFail makes checking the topic fail, if the configuration different than requested
TMConfigMismatchBehaviorFail TMConfigMismatchBehavior = 2
)
// TopicManagerConfig contains options of to create tables and stream topics.
type TopicManagerConfig struct {
Logger logger
Table struct {
Replication int
// CleanupPolicy allows to overwrite the default cleanup policy for streams.
// Defaults to 'compact' if not set
CleanupPolicy string
}
Stream struct {
Replication int
Retention time.Duration
// CleanupPolicy allows to overwrite the default cleanup policy for streams.
// Defaults to 'delete' if not set
CleanupPolicy string
}
// CreateTopicTimeout timeout for the topic manager to wait for the topic being created.
// Set to 0 to turn off checking topic creation.
// Defaults to 10 seconds
CreateTopicTimeout time.Duration
// TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be
// treated
MismatchBehavior TMConfigMismatchBehavior
// If set to true, the topic manager will not attempt to create the topic.
// This can be used if topic creation should be done externally.
NoCreate bool
}
func (tmc *TopicManagerConfig) streamCleanupPolicy() string {
if tmc.Stream.CleanupPolicy != "" {
return tmc.Stream.CleanupPolicy
}
return "delete"
}
func (tmc *TopicManagerConfig) tableCleanupPolicy() string {
if tmc.Table.CleanupPolicy != "" {
return tmc.Table.CleanupPolicy
}
return "compact"
}
// NewTopicManagerConfig provides a default configuration for auto-creation
// with replication factor of 2 and rentention time of 1 hour.
// Use this function rather than creating TopicManagerConfig from scratch to
// initialize the config with reasonable defaults
func NewTopicManagerConfig() *TopicManagerConfig {
cfg := new(TopicManagerConfig)
cfg.Table.Replication = 2
cfg.Stream.Replication = 2
cfg.Stream.Retention = 1 * time.Hour
cfg.MismatchBehavior = TMConfigMismatchBehaviorIgnore
cfg.Logger = defaultLogger.Prefix("topic_manager")
cfg.CreateTopicTimeout = 10 * time.Second
return cfg
}