Skip to content

Commit

Permalink
Fix more strict serializable bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
liuliu committed Sep 21, 2021
1 parent 0e95bac commit 97392ed
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 75 deletions.
18 changes: 18 additions & 0 deletions WORKLOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
2021-09-21
----------

Strict serializable bugs require more tests than we currently have. Right now, it doesn't impose significant penalties because in real world, few people cared about `performChanges([A.self, B.self]` `performChanges([B.self]` and which change block should be executed first in simple apps. These updates can be interleaved and still yield correct result. At the end of the day, if you really care about that, you should use the completion handler. However, having this done correctly can avoid subtle bugs people don't aware previously.

Yesterday, while pondering what can I do to support transactions with dictionary, I looked at our performChanges and found more strict serializable bugs. It comes down to how `DispatchQueue` and `DispatchGroup` may not be the best abstraction to express stream operations, and now I appreciate more CUDA's event / stream model.

What I want to achieve, is to schedule my work on to queues such that when there is a dependency on previous items on the queue, it can be expressed. With DispatchQueue / DispatchGroup, it is possible, but to do that, requires to track every item dispatch to the queue and either use `DispatchWorkItem.notify` or `DispatchGroup.notify`. These are not compatible with `DispatchQueue.async` in the sense that `DispatchGroup.notify(queue: queue` will be dispatched after `queue.async` if the group currently is blocked. To put it simply, there is no API to allow us to manipulate items on the queue. This is in contrast with CUDA's stream / event API, where `EventSignal(stream`, `StreamWait(event` will tap into exactly the point where the work items queued up at that point in time, or any items dispatched after will wait for that particular event. There is no such API on Dispatch side. I am currently end up with a what I believe *correct* but cumbersome way to do this, the good part is if you do this with no cross-table transactions, there is no penality, otherwise, it looks like this:

1. Pick the primary queue (by sorting the transaction-table identifier), for other queues, we will dispatch async with a new group, and inside that, we will suspend these queues;

2. DispatchQueue.async on the primary queue, and explicitly wait that group inside the block. This makes sure any blocks dispatched on this queue or any other queues won't get executed until the current block is done;

3. At the end of the current block, resume other queues.

It is pretty heavy-handed, but it guarantees the correctness in the strict-serializable sense.


2021-09-20
----------

Expand Down
32 changes: 11 additions & 21 deletions src/sqlite/SQLiteTableSpace.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ protocol SQLiteTableSpace: AnyObject {
func connect(_ closure: () -> SQLiteConnection?) -> SQLiteConnection?
func lock()
func unlock()
func enter()
func leave()
func wait(for: DispatchQueue)
func notify(work: DispatchWorkItem)
func resume()
func enterAndSuspend(_: DispatchGroup)
}

