Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds minimal traceparent header support to Elasticsearch backport(#74210) #75331

Merged
merged 5 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ appender.deprecation_rolling.name = deprecation_rolling
appender.deprecation_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecation.json
appender.deprecation_rolling.layout.type = ESJsonLayout
appender.deprecation_rolling.layout.type_name = deprecation.elasticsearch
appender.deprecation_rolling.layout.esmessagefields=x-opaque-id
appender.deprecation_rolling.layout.esmessagefields=x-opaque-id,key
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appender.deprecation_rolling.filter.rate_limit.type = RateLimitingFilter

appender.deprecation_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecation-%i.json.gz
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void testLayout() {
"\"node.name\": \"%node_name\", " +
"\"message\": \"%notEmpty{%enc{%marker}{JSON} }%enc{%.-10000m}{JSON}\"" +
"%notEmpty{, %node_and_cluster_id }" +
"%notEmpty{, %trace_id }" +
"%exceptionAsJson }" + System.lineSeparator()));
}

Expand All @@ -62,6 +63,7 @@ public void testLayoutWithAdditionalFields() {
"%notEmpty{, \"x-opaque-id\": \"%ESMessageField{x-opaque-id}\"}" +
"%notEmpty{, \"someOtherField\": \"%ESMessageField{someOtherField}\"}" +
"%notEmpty{, %node_and_cluster_id }" +
"%notEmpty{, %trace_id }" +
"%exceptionAsJson }" + System.lineSeparator()));
}

Expand All @@ -82,6 +84,7 @@ public void testLayoutWithAdditionalFieldOverride() {
"\"node.name\": \"%node_name\"" +
"%notEmpty{, \"message\": \"%ESMessageField{message}\"}" +
"%notEmpty{, %node_and_cluster_id }" +
"%notEmpty{, %trace_id }" +
"%exceptionAsJson }" + System.lineSeparator()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testParseFieldEmittingLogs() throws Exception {


public void testParseFieldEmittingDeprecatedLogs() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putHeader(Task.TRACE_ID, "someTraceId");

ParseField deprecatedField = new ParseField("new_name", "deprecated_name");
assertTrue(deprecatedField.match("deprecated_name", LoggingDeprecationHandler.INSTANCE));
Expand All @@ -102,7 +105,9 @@ public void testParseFieldEmittingLogs() throws Exception {
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "Deprecated field [deprecated_name] used, expected [new_name] instead"),
hasEntry("x-opaque-id", "someId")
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId")
),
allOf(
hasEntry("type", "deprecation.elasticsearch"),
Expand All @@ -111,7 +116,9 @@ public void testParseFieldEmittingLogs() throws Exception {
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "Deprecated field [deprecated_name2] used, expected [new_name] instead"),
hasEntry("x-opaque-id", "someId")
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name2"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId")
)

)
Expand All @@ -126,6 +133,7 @@ public void testParseFieldEmittingLogs() throws Exception {
public void testDeprecatedMessage() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putHeader(Task.TRACE_ID, "someTraceId");
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
testLogger.deprecate(DeprecationCategory.OTHER, "someKey", "deprecated message1");

Expand All @@ -147,6 +155,7 @@ public void testDeprecatedMessage() throws Exception {
hasEntry("cluster.name", "elasticsearch"),
hasEntry("node.name", "sample-name"),
hasEntry("message", "deprecated message1"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "someKey"),
hasEntry("x-opaque-id", "someId")
)
)
Expand All @@ -157,7 +166,6 @@ public void testDeprecatedMessage() throws Exception {
});
}


