Skip to content

Commit

Permalink
Administrative state for routing objects (#523)
Browse files Browse the repository at this point in the history
Fixes issue: #518.
  • Loading branch information
chrisgresty authored and mikkokar committed Nov 29, 2019
1 parent 2b7075c commit 2d0d16a
Show file tree
Hide file tree
Showing 19 changed files with 818 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ static String adminEndpointPath(String root, String name, String relativePath) {
}

static String dropFirstForwardSlash(String key) {
return key.charAt(0) == '/' ? key.substring(1) : key;
return key.length() > 0 && key.charAt(0) == '/' ? key.substring(1) : key;
}

String linkLabel() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright (C) 2013-2019 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.infrastructure.configuration.json.mixins;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.hotels.styx.ErrorResponse;

/**
* Jackson annotations for {@link ErrorResponse}.
*/
public abstract class ErrorResponseMixin {

@JsonCreator
ErrorResponseMixin(@JsonProperty("errorMessage") String errorMessage) {
}

@JsonProperty("errorMessage")
public abstract String errorMessage();
}
21 changes: 21 additions & 0 deletions components/proxy/src/main/kotlin/com/hotels/styx/ErrorResponse.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright (C) 2013-2019 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx

/**
* Represents the body of an HTTP error response in a standard format.
*/
data class ErrorResponse(val errorMessage:String)
53 changes: 48 additions & 5 deletions components/proxy/src/main/kotlin/com/hotels/styx/ObjectTags.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,58 @@
*/
package com.hotels.styx

private const val LBGROUP = "lbGroup"
private val LBGROUP_REGEX = "$LBGROUP=(.+)".toRegex()
fun lbGroupTag(name: String) = "lbGroup=$name"

fun lbGroupTagValue(tag: String): String? = "lbGroup=(.+)".toRegex()
.matchEntire(tag)
fun lbGroupTag(tags: Set<String>) = tags.firstOrNull(::isLbGroupTag)
fun isLbGroupTag(tag: String) = LBGROUP_REGEX.matches(tag)
fun lbGroupTagValue(tags: Set<String>) = lbGroupTagValue(lbGroupTag(tags)?:"")
fun lbGroupTagValue(tag: String): String? = LBGROUP_REGEX.matchEntire(tag)
?.groupValues
?.get(1)

fun sourceTag(creator: String) = "source=$creator"

fun sourceTag(tags: Set<String>) = tags.firstOrNull { it.startsWith("source=") }

fun sourceTagValue(tags: Set<String>) = sourceTag(tags)?.substring("source".length + 1)

private const val STATE = "state"
const val STATE_ACTIVE = "active"
const val STATE_UNREACHABLE = "unreachable"
const val STATE_INACTIVE = "inactive"
private val STATE_REGEX = "$STATE=(.+)".toRegex()
fun stateTag(value: String) = "$STATE=$value"
fun stateTag(tags: Set<String>) = tags.firstOrNull(::isStateTag)
fun isStateTag(tag: String) = STATE_REGEX.matches(tag)
fun stateTagValue(tags: Set<String>) = stateTagValue(stateTag(tags)?:"")
fun stateTagValue(tag: String) = STATE_REGEX.matchEntire(tag)
?.groupValues
?.get(1)

private const val HEALTHCHECK = "healthCheck"
const val HEALTHCHECK_PASSING = "probes-OK"
const val HEALTHCHECK_FAILING = "probes-FAIL"
const val HEALTHCHECK_ON = "on"

// healthCheck=on
// healthCheck=on;probes-OK:2
// healthCheck=on;probes-FAIL:1
private val HEALTHCHECK_REGEX = "$HEALTHCHECK=$HEALTHCHECK_ON(?:;(.+):([0-9]+))?".toRegex()
fun healthCheckTag(value: Pair<String, Int>?) =
if (value != null && value.first.isNotBlank() && value.second > 0) {
"$HEALTHCHECK=$HEALTHCHECK_ON;${value.first}:${value.second}"
} else if (value != null && value.first.isNotBlank() && value.second == 0) {
"$HEALTHCHECK=$HEALTHCHECK_ON"
} else {
null
}
fun healthCheckTag(tags: Set<String>) = tags.firstOrNull(::isHealthCheckTag)
fun isHealthCheckTag(tag: String) = HEALTHCHECK_REGEX.matches(tag)
fun healthCheckTagValue(tags: Set<String>) = healthCheckTagValue(healthCheckTag(tags)?:"")
fun healthCheckTagValue(tag: String) = HEALTHCHECK_REGEX.matchEntire(tag)
?.groupValues
?.let {
if (it[1].isNotEmpty()) {
Pair(it[1], it[2].toInt())
} else {
Pair(HEALTHCHECK_ON, 0)
}}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.hotels.styx.routing.handlers

import com.fasterxml.jackson.annotation.JsonProperty
import com.hotels.styx.*
import com.hotels.styx.api.Eventual
import com.hotels.styx.api.HttpInterceptor
import com.hotels.styx.api.Id
Expand All @@ -39,12 +40,10 @@ import com.hotels.styx.config.schema.SchemaDsl.integer
import com.hotels.styx.config.schema.SchemaDsl.optional
import com.hotels.styx.config.schema.SchemaDsl.string
import com.hotels.styx.infrastructure.configuration.yaml.JsonNodeConfig
import com.hotels.styx.lbGroupTag
import com.hotels.styx.routing.RoutingObject
import com.hotels.styx.routing.RoutingObjectRecord
import com.hotels.styx.routing.config.RoutingObjectFactory
import com.hotels.styx.routing.config.StyxObjectDefinition
import com.hotels.styx.services.HealthCheckMonitoringService.Companion.INACTIVE_TAG
import org.slf4j.LoggerFactory
import reactor.core.Disposable
import reactor.core.publisher.toFlux
Expand Down Expand Up @@ -122,21 +121,16 @@ internal class LoadBalancingGroup(val client: StyxBackendServiceClient, val chan

private fun routeDatabaseChanged(appId: String, snapshot: ObjectStore<RoutingObjectRecord>, remoteHosts: AtomicReference<Set<RemoteHost>>) {
val newSet = snapshot.entrySet()
.filter { isTaggedWith(it, lbGroupTag(appId)) }
.filterNot { isTaggedWith(it, "$INACTIVE_TAG.*".toRegex()) }
.filter { taggedWith(it, ::lbGroupTagValue, appId) }
.filter { taggedWith(it, ::stateTagValue, STATE_ACTIVE, null) }
.map { toRemoteHost(appId, it) }
.toSet()

remoteHosts.set(newSet)
}

private fun isTaggedWith(recordEntry: Map.Entry<String, RoutingObjectRecord>, tag: String): Boolean {
return recordEntry.value.tags.contains(tag)
}

private fun isTaggedWith(recordEntry: Map.Entry<String, RoutingObjectRecord>, tag: Regex): Boolean {
return recordEntry.value.tags.firstOrNull { it.matches(tag) } != null
}
private fun taggedWith(recordEntry: Map.Entry<String, RoutingObjectRecord>, tagValue: (Set<String>) -> String?, vararg values: String?) =
values.contains(tagValue(recordEntry.value.tags))

private fun toRemoteHost(appId: String, record: Map.Entry<String, RoutingObjectRecord>): RemoteHost {
val routingObject = record.value.routingObject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.hotels.styx.services

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.JsonNode
import com.hotels.styx.*
import com.hotels.styx.api.HttpRequest
import com.hotels.styx.api.extension.service.spi.AbstractStyxService
import com.hotels.styx.api.extension.service.spi.StyxService
Expand All @@ -26,21 +27,18 @@ import com.hotels.styx.config.schema.SchemaDsl.integer
import com.hotels.styx.config.schema.SchemaDsl.optional
import com.hotels.styx.config.schema.SchemaDsl.string
import com.hotels.styx.infrastructure.configuration.yaml.JsonNodeConfig
import com.hotels.styx.lbGroupTag
import com.hotels.styx.routing.RoutingObject
import com.hotels.styx.routing.RoutingObjectRecord
import com.hotels.styx.routing.config.RoutingObjectFactory
import com.hotels.styx.routing.db.StyxObjectStore
import com.hotels.styx.routing.handlers.ProviderObjectRecord
import com.hotels.styx.serviceproviders.ServiceProviderFactory
import com.hotels.styx.services.HealthCheckMonitoringService.Companion.ACTIVE_TAG
import com.hotels.styx.services.HealthCheckMonitoringService.Companion.EXECUTOR
import com.hotels.styx.services.HealthCheckMonitoringService.Companion.INACTIVE_TAG
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.toMono
import java.lang.RuntimeException
import java.time.Duration
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
Expand Down Expand Up @@ -68,9 +66,6 @@ internal class HealthCheckMonitoringService(
optional("unhealthyThreshold", integer())
)

val ACTIVE_TAG = "state:active"
val INACTIVE_TAG = "state:inactive"

internal val EXECUTOR = ScheduledThreadPoolExecutor(2)

private val LOGGER = LoggerFactory.getLogger(HealthCheckMonitoringService::class.java)
Expand All @@ -93,29 +88,30 @@ internal class HealthCheckMonitoringService(
LOGGER.info("stopped")

objectStore.entrySet()
.filter(::containsInactiveTag)
.forEach {
(name, _) -> markObject(objectStore, name, ObjectActive(0))
.filter(::containsRelevantStateTag)
.forEach { (name, _) ->
markObject(objectStore, name, ObjectActive(0, false))
}

futureRef.get().cancel(false)
}

internal fun runChecks(application: String, objectStore: StyxObjectStore<RoutingObjectRecord>) {
val monitoredObjects = discoverMonitoredObjects(application, objectStore)
.map {
val tag = healthStatusTag(it.second.tags).orElse(INACTIVE_TAG)
val health = objectHealthFrom(tag).orElse(null)
Triple(it.first, it.second, health)
val monitoredObjects = objectStore.entrySet()
.map { Pair(it.key, it.value) }
.filter { (_, record) -> record.tags.contains(lbGroupTag(application)) }
.map { (name, record) ->
val tags = record.tags
val objectHealth = objectHealthFrom(stateTagValue(tags), healthCheckTagValue(tags))
Triple(name, record, objectHealth)
}
.filter { (_, _, objectHealth) -> objectHealth != null }

val pendingHealthChecks = monitoredObjects
.map { (name, record, objectHealth) ->
healthCheck(probe, record.routingObject, objectHealth)
.map { newHealth -> Triple(name, objectHealth, newHealth) }
.doOnNext { (name, currentHealth, newHealth) ->
if (currentHealth != newHealth || tagIsIncomplete(record.tags)) {
if (currentHealth != newHealth) {
markObject(objectStore, name, newHealth)
}
}
Expand Down Expand Up @@ -162,49 +158,58 @@ internal class HealthCheckMonitoringServiceFactory : ServiceProviderFactory {
}
}

internal fun objectHealthFrom(string: String) = Optional.ofNullable(
internal fun objectHealthFrom(state: String?, health: Pair<String, Int>?) =
when {
string.equals(ACTIVE_TAG) -> ObjectActive(0)
string.equals(INACTIVE_TAG) -> ObjectInactive(0)
string.matches("$ACTIVE_TAG:[0-9]+".toRegex()) -> {
val count = string.removePrefix("$ACTIVE_TAG:").toInt()
ObjectActive(count)
state == STATE_ACTIVE && (health?.first == HEALTHCHECK_FAILING && health.second >= 0) -> {
ObjectActive(health.second)
}
string.matches("$INACTIVE_TAG:[0-9]+".toRegex()) -> {
val count = string.removePrefix("$INACTIVE_TAG:").toInt()
ObjectInactive(count)

state == STATE_UNREACHABLE && (health?.first == HEALTHCHECK_PASSING && health.second >= 0) -> {
ObjectUnreachable(health.second)
}
else -> null
})

private fun healthStatusTag(tags: Set<String>) = Optional.ofNullable(
tags.firstOrNull {
it.startsWith(ACTIVE_TAG) || it.startsWith(INACTIVE_TAG)
state == STATE_ACTIVE -> ObjectActive(0, healthTagPresent = (health != null))
state == STATE_UNREACHABLE -> ObjectUnreachable(0, healthTagPresent = (health != null))
state == null -> ObjectUnreachable(0, healthTagPresent = (health != null))

else -> ObjectOther(state)
}
)

internal fun tagIsIncomplete(tag: Set<String>) = !healthStatusTag(tag)
.filter { it.matches(".+:([0-9]+)$".toRegex()) }
.isPresent

internal fun discoverMonitoredObjects(application: String, objectStore: StyxObjectStore<RoutingObjectRecord>) =
objectStore.entrySet()
.filter { it.value.tags.contains(lbGroupTag(application)) }
.map { Pair(it.key, it.value) }
internal class ObjectDisappearedException : RuntimeException("Object disappeared")

private fun markObject(db: StyxObjectStore<RoutingObjectRecord>, name: String, newStatus: ObjectHealth) {
db.get(name).ifPresent { db.insert(name, it.copy(tags = reTag(it.tags, newStatus))) }
// The ifPresent is not ideal, but compute() does not allow the computation to return null. So we can't preserve
// a state where the object does not exist using compute alone. But even with ifPresent, as we are open to
// the object disappearing between the ifPresent and the compute, which would again lead to the compute creating
// a new object when we don't want it to. But at least this will happen much less frequently.
db.get(name).ifPresent {
try {
db.compute(name) { previous ->
if (previous == null) throw ObjectDisappearedException()
val prevTags = previous.tags
val newTags = reTag(prevTags, newStatus)
if (prevTags != newTags)
it.copy(tags = newTags)
else
previous
}
} catch (e: ObjectDisappearedException) {
// Object disappeared between the ifPresent check and the compute, but we don't really mind.
// We just want to exit the compute, to avoid re-creating it.
// (The ifPresent is not strictly required, but a pre-emptive check is preferred to an exception)
}
}
}

internal fun reTag(tags: Set<String>, newStatus: ObjectHealth) = tags
.filterNot { it.matches("($ACTIVE_TAG|$INACTIVE_TAG).*".toRegex()) }
.plus(statusTag(newStatus))
.toSet()
internal fun reTag(tags: Set<String>, newStatus: ObjectHealth) =
tags.asSequence()
.filterNot { isStateTag(it) || isHealthCheckTag(it) }
.plus(stateTag(newStatus.state()))
.plus(healthCheckTag(newStatus.health()))
.filterNotNull()
.toSet()

private fun statusTag(status: ObjectHealth) = when (status) {
is ObjectActive -> "$ACTIVE_TAG:${status.failedProbes}"
is ObjectInactive -> "$INACTIVE_TAG:${status.successfulProbes}"
}
private val RELEVANT_STATES = setOf(STATE_ACTIVE, STATE_UNREACHABLE)
private fun containsRelevantStateTag(entry: Map.Entry<String, RoutingObjectRecord>) =
stateTagValue(entry.value.tags) in RELEVANT_STATES

private fun containsInactiveTag(entry: Map.Entry<String, RoutingObjectRecord>) =
entry.value.tags.any { it.matches("($INACTIVE_TAG).*".toRegex()) }
Loading

0 comments on commit 2d0d16a

Please sign in to comment.