Skip to content

Commit

Permalink
TSDB: Initial reindex fix (#86647)
Browse files Browse the repository at this point in the history
It turns out that there is a fairly simple recipe for reindexing to a
`time_series` index:
1. If you are reindexing from a time series index to a time series index
   and *not* changing the `@timestamp` or dimensions it "just
   works"(TM).
2. If you are reindexing from a standard index with a standard random
   `_id` you should clear it on reindex.
3. If you are reindexing from tsdb index to a tsdb index and modifying a
   dimension or `@timestamp` then you should clear the `_id`.

This is not pleasant to have to remember. But it doesn't crash!

* TSDB: Initial reindex fix

This teaches reindex the smallest thing that it needs to know about
tsdb: `_id` is automatically generated. Armed with that knowledge
reindex now doesn't attempt to copy the `_id` when writing to a tsdb
index.

Important: If the index doesn't yet exist it will *assume* that the
index will be created in `standard` mode. We can detect what mode it
*should* be created with in a follow up change.
  • Loading branch information
nik9000 authored May 11, 2022
1 parent 22aeebc commit 48a601b
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
Expand Down Expand Up @@ -112,6 +117,7 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
assigningBulkClient,
threadPool,
scriptService,
clusterService.state(),
reindexSslConfig,
request,
listener
Expand Down Expand Up @@ -177,6 +183,11 @@ static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslCon
* possible.
*/
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest, TransportReindexAction> {
/**
* Mapper for the {@code _id} of the destination index used to
* normalize {@code _id}s landing in the index.
*/
private final IdFieldMapper destinationIndexIdMapper;

/**
* List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
Expand All @@ -193,6 +204,7 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
ParentTaskAssigningClient bulkClient,
ThreadPool threadPool,
ScriptService scriptService,
ClusterState state,
ReindexSslConfig sslConfig,
ReindexRequest request,
ActionListener<BulkByScrollResponse> listener
Expand All @@ -214,6 +226,9 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
scriptService,
sslConfig
);
IndexMetadata destMeta = state.metadata().index(mainRequest.getDestination().index());
IndexMode destMode = destMeta == null ? IndexMode.STANDARD : IndexSettings.MODE.get(destMeta.getSettings());
this.destinationIndexIdMapper = destMode.buildNoFieldDataIdFieldMapper();
}

@Override
Expand Down Expand Up @@ -285,7 +300,7 @@ protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc)
}

// id and source always come from the found doc. Scripts can change them but they operate on the index request.
index.id(doc.getId());
index.id(destinationIndexIdMapper.reindexId(doc.getId()));

// the source xcontent type and destination could be different
final XContentType sourceXContentType = doc.getXContentType();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.reindex;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.reindex.AbstractAsyncBulkByScrollActionTestCase;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.xcontent.XContentType;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

/**
* Reindex
*/
public class ReindexIdTests extends AbstractAsyncBulkByScrollActionTestCase<ReindexRequest, BulkByScrollResponse> {
public void testEmptyStateCopiesId() throws Exception {
assertThat(action(ClusterState.EMPTY_STATE).buildRequest(doc()).getId(), equalTo(doc().getId()));
}

public void testStandardIndexCopiesId() throws Exception {
Settings.Builder settings = Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.STANDARD);
assertThat(action(stateWithIndex(settings)).buildRequest(doc()).getId(), equalTo(doc().getId()));
}

public void testTsdbIndexClearsId() throws Exception {
Settings.Builder settings = Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
.put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "foo");
assertThat(action(stateWithIndex(settings)).buildRequest(doc()).getId(), nullValue());
}

private ClusterState stateWithIndex(Settings.Builder settings) {
IndexMetadata.Builder meta = IndexMetadata.builder(request().getDestination().index())
.settings(settings.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfReplicas(0)
.numberOfShards(1);
return ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder(Metadata.EMPTY_METADATA).put(meta)).build();
}

private ScrollableHitSource.BasicHit doc() {
return new ScrollableHitSource.BasicHit("index", "id", -1).setSource(new BytesArray("{}"), XContentType.JSON);
}

@Override
protected ReindexRequest request() {
return new ReindexRequest().setDestIndex("dest_index");
}

private Reindexer.AsyncIndexBySearchAction action(ClusterState state) {
return new Reindexer.AsyncIndexBySearchAction(task, logger, null, null, threadPool, null, state, null, request(), listener());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
package org.elasticsearch.reindex;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;

/**
* Index-by-search test for ttl, timestamp, and routing.
* Reindex test for routing.
*/
public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadataTestCase<ReindexRequest, BulkByScrollResponse> {
public void testRoutingCopiedByDefault() throws Exception {
Expand Down Expand Up @@ -74,6 +75,7 @@ private class TestAction extends Reindexer.AsyncIndexBySearchAction {
null,
ReindexMetadataTests.this.threadPool,
null,
ClusterState.EMPTY_STATE,
null,
request(),
listener()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.reindex;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
Expand All @@ -20,7 +21,7 @@
import static org.hamcrest.Matchers.containsString;

/**
* Tests index-by-search with a script modifying the documents.
* Tests reindex with a script modifying the documents.
*/
public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTestCase<ReindexRequest, BulkByScrollResponse> {

Expand Down Expand Up @@ -82,6 +83,17 @@ protected ReindexRequest request() {
@Override
protected Reindexer.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) {
ReindexSslConfig sslConfig = Mockito.mock(ReindexSslConfig.class);
return new Reindexer.AsyncIndexBySearchAction(task, logger, null, null, threadPool, scriptService, sslConfig, request, listener());
return new Reindexer.AsyncIndexBySearchAction(
task,
logger,
null,
null,
threadPool,
scriptService,
ClusterState.EMPTY_STATE,
sslConfig,
request,
listener()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ from tsdb to tsdb:
- match: {hits.hits.1._source.@timestamp: 2021-04-28T18:50:23.142Z}
- match: {hits.hits.2._source.@timestamp: 2021-04-28T18:50:53.142Z}
- match: {hits.hits.3._source.@timestamp: 2021-04-28T18:51:03.142Z}

---
from standard to tsdb:
from standard with tsdb id to tsdb:
- skip:
version: " - 8.1.99"
reason: introduced in 8.2.0
Expand All @@ -193,12 +194,79 @@ from standard to tsdb:
- is_false: deleted

# Now test reindexing from it to tsdb
- do:
reindex:
refresh: true
body:
source:
index: standard
dest:
index: tsdb_new
- match: {created: 4}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted

- do:
indices.refresh: {}

- do:
search:
index: tsdb_new
body:
sort: '@timestamp'
aggs:
tsids:
terms:
field: _tsid
order:
_key: asc

- match: {hits.total.value: 4}
- match: {aggregations.tsids.buckets.0.key: {k8s.pod.uid: 1c4fc7b8-93b7-4ba8-b609-2a48af2f8e39, metricset: pod}}
- match: {aggregations.tsids.buckets.0.doc_count: 4}
- match: {hits.hits.0._source.@timestamp: 2021-04-28T18:50:03.142Z}
- match: {hits.hits.1._source.@timestamp: 2021-04-28T18:50:23.142Z}
- match: {hits.hits.2._source.@timestamp: 2021-04-28T18:50:53.142Z}
- match: {hits.hits.3._source.@timestamp: 2021-04-28T18:51:03.142Z}

---
from standard with random _id to tsdb:
- skip:
version: " - 8.1.99"
reason: introduced in 8.2.0

# Populate the standard index
- do:
reindex:
refresh: true
body:
source:
index: tsdb
dest:
index: standard
- match: {created: 4}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted

# Now test reindexing from it to tsdb
- do:
reindex:
refresh: true
body:
source:
index: standard
dest:
index: tsdb_new
- match: {created: 4}
Expand Down Expand Up @@ -241,7 +309,6 @@ from tsdb to tsdb modifying timestamp:
reason: introduced in 8.2.0

- do:
catch: bad_request # TODO make this work
reindex:
refresh: true
body:
Expand All @@ -251,6 +318,38 @@ from tsdb to tsdb modifying timestamp:
index: tsdb_new
script:
source: ctx._source["@timestamp"] = ctx._source["@timestamp"].replace("-04-", "-05-")
- match: {created: 4}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted

- do:
indices.refresh: {}

- do:
search:
index: tsdb_new
body:
sort: '@timestamp'
aggs:
tsids:
terms:
field: _tsid
order:
_key: asc

- match: {hits.total.value: 4}
- match: {aggregations.tsids.buckets.0.key: {k8s.pod.uid: 1c4fc7b8-93b7-4ba8-b609-2a48af2f8e39, metricset: pod}}
- match: {aggregations.tsids.buckets.0.doc_count: 4}
- match: {hits.hits.0._source.@timestamp: 2021-05-28T18:50:03.142Z}
- match: {hits.hits.1._source.@timestamp: 2021-05-28T18:50:23.142Z}
- match: {hits.hits.2._source.@timestamp: 2021-05-28T18:50:53.142Z}
- match: {hits.hits.3._source.@timestamp: 2021-05-28T18:51:03.142Z}

---
from tsdb to tsdb modifying dimension:
Expand All @@ -259,7 +358,6 @@ from tsdb to tsdb modifying dimension:
reason: introduced in 8.2.0

- do:
catch: bad_request # TODO make this work
reindex:
refresh: true
body:
Expand All @@ -269,3 +367,35 @@ from tsdb to tsdb modifying dimension:
index: tsdb_new
script:
source: ctx._source["metricset"] = "bubbles"
- match: {created: 4}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted

- do:
indices.refresh: {}

- do:
search:
index: tsdb_new
body:
sort: '@timestamp'
aggs:
tsids:
terms:
field: _tsid
order:
_key: asc

- match: {hits.total.value: 4}
- match: {aggregations.tsids.buckets.0.key: {k8s.pod.uid: 1c4fc7b8-93b7-4ba8-b609-2a48af2f8e39, metricset: bubbles}}
- match: {aggregations.tsids.buckets.0.doc_count: 4}
- match: {hits.hits.0._source.@timestamp: 2021-04-28T18:50:03.142Z}
- match: {hits.hits.1._source.@timestamp: 2021-04-28T18:50:23.142Z}
- match: {hits.hits.2._source.@timestamp: 2021-04-28T18:50:53.142Z}
- match: {hits.hits.3._source.@timestamp: 2021-04-28T18:51:03.142Z}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ protected final String contentType() {
*/
public abstract String documentDescription(ParsedDocument parsedDocument);

/**
* Build the {@code _id} to use on requests reindexing into indices using
* this {@code _id}.
*/
public abstract String reindexId(String id);

/**
* Create a {@link Field} to store the provided {@code _id} that "stores"
* the {@code _id} so it can be fetched easily from the index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,9 @@ public String documentDescription(DocumentParserContext context) {
public String documentDescription(ParsedDocument parsedDocument) {
return "[" + parsedDocument.id() + "]";
}

@Override
public String reindexId(String id) {
return id;
}
}
Loading

0 comments on commit 48a601b

Please sign in to comment.