Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for null computations inside StyxObjectStore.compute() #629

Merged
merged 5 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public String toString() {
buf.append(", HTTPOnly");
}
if (sameSite != null) {
buf.append(", SameSite=").append(sameSite.toString());
buf.append(", SameSite=").append(sameSite);
dvlato marked this conversation as resolved.
Show resolved Hide resolved
}
return buf.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2019 Expedia Inc.
Copyright (C) 2013-2020 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -121,32 +121,36 @@ class StyxObjectStore<T> internal constructor(executor: ExecutorService): Object
* @property computation a function that produces the new value
* @return the previous value
*/
fun compute(key: String, computation: (T?) -> T): Optional<T> {
fun compute(key: String, computation: (T?) -> T?): Optional<T> {
require(key.isNotEmpty()) { "ObjectStore compute: empty keys are not allowed." }

var current: IndexedSnapshot<T>
var result: T
var new: IndexedSnapshot<T>

do {
current = objects.get()
result = computation(current.snapshot.get(key))

new = if (result != current.snapshot.get(key)){
// Consumer REPLACES an existing value or ADDS a new value
current.map { it.plus(key, result) }
} else {
// Consumer KEEPS the existing value
current
}
} while(!objects.compareAndSet(current, new))

if (current != new) {
// Notify only if content changed:
notificationQueue.publishChange(new)
var current: IndexedSnapshot<T>
var result: T?
var new: IndexedSnapshot<T>

do {
current = objects.get()
val existingValue = current.snapshot.get(key)
result = computation(existingValue)

new = if (existingValue !== null && result === null) {
//New value is null, removing key
current.map { it.minus(key) }
} else if (result != existingValue) {
// Consumer REPLACES an existing value or ADDS a new value
current.map { it.plus(key, result) }
} else {
// Consumer KEEPS the existing value
current
}
} while (!objects.compareAndSet(current, new))

if (current != new) {
// Notify only if content changed:
notificationQueue.publishChange(new)
}

return Optional.ofNullable(current.snapshot[key])
return Optional.ofNullable(current.snapshot[key])
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ package com.hotels.styx.services

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.JsonNode
import com.hotels.styx.*
import com.hotels.styx.HEALTHCHECK_FAILING
import com.hotels.styx.HEALTHCHECK_PASSING
import com.hotels.styx.NettyExecutor
import com.hotels.styx.ProviderObjectRecord
import com.hotels.styx.STATE_ACTIVE
import com.hotels.styx.STATE_UNREACHABLE
import com.hotels.styx.api.HttpRequest
import com.hotels.styx.api.extension.service.spi.AbstractStyxService
import com.hotels.styx.api.extension.service.spi.StyxService
Expand All @@ -26,19 +31,20 @@ import com.hotels.styx.config.schema.SchemaDsl.field
import com.hotels.styx.config.schema.SchemaDsl.integer
import com.hotels.styx.config.schema.SchemaDsl.optional
import com.hotels.styx.config.schema.SchemaDsl.string
import com.hotels.styx.healthCheckTag
import com.hotels.styx.infrastructure.configuration.yaml.JsonNodeConfig
import com.hotels.styx.lbGroupTag
import com.hotels.styx.routing.RoutingObject
import com.hotels.styx.routing.RoutingObjectRecord
import com.hotels.styx.routing.config.RoutingObjectFactory
import com.hotels.styx.routing.db.StyxObjectStore
import com.hotels.styx.ProviderObjectRecord
import com.hotels.styx.server.HttpInterceptorContext
import com.hotels.styx.serviceproviders.ServiceProviderFactory
import com.hotels.styx.services.HealthCheckMonitoringService.Companion.EXECUTOR
import com.hotels.styx.stateTag
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.toMono
import java.lang.RuntimeException
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ScheduledExecutorService
Expand Down Expand Up @@ -96,29 +102,20 @@ internal class HealthCheckMonitoringService(
objectStore.entrySet()
.filter(::containsRelevantStateTag)
.forEach { (name, record) ->
objectStore.get(name).ifPresent {
try {
objectStore.compute(name) { previous ->
if (previous == null) throw ObjectDisappearedException()

val newTags = previous.tags
.let { healthCheckTag.remove(it) }
.let { stateTag.remove(it) }
.plus(stateTag(STATE_ACTIVE))

if (previous.tags != newTags)
it.copy(tags = newTags)
else
previous
}
} catch (e: ObjectDisappearedException) {
// Object disappeared between the ifPresent check and the compute, but we don't really mind.
// We just want to exit the compute, to avoid re-creating it.
// (The ifPresent is not strictly required, but a pre-emptive check is preferred to an exception)
}
objectStore.compute(name) { previous ->
if (previous === null) return@compute null

val newTags = previous.tags
.let { healthCheckTag.remove(it) }
.let { stateTag.remove(it) }
.plus(stateTag(STATE_ACTIVE))

if (previous.tags != newTags)
previous.copy(tags = newTags)
else
previous
}
}

futureRef.get().cancel(false)
}

Expand Down Expand Up @@ -206,29 +203,18 @@ internal fun objectHealthFrom(state: String?, health: Pair<String, Int>?) =
else -> ObjectOther(state)
}

internal class ObjectDisappearedException : RuntimeException("Object disappeared")


private fun markObject(db: StyxObjectStore<RoutingObjectRecord>, name: String, newStatus: ObjectHealth) {
// The ifPresent is not ideal, but compute() does not allow the computation to return null. So we can't preserve
// a state where the object does not exist using compute alone. But even with ifPresent, as we are open to
// the object disappearing between the ifPresent and the compute, which would again lead to the compute creating
// a new object when we don't want it to. But at least this will happen much less frequently.
db.get(name).ifPresent {
try {
db.compute(name) { previous ->
if (previous == null) throw ObjectDisappearedException()
val prevTags = previous.tags
val newTags = reTag(prevTags, newStatus)
if (prevTags != newTags)
it.copy(tags = newTags)
else
previous
}
} catch (e: ObjectDisappearedException) {
// Object disappeared between the ifPresent check and the compute, but we don't really mind.
// We just want to exit the compute, to avoid re-creating it.
// (The ifPresent is not strictly required, but a pre-emptive check is preferred to an exception)
db.compute(name) { previous ->
dvlato marked this conversation as resolved.
Show resolved Hide resolved
if (previous == null) {
null
dvlato marked this conversation as resolved.
Show resolved Hide resolved
} else {
val prevTags = previous.tags;
val newTags = reTag(prevTags, newStatus)
if (prevTags != newTags)
previous.copy(tags = newTags)
else
previous
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2019 Expedia Inc.
Copyright (C) 2013-2020 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@ import io.kotlintest.matchers.boolean.shouldBeTrue
import io.kotlintest.matchers.collections.shouldBeEmpty
import io.kotlintest.matchers.collections.shouldNotBeEmpty
import io.kotlintest.matchers.numerics.shouldBeGreaterThanOrEqual
import io.kotlintest.matchers.numerics.shouldBeLessThanOrEqual
import io.kotlintest.milliseconds
import io.kotlintest.seconds
import io.kotlintest.shouldBe
Expand All @@ -30,15 +29,13 @@ import io.kotlintest.specs.FeatureSpec
import reactor.core.publisher.Flux
import reactor.core.publisher.toFlux
import reactor.test.StepVerifier
import java.time.Duration
import java.util.Optional
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.Executors.newFixedThreadPool
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicReference

// We can remove AssertionError::class.java argument from the
// calls to `eventually`, after this bug fix is released:
Expand Down Expand Up @@ -235,6 +232,19 @@ class StyxObjectStoreTest : FeatureSpec() {
.verify()
}

scenario("Deletes value if computation returns null") {
val db = StyxObjectStore<String>()

db.insert("key", "old value")

StepVerifier.create(db.watch())
.expectNextCount(1)
.then { db.compute("key") { currentEntry -> null } }
.assertNext { it.entrySet().filter { it.key === "key" }.size shouldBe 0 }
.thenCancel()
.verify()
}

scenario("Maintains relative ordering between change and initial watch notifications") {
val executor = Executors.newSingleThreadExecutor()

Expand Down