Skip to content

Commit

Permalink
Update execute API to keep thread context. (opendistro-for-elasticsea…
Browse files Browse the repository at this point in the history
…rch#90)

* Use the ElasticThreadContextElement when executing a monitor to preserve the context variables needed.
  • Loading branch information
lucaswin-amzn committed Aug 13, 2019
1 parent f8c0aca commit dc2e1a2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.ElasticThreadContextElement
import org.apache.logging.log4j.LogManager
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -64,7 +65,9 @@ class RestExecuteMonitorAction(
val requestEnd = request.paramAsTime("period_end", TimeValue(Instant.now().toEpochMilli()))

val executeMonitor = fun(monitor: Monitor) {
runner.launch {
// Launch the coroutine with the clients threadContext. This is needed to preserve authentication information
// stored on the threadContext set by the security plugin when using the Alerting plugin with the Security plugin.
runner.launch(ElasticThreadContextElement(client.threadPool().threadContext)) {
val (periodStart, periodEnd) =
monitor.schedule.getPeriodEndingAt(Instant.ofEpochMilli(requestEnd.millis))
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import java.time.Instant
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlinx.coroutines.ThreadContextElement
import org.elasticsearch.common.util.concurrent.ThreadContext
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext
import kotlin.coroutines.CoroutineContext

/** Convert an object to maps and lists representation */
fun ToXContent.convertToMap(): Map<String, Any> {
Expand Down Expand Up @@ -145,3 +149,32 @@ suspend fun <C : ElasticsearchClient, T> C.suspendUntil(block: C.(ActionListener
override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

/**
* Store a [ThreadContext] and restore a [ThreadContext] when the coroutine resumes on a different thread.
*
* @param threadContext - The context to store when switching to a coroutine.
* @param initialContext - The old context to restore upon completion of the coroutine.
*/
class ElasticThreadContextElement(
private val threadContext: ThreadContext
) : ThreadContextElement<StoredContext> {

companion object Key : CoroutineContext.Key<ElasticThreadContextElement>
private var initialContext: StoredContext? = threadContext.newStoredContext(true)

override val key: CoroutineContext.Key<*>
get() = Key

override fun restoreThreadContext(context: CoroutineContext, oldState: StoredContext) {
initialContext = threadContext.stashContext()
}

override fun updateThreadContext(context: CoroutineContext): StoredContext {
if (initialContext != null) {
initialContext!!.close()
initialContext = null
}
return threadContext.newRestorableContext(true).get()
}
}

0 comments on commit dc2e1a2

Please sign in to comment.