Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Route documents to the correct shards in tsdb #77731

Merged
merged 43 commits into from
Oct 15, 2021

Conversation

nik9000
Copy link
Member

@nik9000 nik9000 commented Sep 14, 2021

This causes elasticsearch to land documents from the same time series on
the same shard. It does so by adding a new index setting routing_path
which must be set when an index is in mode: time_series and may not be
set outside of that mode. That setting contains a list of patterns to
extract from the _source document that are hashed into the routing
value.

Note: This doesn't guarantee that the routing_path only matches dimensions.
For now it is possible to configure the routing_path so it matches non-dimension
fields. This would the same time series to end up in multiple shards. We don't want
that and plan to add that constraint in a follow up change.

@nik9000
Copy link
Member Author

nik9000 commented Sep 14, 2021

In addition to routing documents to the correct shard this adds tsdb's restriction turning off delete and update and indexing with a specific routing or id.

This causes elasticsearch to land documents from the same time series on
the same shard. It does so by adding a new index setting `routing_path`
which must be set when an index is in `mode: time_series` and may not be
set outside of that mode. That setting contains a list of patterns to
extract from the `_source` document that are hashed into the routing
value.
@nik9000 nik9000 added :StorageEngine/TSDB You know, for Metrics v7.16.0 labels Sep 14, 2021
Copy link
Contributor

@imotov imotov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me from TSDB perspective. It doesn't have everything we need, but it is a good start.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nik, I left a few comments. I did not find a restriction on GET in the design doc, hence the question on that, we may need to sync up on that.

@nik9000 nik9000 marked this pull request as ready for review September 16, 2021 18:40
@elasticmachine elasticmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Sep 16, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nik, I left a few comments. The main concern is about forking from the bulk action when the request contains tsdb items.

