Skip to content

Commit

Permalink
Introduce a setting controlling the activation of the logs index mo…
Browse files Browse the repository at this point in the history
…de in `logs@settings` (#109025)

Here we introduce a `cluster.logsdb.enabled` setting that controls activation of the new `logs` index mode in `logs@settings`.  The setting default value is `false` and prevents usage of the new index mode by default in `logs@settings`. We also change `hostname` to `host.name` as the default field used for sorting (other than `@timestamp`) and include it in `logs@mappings`.
  • Loading branch information
salvatore-campagna authored Jun 17, 2024
1 parent fa364bf commit 0ebe811
Show file tree
Hide file tree
Showing 12 changed files with 565 additions and 35 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/109025.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 109025
summary: Introduce a setting controlling the activation of the `logs` index mode in logs@settings
area: Logs
type: feature
issues:
- 108762
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class LogsDataStreamIT extends ESSingleNodeTestCase {
"@timestamp" : {
"type": "date"
},
"hostname": {
"host.name": {
"type": "keyword"
},
"pid": {
Expand All @@ -86,7 +86,7 @@ public class LogsDataStreamIT extends ESSingleNodeTestCase {
"@timestamp" : {
"type": "date"
},
"hostname": {
"host.name": {
"type": "keyword",
"time_series_dimension": "true"
},
Expand All @@ -110,7 +110,7 @@ public class LogsDataStreamIT extends ESSingleNodeTestCase {
private static final String LOG_DOC_TEMPLATE = """
{
"@timestamp": "%s",
"hostname": "%s",
"host.name": "%s",
"pid": "%d",
"method": "%s",
"message": "%s",
Expand All @@ -121,7 +121,7 @@ public class LogsDataStreamIT extends ESSingleNodeTestCase {
private static final String TIME_SERIES_DOC_TEMPLATE = """
{
"@timestamp": "%s",
"hostname": "%s",
"host.name": "%s",
"pid": "%d",
"method": "%s",
"ip_address": "%s",
Expand Down Expand Up @@ -207,7 +207,7 @@ public void testIndexModeLogsAndTimeSeriesSwitching() throws IOException, Execut
final String dataStreamName = generateDataStreamName("custom");
final List<String> indexPatterns = List.of("custom-*-*");
final Map<String, String> logsSettings = Map.of("index.mode", "logs");
final Map<String, String> timeSeriesSettings = Map.of("index.mode", "time_series", "index.routing_path", "hostname");
final Map<String, String> timeSeriesSettings = Map.of("index.mode", "time_series", "index.routing_path", "host.name");

putComposableIndexTemplate(client(), "custom-composable-template", LOGS_OR_STANDARD_MAPPING, logsSettings, indexPatterns);
createDataStream(client(), dataStreamName);
Expand All @@ -224,7 +224,7 @@ public void testIndexModeLogsAndTimeSeriesSwitching() throws IOException, Execut
assertDataStreamBackingIndicesModes(dataStreamName, List.of(IndexMode.LOGS, IndexMode.TIME_SERIES, IndexMode.LOGS));
}

public void testInvalidIndexModeTimeSeriesSwitchWithoutROutingPath() throws IOException, ExecutionException, InterruptedException {
public void testInvalidIndexModeTimeSeriesSwitchWithoutRoutingPath() throws IOException, ExecutionException, InterruptedException {
final String dataStreamName = generateDataStreamName("custom");
final List<String> indexPatterns = List.of("custom-*-*");
final Map<String, String> logsSettings = Map.of("index.mode", "logs");
Expand All @@ -250,7 +250,7 @@ public void testInvalidIndexModeTimeSeriesSwitchWithoutDimensions() throws IOExc
final String dataStreamName = generateDataStreamName("custom");
final List<String> indexPatterns = List.of("custom-*-*");
final Map<String, String> logsSettings = Map.of("index.mode", "logs");
final Map<String, String> timeSeriesSettings = Map.of("index.mode", "time_series", "index.routing_path", "hostname");
final Map<String, String> timeSeriesSettings = Map.of("index.mode", "time_series", "index.routing_path", "host.name");

putComposableIndexTemplate(client(), "custom-composable-template", LOGS_OR_STANDARD_MAPPING, logsSettings, indexPatterns);
createDataStream(client(), dataStreamName);
Expand All @@ -269,8 +269,9 @@ public void testInvalidIndexModeTimeSeriesSwitchWithoutDimensions() throws IOExc
assertThat(
exception.getCause().getCause().getMessage(),
Matchers.equalTo(
"All fields that match routing_path must be configured with [time_series_dimension: true] or flattened fields with "
+ "a list of dimensions in [time_series_dimensions] and without the [script] parameter. [hostname] was not a dimension."
"All fields that match routing_path must be configured with [time_series_dimension: true] or flattened fields "
+ "with a list of dimensions in [time_series_dimensions] and without the [script] parameter. [host.name] was not a "
+ "dimension."
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static void waitForLogs(RestClient client) throws Exception {
"@timestamp" : {
"type": "date"
},
"hostname": {
"host.name": {
"type": "keyword"
},
"pid": {
Expand Down Expand Up @@ -116,7 +116,7 @@ private static void waitForLogs(RestClient client) throws Exception {
"@timestamp" : {
"type": "date"
},
"hostname": {
"host.name": {
"type": "keyword",
"time_series_dimension": "true"
},
Expand All @@ -138,7 +138,7 @@ private static void waitForLogs(RestClient client) throws Exception {
private static final String DOC_TEMPLATE = """
{
"@timestamp": "%s",
"hostname": "%s",
"host.name": "%s",
"pid": "%d",
"method": "%s",
"message": "%s",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams.logsdb;

import org.elasticsearch.client.RestClient;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.ClassRule;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class LogsIndexModeDisabledRestTestIT extends LogsIndexModeRestTestIT {

@ClassRule()
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.module("constant-keyword")
.module("data-streams")
.module("mapper-extras")
.module("x-pack-aggregate-metric")
.module("x-pack-stack")
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Before
public void setup() throws Exception {
client = client();
waitForLogs(client);
}

private RestClient client;

public void testLogsSettingsIndexModeDisabled() throws IOException {
assertOK(createDataStream(client, "logs-custom-dev"));
final String indexMode = (String) getSetting(client, getDataStreamBackingIndex(client, "logs-custom-dev", 0), "index.mode");
assertThat(indexMode, Matchers.not(equalTo(IndexMode.LOGS.getName())));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams.logsdb;

import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.ClassRule;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class LogsIndexModeEnabledRestTestIT extends LogsIndexModeRestTestIT {

@ClassRule()
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.module("constant-keyword")
.module("data-streams")
.module("mapper-extras")
.module("x-pack-aggregate-metric")
.module("x-pack-stack")
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.setting("cluster.logsdb.enabled", "true")
.build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Before
public void setup() throws Exception {
client = client();
waitForLogs(client);
}

private RestClient client;

private static final String MAPPINGS = """
{
"template": {
"mappings": {
"properties": {
"method": {
"type": "keyword"
},
"message": {
"type": "text"
}
}
}
}
}""";

private static final String ALTERNATE_HOST_MAPPING = """
{
"template": {
"mappings": {
"properties": {
"method": {
"type": "keyword"
},
"message": {
"type": "text"
},
"host.cloud_region": {
"type": "keyword"
},
"host.availability_zone": {
"type": "keyword"
}
}
}
}
}""";

private static final String HOST_MAPPING_AS_OBJECT_DEFAULT_SUBOBJECTS = """
{
"template": {
"mappings": {
"properties": {
"method": {
"type": "keyword"
},
"message": {
"type": "text"
},
"host": {
"type": "object",
"properties": {
"cloud_region": {
"type": "keyword"
},
"availability_zone": {
"type": "keyword"
},
"name": {
"type": "keyword"
}
}
}
}
}
}
}""";

private static final String HOST_MAPPING_AS_OBJECT_NON_DEFAULT_SUBOBJECTS = """
{
"template": {
"mappings": {
"dynamic": "strict",
"properties": {
"method": {
"type": "keyword"
},
"message": {
"type": "text"
},
"host": {
"type": "object",
"subobjects": false,
"properties": {
"cloud_region": {
"type": "keyword"
},
"availability_zone": {
"type": "keyword"
},
"name": {
"type": "keyword"
}
}
}
}
}
}
}""";

private static String BULK_INDEX_REQUEST = """
{ "create": {}}
{ "@timestamp": "2023-01-01T05:11:00Z", "host.name": "foo", "method" : "PUT", "message": "foo put message" }
{ "create": {}}
{ "@timestamp": "2023-01-01T05:12:00Z", "host.name": "bar", "method" : "POST", "message": "bar post message" }
{ "create": {}}
{ "@timestamp": "2023-01-01T05:12:00Z", "host.name": "baz", "method" : "PUT", "message": "baz put message" }
{ "create": {}}
{ "@timestamp": "2023-01-01T05:13:00Z", "host.name": "baz", "method" : "PUT", "message": "baz put message" }
""";

private static String BULK_INDEX_REQUEST_WITH_HOST = """
{ "create": {}}
{ "@timestamp": "2023-01-01T05:11:00Z", "method" : "PUT", "message": "foo put message", \
"host": { "cloud_region" : "us-west", "availability_zone" : "us-west-4a", "name" : "ahdta-876584" } }
{ "create": {}}
{ "@timestamp": "2023-01-01T05:12:00Z", "method" : "POST", "message": "bar post message", \
"host": { "cloud_region" : "us-west", "availability_zone" : "us-west-4b", "name" : "tyrou-447898" } }
{ "create": {}}
{ "@timestamp": "2023-01-01T05:12:00Z", "method" : "PUT", "message": "baz put message", \
"host": { "cloud_region" : "us-west", "availability_zone" : "us-west-4a", "name" : "uuopl-162899" } }
{ "create": {}}
{ "@timestamp": "2023-01-01T05:13:00Z", "method" : "PUT", "message": "baz put message", \
"host": { "cloud_region" : "us-west", "availability_zone" : "us-west-4b", "name" : "fdfgf-881197" } }
""";

public void testCreateDataStream() throws IOException {
assertOK(putComponentTemplate(client, "logs@custom", MAPPINGS));
assertOK(createDataStream(client, "logs-custom-dev"));
final String indexMode = (String) getSetting(client, getDataStreamBackingIndex(client, "logs-custom-dev", 0), "index.mode");
assertThat(indexMode, equalTo(IndexMode.LOGS.getName()));
}

public void testBulkIndexing() throws IOException {
assertOK(putComponentTemplate(client, "logs@custom", MAPPINGS));
assertOK(createDataStream(client, "logs-custom-dev"));
final Response response = bulkIndex(client, "logs-custom-dev", () -> BULK_INDEX_REQUEST);
assertOK(response);
assertThat(entityAsMap(response).get("errors"), Matchers.equalTo(false));
}

public void testBulkIndexingWithFlatHostProperties() throws IOException {
assertOK(putComponentTemplate(client, "logs@custom", ALTERNATE_HOST_MAPPING));
assertOK(createDataStream(client, "logs-custom-dev"));
final Response response = bulkIndex(client, "logs-custom-dev", () -> BULK_INDEX_REQUEST_WITH_HOST);
assertOK(response);
assertThat(entityAsMap(response).get("errors"), Matchers.equalTo(false));
}

public void testBulkIndexingWithObjectHostDefaultSubobjectsProperties() throws IOException {
assertOK(putComponentTemplate(client, "logs@custom", HOST_MAPPING_AS_OBJECT_DEFAULT_SUBOBJECTS));
assertOK(createDataStream(client, "logs-custom-dev"));
final Response response = bulkIndex(client, "logs-custom-dev", () -> BULK_INDEX_REQUEST_WITH_HOST);
assertOK(response);
assertThat(entityAsMap(response).get("errors"), Matchers.equalTo(false));
}

public void testBulkIndexingWithObjectHostSubobjectsFalseProperties() throws IOException {
assertOK(putComponentTemplate(client, "logs@custom", HOST_MAPPING_AS_OBJECT_NON_DEFAULT_SUBOBJECTS));
assertOK(createDataStream(client, "logs-custom-dev"));
final Response response = bulkIndex(client, "logs-custom-dev", () -> BULK_INDEX_REQUEST_WITH_HOST);
assertOK(response);
assertThat(entityAsMap(response).get("errors"), Matchers.equalTo(false));
}

public void testRolloverDataStream() throws IOException {
assertOK(putComponentTemplate(client, "logs@custom", MAPPINGS));
assertOK(createDataStream(client, "logs-custom-dev"));
final String firstBackingIndex = getDataStreamBackingIndex(client, "logs-custom-dev", 0);
assertOK(rolloverDataStream(client, "logs-custom-dev"));
final String secondBackingIndex = getDataStreamBackingIndex(client, "logs-custom-dev", 1);
assertThat(firstBackingIndex, Matchers.not(equalTo(secondBackingIndex)));
assertThat(getDataStreamBackingIndices(client, "logs-custom-dev").size(), equalTo(2));
}
}
Loading

0 comments on commit 0ebe811

Please sign in to comment.