From 90a46ffc5f37983c94cf6b2bdef89dcb590cb237 Mon Sep 17 00:00:00 2001 From: Mikko Karjalainen Date: Thu, 21 Nov 2019 16:38:34 +0000 Subject: [PATCH 1/3] Fix object store race condition. --- .../styx/api/configuration/ObjectStore.java | 7 + .../styx/routing/db/NotificationQueue.kt | 107 ++++++++ .../hotels/styx/routing/db/StyxObjectStore.kt | 91 +++---- .../styx/routing/db/StyxObjectStoreTest.kt | 241 +++++++++++++----- 4 files changed, 335 insertions(+), 111 deletions(-) create mode 100644 components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt diff --git a/components/api/src/main/java/com/hotels/styx/api/configuration/ObjectStore.java b/components/api/src/main/java/com/hotels/styx/api/configuration/ObjectStore.java index 32e9c268ef..cafdbf74c4 100644 --- a/components/api/src/main/java/com/hotels/styx/api/configuration/ObjectStore.java +++ b/components/api/src/main/java/com/hotels/styx/api/configuration/ObjectStore.java @@ -41,4 +41,11 @@ public interface ObjectStore { * @return a collection of all entries. */ Collection> entrySet(); + + /** + * Returns this snapshot index. + * + * @return snapshot index. + */ + long index(); } diff --git a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt new file mode 100644 index 0000000000..a32643a592 --- /dev/null +++ b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt @@ -0,0 +1,107 @@ +package com.hotels.styx.routing.db + +import com.hotels.styx.api.configuration.ObjectStore +import org.pcollections.HashTreePMap +import org.pcollections.PMap +import reactor.core.publisher.FluxSink +import java.util.Optional +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.ExecutorService +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + + +internal class NotificationQueue(val watchers: CopyOnWriteArrayList>, val executor: ExecutorService) { + @Volatile + private var pendingSnapshot = IndexedSnapshot(0, HashTreePMap.empty()) + @Volatile + private var issuedSnapshot = IndexedSnapshot(0, HashTreePMap.empty()) + private val pendingChangeNotification = AtomicBoolean(false) + private val lock = ReentrantLock() + + private val listeners = ConcurrentHashMap>() + + fun publishChange(snapshot: IndexedSnapshot) { + val inQueue = lock.withLock { + if (snapshot.index <= pendingSnapshot.index) { + // Snapshot is older than one pending for publishing + return + } + + pendingSnapshot = snapshot + + pendingChangeNotification.getAndSet(true) + } + + if (!inQueue) { + executor.submit { + pendingChangeNotification.set(false) + + issuedSnapshot = pendingSnapshot + watchers.forEach { watcher -> + watcher.invoke(newSnapshot(pendingSnapshot)) + } + + listeners.forEach { + it.value.invoke(ChangeNotification( + newSnapshot(issuedSnapshot), + pendingChangeNotification.get() + )) + } + } + } + + } + + fun publishInitialWatch(sink: FluxSink>) { + executor.submit { + sink.next(newSnapshot(issuedSnapshot)) + listeners.forEach { + it.value.invoke(InitialWatchNotification( + newSnapshot(issuedSnapshot), + pendingChangeNotification.get() + )) + } + } + } + + internal fun addDispatchListener(key: String, listener: DispatchListener) { + listeners.put(key, listener) + } + + internal fun removeDispatchListener(key: String) { + listeners.remove(key) + } + + private fun newSnapshot(snapshot: IndexedSnapshot) = object : ObjectStore { + override fun get(key: String?): Optional { + return Optional.ofNullable(snapshot.snapshot[key]) + } + + override fun entrySet(): Collection> = entrySet(snapshot.snapshot) + + override fun index() = snapshot.index + } +} + +internal fun entrySet(snapshot: PMap): Collection> = snapshot.entries + +internal data class IndexedSnapshot(val index: Long, val snapshot: PMap) { + fun map(modification: (PMap) -> PMap) = IndexedSnapshot(this.index + 1, modification(this.snapshot)) +} + +internal typealias ChangeWatcher = (ObjectStore) -> Unit + +internal typealias DispatchListener = (DispatchListenerNotification) -> Unit + +internal sealed class DispatchListenerNotification + +internal data class ChangeNotification( + val snapshot: ObjectStore, + val pendingNotifications: Boolean) : DispatchListenerNotification() + +internal data class InitialWatchNotification( + val snapshot: ObjectStore, + val pendingNotifications: Boolean) : DispatchListenerNotification() diff --git a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/StyxObjectStore.kt b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/StyxObjectStore.kt index 9ff7c403ea..f00699bbed 100644 --- a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/StyxObjectStore.kt +++ b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/StyxObjectStore.kt @@ -17,26 +17,31 @@ package com.hotels.styx.routing.db; import com.hotels.styx.api.configuration.ObjectStore import org.pcollections.HashTreePMap -import org.pcollections.PMap import org.reactivestreams.Publisher import reactor.core.publisher.Flux -import reactor.core.publisher.FluxSink import java.util.Optional import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicReference /** * Styx Route Database. */ -class StyxObjectStore : ObjectStore { - private val objects: AtomicReference> = AtomicReference(HashTreePMap.empty()) + +class StyxObjectStore internal constructor(executor: ExecutorService): ObjectStore { + private val objects: AtomicReference> = AtomicReference( + IndexedSnapshot(0, HashTreePMap.empty())) + private val watchers = CopyOnWriteArrayList>() + private val notificationQueue = NotificationQueue(watchers, executor) companion object { - private val executor = Executors.newSingleThreadExecutor() + private val sharedExecutor = Executors.newSingleThreadExecutor() } + constructor(): this(sharedExecutor) + /** * Retrieves an object from this object store. * @@ -49,13 +54,13 @@ class StyxObjectStore : ObjectStore { */ override fun get(name: String): Optional { - return Optional.ofNullable(objects().get(name)) + return Optional.ofNullable(objects().snapshot.get(name)) } /** * Retrieves all entries. */ - override fun entrySet(): Collection> = entrySet(objects.get()) + override fun entrySet(): Collection> = entrySet(objects.get().snapshot) /** * Inserts a new object in object store. @@ -75,18 +80,16 @@ class StyxObjectStore : ObjectStore { require(key.isNotEmpty()) { "ObjectStore insert: empty keys are not allowed." } var current = objects.get() - var new = current.plus(key, payload) + var new = current.map { it.plus(key, payload) } while (!objects.compareAndSet(current, new)) { current = objects.get() - new = current.plus(key, payload) + new = current.map { it.plus(key, payload) } } - queue { - notifyWatchers(new) - } + notificationQueue.publishChange(new) - return Optional.ofNullable(current[key]) + return Optional.ofNullable(current.snapshot[key]) } /** @@ -121,17 +124,17 @@ class StyxObjectStore : ObjectStore { fun compute(key: String, computation: (T?) -> T): Optional { require(key.isNotEmpty()) { "ObjectStore compute: empty keys are not allowed." } - var current: PMap + var current: IndexedSnapshot var result: T - var new: PMap + var new: IndexedSnapshot do { current = objects.get() - result = computation(current.get(key)) + result = computation(current.snapshot.get(key)) - new = if (result != current.get(key)){ + new = if (result != current.snapshot.get(key)){ // Consumer REPLACES an existing value or ADDS a new value - current.plus(key, result) + current.map { it.plus(key, result) } } else { // Consumer KEEPS the existing value current @@ -140,12 +143,10 @@ class StyxObjectStore : ObjectStore { if (current != new) { // Notify only if content changed: - queue { - notifyWatchers(new) - } + notificationQueue.publishChange(new) } - return Optional.ofNullable(current[key]) + return Optional.ofNullable(current.snapshot[key]) } @@ -164,20 +165,20 @@ class StyxObjectStore : ObjectStore { */ fun remove(key: String): Optional { var current = objects.get() - var new = current.minus(key) + var new = current.map { it.minus(key) } + // Unnecessarily increments the index when "key" doesn't exist: + // We will live with this for now. while (!objects.compareAndSet(current, new)) { current = objects.get() - new = current.minus(key) + new = current.map { it.minus(key) } } - if (current != new) { - queue { - notifyWatchers(new) - } + if (current.snapshot != new.snapshot) { + notificationQueue.publishChange(new) } - return Optional.ofNullable(current[key]) + return Optional.ofNullable(current.snapshot[key]) } /** @@ -195,41 +196,23 @@ class StyxObjectStore : ObjectStore { } watchers.add(watcher) - queue { - emitInitialSnapshot(sink) - } - } - } - private fun emitInitialSnapshot(sink: FluxSink>) { - sink.next(snapshot(objects())) + notificationQueue.publishInitialWatch(sink) + } } internal fun watchers() = watchers.size private fun objects() = objects.get() - private fun queue(task: () -> Unit) { - executor.submit(task) - } + override fun index() = objects.get().index - private fun notifyWatchers(objectsV2: PMap) { - watchers.forEach { listener -> - listener.invoke(snapshot(objectsV2)) - } + internal fun addDispatchListener(key: String, listener: DispatchListener) { + notificationQueue.addDispatchListener(key, listener) } - private fun snapshot(snapshot: PMap) = object : ObjectStore { - override fun get(key: String?): Optional { - return Optional.ofNullable(snapshot[key]) - } - - override fun entrySet(): Collection> = entrySet(snapshot) + internal fun removeDispatchListener(key: String) { + notificationQueue.removeDispatchListener(key) } - - private fun entrySet(snapshot: PMap): Collection> = snapshot - .entries } - -private typealias ChangeWatcher = (ObjectStore) -> Unit diff --git a/components/proxy/src/test/kotlin/com/hotels/styx/routing/db/StyxObjectStoreTest.kt b/components/proxy/src/test/kotlin/com/hotels/styx/routing/db/StyxObjectStoreTest.kt index 69a487578a..923341d76f 100644 --- a/components/proxy/src/test/kotlin/com/hotels/styx/routing/db/StyxObjectStoreTest.kt +++ b/components/proxy/src/test/kotlin/com/hotels/styx/routing/db/StyxObjectStoreTest.kt @@ -21,16 +21,24 @@ import io.kotlintest.matchers.boolean.shouldBeTrue import io.kotlintest.matchers.collections.shouldBeEmpty import io.kotlintest.matchers.collections.shouldNotBeEmpty import io.kotlintest.matchers.numerics.shouldBeGreaterThanOrEqual +import io.kotlintest.matchers.numerics.shouldBeLessThanOrEqual import io.kotlintest.milliseconds import io.kotlintest.seconds import io.kotlintest.shouldBe +import io.kotlintest.shouldNotBe import io.kotlintest.specs.FeatureSpec import reactor.core.publisher.Flux +import reactor.core.publisher.toFlux import reactor.test.StepVerifier +import java.time.Duration import java.util.Optional import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors import java.util.concurrent.Executors.newFixedThreadPool +import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit.SECONDS +import java.util.concurrent.atomic.AtomicReference // We can remove AssertionError::class.java argument from the // calls to `eventually`, after this bug fix is released: @@ -84,6 +92,7 @@ class StyxObjectStoreTest : FeatureSpec() { it.get("y") shouldBe Optional.of("y") } .thenCancel() + .log() .verify(4.seconds) db.watchers() shouldBe 0 @@ -105,6 +114,43 @@ class StyxObjectStoreTest : FeatureSpec() { } } + scenario("Maintains relative ordering between change and initial watch notifications") { + val executor = Executors.newSingleThreadExecutor() + + val db = StyxObjectStore(executor) + val events = mutableListOf() + + val watchConsumer = db.watch().toFlux().subscribe { + // Keeps the event notification thread busy to build up a backlog of events. + Thread.sleep(10) + } + + db.insert("key", 1) + db.insert("key", 2) + + val watcher = db.watch() + .toFlux() + .subscribe { + events.add(it.get("key").orElse(0xBAD_BEEF)) + } + + db.insert("key", 3) + db.insert("key", 4) + + watchConsumer.dispose() + watcher.dispose() + + executor.shutdown() + executor.awaitTermination(250, TimeUnit.MILLISECONDS) + + // Ensure the events were delivered in order + events.fold(0) { acc, value -> + value.shouldNotBe(0xBAD_BEEF) + value.shouldBeGreaterThanOrEqual(acc) + value + } + } + scenario("Replaces already existing object") { val db = StyxObjectStore() @@ -165,8 +211,21 @@ class StyxObjectStoreTest : FeatureSpec() { scenario("Replace an existing value") { val db = StyxObjectStore() + val latch = CountDownLatch(1) + + db.addDispatchListener("_") { + when (it) { + is ChangeNotification -> + if (it.snapshot.index() == 1L) { + latch.countDown() + } + + else -> { } + } + } db.insert("key", "old value") + latch.await() StepVerifier.create(db.watch()) .expectNextCount(1) @@ -175,6 +234,43 @@ class StyxObjectStoreTest : FeatureSpec() { .thenCancel() .verify() } + + scenario("Maintains relative ordering between change and initial watch notifications") { + val executor = Executors.newSingleThreadExecutor() + + val db = StyxObjectStore(executor) + val events = mutableListOf() + + val watchConsumer = db.watch().toFlux().subscribe { + // Keeps the event notification thread busy to build up a backlog of events. + Thread.sleep(10) + } + + db.compute("key") { 1 } + db.compute("key") { 2 } + + val watcher = db.watch() + .toFlux() + .subscribe { + events.add(it.get("key").orElse(0xBAD_BEEF)) + } + + db.compute("key") { 3 } + db.compute("key") { 4 } + + watchConsumer.dispose() + watcher.dispose() + + executor.shutdown() + executor.awaitTermination(250, TimeUnit.MILLISECONDS) + + // Ensure the events were delivered in order + events.fold(0) { acc, value -> + value.shouldNotBe(0xBAD_BEEF) + value.shouldBeGreaterThanOrEqual(acc) + value + } + } } feature("Remove") { @@ -256,76 +352,70 @@ class StyxObjectStoreTest : FeatureSpec() { db.entrySet().shouldBeEmpty() } - scenario("Returns Optional.empty, when previous value doesn't exist") { - val db = StyxObjectStore() - - db.remove("key") shouldBe Optional.empty() - } - - scenario("Returns previous, replaced value") { - val db = StyxObjectStore() + scenario("Maintains relative ordering between change and initial watch notifications") { + val executor = Executors.newSingleThreadExecutor() - db.insert("key", "a-value") shouldBe Optional.empty() + val db = StyxObjectStore(executor) + db.insert("key-01", 1) + db.insert("key-02", 2) + db.insert("key-03", 3) + db.insert("key-04", 4) - db.remove("key") shouldBe Optional.of("a-value") - } - } + val events = mutableListOf() - feature("Watch") { - scenario("Supports multiple watchers") { - val db = StyxObjectStore() - val watchEvents1 = CopyOnWriteArrayList>() - val watchEvents2 = CopyOnWriteArrayList>() + val watchConsumer = db.watch().toFlux().subscribe { + // Keeps the event notification thread busy to build up a backlog of events. + Thread.sleep(10) + } - val watcher1 = Flux.from(db.watch()).subscribe { watchEvents1.add(it) } - val watcher2 = Flux.from(db.watch()).subscribe { watchEvents2.add(it) } + db.remove("key-01") + db.remove("key-02") - // Wait for the initial watch event ... - eventually(1.seconds, java.lang.AssertionError::class.java) { - watchEvents1.size shouldBe 1 - watchEvents1[0].get("x") shouldBe Optional.empty() + val watcher = db.watch() + .toFlux() + .subscribe { + events.add(it.index()) + } - watchEvents2.size shouldBe 1 - watchEvents2[0].get("x") shouldBe Optional.empty() - } + db.remove("key-03") + db.remove("key-04") - db.insert("x", "x") - db.insert("y", "y") + watchConsumer.dispose() + watcher.dispose() - // ... otherwise we aren't guaranteed what events are going show up. - // - // The ordering between initial watch event in relation to objectStore.inserts are - // non-deterministic. - eventually(1.seconds, AssertionError::class.java) { - watchEvents1.size shouldBe 3 - watchEvents2.size shouldBe 3 + executor.shutdown() + executor.awaitTermination(250, TimeUnit.MILLISECONDS) - watchEvents1[1].get("x") shouldBe Optional.of("x") - watchEvents2[1].get("x") shouldBe Optional.of("x") + // Ensure the events were delivered in order + events.fold(0L) { previous, index -> + index.shouldBeGreaterThanOrEqual(previous) + index + } + } - watchEvents1[1].get("y") shouldBe Optional.empty() - watchEvents2[1].get("y") shouldBe Optional.empty() + scenario("Returns Optional.empty, when previous value doesn't exist") { + val db = StyxObjectStore() - watchEvents1[2].get("x") shouldBe Optional.of("x") - watchEvents2[2].get("x") shouldBe Optional.of("x") + db.remove("key") shouldBe Optional.empty() + } - watchEvents1[2].get("y") shouldBe Optional.of("y") - watchEvents2[2].get("y") shouldBe Optional.of("y") - } + scenario("Returns previous, replaced value") { + val db = StyxObjectStore() - watcher1.dispose() - db.watchers() shouldBe 1 + db.insert("key", "a-value") shouldBe Optional.empty() - watcher2.dispose() - db.watchers() shouldBe 0 + db.remove("key") shouldBe Optional.of("a-value") } - scenario("Provides immutable snapshot") { + } + + feature("Watch") { + scenario("Publishes an immutable final state snapshot") { val db = StyxObjectStore() val watchEvents = CopyOnWriteArrayList>() - val watcher = Flux.from(db.watch()).subscribe { watchEvents.add(it) } + val watcher = db.watch().toFlux().subscribe { watchEvents.add(it) } eventually(1.seconds, AssertionError::class.java) { watchEvents.isNotEmpty().shouldBeTrue() @@ -337,18 +427,55 @@ class StyxObjectStoreTest : FeatureSpec() { db.insert("y", "y") eventually(1.seconds, AssertionError::class.java) { - watchEvents.size shouldBe 3 - watchEvents[1].get("x") shouldBe Optional.of("x") - watchEvents[1].get("y") shouldBe Optional.empty() - - watchEvents[2].get("x") shouldBe Optional.of("x") - watchEvents[2].get("y") shouldBe Optional.of("y") + watchEvents.last()["x"].isPresent.shouldBeTrue() + watchEvents.last()["y"].isPresent.shouldBeTrue() } watcher.dispose() db.watchers() shouldBe 0 } + scenario("Supports multiple watchers") { + for (x in 0..100) { + val db = StyxObjectStore() + val watchEvents1 = CopyOnWriteArrayList>() + val watchEvents2 = CopyOnWriteArrayList>() + + val watcher1 = Flux.from(db.watch()).subscribe { watchEvents1.add(it) } + val watcher2 = Flux.from(db.watch()).subscribe { watchEvents2.add(it) } + + // Wait for the initial watch event ... + eventually(1.seconds, java.lang.AssertionError::class.java) { + watchEvents1.size shouldBe 1 + watchEvents1[0].get("x") shouldBe Optional.empty() + + watchEvents2.size shouldBe 1 + watchEvents2[0].get("x") shouldBe Optional.empty() + } + + db.insert("x", "x") + db.insert("y", "y") + + // ... otherwise we aren't guaranteed what events are going show up. + // + // The ordering between initial watch event in relation to objectStore.inserts are + // non-deterministic. + eventually(1.seconds, AssertionError::class.java) { + watchEvents1.last()["x"].isPresent.shouldBeTrue() + watchEvents1.last()["y"].isPresent.shouldBeTrue() + + watchEvents1.last()["x"].shouldBe(Optional.of("x")) + watchEvents1.last()["y"].shouldBe(Optional.of("y")) + } + + watcher1.dispose() + db.watchers() shouldBe 1 + + watcher2.dispose() + db.watchers() shouldBe 0 + } + } + scenario("Provides current snapshot at subscription") { val db = StyxObjectStore() val watchEvents = CopyOnWriteArrayList>() @@ -416,4 +543,4 @@ class StyxObjectStoreTest : FeatureSpec() { } } -} \ No newline at end of file +} From aea2a9978c3fa0a41c6ff916571e7b40d948ddbf Mon Sep 17 00:00:00 2001 From: Mikko Karjalainen Date: Fri, 22 Nov 2019 08:28:09 +0000 Subject: [PATCH 2/3] Add copyright message. --- .../hotels/styx/routing/db/NotificationQueue.kt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt index a32643a592..325b17e5b3 100644 --- a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt +++ b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt @@ -1,3 +1,18 @@ +/* + Copyright (C) 2013-2019 Expedia Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ package com.hotels.styx.routing.db import com.hotels.styx.api.configuration.ObjectStore From 8c5ae697cb0a26151286e295e2df9b36c3379ea6 Mon Sep 17 00:00:00 2001 From: Mikko Karjalainen Date: Wed, 27 Nov 2019 09:20:04 +0000 Subject: [PATCH 3/3] Preserve invariants. --- .../styx/routing/db/NotificationQueue.kt | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt index 325b17e5b3..d3fbc41e7d 100644 --- a/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt +++ b/components/proxy/src/main/kotlin/com/hotels/styx/routing/db/NotificationQueue.kt @@ -36,27 +36,35 @@ internal class NotificationQueue(val watchers: CopyOnWriteArrayList>() fun publishChange(snapshot: IndexedSnapshot) { + val inQueue = lock.withLock { - if (snapshot.index <= pendingSnapshot.index) { - // Snapshot is older than one pending for publishing + // Preserve invariant: + // - pendingSnapshot is only ever increasing + // - (pendingChangeNotification == True) only if (pendingSnapshot > issuedSnapshot) + if (snapshot.index > pendingSnapshot.index) { + pendingSnapshot = snapshot + pendingChangeNotification.getAndSet(true) + } else { + // Ignore this event. Pending snapshot is more recent. return } - - pendingSnapshot = snapshot - - pendingChangeNotification.getAndSet(true) } if (!inQueue) { executor.submit { - pendingChangeNotification.set(false) + lock.withLock { + // Preserve invariant: + // - (pendingChangeNotification == False) only if (pendingSnapshot <= issuedSnapshot) + pendingChangeNotification.set(false) + issuedSnapshot = pendingSnapshot + } - issuedSnapshot = pendingSnapshot - watchers.forEach { watcher -> - watcher.invoke(newSnapshot(pendingSnapshot)) + watchers.forEach { + it.invoke(newSnapshot(issuedSnapshot)) } listeners.forEach { @@ -67,7 +75,6 @@ internal class NotificationQueue(val watchers: CopyOnWriteArrayList>) {