Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose timestamp field type on coordinator node #65873

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
Expand Down Expand Up @@ -232,11 +233,15 @@ public class IndicesService extends AbstractLifecycleComponent
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
private final boolean nodeWriteDanglingIndicesInfo;
private final ValuesSourceRegistry valuesSourceRegistry;
private final TimestampFieldMapperService timestampFieldMapperService;

@Override
protected void doStart() {
// Start thread that will manage cleaning the field data cache periodically
threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME);

// Start watching for timestamp fields
clusterService.addStateApplier(timestampFieldMapperService);
}

public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, NamedXContentRegistry xContentRegistry,
Expand Down Expand Up @@ -328,6 +333,8 @@ protected void closeInternal() {

this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);

this.timestampFieldMapperService = new TimestampFieldMapperService(settings, threadPool, this);
}

private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
Expand All @@ -338,6 +345,9 @@ public ClusterService clusterService() {

@Override
protected void doStop() {
clusterService.removeApplier(timestampFieldMapperService);
timestampFieldMapperService.doStop();

ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS);

ExecutorService indicesStopExecutor =
Expand Down Expand Up @@ -1603,4 +1613,16 @@ public boolean allPendingDanglingIndicesWritten() {
return nodeWriteDanglingIndicesInfo == false ||
(danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0);
}

/**
* @return the field type of the {@code @timestamp} field of the given index, or {@code null} if:
* - the index is not found,
* - the field is not found, or
* - the field is not a timestamp field.
*/
@Nullable
public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
return timestampFieldMapperService.getTimestampFieldType(index);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

/**
* Tracks the mapping of the {@code @timestamp} field of immutable indices that expose their timestamp range in their index metadata.
* Coordinating nodes do not have (easy) access to mappings for all indices, so we extract the type of this one field from the mapping here.
*/
public class TimestampFieldMapperService extends AbstractLifecycleComponent implements ClusterStateApplier {

private static final Logger logger = LogManager.getLogger(TimestampFieldMapperService.class);

private final IndicesService indicesService;
private final ExecutorService executor; // single thread to construct mapper services async as needed

/**
* The type of the {@code @timestamp} field keyed by index. Futures may be completed with {@code null} to indicate that there is
* no usable {@code @timestamp} field.
*/
private final Map<Index, PlainActionFuture<DateFieldMapper.DateFieldType>> fieldTypesByIndex = ConcurrentCollections.newConcurrentMap();

public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
this.indicesService = indicesService;

final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
final String threadName = "TimestampFieldMapperService#updateTask";
executor = EsExecutors.newScaling(nodeName + "/" + threadName, 0, 1, 0, TimeUnit.MILLISECONDS,
daemonThreadFactory(nodeName, threadName), threadPool.getThreadContext());
}

@Override
protected void doStart() {
}

@Override
protected void doStop() {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}

@Override
protected void doClose() {
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
final Metadata metadata = event.state().metadata();

// clear out mappers for indices that no longer exist or whose timestamp range is no longer known
fieldTypesByIndex.keySet().removeIf(index -> hasUsefulTimestampField(metadata.index(index)) == false);

// capture mappers for indices that do exist
for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
final IndexMetadata indexMetadata = cursor.value;
final Index index = indexMetadata.getIndex();

if (hasUsefulTimestampField(indexMetadata) && fieldTypesByIndex.containsKey(index) == false) {
logger.trace("computing timestamp mapping for {}", index);
final PlainActionFuture<DateFieldMapper.DateFieldType> future = new PlainActionFuture<>();
fieldTypesByIndex.put(index, future);

final IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
logger.trace("computing timestamp mapping for {} async", index);
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failed to compute mapping for {}", index), e);
future.onResponse(null); // no need to propagate a failure to create the mapper service to searches
}

@Override
protected void doRun() throws Exception {
try (MapperService mapperService = indicesService.createIndexMapperService(indexMetadata)) {
mapperService.merge(indexMetadata, MapperService.MergeReason.MAPPING_RECOVERY);
future.onResponse(fromMapperService(mapperService));
}
}
});
} else {
logger.trace("computing timestamp mapping for {} using existing index service", index);
try {
future.onResponse(fromMapperService(indexService.mapperService()));
} catch (Exception e) {
assert false : e;
future.onResponse(null);
}
}
}
}
}

