Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Common tags #549

Merged
merged 5 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import static com.hotels.styx.api.HttpResponse.response;
import static com.hotels.styx.api.HttpResponseStatus.OK;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.CREATED;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.FAILED;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.RUNNING;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.STARTING;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.STOPPED;
Expand Down Expand Up @@ -91,7 +92,7 @@ public CompletableFuture<Void> stop() {

private Function<Throwable, Void> failWithMessage(String message) {
return cause -> {
status.set(StyxServiceStatus.FAILED);
status.set(FAILED);
throw new ServiceFailureException(message, cause);
};
}
Expand Down
122 changes: 122 additions & 0 deletions components/proxy/src/main/kotlin/com/hotels/styx/CommonTags.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright (C) 2013-2019 Expedia Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx

/**
* A common tag representation.
*
* Enforces a common tag format and provides a set of higher order functionality
* that operate on provided `name`, `encode` and `decode` properties.
*
* A tag string is a name-value pair separated by equal ('=') sign:
*
* tag-string = name "=" value-part
*
* `encode` and `decode` are functions to encode a T to a value-part string,
* and to decode the value-part string back to T.
*
* @property name a tag name
* @property encode a function that encodes a value of T as a tag value string
* @property decode a function that decodes a tag value string as a T
*/
sealed class CommonValueTag<T>(
val name: String,
val encode: (T) -> String?,
val decode: (String) -> T?) {
Comment on lines +37 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth providing defaults for encode and decode as { it } ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this possible without knowing the type T?


/**
* Extracts value part (the right hand side) from a tag string.
*
* @param tag a tag string
* @return the tag value
*/
fun valuePart(tag: String) = if (tag.startsWith("$name=")) {
tag.removePrefix("$name=")
} else {
null
}

/**
* Tests if a given string matches this tag. A match is positive when the string
* starts with `name` followed by `=`.
*
* @param a tag string
* @return True if this string is possibly a matching tag. Otherwise return false.
*/
fun match(tag: String) = tag.startsWith("$name=")

/**
* Decodes given tag string to its typed value.
*
* @param tag a tag string
* @return a decoded tag value, or null if decoding failed
*/
fun valueOf(tag: String) = valuePart(tag)
?.let { decode(it) }

/**
* Find this tag from a set of tag strings. If found, decode the tag value. Return null if
* tag was not found, or if decoding failed.
*
* @param tags a set of tag strings
* @return A decoded value if tag was found. Otherwise return null.
*/
fun find(tags: Set<String>) = tags.firstOrNull { this.match(it) }
?.let { valuePart(it) }
?.let { this.decode(it) }
mikkokar marked this conversation as resolved.
Show resolved Hide resolved

/**
* Removes all instances of this tag from a set of tag strings.
* Return a new Set<String> with all instances of this tag removed.
*
* @param tags a set of tag strings
* @return A new set of strings without this tag.
*/
fun remove(tags: Set<String>) = tags
.filterNot { this.match(it) }
.toSet()
}

/**
* A NullableValueTag invoke method returns null when value cannot be encoded to string.
* The API consumer must handle this situation.
*/
class NullableValueTag<T>(
name: String,
encode: (T) -> String?,
decode: (String) -> T?) : CommonValueTag<T>(name, encode, decode) {

operator fun invoke(value: T): String? = encode(value)
?.let {
"$name=$it"
}
}

/**
* A SafeValueTag invoke method throws a KotlinNullPointerException when the
* tag value cannot be encoded to string.
*/
class SafeValueTag<T>(
name: String,
encode: (T) -> String?,
decode: (String) -> T?) : CommonValueTag<T>(name, encode, decode) {

operator fun invoke(value: T): String = encode(value)
.let {
it!!
"$name=$it"
}
}
92 changes: 49 additions & 43 deletions components/proxy/src/main/kotlin/com/hotels/styx/ObjectTags.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,58 +15,64 @@
*/
package com.hotels.styx

private const val LBGROUP = "lbGroup"
private val LBGROUP_REGEX = "$LBGROUP=(.+)".toRegex()
fun lbGroupTag(name: String) = "lbGroup=$name"
fun lbGroupTag(tags: Set<String>) = tags.firstOrNull(::isLbGroupTag)
fun isLbGroupTag(tag: String) = LBGROUP_REGEX.matches(tag)
fun lbGroupTagValue(tags: Set<String>) = lbGroupTagValue(lbGroupTag(tags)?:"")
fun lbGroupTagValue(tag: String): String? = LBGROUP_REGEX.matchEntire(tag)
?.groupValues
?.get(1)
/*
* TAG: lbGroup
*/
val lbGroupTag = SafeValueTag(
"lbGroup",
{ it },
{ it })

/*
* TAG: source
*/
val sourceTag = SafeValueTag(
"source",
{ it },
{ it })

fun sourceTag(creator: String) = "source=$creator"
fun sourceTag(tags: Set<String>) = tags.firstOrNull { it.startsWith("source=") }
fun sourceTagValue(tags: Set<String>) = sourceTag(tags)?.substring("source".length + 1)

private const val STATE = "state"
/*
* TAG: state
*/
const val STATE_ACTIVE = "active"
const val STATE_UNREACHABLE = "unreachable"
const val STATE_INACTIVE = "inactive"
private val STATE_REGEX = "$STATE=(.+)".toRegex()
fun stateTag(value: String) = "$STATE=$value"
fun stateTag(tags: Set<String>) = tags.firstOrNull(::isStateTag)
fun isStateTag(tag: String) = STATE_REGEX.matches(tag)
fun stateTagValue(tags: Set<String>) = stateTagValue(stateTag(tags)?:"")
fun stateTagValue(tag: String) = STATE_REGEX.matchEntire(tag)
?.groupValues
?.get(1)

val stateTag = SafeValueTag(
"state",
{ it },
{ it })

/*
* TAG: healthCheck
* healthCheck=on
* healthCheck=on;probes-OK:2
* healthCheck=on;probes-FAIL:1
*/
private const val HEALTHCHECK = "healthCheck"
mikkokar marked this conversation as resolved.
Show resolved Hide resolved
const val HEALTHCHECK_PASSING = "probes-OK"
const val HEALTHCHECK_FAILING = "probes-FAIL"
const val HEALTHCHECK_ON = "on"
private val HEALTHCHECK_REGEX = "$HEALTHCHECK_ON(?:;(.+):([0-9]+))?".toRegex()


// healthCheck=on
// healthCheck=on;probes-OK:2
// healthCheck=on;probes-FAIL:1
private val HEALTHCHECK_REGEX = "$HEALTHCHECK=$HEALTHCHECK_ON(?:;(.+):([0-9]+))?".toRegex()
fun healthCheckTag(value: Pair<String, Int>?) =
if (value != null && value.first.isNotBlank() && value.second > 0) {
"$HEALTHCHECK=$HEALTHCHECK_ON;${value.first}:${value.second}"
} else if (value != null && value.first.isNotBlank() && value.second == 0) {
"$HEALTHCHECK=$HEALTHCHECK_ON"
} else {
null
}
fun healthCheckTag(tags: Set<String>) = tags.firstOrNull(::isHealthCheckTag)
fun isHealthCheckTag(tag: String) = HEALTHCHECK_REGEX.matches(tag)
fun healthCheckTagValue(tags: Set<String>) = healthCheckTagValue(healthCheckTag(tags)?:"")
fun healthCheckTagValue(tag: String) = HEALTHCHECK_REGEX.matchEntire(tag)
?.groupValues
?.let {
if (it[1].isNotEmpty()) {
Pair(it[1], it[2].toInt())
val healthCheckTag = NullableValueTag(
"healthCheck",
{ value -> if (value.first.isNotBlank() && value.second > 0) {
"$HEALTHCHECK_ON;${value.first}:${value.second}"
} else if (value.first.isNotBlank() && value.second == 0) {
HEALTHCHECK_ON
} else {
Pair(HEALTHCHECK_ON, 0)
}}
null
}
},
{ tagValue -> HEALTHCHECK_REGEX.matchEntire(tagValue)
?.groupValues
?.let {
if (it[1].isNotEmpty()) {
Pair(it[1], it[2].toInt())
} else {
Pair(HEALTHCHECK_ON, 0)
}}
})
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.hotels.styx.routing.handlers

import com.fasterxml.jackson.annotation.JsonProperty
import com.hotels.styx.*
import com.hotels.styx.api.Eventual
import com.hotels.styx.api.HttpInterceptor
import com.hotels.styx.api.Id
Expand All @@ -40,10 +39,12 @@ import com.hotels.styx.config.schema.SchemaDsl.integer
import com.hotels.styx.config.schema.SchemaDsl.optional
import com.hotels.styx.config.schema.SchemaDsl.string
import com.hotels.styx.infrastructure.configuration.yaml.JsonNodeConfig
import com.hotels.styx.lbGroupTag
import com.hotels.styx.routing.RoutingObject
import com.hotels.styx.routing.RoutingObjectRecord
import com.hotels.styx.routing.config.RoutingObjectFactory
import com.hotels.styx.routing.config.StyxObjectDefinition
import com.hotels.styx.stateTag
import org.slf4j.LoggerFactory
import reactor.core.Disposable
import reactor.core.publisher.toFlux
Expand Down Expand Up @@ -121,17 +122,16 @@ internal class LoadBalancingGroup(val client: StyxBackendServiceClient, val chan

private fun routeDatabaseChanged(appId: String, snapshot: ObjectStore<RoutingObjectRecord>, remoteHosts: AtomicReference<Set<RemoteHost>>) {
val newSet = snapshot.entrySet()
.filter { taggedWith(it, ::lbGroupTagValue, appId) }
.filter { taggedWith(it, ::stateTagValue, STATE_ACTIVE, null) }
.filter { it.value.tags.contains(lbGroupTag(appId)) }
.filter { stateTag.find(it.value.tags)
.let { it == null || it == "active" }
}
.map { toRemoteHost(appId, it) }
.toSet()

remoteHosts.set(newSet)
}

private fun taggedWith(recordEntry: Map.Entry<String, RoutingObjectRecord>, tagValue: (Set<String>) -> String?, vararg values: String?) =
values.contains(tagValue(recordEntry.value.tags))

private fun toRemoteHost(appId: String, record: Map.Entry<String, RoutingObjectRecord>): RemoteHost {
val routingObject = record.value.routingObject
val originName = record.key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ internal class HealthCheckMonitoringService(

internal val EXECUTOR = ScheduledThreadPoolExecutor(2)

private val LOGGER = LoggerFactory.getLogger(HealthCheckMonitoringService::class.java)
internal val LOGGER = LoggerFactory.getLogger(HealthCheckMonitoringService::class.java)
}

private val probe = urlProbe(HttpRequest.get(urlPath).build(), Duration.ofMillis(1000))
private val determineObjectState = healthCheckFunction(activeThreshold, inactiveThreshold)
private val futureRef: AtomicReference<ScheduledFuture<*>> = AtomicReference()

override fun startService() = CompletableFuture.runAsync {
LOGGER.info("started - {} - {}", period.toMillis(), period.toMillis())
LOGGER.info("started service for {} - {} - {}", arrayOf(application, period.toMillis(), period.toMillis()))
futureRef.set(executor.scheduleAtFixedRate(
{ runChecks(application, objectStore) },
period.toMillis(),
Expand All @@ -85,12 +85,32 @@ internal class HealthCheckMonitoringService(
}

override fun stopService() = CompletableFuture.runAsync {
LOGGER.info("stopped")
LOGGER.info("stopped service for {}", application)

objectStore.entrySet()
.filter(::containsRelevantStateTag)
.forEach { (name, _) ->
markObject(objectStore, name, ObjectActive(0, false))
.forEach { (name, record) ->
objectStore.get(name).ifPresent {
try {
objectStore.compute(name) { previous ->
if (previous == null) throw ObjectDisappearedException()

val newTags = record.tags
mikkokar marked this conversation as resolved.
Show resolved Hide resolved
.let { healthCheckTag.remove(it) }
.let { stateTag.remove(it) }
.plus(stateTag(STATE_ACTIVE))

if (previous.tags != newTags)
it.copy(tags = newTags)
else
previous
}
} catch (e: ObjectDisappearedException) {
// Object disappeared between the ifPresent check and the compute, but we don't really mind.
// We just want to exit the compute, to avoid re-creating it.
// (The ifPresent is not strictly required, but a pre-emptive check is preferred to an exception)
}
}
}

futureRef.get().cancel(false)
Expand All @@ -102,7 +122,7 @@ internal class HealthCheckMonitoringService(
.filter { (_, record) -> record.tags.contains(lbGroupTag(application)) }
.map { (name, record) ->
val tags = record.tags
val objectHealth = objectHealthFrom(stateTagValue(tags), healthCheckTagValue(tags))
val objectHealth = objectHealthFrom(stateTag.find(tags), healthCheckTag.find(tags))
Triple(name, record, objectHealth)
}

Expand Down Expand Up @@ -177,6 +197,7 @@ internal fun objectHealthFrom(state: String?, health: Pair<String, Int>?) =

internal class ObjectDisappearedException : RuntimeException("Object disappeared")


private fun markObject(db: StyxObjectStore<RoutingObjectRecord>, name: String, newStatus: ObjectHealth) {
// The ifPresent is not ideal, but compute() does not allow the computation to return null. So we can't preserve
// a state where the object does not exist using compute alone. But even with ifPresent, as we are open to
Expand All @@ -202,14 +223,14 @@ private fun markObject(db: StyxObjectStore<RoutingObjectRecord>, name: String, n
}

internal fun reTag(tags: Set<String>, newStatus: ObjectHealth) =
tags.asSequence()
.filterNot { isStateTag(it) || isHealthCheckTag(it) }
.plus(stateTag(newStatus.state()))
.plus(healthCheckTag(newStatus.health()))
.filterNotNull()
.toSet()
tags.asSequence()
.filterNot { stateTag.match(it) || healthCheckTag.match(it) }
.plus(stateTag(newStatus.state()))
.plus(healthCheckTag(newStatus.health()!!))
.filterNotNull()
.toSet()

private val RELEVANT_STATES = setOf(STATE_ACTIVE, STATE_UNREACHABLE)
private fun containsRelevantStateTag(entry: Map.Entry<String, RoutingObjectRecord>) =
stateTagValue(entry.value.tags) in RELEVANT_STATES
stateTag.find(entry.value.tags) in RELEVANT_STATES

Loading