Skip to content

Commit

Permalink
x-pack/plugin/otel: introduce x-pack-otel plugin (elastic#111091)
Browse files Browse the repository at this point in the history
* Add YamlTemplateRegistry and OtelIndexTemplateRegistry with resource YAML files

* Fix traces-otel template

* Adding first yml tests

* Base APMIndexTemplateRegistry on YamlTemplateRegistry

* Update OTelPlugin.java

* Update APMIndexTemplateRegistry.java

* Update YamlIngestPipelineConfig.java

* Adding traces tests

* Update x-pack/plugin/otel-data/src/main/resources/component-templates/ecs-tsdb@mappings.yaml

Co-authored-by: Felix Barnsteiner <felixbarny@users.noreply.github.com>

* Add mapper-version

* Fix code-style

* Rename `status.status_code` to `status.code`

* Update otel@mappings.yaml

Revert back to date due to missing support in ES|QL for date_nanos

* Move dynamic_templates to metrics@mappings in core

* Run gradlew :x-pack:plugin:core:spotlessApply

* Update x-pack/plugin/otel-data/src/main/resources/component-templates/metrics-otel@mappings.yaml

Co-authored-by: Carson Ip <carsonip@users.noreply.github.com>

* Update 20_metic_tests.yml

Workaround for TSDB timestamp issue: we push a custom template with higher priority and set  time_series.start_time.

* Update CODEOWNERS

Adding obs-ds-intake-services as owner of the new otel-data plugin.

Since we had some changes, also updating the owner of apm-data to the same team.

* Change dynamic: strict to false

* Skip "Reject invalid top level field" test

* Update 20_metic_tests.yml

* Add boolean as dimension test (skipping it for now)

* Add booleans_to_keywords and enable corresponding test

* Remove processor.event top level mapping

Reason: for metrics and logs we can rely on the name of the datastream. For spans vs. transactions there are other fields we can use.

* Remove booleans_to_keywords

Because booleans are supported now as dimension on TSDB

* Add alias service.language.name -> telemetry.sdk.language

* cleanup

* Update README.md

* Update README.md

* Update docs/changelog/111091.yaml

* Move traces@settings and traces@mappings to core

* Update traces-otel@mappings.yaml

* Review feedback

* Adapt `match` style in tests

* Update docs/changelog/111091.yaml

* Apply suggestions from code review

Co-authored-by: Vishal Raj <vishal.raj@elastic.co>

* Update x-pack/plugin/otel-data/src/main/resources/component-templates/traces-otel@mappings.yaml

Co-authored-by: Carson Ip <carsonip@users.noreply.github.com>

* Changing trace_flags to long

Related discussion: elastic#111091 (comment)

* Remove trace_flags

see: elastic/opentelemetry-dev#368 (review)

* Apply suggestions from code review

Co-authored-by: Andrew Wilkins <axwalk@gmail.com>

* Review feedback

* Add store_array_source for span links

* Define constant `data_stream.type` in `template.yaml`s

* Create package-info.java

* Move ecs-tsdb@mappings to index template

Add test to verify that @Custom template can add dynamic templates with a higher precedence

* Update metrics@mappings.json

Remove summary_gauge and summary_counter since they are covered by summary_metrics

* Move clusterService.getClusterSettings().addSettingsUpdateConsumer to registry

* Fix code-style

* Update x-pack/plugin/otel-data/src/yamlRestTest/resources/rest-api-spec/test/20_logs.tests.yml

Co-authored-by: Felix Barnsteiner <felixbarny@users.noreply.github.com>

* Enable logsdb

* Update traces@settings.json

No lifecycle needed for OTel at this point

---------

Co-authored-by: Felix Barnsteiner <felixbarny@users.noreply.github.com>
Co-authored-by: Carson Ip <carsonip@users.noreply.github.com>
Co-authored-by: Vishal Raj <vishal.raj@elastic.co>
Co-authored-by: Andrew Wilkins <axwalk@gmail.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Co-authored-by: Felix Barnsteiner <felix.barnsteiner@elastic.co>
  • Loading branch information
7 people authored and davidkyle committed Sep 5, 2024
1 parent bbfc805 commit bf1e962
Show file tree
Hide file tree
Showing 39 changed files with 1,480 additions and 219 deletions.
8 changes: 6 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ libs/logstash-bridge @elastic/logstash
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java @elastic/kibana-security

# APM Data index templates, etc.
x-pack/plugin/apm-data/src/main/resources @elastic/apm-server
x-pack/plugin/apm-data/src/yamlRestTest/resources @elastic/apm-server
x-pack/plugin/apm-data/src/main/resources @elastic/obs-ds-intake-services
x-pack/plugin/apm-data/src/yamlRestTest/resources @elastic/obs-ds-intake-services

# OTel
x-pack/plugin/otel-data/src/main/resources @elastic/obs-ds-intake-services
x-pack/plugin/otel-data/src/yamlRestTest/resources @elastic/obs-ds-intake-services

# Delivery
gradle @elastic/es-delivery
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/111091.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111091
summary: "X-pack/plugin/otel: introduce x-pack-otel plugin"
area: Data streams
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,24 @@

package org.elasticsearch.xpack.apmdata;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.yaml.YamlXContent;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.template.IndexTemplateRegistry;
import org.elasticsearch.xpack.core.template.IngestPipelineConfig;
import org.elasticsearch.xpack.core.template.YamlTemplateRegistry;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.apmdata.ResourceUtils.APM_TEMPLATE_VERSION_VARIABLE;
import static org.elasticsearch.xpack.apmdata.ResourceUtils.loadResource;
import static org.elasticsearch.xpack.apmdata.ResourceUtils.loadVersionedResourceUTF8;
import static org.elasticsearch.xpack.apmdata.APMPlugin.APM_DATA_REGISTRY_ENABLED;

/**
* Creates all index templates and ingest pipelines that are required for using Elastic APM.
*/
public class APMIndexTemplateRegistry extends IndexTemplateRegistry {
private static final Logger logger = LogManager.getLogger(APMIndexTemplateRegistry.class);
// this node feature is a redefinition of {@link DataStreamFeatures#DATA_STREAM_LIFECYCLE} and it's meant to avoid adding a
// dependency to the data-streams module just for this
public static final NodeFeature DATA_STREAM_LIFECYCLE = new NodeFeature("data_stream.lifecycle");
private final int version;
public class APMIndexTemplateRegistry extends YamlTemplateRegistry {

private final Map<String, ComponentTemplate> componentTemplates;
private final Map<String, ComposableIndexTemplate> composableIndexTemplates;
private final List<IngestPipelineConfig> ingestPipelines;
private final FeatureService featureService;
private volatile boolean enabled;
public static final String APM_TEMPLATE_VERSION_VARIABLE = "xpack.apmdata.template.version";

@SuppressWarnings("unchecked")
public APMIndexTemplateRegistry(
Settings nodeSettings,
ClusterService clusterService,
Expand All @@ -62,133 +33,29 @@ public APMIndexTemplateRegistry(
NamedXContentRegistry xContentRegistry,
FeatureService featureService
) {
super(nodeSettings, clusterService, threadPool, client, xContentRegistry);

try {
final Map<String, Object> apmResources = XContentHelper.convertToMap(
YamlXContent.yamlXContent,
loadResource("/resources.yaml"),
false
);
version = (((Number) apmResources.get("version")).intValue());
final List<Object> componentTemplateNames = (List<Object>) apmResources.get("component-templates");
final List<Object> indexTemplateNames = (List<Object>) apmResources.get("index-templates");
final List<Object> ingestPipelineConfigs = (List<Object>) apmResources.get("ingest-pipelines");

componentTemplates = componentTemplateNames.stream()
.map(o -> (String) o)
.collect(Collectors.toMap(name -> name, name -> loadComponentTemplate(name, version)));
composableIndexTemplates = indexTemplateNames.stream()
.map(o -> (String) o)
.collect(Collectors.toMap(name -> name, name -> loadIndexTemplate(name, version)));
ingestPipelines = ingestPipelineConfigs.stream().map(o -> (Map<String, Map<String, Object>>) o).map(map -> {
Map.Entry<String, Map<String, Object>> pipelineConfig = map.entrySet().iterator().next();
return loadIngestPipeline(pipelineConfig.getKey(), version, (List<String>) pipelineConfig.getValue().get("dependencies"));
}).collect(Collectors.toList());
this.featureService = featureService;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public int getVersion() {
return version;
}

void setEnabled(boolean enabled) {
logger.info("APM index template registry is {}", enabled ? "enabled" : "disabled");
this.enabled = enabled;
}

public boolean isEnabled() {
return enabled;
}

public void close() {
clusterService.removeListener(this);
}

@Override
protected String getOrigin() {
return ClientHelper.APM_ORIGIN;
}

@Override
protected boolean isClusterReady(ClusterChangedEvent event) {
// Ensure current version of the components are installed only after versions that support data stream lifecycle
// due to the use of the feature in all the `@lifecycle` component templates
return featureService.clusterHasFeature(event.state(), DATA_STREAM_LIFECYCLE);
super(nodeSettings, clusterService, threadPool, client, xContentRegistry, featureService);
}

@Override
protected boolean requiresMasterNode() {
return true;
public String getName() {
return "apm";
}

@Override
protected Map<String, ComponentTemplate> getComponentTemplateConfigs() {
if (enabled) {
return componentTemplates;
} else {
return Map.of();
public void initialize() {
super.initialize();
if (isEnabled()) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(APM_DATA_REGISTRY_ENABLED, this::setEnabled);
}
}

@Override
protected Map<String, ComposableIndexTemplate> getComposableTemplateConfigs() {
if (enabled) {
return composableIndexTemplates;
} else {
return Map.of();
}
}

@Override
protected List<IngestPipelineConfig> getIngestPipelines() {
if (enabled) {
return ingestPipelines;
} else {
return Collections.emptyList();
}
}

private static ComponentTemplate loadComponentTemplate(String name, int version) {
try {
final byte[] content = loadVersionedResourceUTF8("/component-templates/" + name + ".yaml", version);
try (var parser = YamlXContent.yamlXContent.createParser(XContentParserConfiguration.EMPTY, content)) {
return ComponentTemplate.parse(parser);
}
} catch (Exception e) {
throw new RuntimeException("failed to load APM Ingest plugin's component template: " + name, e);
}
}

private static ComposableIndexTemplate loadIndexTemplate(String name, int version) {
try {
final byte[] content = loadVersionedResourceUTF8("/index-templates/" + name + ".yaml", version);
try (var parser = YamlXContent.yamlXContent.createParser(XContentParserConfiguration.EMPTY, content)) {
return ComposableIndexTemplate.parse(parser);
}
} catch (Exception e) {
throw new RuntimeException("failed to load APM Ingest plugin's index template: " + name, e);
}
}

private static IngestPipelineConfig loadIngestPipeline(String name, int version, @Nullable List<String> dependencies) {
if (dependencies == null) {
dependencies = Collections.emptyList();
}
return new YamlIngestPipelineConfig(
name,
"/ingest-pipelines/" + name + ".yaml",
version,
APM_TEMPLATE_VERSION_VARIABLE,
dependencies
);
protected String getVersionProperty() {
return APM_TEMPLATE_VERSION_VARIABLE;
}

@Override
protected boolean applyRolloverAfterTemplateV2Upgrade() {
return true;
protected String getOrigin() {
return ClientHelper.APM_ORIGIN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public Collection<?> createComponents(PluginServices services) {
if (enabled) {
APMIndexTemplateRegistry registryInstance = registry.get();
registryInstance.setEnabled(APM_DATA_REGISTRY_ENABLED.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(APM_DATA_REGISTRY_ENABLED, registryInstance::setEnabled);
registryInstance.initialize();
}
return Collections.emptyList();
Expand Down

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion x-pack/plugin/apm-data/src/main/resources/resources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ component-templates:
- metrics-apm.service_summary@mappings
- metrics-apm.service_transaction@mappings
- metrics-apm.transaction@mappings
- traces@mappings
- traces-apm@mappings
- traces-apm.rum@mappings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
public class APMIndexTemplateRegistryTests extends ESTestCase {
private APMIndexTemplateRegistry apmIndexTemplateRegistry;
private StackTemplateRegistryAccessor stackTemplateRegistryAccessor;
private ClusterService clusterService;
private ThreadPool threadPool;
private VerifyingClient client;

Expand All @@ -89,7 +88,7 @@ public void createRegistryAndClient() {

threadPool = new TestThreadPool(this.getClass().getName());
client = new VerifyingClient(threadPool);
clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings);
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings);
FeatureService featureService = new FeatureService(List.of(new DataStreamFeatures()));
stackTemplateRegistryAccessor = new StackTemplateRegistryAccessor(
new StackTemplateRegistry(Settings.EMPTY, clusterService, threadPool, client, NamedXContentRegistry.EMPTY, featureService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ private static String maybeRewriteSingleAuthenticationHeaderForVersion(
public static final String CONNECTORS_ORIGIN = "connectors";
public static final String INFERENCE_ORIGIN = "inference";
public static final String APM_ORIGIN = "apm";
public static final String OTEL_ORIGIN = "otel";

private ClientHelper() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ public Iterator<Setting<?>> settings() {
/** Setting for enabling or disabling APM Data. Defaults to true. */
public static final Setting<Boolean> APM_DATA_ENABLED = Setting.boolSetting("xpack.apm_data.enabled", true, Setting.Property.NodeScope);

/** Setting for enabling or disabling OTel Data. Defaults to true. */
public static final Setting<Boolean> OTEL_DATA_ENABLED = Setting.boolSetting(
"xpack.otel_data.enabled",
true,
Setting.Property.NodeScope
);

/** Setting for enabling or disabling enterprise search. Defaults to true. */
public static final Setting<Boolean> ENTERPRISE_SEARCH_ENABLED = Setting.boolSetting(
"xpack.ent_search.enabled",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.template;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class ResourceUtils {
static byte[] loadVersionedResourceUTF8(Class<?> clazz, String name, int version, String versionProperty) {
return loadVersionedResourceUTF8(clazz, name, version, versionProperty, Map.of());
}

static byte[] loadVersionedResourceUTF8(
Class<?> clazz,
String name,
int version,
String versionProperty,
Map<String, String> variables
) {
try {
String content = loadResource(clazz, name);
content = TemplateUtils.replaceVariables(content, String.valueOf(version), versionProperty, variables);
return content.getBytes(StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static String loadResource(Class<?> clazz, String name) throws IOException {
InputStream is = clazz.getResourceAsStream(name);
if (is == null) {
throw new IOException("Resource [" + name + "] not found in classpath.");
}
return new String(is.readAllBytes(), java.nio.charset.StandardCharsets.UTF_8);
}

}
Loading

0 comments on commit bf1e962

Please sign in to comment.