Skip to content

Commit

Permalink
ThreadPool and ThreadContext are not closeable (#43249)
Browse files Browse the repository at this point in the history
This commit changes the ThreadContext to just use a regular ThreadLocal
over the lucene CloseableThreadLocal. The CloseableThreadLocal solves
issues with ThreadLocals that are no longer needed during runtime but
in the case of the ThreadContext, we need it for the runtime of the
node and it is typically not closed until the node closes, so we miss
out on the benefits that this class provides.

Additionally by removing the close logic, we simplify code in other
places that deal with exceptions and tracking to see if it happens when
the node is closing.

Closes #42577
  • Loading branch information
jaymode authored Nov 18, 2019
1 parent e0aa910 commit 0260c6f
Show file tree
Hide file tree
Showing 21 changed files with 640 additions and 838 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,80 +387,77 @@ public void testJsonInStacktraceMessageIsSplitted() throws IOException {
public void testDuplicateLogMessages() throws IOException {
final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger("test"));


// For the same key and X-Opaque-ID deprecation should be once
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
try{
threadContext.putHeader(Task.X_OPAQUE_ID, "ID1");
DeprecationLogger.setThreadContext(threadContext);
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
deprecationLogger.deprecatedAndMaybeLog("key", "message2");
assertWarnings("message1", "message2");

final Path path = PathUtils.get(System.getProperty("es.logs.base_path"),
System.getProperty("es.logs.cluster_name") + "_deprecated.json");
try (Stream<Map<String, String>> stream = JsonLogsStream.mapStreamFrom(path)) {
List<Map<String, String>> jsonLogs = stream
.collect(Collectors.toList());

assertThat(jsonLogs, contains(
allOf(
hasEntry("type", "deprecation"),
hasEntry("level", "WARN"),
hasEntry("component", "d.test"),
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "message1"),
hasEntry("x-opaque-id", "ID1"))
)
);
}
}finally{
DeprecationLogger.removeThreadContext(threadContext);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader(Task.X_OPAQUE_ID, "ID1");
DeprecationLogger.setThreadContext(threadContext);
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
deprecationLogger.deprecatedAndMaybeLog("key", "message2");
assertWarnings("message1", "message2");

final Path path = PathUtils.get(System.getProperty("es.logs.base_path"),
System.getProperty("es.logs.cluster_name") + "_deprecated.json");
try (Stream<Map<String, String>> stream = JsonLogsStream.mapStreamFrom(path)) {
List<Map<String, String>> jsonLogs = stream
.collect(Collectors.toList());

assertThat(jsonLogs, contains(
allOf(
hasEntry("type", "deprecation"),
hasEntry("level", "WARN"),
hasEntry("component", "d.test"),
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "message1"),
hasEntry("x-opaque-id", "ID1"))
)
);
}
} finally {
DeprecationLogger.removeThreadContext(threadContext);
}


// For the same key and different X-Opaque-ID should be multiple times per key/x-opaque-id
//continuing with message1-ID1 in logs already, adding a new deprecation log line with message2-ID2
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
try{
threadContext.putHeader(Task.X_OPAQUE_ID, "ID2");
DeprecationLogger.setThreadContext(threadContext);
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
deprecationLogger.deprecatedAndMaybeLog("key", "message2");
assertWarnings("message1", "message2");

final Path path = PathUtils.get(System.getProperty("es.logs.base_path"),
System.getProperty("es.logs.cluster_name") + "_deprecated.json");
try (Stream<Map<String, String>> stream = JsonLogsStream.mapStreamFrom(path)) {
List<Map<String, String>> jsonLogs = stream
.collect(Collectors.toList());

assertThat(jsonLogs, contains(
allOf(
hasEntry("type", "deprecation"),
hasEntry("level", "WARN"),
hasEntry("component", "d.test"),
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "message1"),
hasEntry("x-opaque-id", "ID1")
),
allOf(
hasEntry("type", "deprecation"),
hasEntry("level", "WARN"),
hasEntry("component", "d.test"),
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "message1"),
hasEntry("x-opaque-id", "ID2")
)
)
);
}
}finally{
DeprecationLogger.removeThreadContext(threadContext);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader(Task.X_OPAQUE_ID, "ID2");
DeprecationLogger.setThreadContext(threadContext);
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
deprecationLogger.deprecatedAndMaybeLog("key", "message2");
assertWarnings("message1", "message2");

final Path path = PathUtils.get(System.getProperty("es.logs.base_path"),
System.getProperty("es.logs.cluster_name") + "_deprecated.json");
try (Stream<Map<String, String>> stream = JsonLogsStream.mapStreamFrom(path)) {
List<Map<String, String>> jsonLogs = stream
.collect(Collectors.toList());

assertThat(jsonLogs, contains(
allOf(
hasEntry("type", "deprecation"),
hasEntry("level", "WARN"),
hasEntry("component", "d.test"),
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "message1"),
hasEntry("x-opaque-id", "ID1")
),
allOf(
hasEntry("type", "deprecation"),
hasEntry("level", "WARN"),
hasEntry("component", "d.test"),
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "message1"),
hasEntry("x-opaque-id", "ID2")
)
)
);
}
} finally {
DeprecationLogger.removeThreadContext(threadContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ public Void run() {

public String getXOpaqueId(Set<ThreadContext> threadContexts) {
return threadContexts.stream()
.filter(t -> t.isClosed() == false)
.filter(t -> t.getHeader(Task.X_OPAQUE_ID) != null)
.findFirst()
.map(t -> t.getHeader(Task.X_OPAQUE_ID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,8 @@ protected void afterExecute(Runnable r, Throwable t) {
}

private boolean assertDefaultContext(Runnable r) {
try {
assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
} catch (IllegalStateException ex) {
// sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
// this must not trigger an exception here since we only assert if the default is restored and
// we don't really care if we are closed
if (contextHolder.isClosed() == false) {
throw ex;
}
}
assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
return true;
}

Expand Down
Loading

0 comments on commit 0260c6f

Please sign in to comment.