-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathRapidStateMachine.swift
694 lines (616 loc) · 30.9 KB
/
RapidStateMachine.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
import Foundation
import NIO
import Dispatch
import Logging
/// TODO
/// - graceful leaving of this node
/// - implement missing cut detection recovery if fast path fails (timeout on unstable reports / implicit detection)
final class RapidStateMachine: Actor {
private let logger = Logger(label: "rapid.RapidStateMachine")
typealias MessageType = RapidCommand
typealias ResponseType = RapidResult
internal let el: EventLoop
private var state: State
enum State {
case initial(CommonState)
case active(ActiveState)
case viewChanging(ViewChangingState)
case leaving
case left
}
enum RapidStateMachineError: Error {
case messageInInvalidState(State)
case viewChangeInProgress
case noStateAvailable
}
func receive(_ msg: MessageType, _ callback: ((Result<ResponseType, Error>) -> ())? = nil) {
do {
switch(state) {
case .initial:
throw RapidStateMachineError.messageInInvalidState(state)
case .active(var currentState):
let nextState = try currentState.handleMessage(msg, callback)
self.state = nextState
case .viewChanging(var currentState):
let nextState = try currentState.handleMessage(msg, callback)
self.state = nextState
case .leaving, .left:
// TODO we need to do graceful handling of messages in leaving and then transition to left
throw RapidStateMachineError.messageInInvalidState(state)
}
} catch {
callback?(Result.failure(error))
}
}
/// Initialize the Rapid state machine
init(selfEndpoint: Endpoint, settings: Settings, view: MembershipView, failureDetectorProvider: EdgeFailureDetectorProvider,
broadcaster: Broadcaster, messagingClient: MessagingClient, allMetadata: [Endpoint: Metadata],
subscriptions: [(RapidCluster.ClusterEvent) -> ()],
el: EventLoop) throws {
self.el = el
let commonState = CommonState(
selfEndpoint: selfEndpoint,
settings: settings,
el: el,
view: view,
metadata: allMetadata,
failureDetectorProvider: failureDetectorProvider,
broadcaster: broadcaster,
messagingClient: messagingClient,
subscriptions: subscriptions)
self.state = .initial(commonState)
}
/// Starts the state machine by switching to the active state
func start(ref: ActorRef<RapidStateMachine>) throws {
switch state {
case .initial(let commonState):
var activeState = try ActiveState(commonState, ref: ref)
// batch alerts
activeState.common.alertBatchJob = el.scheduleRepeatedTask(initialDelay: commonState.settings.batchingWindow, delay: commonState.settings.batchingWindow) { _ in
ref.tell(.batchedAlertTick)
}
self.state = .active(activeState)
default:
fatalError("Can only start in initial state")
}
}
@discardableResult
func stop(el: EventLoop) -> EventLoopFuture<Void> {
func cancelFailureDetectors(failureDetectors: [RepeatedTask]) -> EventLoopFuture<Void> {
let cancellationFutures: [EventLoopFuture<Void>] = failureDetectors.map { fd in
let promise: EventLoopPromise<Void> = el.makePromise()
fd.cancel(promise: promise)
return promise.futureResult
}
return EventLoopFuture.whenAllComplete(cancellationFutures, on: el).map { _ in () }
}
switch state {
case .active(let activeState):
activeState.common.alertBatchJob?.cancel()
return cancelFailureDetectors(failureDetectors: activeState.failureDetectors)
case .viewChanging(let viewChangingState):
viewChangingState.common.alertBatchJob?.cancel()
return cancelFailureDetectors(failureDetectors: viewChangingState.failureDetectors)
default:
return el.makeSucceededFuture(())
}
}
/// ~~~ protocol
enum RapidCommand {
case rapidRequest(RapidRequest)
case subjectFailed(Endpoint)
case viewChangeDecided([Endpoint])
case batchedAlertTick
case retrieveMemberList
case retrieveMetadata
}
enum RapidResult {
case rapidResponse(RapidResponse)
case memberList([Endpoint])
case metadata([Endpoint: Metadata])
}
/// ~~~ states
struct ActiveState: SubjectFailedHandler, ProbeMessageHandler, BatchedAlertMessageHandler, AlertBatcher, StateQueryHandler {
private let logger = Logger(label: "rapid.RapidStateMachine")
var common: CommonState
// TODO be less of a troll with the naming
var this: ActorRef<RapidStateMachine>
// ~~~ membership protocol
var cutDetector: MultiNodeCutDetector
var failureDetectors: [RepeatedTask]
// ~~~ postponed consensus messages
// we only consider those once we have transitioned to the viewChanging state
var postponedConsensusMessages = [RapidRequest]()
init(_ common: CommonState, ref: ActorRef<RapidStateMachine>) throws {
self.common = common
self.this = ref
common.broadcaster.setMembership(recipients: common.view.getRing(k: 0).contents)
try self.cutDetector = MultiNodeCutDetector(K: common.settings.K, H: common.settings.H, L: common.settings.L)
// failure detectors
let subjects = try common.view.getSubjectsOf(node: common.selfEndpoint)
// for smaller clusters, the expander graph repeats subjects. we only want one failure detector per host though
// also, don't monitor ourselves
let monitoredSubjects = Set(subjects).filter { $0 != common.selfEndpoint }
try self.failureDetectors = monitoredSubjects.map { subject in
let fd = try common.failureDetectorProvider.createInstance(subject: subject, signalFailure: { failedSubject in
ref.tell(.subjectFailed(subject))
})
let fdTask = { (task: RepeatedTask) in
fd().hop(to: common.el)
}
return common.el.scheduleRepeatedAsyncTask(initialDelay: TimeAmount.seconds(1), delay: common.settings.failureDetectorInterval, fdTask)
}
}
init(_ previousState: ViewChangingState) throws {
try self.init(previousState.common, ref: previousState.this)
}
mutating func handleMessage(_ msg: MessageType, _ callback: ((Result<ResponseType, Error>) -> ())? = nil) throws -> State {
func respond(_ rapidResponse: RapidResponse) {
callback?(Result.success(RapidResult.rapidResponse(rapidResponse)))
}
switch(msg) {
case .rapidRequest(let request):
switch request.content {
case .joinMessage(let join):
// note: we may postpone the reply until the cluster has gone into a new configuration
// which is why we pass in the callback here
let joinResponse = try handleJoin(msg: join, callback: callback)
if let response = joinResponse {
let rapidResponse = RapidResponse.with {
$0.joinResponse = response
}
respond(rapidResponse)
}
return .active(self)
case .batchedAlertMessage(let batchedAlertMessage):
let proposal = handleBatchedAlert(msg: batchedAlertMessage)
if (proposal.isEmpty) {
respond(RapidResponse())
return .active(self)
} else {
respond(RapidResponse())
// TODO debug logging
// print("\(common.selfEndpoint.port) Switching to view changing state")
return try .viewChanging(ViewChangingState(self, proposal: proposal))
}
case .probeMessage(let probe):
let response = handleProbe(msg: probe)
respond(response)
return .active(self)
case .fastRoundPhase2BMessage, .phase1AMessage, .phase1BMessage, .phase2AMessage, .phase2BMessage:
let response = try handleConsensus(msg: request)
respond(response)
return .active(self)
case .leaveMessage(let leave):
let response = try handleLeave(msg: leave)
respond(response)
return .active(self)
case .none:
return .active(self)
}
case .subjectFailed(let subject):
try handleSubjectFailed(subject)
return .active(self)
case .batchedAlertTick:
sendAlertBatch()
return .active(self)
case .viewChangeDecided:
fatalError("How on earth are we here?")
case .retrieveMemberList:
callback?(Result.success(RapidResult.memberList(getMemberList())))
return .active(self)
case .retrieveMetadata:
callback?(Result.success(RapidResult.metadata(getMetadata())))
return .active(self)
}
}
func applyCutDetection(alerts: [AlertMessage]) -> [Endpoint] {
// apply all the valid messages to the cut detector to obtain a view change proposal
var proposal = alerts.flatMap { cutDetector.aggregate(alert: $0) }
// apply implicit detections
proposal.append(contentsOf: cutDetector.invalidateFailingEdges(view: common.view))
return proposal
}
// ~~~ event handling
private mutating func handleJoin(msg: JoinMessage, callback: ((Result<ResponseType, Error>) -> ())?) throws -> JoinResponse? {
let configuration = common.view.getCurrentConfiguration()
let statusCode = common.view.isSafeToJoin(node: msg.sender, uuid: msg.nodeID)
switch(statusCode) {
case .sameNodeAlreadyInRing:
// this can happen if a join attempt times out at the joining node
// yet the response was about to be sent
// simply reply that they're welcome to join so they can get the membership list
let response = JoinResponse.with {
$0.sender = common.selfEndpoint
$0.configurationID = configuration.configurationId
$0.statusCode = JoinStatusCode.safeToJoin
$0.endpoints = configuration.endpoints
$0.identifiers = Array(configuration.nodeIds)
$0.metadataKeys = Array(common.metadata.keys)
$0.metadataValues = Array(common.metadata.values)
}
return response
case .safeToJoin:
common.joiners[msg.sender] = callback
// simulate K alerts, one for each of the expected observers
let observers = common.view.getExpectedObserversOf(node: msg.sender)
for i in 0..<observers.count {
let joinAlert = AlertMessage.with {
$0.edgeSrc = observers[i]
$0.edgeDst = msg.sender
$0.edgeStatus = EdgeStatus.up
$0.configurationID = configuration.configurationId
$0.nodeID = msg.nodeID
$0.ringNumber = [Int32(i)]
$0.metadata = msg.metadata
}
enqueueAlertMessage(joinAlert)
}
return nil
case .hostnameAlreadyInRing:
// do not let the node join. it will have to wait until failure detection kicks in and
// a new membership view is agreed upon in order to be able to join again.
let response = JoinResponse.with {
$0.sender = common.selfEndpoint
$0.statusCode = JoinStatusCode.hostnameAlreadyInRing
}
return response
default:
// in all other cases, the client should try rejoining
let response = JoinResponse.with {
$0.sender = common.selfEndpoint
$0.statusCode = statusCode
}
return response
}
}
private func handleAlert(msg: BatchedAlertMessage) throws -> RapidResponse {
fatalError("Not implemented")
}
private mutating func handleLeave(msg: LeaveMessage) throws -> RapidResponse {
// propagate the intent of a node to leave by proactively notifying of edge failure
try edgeFailureNotification(subject: msg.sender, configurationId: common.view.getCurrentConfigurationId())
return RapidResponse()
}
private mutating func handleConsensus(msg: RapidRequest) throws -> RapidResponse {
postponedConsensusMessages.append(msg)
// reply now so that the sender doesn't retry propagating this (in case of an eager broadcasting mechanism)
return RapidResponse()
}
}
struct ViewChangingState: SubjectFailedHandler, ProbeMessageHandler, BatchedAlertMessageHandler, AlertBatcher, StateQueryHandler {
var common: CommonState
// TODO be less of a troll with the naming
var this: ActorRef<RapidStateMachine>
var failureDetectors: [RepeatedTask]
var fastPaxos: FastPaxos
// a stash of all messages that we didn't want to handle in this state
var stashedMessages: [RapidRequest] = []
// no more alert message queue, we have flushed them before
// no need for a cut detector, we already made the decision to reconfigure
init(_ previousState: ActiveState, proposal: [Endpoint]) throws {
self.common = previousState.common
self.this = previousState.this
self.failureDetectors = previousState.failureDetectors
self.fastPaxos = FastPaxos(
selfEndpoint: common.selfEndpoint,
configurationId: common.view.getCurrentConfigurationId(),
membershipSize: common.view.getMembershipSize(),
decisionCallback: { (endpoints: [Endpoint]) in
previousState.this.tell(.viewChangeDecided(endpoints))
},
messagingClient: common.messagingClient,
broadcaster: self.common.broadcaster,
settings: self.common.settings,
el: self.common.el
)
// we just got a proposal, directly start the consensus process on it
// sort it first with the same seed so that we have a stable proposal on all nodes
let seed = 0
var sortedProposal = proposal
sortedProposal.sort { $0.ringHash(seed: seed) < $1.ringHash(seed: seed) }
for callback in previousState.common.subscriptions {
callback(.viewChangeProposal(proposal))
}
// now directly handle with all the postponed consensus messages we have received while in the active state
for consensusMessage in previousState.postponedConsensusMessages {
let _ = try handleConsensus(msg: consensusMessage)
}
let _ = fastPaxos.propose(proposal: sortedProposal)
}
mutating func handleMessage(_ msg: RapidCommand, _ callback: ((Result<ResponseType, Error>) -> ())? = nil) throws -> State {
func respond(_ rapidResponse: RapidResponse) {
callback?(Result.success(RapidResult.rapidResponse(rapidResponse)))
}
switch(msg) {
case .rapidRequest(let request):
switch request.content {
case .joinMessage:
let response = RapidResponse.with {
$0.joinResponse = JoinResponse.with {
$0.sender = common.selfEndpoint
$0.statusCode = JoinStatusCode.viewChangeInProgress
}
}
respond(response)
return .viewChanging(self)
case .batchedAlertMessage(let batchedAlertMessage):
// we don't care about any resulting proposal here as we do not perform cut detection anyway
// however, we need to handle the alerts for joining nodes for which we store UUIDs and metadata
let _ = handleBatchedAlert(msg: batchedAlertMessage)
respond(RapidResponse())
return .viewChanging(self)
case .probeMessage(let probe):
let response = handleProbe(msg: probe)
respond(response)
return .viewChanging(self)
case .fastRoundPhase2BMessage, .phase1AMessage, .phase1BMessage, .phase2AMessage, .phase2BMessage:
let response = try handleConsensus(msg: request)
respond(response)
return .viewChanging(self)
case .leaveMessage:
stashedMessages.append(request)
// reply immediately, leave is a best-effort operation anyway
respond(RapidResponse())
return .viewChanging(self)
case .none:
return .viewChanging(self)
}
case .subjectFailed(let subject):
// we have to handle this message in this state as the failure detectors can trigger right before
// we switch to this state or whilst we haven't reached consensus yet and the failure detectors are
// still active. there's a risk that we alert of failed edges with the old configuration ID, which means
// that the alerts will be ignored by nodes that have already moved to a new configuration or by nodes
// that have already moved to this state (since we don't apply cut detection while in this state)
// in this case the failure detection may take a bit longer - but that's just something we need to live
// with according to the virtual synchrony model
try handleSubjectFailed(subject)
return .viewChanging(self)
case .viewChangeDecided(let proposal):
try handleViewChangeDecided(proposal: proposal)
// create a synthetic batchedAlertMessage from all the postponed ones
let batch = RapidRequest.with {
$0.batchedAlertMessage = BatchedAlertMessage.with {
$0.messages = common.postponedAlertMessages
}
}
self.this.tell(.rapidRequest(batch))
common.postponedAlertMessages = []
// unstash all and return to active state
for msg in stashedMessages {
self.this.tell(.rapidRequest(msg))
}
// TODO debug logging
// print("\(common.selfEndpoint.port) Switching to active state")
return .active(try ActiveState(self))
case .batchedAlertTick:
sendAlertBatch()
return .viewChanging(self)
case .retrieveMemberList:
callback?(Result.success(RapidResult.memberList(getMemberList())))
return .viewChanging(self)
case .retrieveMetadata:
callback?(Result.success(RapidResult.metadata(getMetadata())))
return .viewChanging(self)
}
}
private mutating func handleConsensus(msg: RapidRequest) throws -> RapidResponse {
switch(msg.content) {
case .fastRoundPhase2BMessage:
// we receive a consensus round vote for a configuration we haven't learned about yet
// this can happen when we have not yet received enough alert messages to trigger a view change
if (!common.view.getCurrentConfiguration().knownConfigurations.contains(msg.fastRoundPhase2BMessage.configurationID)) {
stashedMessages.append(msg)
} else {
fastPaxos.handleFastRoundProposal(proposalMessage: msg.fastRoundPhase2BMessage)
}
return RapidResponse()
case .phase1AMessage, .phase1BMessage, .phase2AMessage, .phase2BMessage:
fatalError("Not implemented")
default:
fatalError("Should not be here")
}
}
private mutating func handleViewChangeDecided(proposal: [Endpoint]) throws {
// disable our failure detectors as they will be recreated when transitioning back into the active state
for fd in failureDetectors {
fd.cancel()
}
// add or remove nodes to / from the ring
var statusChanges = [RapidCluster.NodeStatusChange]()
for node in proposal {
let isPresent = common.view.isHostPresent(node)
// if the node is already in the ring, remove it. Else, add it.
if (isPresent) {
try common.view.ringDelete(node: node)
let metadata = common.metadata.removeValue(forKey: node)
statusChanges.append(RapidCluster.NodeStatusChange(node: node, status: .down, metadata: metadata ?? Metadata()))
} else {
guard let joinerNodeId = common.joinerNodeIds.removeValue(forKey: node) else {
// FIXME this will fail if the broadcasted alert about the JOIN and the consensus votes are not
// FIXME delivered in order (see 10k node experiment)
fatalError("NodeId of joining node unknown")
}
try common.view.ringAdd(node: node, nodeId: joinerNodeId)
guard let joinerMetadata = common.joinerMetadata.removeValue(forKey: node) else {
// this shouldn't happen, we should've failed at the UUID missing
fatalError("Metadata of joining node unknown")
}
common.metadata[node] = joinerMetadata
statusChanges.append(RapidCluster.NodeStatusChange(node: node, status: .up, metadata: joinerMetadata))
}
}
// respond to the nodes that joined through us
respondToJoiners(proposal: proposal)
for callback in common.subscriptions {
callback(.viewChange(RapidCluster.ViewChange(configurationId: common.view.getCurrentConfigurationId(), statusChanges: statusChanges)))
}
}
func applyCutDetection(alerts: [AlertMessage]) -> [Endpoint] {
// we already have a proposal in progress so we don't apply cut detection
[]
}
mutating func respondToJoiners(proposal: [Endpoint]) {
let configuration = common.view.getCurrentConfiguration()
let response = RapidResponse.with {
$0.joinResponse = JoinResponse.with {
$0.sender = common.selfEndpoint
$0.statusCode = JoinStatusCode.safeToJoin
$0.configurationID = configuration.configurationId
$0.endpoints = configuration.endpoints
$0.identifiers = Array(configuration.nodeIds)
$0.metadataKeys = Array(common.metadata.keys)
$0.metadataValues = Array(common.metadata.values)
}
}
let negativeResponse = RapidResponse.with {
$0.joinResponse = JoinResponse.with {
$0.sender = common.selfEndpoint
$0.statusCode = JoinStatusCode.viewChangeInProgress
}
}
for (node, joinerCallback) in common.joiners {
if (proposal.contains(node)) {
joinerCallback?(Result.success(RapidResult.rapidResponse(response)))
} else {
joinerCallback?(Result.success(RapidResult.rapidResponse(negativeResponse)))
}
}
common.joiners = [:]
}
}
struct CommonState {
var selfEndpoint: Endpoint
var settings: Settings
var el: EventLoop
// ~~~ membership protocol
var view: MembershipView
var metadata: [Endpoint:Metadata]
var failureDetectorProvider: EdgeFailureDetectorProvider
// ~~~ joiner state
var joiners = [Endpoint:((Result<ResponseType, Error>) -> ())?]()
var joinerNodeIds = [Endpoint:NodeId]()
var joinerMetadata = [Endpoint:Metadata]()
// ~~~ communication
var broadcaster: Broadcaster
var messagingClient: MessagingClient
var alertMessageQueue = [AlertMessage]()
var alertSendingDeadline: NIODeadline = NIODeadline.now()
var alertBatchJob: RepeatedTask?
var subscriptions: [(RapidCluster.ClusterEvent) -> ()]
// ~~~ postponed alert messages
// these are alert messages for a yet unknown configuration if the node is lagging behind the rest
var postponedAlertMessages = [AlertMessage]()
}
}
protocol SubjectFailedHandler {
var common: RapidStateMachine.CommonState { get set }
}
extension SubjectFailedHandler where Self: AlertBatcher {
mutating func handleSubjectFailed(_ subject: Endpoint) throws {
try edgeFailureNotification(subject: subject, configurationId: common.view.getCurrentConfigurationId())
}
mutating func edgeFailureNotification(subject: Endpoint, configurationId: UInt64) throws {
if (configurationId != common.view.getCurrentConfigurationId()) {
// TODO INFO log
print("Ignoring failure notification from old configuration \(configurationId)")
return
}
let alert = try AlertMessage.with {
$0.edgeSrc = common.selfEndpoint
$0.edgeDst = subject
$0.edgeStatus = EdgeStatus.down
$0.configurationID = configurationId
try $0.ringNumber = common.view.getRingNumbers(observer: common.selfEndpoint, subject: subject)
}
enqueueAlertMessage(alert)
}
}
protocol ProbeMessageHandler { }
extension ProbeMessageHandler {
func handleProbe(msg: ProbeMessage) -> RapidResponse {
return RapidResponse.with {
$0.probeResponse = ProbeResponse()
}
}
}
protocol BatchedAlertMessageHandler {
var common: RapidStateMachine.CommonState { get set }
func applyCutDetection(alerts: [AlertMessage]) -> [Endpoint]
}
extension BatchedAlertMessageHandler {
mutating func handleBatchedAlert(msg: BatchedAlertMessage) -> [Endpoint] {
let validAlerts = msg.messages
// First, we filter out invalid messages that violate membership invariants.
.filter(filterAlertMessages)
// For valid UP alerts, extract the joiner details (UUID and metadata) which is going to be needed
// when the node is added to the rings
.map { extractJoinerNodeIdAndMetadata($0) }
// stash those alerts that we shouldn't have seen yet
// this happens when a part of the group is lagging behind the rest
let postponedAlerts = msg.messages.filter { alert in
!common.view.getCurrentConfiguration().knownConfigurations.contains(alert.configurationID)
}
common.postponedAlertMessages.append(contentsOf: postponedAlerts)
return applyCutDetection(alerts: validAlerts)
}
private func filterAlertMessages(alert: AlertMessage) -> Bool {
let destination = alert.edgeDst
let currentConfigurationID = common.view.getCurrentConfigurationId()
if (currentConfigurationID != alert.configurationID) {
return false
}
// The invariant we want to maintain is that a node can only go into the
// membership set once and leave it once.
if (alert.edgeStatus == EdgeStatus.up && common.view.isHostPresent(destination)) {
// TODO use logging (TRACE)
print("AlertMessage with status UP received for node \(destination) already in configuration \(currentConfigurationID)")
return false
}
if (alert.edgeStatus == EdgeStatus.down && !common.view.isHostPresent(destination)) {
// TODO use logging (TRACE)
print("AlertMessage with status DOWN received for node \(destination) not in configuration \(currentConfigurationID)")
return false
}
return true
}
private mutating func extractJoinerNodeIdAndMetadata(_ alert: AlertMessage) -> AlertMessage {
if (alert.edgeStatus == EdgeStatus.up) {
common.joinerNodeIds[alert.edgeDst] = alert.nodeID
common.joinerMetadata[alert.edgeDst] = alert.metadata
}
return alert
}
}
protocol AlertBatcher {
var common: RapidStateMachine.CommonState { get set }
}
extension AlertBatcher {
mutating func enqueueAlertMessage(_ msg: AlertMessage) {
common.alertMessageQueue.append(msg)
common.alertSendingDeadline = NIODeadline.uptimeNanoseconds(UInt64(common.settings.batchingWindow.nanoseconds))
}
mutating func sendAlertBatch() {
if (common.alertSendingDeadline.isOverdue() && !common.alertMessageQueue.isEmpty) {
let batchedAlertRequest = RapidRequest.with {
$0.batchedAlertMessage = BatchedAlertMessage.with {
$0.sender = common.selfEndpoint
$0.messages = common.alertMessageQueue
}
}
common.broadcaster.broadcast(request: batchedAlertRequest)
common.alertMessageQueue = []
}
}
}
protocol StateQueryHandler {
var common: RapidStateMachine.CommonState { get }
}
extension StateQueryHandler {
func getMemberList() -> [Endpoint] {
common.view.getRing(k: 0).contents
}
func getMetadata() -> [Endpoint: Metadata] {
common.metadata
}
}