-
Notifications
You must be signed in to change notification settings - Fork 3
/
replication.go
133 lines (118 loc) · 3.6 KB
/
replication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright (c) R.I. Pienaar and the Choria Project contributors
//
// SPDX-License-Identifier: Apache-2.0
package machineroom
import (
"context"
"fmt"
"sync"
srcfg "github.com/choria-io/stream-replicator/config"
"github.com/choria-io/stream-replicator/replicator"
)
// StartReplication starts to replicate our standard streams and buckets
func (b *broker) StartReplication(ctx context.Context, wg *sync.WaitGroup) error {
b.log.Infof("Starting data replication")
backendUrl := b.cfg.Option(configKeySourceHost, "")
if backendUrl == "" {
fmt.Printf("\n%#v\n", b.cfg)
return fmt.Errorf("replication source is not defined")
}
site := b.cfg.Option(configKeySite, "")
if site == "" {
return fmt.Errorf("site is not defined")
}
rcfg := &srcfg.Config{
ReplicatorName: site,
StateDirectory: defaultReplicationStateDirectory,
}
cc := &srcfg.ChoriaConnection{
SeedFileName: b.cfg.Choria.ChoriaSecuritySeedFile,
JWTFileName: b.cfg.Choria.ChoriaSecurityTokenFile,
CollectiveName: "choria",
}
rcfg.Streams = []*srcfg.Stream{
{
Name: "REGISTRATION",
Stream: "REGISTRATION",
TargetStream: "MACHINE_ROOM_NODES",
TargetURL: backendUrl,
NoTargetCreate: true,
SourceURL: "nats://localhost:9222",
SourceProcess: b.broker,
SourceChoriaConn: cc,
},
{
Name: "SUBMIT",
Stream: "SUBMIT",
TargetStream: "MACHINE_ROOM_EVENTS",
TargetURL: backendUrl,
NoTargetCreate: true,
SourceURL: "nats://localhost:9222",
SourceProcess: b.broker,
SourceChoriaConn: cc,
TargetRemoveString: "choria.submission.in.",
TargetPrefix: "machine_room.submit.",
},
{
Name: "CHORIA_EVENTS",
Stream: "CHORIA_EVENTS",
TargetStream: "MACHINE_ROOM_EVENTS",
TargetURL: backendUrl,
NoTargetCreate: true,
SourceURL: "nats://localhost:9222",
SourceProcess: b.broker,
SourceChoriaConn: cc,
TargetRemoveString: "choria.lifecycle.",
TargetPrefix: "machine_room.events.lifecycle.",
},
{
Name: "CHORIA_MACHINE",
Stream: "CHORIA_MACHINE",
TargetStream: "MACHINE_ROOM_EVENTS",
TargetURL: backendUrl,
NoTargetCreate: true,
SourceURL: "nats://localhost:9222",
SourceProcess: b.broker,
SourceChoriaConn: cc,
TargetRemoveString: "choria.machine.",
TargetPrefix: "machine_room.events.machine.",
},
}
cfgRepl := &srcfg.Stream{
Name: "KV_CONFIG",
Stream: "KV_CONFIG",
TargetStream: "KV_CONFIG",
TargetURL: "nats://localhost:9222",
TargetProcess: b.broker,
TargetChoriaConn: cc,
NoTargetCreate: true,
Ephemeral: true, // copy the entire thing each time we start to be sure we have the latest config
SourceURL: backendUrl,
}
if b.opts.ConfigBucketPrefix != "" {
cfgRepl.TargetRemoveString = b.opts.ConfigBucketPrefix
cfgRepl.FilterSubject = fmt.Sprintf("$KV.CONFIG.%s.>", b.opts.ConfigBucketPrefix)
}
rcfg.Streams = append(rcfg.Streams, cfgRepl)
err := rcfg.Validate()
if err != nil {
return err
}
for _, s := range rcfg.Streams {
b.log.Debugf("Configuring replication for stream stream %s", s.Name)
stream, err := replicator.NewStream(s, rcfg, b.log.WithField("stream", s.Name))
if err != nil {
return err
}
wg.Add(1)
go func(s *srcfg.Stream) {
defer wg.Done()
wg.Add(1)
err = stream.Run(ctx, wg)
if err != nil {
b.log.Errorf("Could not start replicator for %s: %v", s.Name, err)
}
}(s)
}
return nil
}