Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue: #525: A race condition in styx object store. #537

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public interface ObjectStore<T> {
* @return a collection of all entries.
*/
Collection<Map.Entry<String, T>> entrySet();

/**
* Returns this snapshot index.
*
* @return snapshot index.
*/
long index();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Copyright (C) 2013-2019 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.routing.db

import com.hotels.styx.api.configuration.ObjectStore
import org.pcollections.HashTreePMap
import org.pcollections.PMap
import reactor.core.publisher.FluxSink
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock


internal class NotificationQueue<T>(val watchers: CopyOnWriteArrayList<ChangeWatcher<T>>, val executor: ExecutorService) {
@Volatile
private var pendingSnapshot = IndexedSnapshot<T>(0, HashTreePMap.empty())
@Volatile
private var issuedSnapshot = IndexedSnapshot<T>(0, HashTreePMap.empty())
private val pendingChangeNotification = AtomicBoolean(false)
private val lock = ReentrantLock()

// Listeners are just for testing purposes.
private val listeners = ConcurrentHashMap<String, DispatchListener<T>>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the listeners only for use in testing, or is there another purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. They are only for testing purposes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code does not seem clear enough to me. For instance, can't we use more meaningful names than watchers and listeners?

We might need to review the segmentation in functions to see if the sequence of function names help making the intent of the code clearer...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi David. Watcher is the documented term in the object database API.

Listener is just an event hook for the debugging purposes. Perhaps I'll rename it to eventHook? But listener is more common in Java world. Or perhaps I'll clarify this in comments?


fun publishChange(snapshot: IndexedSnapshot<T>) {

val inQueue = lock.withLock {
// Preserve invariant:
// - pendingSnapshot is only ever increasing
// - (pendingChangeNotification == True) only if (pendingSnapshot > issuedSnapshot)
if (snapshot.index > pendingSnapshot.index) {
pendingSnapshot = snapshot
pendingChangeNotification.getAndSet(true)
} else {
// Ignore this event. Pending snapshot is more recent.
return
}
}

if (!inQueue) {
executor.submit {
lock.withLock {
// Preserve invariant:
// - (pendingChangeNotification == False) only if (pendingSnapshot <= issuedSnapshot)
pendingChangeNotification.set(false)
issuedSnapshot = pendingSnapshot
}

watchers.forEach {
it.invoke(newSnapshot(issuedSnapshot))
}

listeners.forEach {
it.value.invoke(ChangeNotification(
newSnapshot(issuedSnapshot),
pendingChangeNotification.get()
))
}
}
}
}

fun publishInitialWatch(sink: FluxSink<ObjectStore<T>>) {
executor.submit {
sink.next(newSnapshot(issuedSnapshot))
listeners.forEach {
it.value.invoke(InitialWatchNotification(
newSnapshot(issuedSnapshot),
pendingChangeNotification.get()
))
}
}
}

internal fun addDispatchListener(key: String, listener: DispatchListener<T>) {
listeners.put(key, listener)
}

internal fun removeDispatchListener(key: String) {
listeners.remove(key)
}

private fun newSnapshot(snapshot: IndexedSnapshot<T>) = object : ObjectStore<T> {
override fun get(key: String?): Optional<T> {
return Optional.ofNullable(snapshot.snapshot[key])
}

override fun entrySet(): Collection<Map.Entry<String, T>> = entrySet(snapshot.snapshot)

override fun index() = snapshot.index
}
}

internal fun <T> entrySet(snapshot: PMap<String, T>): Collection<Map.Entry<String, T>> = snapshot.entries

internal data class IndexedSnapshot<T>(val index: Long, val snapshot: PMap<String, T>) {
fun map(modification: (PMap<String, T>) -> PMap<String, T>) = IndexedSnapshot(this.index + 1, modification(this.snapshot))
}

internal typealias ChangeWatcher<T> = (ObjectStore<T>) -> Unit

internal typealias DispatchListener<T> = (DispatchListenerNotification<T>) -> Unit

internal sealed class DispatchListenerNotification<T>

internal data class ChangeNotification<T>(
val snapshot: ObjectStore<T>,
val pendingNotifications: Boolean) : DispatchListenerNotification<T>()

internal data class InitialWatchNotification<T>(
val snapshot: ObjectStore<T>,
val pendingNotifications: Boolean) : DispatchListenerNotification<T>()
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,31 @@ package com.hotels.styx.routing.db;

import com.hotels.styx.api.configuration.ObjectStore
import org.pcollections.HashTreePMap
import org.pcollections.PMap
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.util.Optional
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicReference

/**
* Styx Route Database.
*/
class StyxObjectStore<T> : ObjectStore<T> {
private val objects: AtomicReference<PMap<String, T>> = AtomicReference(HashTreePMap.empty())

class StyxObjectStore<T> internal constructor(executor: ExecutorService): ObjectStore<T> {
private val objects: AtomicReference<IndexedSnapshot<T>> = AtomicReference(
IndexedSnapshot(0, HashTreePMap.empty()))

private val watchers = CopyOnWriteArrayList<ChangeWatcher<T>>()
private val notificationQueue = NotificationQueue(watchers, executor)

companion object {
private val executor = Executors.newSingleThreadExecutor()
private val sharedExecutor = Executors.newSingleThreadExecutor()
}

constructor(): this(sharedExecutor)

/**
* Retrieves an object from this object store.
*
Expand All @@ -49,13 +54,13 @@ class StyxObjectStore<T> : ObjectStore<T> {
*/

override fun get(name: String): Optional<T> {
return Optional.ofNullable(objects().get(name))
return Optional.ofNullable(objects().snapshot.get(name))
}

/**
* Retrieves all entries.
*/
override fun entrySet(): Collection<Map.Entry<String, T>> = entrySet(objects.get())
override fun entrySet(): Collection<Map.Entry<String, T>> = entrySet(objects.get().snapshot)

/**
* Inserts a new object in object store.
Expand All @@ -75,18 +80,16 @@ class StyxObjectStore<T> : ObjectStore<T> {
require(key.isNotEmpty()) { "ObjectStore insert: empty keys are not allowed." }

var current = objects.get()
var new = current.plus(key, payload)
var new = current.map { it.plus(key, payload) }

while (!objects.compareAndSet(current, new)) {
current = objects.get()
new = current.plus(key, payload)
new = current.map { it.plus(key, payload) }
}

queue {
notifyWatchers(new)
}
notificationQueue.publishChange(new)

return Optional.ofNullable(current[key])
return Optional.ofNullable(current.snapshot[key])
}

/**
Expand Down Expand Up @@ -121,17 +124,17 @@ class StyxObjectStore<T> : ObjectStore<T> {
fun compute(key: String, computation: (T?) -> T): Optional<T> {
require(key.isNotEmpty()) { "ObjectStore compute: empty keys are not allowed." }

var current: PMap<String, T>
var current: IndexedSnapshot<T>
var result: T
var new: PMap<String, T>
var new: IndexedSnapshot<T>

do {
current = objects.get()
result = computation(current.get(key))
result = computation(current.snapshot.get(key))

new = if (result != current.get(key)){
new = if (result != current.snapshot.get(key)){
// Consumer REPLACES an existing value or ADDS a new value
current.plus(key, result)
current.map { it.plus(key, result) }
} else {
// Consumer KEEPS the existing value
current
Expand All @@ -140,12 +143,10 @@ class StyxObjectStore<T> : ObjectStore<T> {

if (current != new) {
// Notify only if content changed:
queue {
notifyWatchers(new)
}
notificationQueue.publishChange(new)
}

return Optional.ofNullable(current[key])
return Optional.ofNullable(current.snapshot[key])
}


Expand All @@ -164,20 +165,20 @@ class StyxObjectStore<T> : ObjectStore<T> {
*/
fun remove(key: String): Optional<T> {
var current = objects.get()
var new = current.minus(key)
var new = current.map { it.minus(key) }

// Unnecessarily increments the index when "key" doesn't exist:
// We will live with this for now.
while (!objects.compareAndSet(current, new)) {
current = objects.get()
new = current.minus(key)
new = current.map { it.minus(key) }
}

if (current != new) {
queue {
notifyWatchers(new)
}
if (current.snapshot != new.snapshot) {
notificationQueue.publishChange(new)
}

return Optional.ofNullable(current[key])
return Optional.ofNullable(current.snapshot[key])
}

/**
Expand All @@ -195,41 +196,23 @@ class StyxObjectStore<T> : ObjectStore<T> {
}

watchers.add(watcher)
queue {
emitInitialSnapshot(sink)
}
}
}

private fun emitInitialSnapshot(sink: FluxSink<ObjectStore<T>>) {
sink.next(snapshot(objects()))
notificationQueue.publishInitialWatch(sink)
}
}

internal fun watchers() = watchers.size

private fun objects() = objects.get()

private fun queue(task: () -> Unit) {
executor.submit(task)
}
override fun index() = objects.get().index

private fun notifyWatchers(objectsV2: PMap<String, T>) {
watchers.forEach { listener ->
listener.invoke(snapshot(objectsV2))
}
internal fun addDispatchListener(key: String, listener: DispatchListener<T>) {
notificationQueue.addDispatchListener(key, listener)
}

private fun snapshot(snapshot: PMap<String, T>) = object : ObjectStore<T> {
override fun get(key: String?): Optional<T> {
return Optional.ofNullable(snapshot[key])
}

override fun entrySet(): Collection<Map.Entry<String, T>> = entrySet(snapshot)
internal fun removeDispatchListener(key: String) {
notificationQueue.removeDispatchListener(key)
}


private fun entrySet(snapshot: PMap<String, T>): Collection<Map.Entry<String, T>> = snapshot
.entries
}

private typealias ChangeWatcher<T> = (ObjectStore<T>) -> Unit
Loading