From e1e1974828acd4e5b1110bae57ea3280f77455f4 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 4 Dec 2020 16:11:42 +0000 Subject: [PATCH] Expose timestamp field type on coordinator node (#65873) Today a coordinating node does not have (easy) access to the mappings for the indices for the searches it wishes to coordinate. This means it can't properly interpret a timestamp range filter in a query and must involve a copy of every shard in at least the `can_match` phase. It therefore cannot cope with cases when shards are temporarily not started even if those shards are irrelevant to the search. This commit captures the mapping of the `@timestamp` field for indices which expose a timestamp range in their index metadata. --- .../elasticsearch/indices/IndicesService.java | 22 +++ .../indices/TimestampFieldMapperService.java | 174 ++++++++++++++++++ .../index/engine/FrozenIndexIT.java | 73 ++++++++ 3 files changed, 269 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index a7959e07801ae..d438b2a96959b 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -99,6 +99,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; @@ -232,11 +233,15 @@ public class IndicesService extends AbstractLifecycleComponent private final Set danglingIndicesToWrite = Sets.newConcurrentHashSet(); private final boolean nodeWriteDanglingIndicesInfo; private final ValuesSourceRegistry valuesSourceRegistry; + private final TimestampFieldMapperService timestampFieldMapperService; @Override protected void doStart() { // Start thread that will manage cleaning the field data cache periodically threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME); + + // Start watching for timestamp fields + clusterService.addStateApplier(timestampFieldMapperService); } public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, NamedXContentRegistry xContentRegistry, @@ -328,6 +333,8 @@ protected void closeInternal() { this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings()); clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); + + this.timestampFieldMapperService = new TimestampFieldMapperService(settings, threadPool, this); } private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask"; @@ -338,6 +345,9 @@ public ClusterService clusterService() { @Override protected void doStop() { + clusterService.removeApplier(timestampFieldMapperService); + timestampFieldMapperService.doStop(); + ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS); ExecutorService indicesStopExecutor = @@ -1603,4 +1613,16 @@ public boolean allPendingDanglingIndicesWritten() { return nodeWriteDanglingIndicesInfo == false || (danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0); } + + /** + * @return the field type of the {@code @timestamp} field of the given index, or {@code null} if: + * - the index is not found, + * - the field is not found, or + * - the field is not a timestamp field. + */ + @Nullable + public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) { + return timestampFieldMapperService.getTimestampFieldType(index); + } + } diff --git a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java new file mode 100644 index 0000000000000..bc530db01f98a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java @@ -0,0 +1,174 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.node.Node; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; + +/** + * Tracks the mapping of the {@code @timestamp} field of immutable indices that expose their timestamp range in their index metadata. + * Coordinating nodes do not have (easy) access to mappings for all indices, so we extract the type of this one field from the mapping here. + */ +public class TimestampFieldMapperService extends AbstractLifecycleComponent implements ClusterStateApplier { + + private static final Logger logger = LogManager.getLogger(TimestampFieldMapperService.class); + + private final IndicesService indicesService; + private final ExecutorService executor; // single thread to construct mapper services async as needed + + /** + * The type of the {@code @timestamp} field keyed by index. Futures may be completed with {@code null} to indicate that there is + * no usable {@code @timestamp} field. + */ + private final Map> fieldTypesByIndex = ConcurrentCollections.newConcurrentMap(); + + public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, IndicesService indicesService) { + this.indicesService = indicesService; + + final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); + final String threadName = "TimestampFieldMapperService#updateTask"; + executor = EsExecutors.newScaling(nodeName + "/" + threadName, 0, 1, 0, TimeUnit.MILLISECONDS, + daemonThreadFactory(nodeName, threadName), threadPool.getThreadContext()); + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); + } + + @Override + protected void doClose() { + } + + @Override + public void applyClusterState(ClusterChangedEvent event) { + final Metadata metadata = event.state().metadata(); + + // clear out mappers for indices that no longer exist or whose timestamp range is no longer known + fieldTypesByIndex.keySet().removeIf(index -> hasUsefulTimestampField(metadata.index(index)) == false); + + // capture mappers for indices that do exist + for (ObjectCursor cursor : metadata.indices().values()) { + final IndexMetadata indexMetadata = cursor.value; + final Index index = indexMetadata.getIndex(); + + if (hasUsefulTimestampField(indexMetadata) && fieldTypesByIndex.containsKey(index) == false) { + logger.trace("computing timestamp mapping for {}", index); + final PlainActionFuture future = new PlainActionFuture<>(); + fieldTypesByIndex.put(index, future); + + final IndexService indexService = indicesService.indexService(index); + if (indexService == null) { + logger.trace("computing timestamp mapping for {} async", index); + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.debug(new ParameterizedMessage("failed to compute mapping for {}", index), e); + future.onResponse(null); // no need to propagate a failure to create the mapper service to searches + } + + @Override + protected void doRun() throws Exception { + try (MapperService mapperService = indicesService.createIndexMapperService(indexMetadata)) { + mapperService.merge(indexMetadata, MapperService.MergeReason.MAPPING_RECOVERY); + future.onResponse(fromMapperService(mapperService)); + } + } + }); + } else { + logger.trace("computing timestamp mapping for {} using existing index service", index); + try { + future.onResponse(fromMapperService(indexService.mapperService())); + } catch (Exception e) { + assert false : e; + future.onResponse(null); + } + } + } + } + } + + private static boolean hasUsefulTimestampField(IndexMetadata indexMetadata) { + if (indexMetadata == null) { + return false; + } + final IndexLongFieldRange timestampMillisRange = indexMetadata.getTimestampMillisRange(); + return timestampMillisRange.isComplete() && timestampMillisRange != IndexLongFieldRange.UNKNOWN; + } + + private static DateFieldMapper.DateFieldType fromMapperService(MapperService mapperService) { + final MappedFieldType mappedFieldType = mapperService.fieldType(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + if (mappedFieldType instanceof DateFieldMapper.DateFieldType) { + return (DateFieldMapper.DateFieldType) mappedFieldType; + } else { + return null; + } + } + + /** + * @return the field type of the {@code @timestamp} field of the given index, or {@code null} if: + * - the index is not found, + * - the field is not found, + * - the mapping is not known yet, or + * - the field is not a timestamp field. + */ + @Nullable + public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) { + final PlainActionFuture future = fieldTypesByIndex.get(index); + if (future == null || future.isDone() == false) { + return null; + } + return future.actionGet(); + } + +} diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java index 1e3192f64b747..7b7313fbc282c 100644 --- a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java @@ -8,12 +8,16 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; import org.elasticsearch.rest.RestStatus; @@ -29,6 +33,7 @@ import java.util.List; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -97,4 +102,72 @@ public void testTimestampRangeRecalculatedOnStalePrimaryAllocation() throws IOEx assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis())); } + public void testTimestampFieldTypeExposedByAllIndicesServices() throws Exception { + internalCluster().startNodes(between(2, 4)); + + final String locale; + final String date; + + switch (between(1, 3)) { + case 1: + locale = ""; + date = "04 Feb 2020 12:01:23Z"; + break; + case 2: + locale = "en_GB"; + date = "04 Feb 2020 12:01:23Z"; + break; + case 3: + locale = "fr_FR"; + date = "04 févr. 2020 12:01:23Z"; + break; + default: + throw new AssertionError("impossible"); + } + + assertAcked(prepareCreate("index") + .setSettings(Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + .setMapping(jsonBuilder().startObject().startObject("_doc").startObject("properties") + .startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .field("type", "date") + .field("format", "dd LLL yyyy HH:mm:ssX") + .field("locale", locale) + .endObject() + .endObject().endObject().endObject())); + + final Index index = client().admin().cluster().prepareState().clear().setIndices("index").setMetadata(true) + .get().getState().metadata().index("index").getIndex(); + + ensureGreen("index"); + if (randomBoolean()) { + client().prepareIndex("index").setSource(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, date).get(); + } + + for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) { + assertNull(indicesService.getTimestampFieldType(index)); + } + + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet()); + ensureGreen("index"); + for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) { + final PlainActionFuture timestampFieldTypeFuture = new PlainActionFuture<>(); + assertBusy(() -> { + final DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(index); + assertNotNull(timestampFieldType); + timestampFieldTypeFuture.onResponse(timestampFieldType); + }); + assertTrue(timestampFieldTypeFuture.isDone()); + assertThat(timestampFieldTypeFuture.get().dateTimeFormatter().locale().toString(), equalTo(locale)); + assertThat(timestampFieldTypeFuture.get().dateTimeFormatter().parseMillis(date), equalTo(1580817683000L)); + } + + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index").setFreeze(false)).actionGet()); + ensureGreen("index"); + for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) { + assertNull(indicesService.getTimestampFieldType(index)); + } + } + }