@@ -462,28 +468,32 @@ protected void doRun() {
Version indexCreated = indexMetadata.getCreationVersion();
indexRequest.resolveRouting(metadata);
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
shardId = indexRouting.indexShard(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder if we can move this to the DocWriteRequest with sub-class implementations instead? Would simplify the code here a bit. Would need to accept the IndexRouting object as a parameter to enable the caching here.

Optional of course.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about doing that when I was out on a walk a while back. I didn't like it at the time. But I'll give it a shot and see how it looks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I'm going to try this in this PR. But I will keep it in mind. I like it and I don't. But I'll remember it for sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok to keep it as is for now, but happy to hear your more elaborate thoughts on the pros and cons of this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over the last month I've had a bit of a change of heart here. I'd love to give it a go, but I'm going to hold it for a follow up. That way if we look at it and say "oh no, that's icky" we can just not merge it.

@nik9000
Copy link
Member Author

nik9000 commented Sep 23, 2021

@henningandersen I pushed a change that adds the forking. I'd love it if you could see if it looks like it is along the right lines. I haven't had a chance to write new tests for it, but the old tests seem happy enough.

@nik9000
Copy link
Member Author

nik9000 commented Sep 23, 2021

old tests seem happy enough.

Not quite! DataStreams tests failed. I'll have a look on Monday.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, though I think it makes sense to get a review from a tsdb team member too.

Comment on lines 626 to 629
threadPool.executor(executorName).submit(new ActionRunnable<>(listener) {
@Override
protected void doRun() throws Exception {
threadPool.executor(executorName).execute(new ActionRunnable<>(listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need just one level of dispatch here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Oh boy. That's a strange way to write that. I'm wondering how I got there....

@@ -394,37 +406,46 @@ private long buildTookInMillis(long startTimeNanos) {
* retries on retryable cluster blocks, resolves item requests,
* constructs shard bulk requests and delegates execution to shard bulk action
* */
private final class BulkOperation extends ActionRunnable<BulkResponse> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main difference is exception handling. I see us handling exceptions in some parts but not all. I worry about some validation or other exception inside the last part of the method. One specific example is the case where we set a parent task on the bulkShardRequest and then do client.executeLocally. I believe executeLocally can fail with a task cancelled exception.

I do think that is correctly handled everywhere though. It is mostly a matter of taste. I am inclined to prefer to let the object handle it's exceptions itself because it is passed a listener in constructor. I find it slightly confusing to have a method/object that accepts a listener but then can also throw exceptions.

That said, it is a minor point/taste matter and I am OK to keep this as is, though I would ask to add a comment to performBulkRequests that it might throw exceptions for the caller to handle and delegate to appropriate listener.

* This is called on the Transport tread so we can check the indexing
* memory pressure *quickly* but we don't want to keep the transport
* thread busy. So as son as we have the indexing pressure in we fork
* to one of the write thread pools.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps mention the two known reasons for this to be expensive too: tsdb parse routing from source and compression for outgoing requests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -462,28 +468,32 @@ protected void doRun() {
Version indexCreated = indexMetadata.getCreationVersion();
indexRequest.resolveRouting(metadata);
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
shardId = indexRouting.indexShard(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok to keep it as is for now, but happy to hear your more elaborate thoughts on the pros and cons of this?


private void dispatchRetry() {
/*
* This is called on the cluster state update and timer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ClusterStateObserver uses generic thread pool for timeout. No need to change the code, but perhaps update the comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it!

                        notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, ThreadPool.Names.GENERIC);

I'll update the comment.

Comment on lines 169 to 173
ShardIterator shardIterator = RoutingTable.shardRoutingTable(clusterState.routingTable().index(request.concreteIndex()), shardId)
.shardsIt();
ShardRouting shard;
while ((shard = shardIterator.nextOrNull()) != null) {
if (shard.primary()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS, this can now be simplified to (also replacing the next 4 lines):

Suggested change
ShardIterator shardIterator = RoutingTable.shardRoutingTable(clusterState.routingTable().index(request.concreteIndex()), shardId)
.shardsIt();
ShardRouting shard;
while ((shard = shardIterator.nextOrNull()) != null) {
if (shard.primary()) {
return RoutingTable.shardRoutingTable(clusterState.routingTable().index(request.concreteIndex()), shardId)
.primaryShardIt();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

I see how it's using a utility method. I'm not used to these parts of the code so I hadn't realized it was there. But got it!

@@ -59,6 +59,7 @@
IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING,
IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING,
IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING,
IndexMetadata.INDEX_ROUTING_PATH,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is now tsdb-only, I wonder if adding it here should be guarded by the feature flag, i.e., this should move to where we also add the tsdb-mode setting? Otherwise, I think setting it is possible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right I should move it under the feature flag.

@@ -329,6 +329,14 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
public static final Setting<Integer> INDEX_FORMAT_SETTING =
Setting.intSetting(INDEX_FORMAT, 0, Setting.Property.IndexScope, Setting.Property.Final);

public static final Setting<List<String>> INDEX_ROUTING_PATH = Setting.listSetting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need a validation here, much like we do for the mode setting? To ensure this can only be set when index-mode is tsdb. Perhaps our settings infra catches this though, if you have a test validating that, then disregard this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have a test for it. The validation is in IndexMode. Instead of "routing_path must be empty in standard mode and have values in time_series mode" it is "standard mode forbids a routing_path and time series mode requires it". Just flipped.

I had a look at moving it here and didn't really like how it turned out. Most of the contents of IndexMode is validation at the moment so this validation feels right at home.

@henningandersen
Copy link
Contributor

One more detail, this touches code where it likely makes sense to get a few extra CI runs for safety, either locally or by provoking them on this PR.

Copy link
Contributor

@imotov imotov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@nik9000
Copy link
Member Author

nik9000 commented Oct 15, 2021

One more detail, this touches code where it likely makes sense to get a few extra CI runs for safety, either locally or by provoking them on this PR.

👍

I'll provoke them on this PR. I have a lovely local machine to run the tests but its not as fast as the half dozen machines CI throws at it.

@nik9000
Copy link
Member Author

nik9000 commented Oct 15, 2021

@elasticmachine test this please

1 similar comment
@nik9000
Copy link
Member Author

nik9000 commented Oct 15, 2021

@elasticmachine test this please

@nik9000
Copy link
Member Author

nik9000 commented Oct 15, 2021

Tests are passing, but I want to run them a few more times out of paranoia.

@nik9000
Copy link
Member Author

nik9000 commented Oct 15, 2021

@elasticmachine test this please

@nik9000 nik9000 added >non-issue auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) labels Oct 15, 2021
@elasticsearchmachine elasticsearchmachine merged commit b6c61f4 into elastic:master Oct 15, 2021
weizijun added a commit to weizijun/elasticsearch that referenced this pull request Oct 18, 2021
* upstream/master: (109 commits)
  Migrate custom role providers to licensed feature (elastic#79127)
  Remove stale AwaitsFix in InternalEngineTests (elastic#79323)
  Fix errors in RefreshListenersTests (elastic#79324)
  Reeable BwC Tests after elastic#79318 (elastic#79320)
  Mute BwC Tests for elastic#79318 (elastic#79319)
  Reenable BwC Tests after elastic#79308 (elastic#79313)
  Disable BwC Tests for elastic#79308 (elastic#79310)
  Adjust BWC for node-level field cap requests (elastic#79301)
  Allow total memory to be overridden (elastic#78750)
  Fix SnapshotBasedIndexRecoveryIT#testRecoveryIsCancelledAfterDeletingTheIndex (elastic#79269)
  Disable BWC tests
  Mute GeoIpDownloaderCliIT.testStartWithNoDatabases (elastic#79299)
  Add alias support to fleet search API (elastic#79285)
  Create a coordinating node level reader for tsdb (elastic#79197)
  Route documents to the correct shards in tsdb (elastic#77731)
  Inject migrate action regardless of allocate action (elastic#79090)
  Migrate to data tiers should always ensure a TIER_PREFERENCE is set (elastic#79100)
  Skip building of BWC distributions when building release artifacts (elastic#79180)
  Default ENFORCE_DEFAULT_TIER_PREFERENCE to true (elastic#79275)
  Deprecation of transient cluster settings (elastic#78794)
  ...

# Conflicts:
#	server/src/main/java/org/elasticsearch/index/IndexMode.java
#	server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java
weizijun added a commit to weizijun/elasticsearch that referenced this pull request Oct 18, 2021
* upstream/master: (521 commits)
  Migrate custom role providers to licensed feature (elastic#79127)
  Remove stale AwaitsFix in InternalEngineTests (elastic#79323)
  Fix errors in RefreshListenersTests (elastic#79324)
  Reeable BwC Tests after elastic#79318 (elastic#79320)
  Mute BwC Tests for elastic#79318 (elastic#79319)
  Reenable BwC Tests after elastic#79308 (elastic#79313)
  Disable BwC Tests for elastic#79308 (elastic#79310)
  Adjust BWC for node-level field cap requests (elastic#79301)
  Allow total memory to be overridden (elastic#78750)
  Fix SnapshotBasedIndexRecoveryIT#testRecoveryIsCancelledAfterDeletingTheIndex (elastic#79269)
  Disable BWC tests
  Mute GeoIpDownloaderCliIT.testStartWithNoDatabases (elastic#79299)
  Add alias support to fleet search API (elastic#79285)
  Create a coordinating node level reader for tsdb (elastic#79197)
  Route documents to the correct shards in tsdb (elastic#77731)
  Inject migrate action regardless of allocate action (elastic#79090)
  Migrate to data tiers should always ensure a TIER_PREFERENCE is set (elastic#79100)
  Skip building of BWC distributions when building release artifacts (elastic#79180)
  Default ENFORCE_DEFAULT_TIER_PREFERENCE to true (elastic#79275)
  Deprecation of transient cluster settings (elastic#78794)
  ...

# Conflicts:
#	rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/10_settings.yml
#	server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java
#	server/src/main/java/org/elasticsearch/common/settings/Setting.java
#	server/src/main/java/org/elasticsearch/index/IndexMode.java
#	server/src/test/java/org/elasticsearch/index/TimeSeriesModeTests.java
@wchaparro wchaparro assigned nik9000 and unassigned nik9000 Dec 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) >non-issue :StorageEngine/TSDB You know, for Metrics Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants