-
Notifications
You must be signed in to change notification settings - Fork 79
Update execute API to use Unconfined dispatcher to keep thread context. #90
Update execute API to use Unconfined dispatcher to keep thread context. #90
Conversation
I don't think we should restrict to a specific dispatcher to avoid losing the Elasticsearch ThreadContext. Instead you can use Kotlin's ThreadContextElement to restore the ES ThreadContext immediately before and after suspension (maybe restoring just the security plugin's context). |
You want to capture and restore the threadContext on whatever thread the coroutine resumes on instead of forcing the coroutine to run in the import kotlinx.coroutines.ThreadContextElement
import org.elasticsearch.common.util.concurrent.ThreadContext
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext
import kotlin.coroutines.CoroutineContext
class ElasticThreadContextElement(private val threadContext: ThreadContext) : ThreadContextElement<StoredContext> {
companion object Key : CoroutineContext.Key<ElasticThreadContextElement>
override val key: CoroutineContext.Key<*>
get() = Key
override fun restoreThreadContext(context: CoroutineContext, oldState: StoredContext) = oldState.restore()
override fun updateThreadContext(context: CoroutineContext): StoredContext = threadContext.newStoredContext(true)
}
//... When invoking
launch (ElasticThreadContextElement(threadPool.getThreadContext()) {
...
} |
Implemented but in a slightly different manner as to preserve the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also validate that the security headers are propagated correctly when calling withContext(Dispatchers.IO)
in runActions
and getDestinationInfo
? If not we need to manually add an ElasticThreadContextElement
in those invocations as well:
withContext(Dispatchers.IO + ElasticThreadContextElement(threadContext)) {
...
}
(I'm hoping this is unnnecessary and that the elements are automatically propagated from the parent to any child contexts)
...c/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt
Outdated
Show resolved
Hide resolved
} | ||
|
||
override fun updateThreadContext(context: CoroutineContext): StoredContext { | ||
oldContext.restore() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only want to restore oldContext
on the first call otherwise we risk resetting the context back to what it was when we launched the coroutine every time we change thread (losing further updates to ThreadContext).
You probably want to null out initialContext
(aka oldContext
) after it's been restored the first time:
private var StoredContext? initialContext = threadContext.newStoredContext()
override fun updateThreadContext(context: CoroutineContext) : StoredContext {
if (initialContext != null) {
initialContext.restore()
initialContext = null
}
return threadContext.newStoredContext(true)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With either of these solution we have a new issue.
Once calling the execute API already existing jobs fail to run. Log output as example:
Execute call using credentials that do not have credentials to look at the index:
[2019-08-01T15:56:50,668][INFO ][c.a.o.a.MonitorRunner ] [smoketestnode] Error collecting inputs for monitor:
org.elasticsearch.ElasticsearchSecurityException: no permissions for [indices:data/read/search] and User [name=lucas, roles=[], requestedTenant=null]
at com.amazon.opendistroforelasticsearch.security.filter.OpenDistroSecurityFilter.apply0(OpenDistroSecurityFilter.java:269) ~[?:?]
at com.amazon.opendistroforelasticsearch.security.filter.OpenDistroSecurityFilter.apply(OpenDistroSecurityFilter.java:119) ~[?:?]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:143) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:121) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:64) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:83) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:72) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:393) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.client.support.AbstractClient.search(AbstractClient.java:526) ~[elasticsearch-7.1.1.jar:7.1.1]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner$collectInputResults$$inlined$forEach$lambda$1.invoke(MonitorRunner.kt:292) ~[opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner$collectInputResults$$inlined$forEach$lambda$1.invoke(MonitorRunner.kt:97) ~[opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at com.amazon.opendistroforelasticsearch.alerting.elasticapi.ElasticExtensionsKt.suspendUntil(ElasticExtensions.kt:146) ~[alerting-core-1.1.0.0.jar:?]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner.collectInputResults(MonitorRunner.kt:292) [opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner.runMonitor(MonitorRunner.kt:194) [opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner$runMonitor$1.invokeSuspend(MonitorRunner.kt) [opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32) [kotlin-stdlib-1.3.21.jar:1.3.21-release-158 (1.3.21)]
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:233) [kotlinx-coroutines-core-1.1.1.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594) [kotlinx-coroutines-core-1.1.1.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60) [kotlinx-coroutines-core-1.1.1.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:742) [kotlinx-coroutines-core-1.1.1.jar:?]
[2019-08-01T15:56:56,730][INFO ][c.a.o.a.r.RestIndexMonitorAction] [smoketestnode] Updated .opendistro-alerting-config with mappings.
A preexisting monitor that was scheduled earlier:
[2019-08-01T15:56:37,723][INFO ][c.a.o.a.c.s.JobScheduler ] [smoketestnode] Scheduling jobId : P6VZT2wBDfxzsNgx-l8G, name: Integration test monitor workflow
. . . . . . .
[2019-08-01T15:58:12,469][INFO ][c.a.o.a.MonitorRunner ] [smoketestnode] Error collecting inputs for monitor: P6VZT2wBDfxzsNgx-l8G
org.elasticsearch.ElasticsearchSecurityException: no permissions for [indices:data/read/search] and User [name=lucas, roles=[], requestedTenant=null]
at com.amazon.opendistroforelasticsearch.security.filter.OpenDistroSecurityFilter.apply0(OpenDistroSecurityFilter.java:269) ~[?:?]
at com.amazon.opendistroforelasticsearch.security.filter.OpenDistroSecurityFilter.apply(OpenDistroSecurityFilter.java:119) ~[?:?]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:143) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:121) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:64) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:83) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:72) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:393) ~[elasticsearch-7.1.1.jar:7.1.1]
at org.elasticsearch.client.support.AbstractClient.search(AbstractClient.java:526) ~[elasticsearch-7.1.1.jar:7.1.1]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner$collectInputResults$$inlined$forEach$lambda$1.invoke(MonitorRunner.kt:292) ~[opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner$collectInputResults$$inlined$forEach$lambda$1.invoke(MonitorRunner.kt:97) ~[opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at com.amazon.opendistroforelasticsearch.alerting.elasticapi.ElasticExtensionsKt.suspendUntil(ElasticExtensions.kt:146) ~[alerting-core-1.1.0.0.jar:?]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner.collectInputResults(MonitorRunner.kt:292) [opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner.runMonitor(MonitorRunner.kt:194) [opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at com.amazon.opendistroforelasticsearch.alerting.MonitorRunner$runMonitor$1.invokeSuspend(MonitorRunner.kt) [opendistro_alerting-1.1.0.0-SNAPSHOT.jar:1.1.0.0-SNAPSHOT]
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32) [kotlin-stdlib-1.3.21.jar:1.3.21-release-158 (1.3.21)]
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:233) [kotlinx-coroutines-core-1.1.1.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594) [kotlinx-coroutines-core-1.1.1.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60) [kotlinx-coroutines-core-1.1.1.jar:?]
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:742) [kotlinx-coroutines-core-1.1.1.jar:?]
This only happens some times and not to all monitors which makes me think the issue is only when a monitor is executed on the thread that schedules the monitor.
Also your above solution requires initialContext!!.restore()
due to initialContext is a mutable property that could have been changed by this time
when to my understanding it really shouldn't be since its a single object just created for this particular coroutine.
...c/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt
Outdated
Show resolved
Hide resolved
The variables are persisted throughout these calls, so no need to add them with the |
…rch#90) * Use the ElasticThreadContextElement when executing a monitor to preserve the context variables needed.
* Use the ElasticThreadContextElement when executing a monitor to preserve the context variables needed.
…opendistro-for-elasticsearch#93) * Update execute API to keep thread context. (opendistro-for-elasticsearch#90) * Use the ElasticThreadContextElement when executing a monitor to preserve the context variables needed.
* Use the ElasticThreadContextElement when executing a monitor to preserve the context variables needed.
Description of changes:
Update execute API to use Unconfined dispatcher. This will allow context to remain when combing alerting with security plugin.
Prior to this change the execute API would run on a background co-routine thread which removes the security plugin
user
context.From the Kotlin documentation:
Build output:
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.