private static boolean hasUsefulTimestampField(IndexMetadata indexMetadata) {
if (indexMetadata == null) {
return false;
}
final IndexLongFieldRange timestampMillisRange = indexMetadata.getTimestampMillisRange();
return timestampMillisRange.isComplete() && timestampMillisRange != IndexLongFieldRange.UNKNOWN;
}

private static DateFieldMapper.DateFieldType fromMapperService(MapperService mapperService) {
final MappedFieldType mappedFieldType = mapperService.fieldType(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD);
if (mappedFieldType instanceof DateFieldMapper.DateFieldType) {
return (DateFieldMapper.DateFieldType) mappedFieldType;
} else {
return null;
}
}

/**
* @return the field type of the {@code @timestamp} field of the given index, or {@code null} if:
* - the index is not found,
* - the field is not found,
* - the mapping is not known yet, or
* - the field is not a timestamp field.
*/
@Nullable
public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
final PlainActionFuture<DateFieldMapper.DateFieldType> future = fieldTypesByIndex.get(index);
if (future == null || future.isDone() == false) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There remains a question of whether we should block here or not (and if so, for how long).

On reflection I think we shouldn't block. Returning null sooner will allow the search coordination to proceed normally, ignoring any timestamp filter and deferring any skipping to the individual shards. This means we'll see shard failures if the coordinating node falls behind on extracting these mappings AND some of the shards are unassigned, which is hopefully rare.

As a follow-up we could in theory add another more patient getter to support a workflow that goes:

  1. we call getTimestampFieldType which returns null
  2. some shards are unavailable for the can_match phase
  3. we call getTimestampFieldTypePatiently to see for whether those shard failures can be ignored or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to return null and proceed regularly if the mapping isn't available yet. This should be rare enough to cause too much trouble.
I guess the most problematic scenario is when a node joins and has to parse a lot of mappings, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, although in that case there's no particular reason to expect shards to be unavailable.

return null;
}
return future.actionGet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -29,6 +33,7 @@
import java.util.List;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -97,4 +102,72 @@ public void testTimestampRangeRecalculatedOnStalePrimaryAllocation() throws IOEx
assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis()));
}

public void testTimestampFieldTypeExposedByAllIndicesServices() throws Exception {
internalCluster().startNodes(between(2, 4));

final String locale;
final String date;

switch (between(1, 3)) {
case 1:
locale = "";
date = "04 Feb 2020 12:01:23Z";
break;
case 2:
locale = "en_GB";
date = "04 Feb 2020 12:01:23Z";
break;
case 3:
locale = "fr_FR";
date = "04 févr. 2020 12:01:23Z";
break;
default:
throw new AssertionError("impossible");
}

assertAcked(prepareCreate("index")
.setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
.setMapping(jsonBuilder().startObject().startObject("_doc").startObject("properties")
.startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD)
.field("type", "date")
.field("format", "dd LLL yyyy HH:mm:ssX")
.field("locale", locale)
.endObject()
.endObject().endObject().endObject()));

final Index index = client().admin().cluster().prepareState().clear().setIndices("index").setMetadata(true)
.get().getState().metadata().index("index").getIndex();

ensureGreen("index");
if (randomBoolean()) {
client().prepareIndex("index").setSource(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, date).get();
}

for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
assertNull(indicesService.getTimestampFieldType(index));
}

assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet());
ensureGreen("index");
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
final PlainActionFuture<DateFieldMapper.DateFieldType> timestampFieldTypeFuture = new PlainActionFuture<>();
fcofdez marked this conversation as resolved.
Show resolved Hide resolved
assertBusy(() -> {
final DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(index);
assertNotNull(timestampFieldType);
timestampFieldTypeFuture.onResponse(timestampFieldType);
});
assertTrue(timestampFieldTypeFuture.isDone());
assertThat(timestampFieldTypeFuture.get().dateTimeFormatter().locale().toString(), equalTo(locale));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a an assertion that checks that DateFieldMapper.DateFieldType#parse works with the original timestamp string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok let me try and remember the month names in French to give this assertion some teeth 🇫🇷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 79ef7b0. Remembering the names wasn't the hard bit, it was working out that in French we write month names lower-case, with a trailing ., and sometimes use more than 3 letters.

assertThat(timestampFieldTypeFuture.get().dateTimeFormatter().parseMillis(date), equalTo(1580817683000L));
}

assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index").setFreeze(false)).actionGet());
ensureGreen("index");
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
assertNull(indicesService.getTimestampFieldType(index));
}
}

}