public void testDeprecatedMessageWithoutXOpaqueId() throws IOException {
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
testLogger.deprecate(DeprecationCategory.OTHER, "a key", "deprecated message1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ appender.deprecated.name = deprecated
appender.deprecated.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecated.json
appender.deprecated.layout.type = ESJsonLayout
appender.deprecated.layout.type_name = deprecation.elasticsearch
appender.deprecated.layout.esmessagefields = x-opaque-id
appender.deprecated.layout.esmessagefields = x-opaque-id,key
appender.deprecated.filter.rate_limit.type = RateLimitingFilter

appender.deprecation_rolling_old.type = File
Expand All @@ -28,7 +28,7 @@ appender.deprecatedconsole.type = Console
appender.deprecatedconsole.name = deprecatedconsole
appender.deprecatedconsole.layout.type = ESJsonLayout
appender.deprecatedconsole.layout.type_name = deprecation.elasticsearch
appender.deprecatedconsole.layout.esmessagefields = x-opaque-id
appender.deprecatedconsole.layout.esmessagefields = x-opaque-id,key
appender.deprecatedconsole.filter.rate_limit.type = RateLimitingFilter

appender.index_search_slowlog_rolling.type = File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
new RestHeaderDefinition(Task.TRACE_PARENT, false)
)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
public class DeprecatedMessage extends ESLogMessage {
public static final String X_OPAQUE_ID_FIELD_NAME = "x-opaque-id";
public static final String KEY_FIELD_NAME = "key";

public DeprecatedMessage(DeprecationCategory category, String key, String xOpaqueId, String messagePattern, Object... args) {
super(fieldMap(category, key, xOpaqueId), messagePattern, args);
Expand All @@ -35,7 +36,7 @@ private static Map<String, Object> fieldMap(DeprecationCategory category, String
builder.put("category", category.name().toLowerCase(Locale.ROOT));

if (Strings.isNullOrEmpty(key) == false) {
builder.put("key", key);
builder.put(KEY_FIELD_NAME, key);
}
if (Strings.isNullOrEmpty(xOpaqueId) == false) {
builder.put(X_OPAQUE_ID_FIELD_NAME, xOpaqueId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ private String createPattern(Map<String, Object> map, Set<String> esMessageField
separator = ", ";
}
sb.append(notEmpty(", %node_and_cluster_id "));
sb.append(notEmpty(", %trace_id "));
sb.append("%exceptionAsJson ");
sb.append("}");
sb.append(System.lineSeparator());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.logging;

import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.pattern.ConverterKeys;
import org.apache.logging.log4j.core.pattern.LogEventPatternConverter;
import org.apache.logging.log4j.core.pattern.PatternConverter;
import org.elasticsearch.tasks.Task;

import java.util.Objects;

/**
* Pattern converter to format the trace id provided in the traceparent header into JSON fields <code>trace.id</code>.
*/
@Plugin(category = PatternConverter.CATEGORY, name = "TraceIdConverter")
@ConverterKeys({"trace_id"})
public final class TraceIdConverter extends LogEventPatternConverter {
/**
* Called by log4j2 to initialize this converter.
*/
public static TraceIdConverter newInstance(@SuppressWarnings("unused") final String[] options) {
return new TraceIdConverter();
}

public TraceIdConverter() {
super("trace_id", "trace_id");
}

public static String getTraceId() {
return HeaderWarning.THREAD_CONTEXT.stream()
.map(t -> t.<String>getHeader(Task.TRACE_ID))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

/**
* Formats the trace.id into json fields.
*
* @param event - a log event is ignored in this method as it uses the clusterId value
* from <code>NodeAndClusterIdStateListener</code> to format
*/
@Override
public void format(LogEvent event, StringBuilder toAppendTo) {
String traceId = getTraceId();
if (traceId != null) {
toAppendTo.append("\"trace.id\": \"" + traceId + "\"");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -109,15 +108,21 @@ public StoredContext stashContext() {
/**
* X-Opaque-ID should be preserved in a threadContext in order to propagate this across threads.
* This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user.
* The same is applied to Task.TRACE_ID.
* Otherwise when context is stash, it should be empty.
*/
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
ThreadContextStruct threadContextStruct =
DEFAULT_CONTEXT.putHeaders(MapBuilder.<String, String>newMapBuilder()
.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID))
.immutableMap());
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID) || context.requestHeaders.containsKey(Task.TRACE_ID)) {
Map<String, String> map = new HashMap<>(2, 1);
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
map.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID));
}
if (context.requestHeaders.containsKey(Task.TRACE_ID)) {
map.put(Task.TRACE_ID, context.requestHeaders.get(Task.TRACE_ID));
}
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(map);
threadLocal.set(threadContextStruct);
} else {
}
else {
threadLocal.set(DEFAULT_CONTEXT);
}
return () -> {
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ protected Node(final Environment initialEnvironment,
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
Stream.of(Task.X_OPAQUE_ID, Task.TRACE_ID)
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
Expand Down
55 changes: 35 additions & 20 deletions server/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.rest.RestHandler.Route;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.usage.UsageService;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -309,29 +310,15 @@ private void sendContentTypeErrorMessage(@Nullable List<String> contentTypeHeade
}

private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
for (final RestHeaderDefinition restHeader : headersToCopy) {
final String name = restHeader.getName();
final List<String> headerValues = request.getAllHeaderValues(name);
if (headerValues != null && headerValues.isEmpty() == false) {
final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
channel.sendResponse(
BytesRestResponse.
createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));
return;
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
}
}
}
// error_trace cannot be used when we disable detailed errors
// we consume the error_trace parameter first to ensure that it is always consumed
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
channel.sendResponse(
BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
try {
copyRestHeaders(request, threadContext);
validateErrorTrace(request, channel);
} catch (IllegalArgumentException e) {
channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, e.getMessage()));
return;
}


final String rawPath = request.rawPath();
final String uri = request.uri();
final RestRequest.Method requestMethod;
Expand Down Expand Up @@ -365,6 +352,34 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel
handleBadRequest(uri, requestMethod, channel);
}

private void validateErrorTrace(RestRequest request, RestChannel channel) {
// error_trace cannot be used when we disable detailed errors
// we consume the error_trace parameter first to ensure that it is always consumed
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
throw new IllegalArgumentException("error traces in responses are disabled.");
}
}

private void copyRestHeaders(RestRequest request, ThreadContext threadContext) throws IOException {
for (final RestHeaderDefinition restHeader : headersToCopy) {
final String name = restHeader.getName();
final List<String> headerValues = request.getAllHeaderValues(name);
if (headerValues != null && headerValues.isEmpty() == false) {
final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
throw new IllegalArgumentException("multiple values for single-valued header [" + name + "].");
} else if (name.equals(Task.TRACE_PARENT)) {
String traceparent = distinctHeaderValues.get(0);
if (traceparent.length() >= 55) {
threadContext.putHeader(Task.TRACE_ID, traceparent.substring(3, 35));
}
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
}
}
}
}

Iterator<MethodHandlers> getAllHandlers(@Nullable Map<String, String> requestParamsRef, String rawPath) {
final Supplier<Map<String, String>> paramsSupplier;
if (requestParamsRef == null) {
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ public class Task {
*/
public static final String X_OPAQUE_ID = "X-Opaque-Id";

/**
* The request header which is contained in HTTP request. We parse trace.id from it and store it in thread context.
* TRACE_PARENT once parsed in RestController.tryAllHandler is not preserved
* has to be declared as a header copied over from http request.
*/
public static final String TRACE_PARENT = "traceparent";

/**
* Parsed part of traceparent. It is stored in thread context and emitted in logs.
* Has to be declared as a header copied over for tasks.
*/
public static final String TRACE_ID = "trace.id";

private final long id;

private final String type;
Expand Down
Loading