This repository has been archived by the owner on Nov 17, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 21
/
README-hacking
143 lines (108 loc) · 5.82 KB
/
README-hacking
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
This file is intended to tell you How It All Works, concentrating on
the things you might not expect.
The theory
==========
The 'x-federation' exchange is defined in
rabbit_federation_exchange. This starts up a bunch of link processes
(one for each upstream) which:
* Connect to the upstream broker
* Create a queue and bind it to the upstream exchange
* Keep bindings in sync with the downstream exchange
* Consume messages from the upstream queue and republish them to the
downstream exchange (matching confirms with acks)
Each link process monitors the connections / channels it opens, and
dies if they do. We use a supervisor2 to ensure that we get some
backoff when restarting.
We use process groups to identify all link processes for a certain
exchange, as well as all link processes together.
However, there are a bunch of wrinkles:
Wrinkle: The exchange will be recovered when the Erlang client is not available
===============================================================================
Exchange recovery happens within the rabbit application - therefore at
the time that the exchange is recovered, we can't make any connections
since the amqp_client application has not yet started. Each link
therefore initially has a state 'not_started'. When it is created it
checks to see if the rabbitmq_federation application is running. If
so, it starts fully. If not, it goes into the 'not_started'
state. When rabbitmq_federation starts, it sends a 'go' message to all
links, prodding them to bring up the link.
Wrinkle: On reconnect we want to assert bindings atomically
===========================================================
If the link goes down for whatever reason, then by the time it comes
up again the bindings downstream may no longer be in sync with those
upstream. Therefore on link establishment we want to ensure that a
certain set of bindings exists. (Of course bringing up a link for the
first time is a simple case of this.) And we want to do this with AMQP
methods. But if we were to tear down all bindings and recreate them,
we would have a time period when messages would not be forwarded for
bindings that *do* still exist before and after.
We use exchange to exchange bindings to work around this:
We bind the upstream exchange (X) to the upstream queue (Q) via an
internal fanout exchange (IXA) like so: (routing keys R1 and R2):
X----R1,R2--->IXA---->Q
This has the same effect as binding the queue to the exchange directly.
Now imagine the link has gone down, and is about to be
reestablished. In the meanwhile, routing has changed downstream so
that we now want routing keys R1 and R3. On link reconnection we can
create and bind another internal fanout exchange IXB:
X----R1,R2--->IXA---->Q
| ^
| |
\----R1,R3--->IXB-----/
and then delete the original exchange IXA:
X Q
| ^
| |
\----R1,R3--->IXB-----/
This means that messages matching R1 are always routed during the
switchover. Messages for R3 will start being routed as soon as we bind
the second exchange, and messages for R2 will be stopped in a timely
way. Of course this could lag the downstream situation somewhat, in
which case some R2 messages will get thrown away downstream since they
are unroutable. However this lag is inevitable when the link goes
down.
This means that the downstream only needs to keep track of whether the
upstream is currently going via internal exchange A or B. This is
held in the exchange scratch space in Mnesia.
Wrinkle: We need to amalgamate bindings
=======================================
Since we only bind to one exchange upstream, but the downstream
exchange can be bound to many queues, we can have duplicated bindings
downstream (same source, routing key and args but different
destination) that cannot be duplicated upstream (since the destination
is the same). The link therefore maintains a mapping of (Key, Args) to
set(Dest). Duplicated bindings do not get repeated upstream, and are
only unbound upstream when the last one goes away downstream.
Furthermore, this works as an optimisation since this will tend to
reduce upstream binding count and churn.
Wrinkle: We may receive binding events out of order
===================================================
The rabbit_federation_exchange callbacks are invoked by channel
processes within rabbit. Therefore they can be executed concurrently,
and can arrive at the link processes in an order that does not
correspond to the wall clock.
We need to keep the state of the link in sync with Mnesia. Therefore
not only do we need to impose an ordering on these events, we need to
impose Mnesia's ordering on them. We therefore added a function to the
callback interface, serialise_events. When this returns true, the
callback mechanism inside rabbit increments a per-exchange counter
within an Mnesia transaction, and returns the value as part of the
add_binding and remove_binding callbacks. The link process then queues
up these events, and replays them in order. The link process's state
thus always follows Mnesia (it may be delayed, but the effects happen
in the same order).
Other issues
============
Since links are implemented in terms of AMQP, link failure may cause
messages to be redelivered. If you're unlucky this could lead to
duplication.
Message duplication can also happen with some topologies. In some
cases it may not be possible to set max_hops such that messages arrive
once at every node.
While we correctly order bind / unbind events, we don't do the same
thing for exchange creation / deletion. (This is harder - if you
delete and recreate an exchange with the same name, is it the same
exchange? What about if its type changes?) This would only be an issue
if exchanges churn rapidly; however we could get into a state where
Mnesia sees CDCD but we see CDDC and leave a process running when we
shouldn't.