Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a potential race condition in the Floodgate. #58

Merged
merged 1 commit into from
Apr 14, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions ReactiveFeedback/Floodgate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
struct QueueState {
var events: [(Event, Token)] = []
var isOuterLifetimeEnded = false

var hasEvents: Bool {
events.isEmpty == false && isOuterLifetimeEnded == false
}
}

let (stateDidChange, changeObserver) = Signal<State, Never>.pipe()
Expand All @@ -26,22 +30,41 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
defer { reducerLock.unlock() }

guard !hasStarted else { return }

hasStarted = true

changeObserver.send(value: state)
drainEvents()
}

override func process(_ event: Event, for token: Token) {
enqueue(event, for: token)

if reducerLock.try() {
// Fast path: No running effect.
defer { reducerLock.unlock() }

consume(event)
drainEvents()
} else {
// Slow path: Enqueue the event for the running effect to drain it on behalf of us.
enqueue(event, for: token)
repeat {
drainEvents()
reducerLock.unlock()
} while queue.withValue({ $0.hasEvents }) && reducerLock.try()
// ^^^
// Restart the event draining after we unlock the reducer lock, iff:
//
// 1. the queue still has unprocessed events; and
// 2. no concurrent actor has taken the reducer lock, which implies no event draining would be started
// unless we take active action.
//
// This eliminates a race condition in the following sequence of operations:
//
// | Thread A | Thread B |
// |------------------------------------|------------------------------------|
// | concurrent dequeue: no item | |
// | | concurrent enqueue |
// | | trylock lock: BUSY |
// | unlock lock | |
// | | |
// | <<< The enqueued event is left unprocessed. >>> |
//
// The trylock-unlock duo has a synchronize-with relationship, which ensures that Thread A must see any
// concurrent enqueue that *happens before* the trylock.
}
}

Expand All @@ -65,7 +88,7 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {

private func dequeue() -> Event? {
queue.modify {
guard !$0.isOuterLifetimeEnded, !$0.events.isEmpty else { return nil }
guard $0.hasEvents else { return nil }
return $0.events.removeFirst().0
}
}
Expand Down