final class ConcurrentSQLiteTableSpace: SQLiteTableSpace {
let queue: DispatchQueue
let state = SQLiteTableState()
var resultPublisher: ResultPublisher? = nil
private let group = DispatchGroup()
private var connection: SQLiteConnection? = nil
private var _shutdown: Bool = false
private var _lock: os_unfair_lock_s = os_unfair_lock()
Expand All @@ -42,17 +39,14 @@ final class ConcurrentSQLiteTableSpace: SQLiteTableSpace {
func unlock() {
os_unfair_lock_unlock(&_lock)
}
func enter() {
group.enter()
func resume() {
queue.resume()
}
func leave() {
group.leave()
}
func wait(for: DispatchQueue) {
queue.async(group: group, execute: DispatchWorkItem(flags: .enforceQoS) {})
}
func notify(work: DispatchWorkItem) {
group.notify(queue: queue, work: work)
func enterAndSuspend(_ group: DispatchGroup) {
let queue = self.queue
queue.async(group: group) {
queue.suspend()
}
}
}

Expand All @@ -76,10 +70,6 @@ final class SerialSQLiteTableSpace: SQLiteTableSpace {
}
func lock() {}
func unlock() {}
func enter() {}
func leave() {}
func wait(for: DispatchQueue) {}
func notify(work: DispatchWorkItem) {
queue.async(execute: work)
}
func resume() {}
func enterAndSuspend(_: DispatchGroup) {}
}
123 changes: 71 additions & 52 deletions src/sqlite/SQLiteWorkspace.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,64 +176,83 @@ public final class SQLiteWorkspace: Workspace {
tableSpaces.append(tableSpace)
}
}
// sync on that particular queue, in this way, we ensures that our operation is done strictly serialize after that one.
// Without this, if we have thread 1:
// performChanges([A.self, B.self], ...)
// performChanges([B.self], ...)
// because the first line uses A's queue while the second line uses B's queue, there is no guarantee that the
// changeHandler in the second line will be executed after the first line. It would be surprising and violates strictly
// serializable guarantee. By using DispatchGroup to enter / leave, we makes sure B's queue will wait for its signal
// before proceed.
if tableSpaces.count > 1 {
for tableSpace in tableSpaces.suffix(from: 1) {
tableSpace.enter()
}
}
}
tableSpaces[0].notify(
work: DispatchWorkItem(flags: .enforceQoS) { [weak self] in
guard let self = self else {
completionHandler?(false)
return
}
guard let connection = tableSpaces[0].connect({ self.newConnection() }) else {
completionHandler?(false)
return
}
// It is OK to create connection etc before acquiring the lock as long as we don't do mutation (because we already on its queue, and we only create connection on its own queue).
tableSpaces[0].lock()
if tableSpaces.count > 1 {
// Similar to above, this is explicitly to handle the case where we have:
// performChanges([B.self], ...)
// performChanges([A.self, B.self], ...)
// In this case, B will enter without waiting for any other groups and A will enter without waiting for any other groups.
// But at this point, the second line will wait until B finishes before proceed.
for tableSpace in tableSpaces.suffix(from: 1) {
tableSpace.queue.sync {
tableSpace.lock()
// sync on that particular queue, in this way, we ensures that our operation is done strictly serialize after that one.
// Without this, if we have thread 1:
// performChanges([A.self, B.self], ...)
// performChanges([B.self], ...)
// because the first line uses A's queue while the second line uses B's queue, there is no guarantee that the
// changeHandler in the second line will be executed after the first line. It would be surprising and violates strictly
// serializable guarantee. By using DispatchGroup to enter / leave, we makes sure B's queue will wait for its signal
// before proceed.
if tableSpaces.count > 1 {
let group = DispatchGroup()
for tableSpace in tableSpaces.suffix(from: 1) {
// This will suspend these queues upon entering the group.
tableSpace.enterAndSuspend(group)
}
tableSpaces[0].queue.async(
execute: DispatchWorkItem(flags: .enforceQoS) { [weak self] in
guard let self = self else {
completionHandler?(false)
return
}
// It is OK to create connection etc before acquiring the lock as long as we don't do mutation (because we already on its queue, and we only create connection on its own queue).
guard let connection = tableSpaces[0].connect({ self.newConnection() }) else {
completionHandler?(false)
return
}
group.wait() // Force to sync with other queues, and then acquiring locks. The order doesn't matter because at this point, all other queues are suspended.
for tableSpace in tableSpaces {
tableSpace.lock()
}
// We need to fetch the resultPublisher only after acquired the lock.
var resultPublishers = [ObjectIdentifier: ResultPublisher]()
for (i, tableSpace) in tableSpaces.enumerated() {
resultPublishers[transactionalObjectIdentifiers[i]] = tableSpace.resultPublisher
}
let succeed = self.invokeChangesHandler(
transactionalObjectIdentifiers, connection: connection,
resultPublishers: resultPublishers, tableState: tableSpaces[0].state,
changesHandler: changesHandler)
for tableSpace in tableSpaces.reversed() {
tableSpace.unlock()
}
if tableSpaces.count > 1 {
// Resume all previous suspended queues.
for tableSpace in tableSpaces.suffix(from: 1) {
tableSpace.resume()
}
}
completionHandler?(succeed)
}
// We need to fetch the resultPublisher only after acquired the lock.
var resultPublishers = [ObjectIdentifier: ResultPublisher]()
for (i, tableSpace) in tableSpaces.enumerated() {
resultPublishers[transactionalObjectIdentifiers[i]] = tableSpace.resultPublisher
}
let succeed = self.invokeChangesHandler(
transactionalObjectIdentifiers, connection: connection,
resultPublishers: resultPublishers, tableState: tableSpaces[0].state,
changesHandler: changesHandler)
for tableSpace in tableSpaces.reversed() {
tableSpace.unlock()
}
if tableSpaces.count > 1 {
for tableSpace in tableSpaces.suffix(from: 1) {
tableSpace.leave()
)
} else {
let tableSpace = tableSpaces[0]
tableSpace.queue.async(
execute: DispatchWorkItem(flags: .enforceQoS) { [weak self] in
guard let self = self else {
completionHandler?(false)
return
}
// It is OK to create connection etc before acquiring the lock as long as we don't do mutation (because we already on its queue, and we only create connection on its own queue).
guard let connection = tableSpace.connect({ self.newConnection() }) else {
completionHandler?(false)
return
}
// We need to fetch the resultPublisher only after acquired the lock.
tableSpace.lock()
var resultPublishers = [ObjectIdentifier: ResultPublisher]()
resultPublishers[transactionalObjectIdentifiers[0]] = tableSpace.resultPublisher
let succeed = self.invokeChangesHandler(
transactionalObjectIdentifiers, connection: connection,
resultPublishers: resultPublishers, tableState: tableSpace.state,
changesHandler: changesHandler)
tableSpace.unlock()
completionHandler?(succeed)
}
completionHandler?(succeed)
}
)
)
}
}

// MARK - Fetching
Expand Down
116 changes: 114 additions & 2 deletions src/tests/ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ConcurrencyTests: XCTestCase {
dflat?.shutdown()
}

func testConcurrentUpdates() {
func testConcurrentUpdatesAAB() {
guard let dflat = dflat else { return }
let update1 = XCTestExpectation(description: "transcation 1")
dflat.performChanges(
Expand Down Expand Up @@ -198,6 +198,117 @@ class ConcurrencyTests: XCTestCase {
XCTAssertEqual(fetchedResult2[2].name, "name4")
}

func testConcurrentUpdatesABBAB() {
guard let dflat = dflat else { return }
let update3 = XCTestExpectation(description: "transcation 3")
dflat.performChanges(
[MyGame.Sample.Monster.self, MyGame.SampleV2.Monster.self],
changesHandler: { txnContext in
let creationRequest1 = MyGame.Sample.MonsterChangeRequest.creationRequest()
creationRequest1.name = "name1"
creationRequest1.mana = 100
creationRequest1.color = .green
try! txnContext.submit(creationRequest1)
let creationRequest2 = MyGame.Sample.MonsterChangeRequest.creationRequest()
creationRequest2.name = "name2"
creationRequest2.mana = 50
creationRequest2.color = .green
try! txnContext.submit(creationRequest2)
let creationRequest3 = MyGame.Sample.MonsterChangeRequest.creationRequest()
creationRequest3.name = "name3"
creationRequest3.mana = 20
creationRequest3.color = .green
try! txnContext.submit(creationRequest3)
let creationRequest4 = MyGame.Sample.MonsterChangeRequest.creationRequest()
creationRequest4.name = "name4"
creationRequest4.mana = 120
creationRequest4.color = .green
try! txnContext.submit(creationRequest4)
let creationRequest1v2 = MyGame.SampleV2.MonsterChangeRequest.creationRequest()
creationRequest1v2.name = "name1"
creationRequest1v2.mana = 100
creationRequest1v2.color = .green
try! txnContext.submit(creationRequest1v2)
let creationRequest2v2 = MyGame.SampleV2.MonsterChangeRequest.creationRequest()
creationRequest2v2.name = "name2"
creationRequest2v2.mana = 50
creationRequest2v2.color = .green
try! txnContext.submit(creationRequest2v2)
let creationRequest3v2 = MyGame.SampleV2.MonsterChangeRequest.creationRequest()
creationRequest3v2.name = "name3"
creationRequest3v2.mana = 20
creationRequest3v2.color = .green
try! txnContext.submit(creationRequest3v2)
let creationRequest4v2 = MyGame.SampleV2.MonsterChangeRequest.creationRequest()
creationRequest4v2.name = "name4"
creationRequest4v2.mana = 120
creationRequest4v2.color = .green
try! txnContext.submit(creationRequest4v2)
}
) { success in
update3.fulfill()
}
let update1 = XCTestExpectation(description: "transcation 1")
dflat.performChanges(
[MyGame.Sample.Monster.self],
changesHandler: { txnContext in
// At this point, we should be able to see all
let fetchedResult1 = dflat.fetch(for: MyGame.Sample.Monster.self).all()
XCTAssert(fetchedResult1.count == 4)
let updateObj2 = MyGame.Sample.Monster(name: "name2", color: .green)
let changeRequest2 = MyGame.Sample.MonsterChangeRequest.changeRequest(updateObj2)!
changeRequest2.mana = 25
try! txnContext.submit(changeRequest2)
}
) { success in
update1.fulfill()
}
let update2 = XCTestExpectation(description: "transcation 2")
dflat.performChanges(
[MyGame.SampleV2.Monster.self],
changesHandler: { txnContext in
let fetchedResult2 = dflat.fetch(for: MyGame.SampleV2.Monster.self).all()
XCTAssert(fetchedResult2.count == 4)
let updateObj1 = MyGame.SampleV2.Monster(name: "name1", color: .green)
let changeRequest1 = MyGame.SampleV2.MonsterChangeRequest.changeRequest(updateObj1)!
changeRequest1.mana = 75
try! txnContext.submit(changeRequest1)
}
) { success in
update2.fulfill()
}
let update4 = XCTestExpectation(description: "transcation 4")
dflat.performChanges(
[MyGame.Sample.Monster.self, MyGame.SampleV2.Monster.self],
changesHandler: { txnContext in
// At this point, we should be able to see all
let fetchedResult1 = dflat.fetch(for: MyGame.Sample.Monster.self).all()
XCTAssert(fetchedResult1.count == 4)
let deleteObj2 = MyGame.Sample.Monster(name: "name2", color: .green)
let deletionRequest2 = MyGame.Sample.MonsterChangeRequest.deletionRequest(deleteObj2)
try! txnContext.submit(deletionRequest2!)
let fetchedResult2 = dflat.fetch(for: MyGame.SampleV2.Monster.self).all()
XCTAssert(fetchedResult2.count == 4)
let deleteObj1 = MyGame.SampleV2.Monster(name: "name1", color: .green)
let deletionRequest1 = MyGame.SampleV2.MonsterChangeRequest.deletionRequest(deleteObj1)
try! txnContext.submit(deletionRequest1!)
}
) { success in
update4.fulfill()
}
wait(for: [update1, update2, update3, update4], timeout: 10.0)
let fetchedResult1 = dflat.fetch(for: MyGame.Sample.Monster.self).all()
XCTAssert(fetchedResult1.count == 3)
XCTAssertEqual(fetchedResult1[0].name, "name1")
XCTAssertEqual(fetchedResult1[1].name, "name3")
XCTAssertEqual(fetchedResult1[2].name, "name4")
let fetchedResult2 = dflat.fetch(for: MyGame.SampleV2.Monster.self).all()
XCTAssert(fetchedResult2.count == 3)
XCTAssertEqual(fetchedResult2[0].name, "name2")
XCTAssertEqual(fetchedResult2[1].name, "name3")
XCTAssertEqual(fetchedResult2[2].name, "name4")
}

func testSQLiteWriteLock() {
guard let dflat = dflat else { return }
let update1 = XCTestExpectation(description: "transcation 1")
Expand Down Expand Up @@ -321,8 +432,9 @@ class ConcurrencyTests: XCTestCase {
}

static let allTests = [
("testConcurrentUpdates", testConcurrentUpdates),
("testConcurrentUpdatesAAB", testConcurrentUpdatesAAB),
("testConcurrentUpdatesABB", testConcurrentUpdatesABB),
("testConcurrentUpdatesABBAB", testConcurrentUpdatesABBAB),
("testSQLiteWriteLock", testSQLiteWriteLock),
]
}

0 comments on commit 97392ed

Please sign in to comment.