-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a mode: time_series
option for indices
#75638
Conversation
Enabling this setting will put the index into a mode optimized for time series data, grouping documents who's fields are annotated with `dimension`. Because time time series data is usually write-once we also disable your ability to update or delete documents. We have big plans for things we can do with data organized in this way, but for now the primary advantage of enabling `time_series_mode` is that it shrinks the index on disk. We did some local tests with k8s monitoring data in ECS format. Here are some figures for a single sharded index without replicas, force merged to a single segment: uncompressed JSON: 150GB defaults: 60GB best_compression: 39GB time_series_mode: 33GB best_compression and time_series_mode: 22GB ((NOCOMMIT: get the size measurements after running the speed tests)) So the compression you get is similar to best_compression and it can operate "on top of" best compression to further reduce the index size. The price is, like we mentioned above, disabling update and delete of documents. In addition to that, there is a performance cost on ingest and on mere. Loading the data from above took: <load> <force_merge> best_compression: 2.6 hours 0.8 hours best_compression sort on all dimensions: time_series_mode: (measuring) best_compression and time_series_mode: (measuring) In addition to the space savings this creates an unsearchable `_tsid` field which functions as a short hand for aggregating on all the dimension. So you can run this: ``` POST localhost:9200/test/_search?pretty { "size": 0, "aggs": { "tsid": { "terms": { "field": "_tsid" }, "aggs": { "max_n": { "max": { "field": "n" } } } } } } ``` and the `key` field in the json of the terms bucket will contain all of the dimensions. Something like this: ``` "aggregations" : { "tsid" : { "buckets" : [ {"key":{"dim1":"a","dim2":"a"},"doc_count":6,"max_n":{"value":6}}, {"key":{"dim1":"a","dim2":"b"},"doc_count":6,"max_n":{"value":6}}, {"key":{"dim1":"a","dim2":"c"},"doc_count":6,"max_n":{"value":6}}, {"key":{"dim1":"a","dim2":"d"},"doc_count":6,"max_n":{"value":6}}, {"key":{"dim1":"b","dim2":"a"},"doc_count":6,"max_n":{"value":6}}, {"key":{"dim1":"b","dim2":"b"},"doc_count":6,"max_n":{"value":6}}, {"key":{"dim1":"b","dim2":"c"},"doc_count":6,"max_n":{"value":6}}, {"key":{"dim1":"b","dim2":"d"},"doc_count":6,"max_n":{"value":6}}, {"key":{"dim1":"c","dim2":"a"},"doc_count":6,"max_n":{"value":6}}, {"key":{"dim1":"c","dim2":"b"},"doc_count":6,"max_n":{"value":6}} ] } } ```
"alias routing incompatible the destination index [" + abstraction.getName() + "] because it is in time series mode" | ||
); | ||
} | ||
routing(routingFromTimeSeries(abstraction, timeSeriesGeneratorLookup)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to put this into its own field and modify the routing behavior to pick it up. But I've not done that in this PR because I think it's fairly mechanical and change and I believe I'd have to disable a lot of BWC tests for it. I'd like to do that in a follow up. So instead I'm hiding the tsid in the _routing field, base 64 encoding it. Ewwwwww. But it works and we can zap it in a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you already looked into it, but this might require some changes for _split
to keep working with time series indices if we did that. _split
looks at the value of the _routing
field to know how to split shards in a way that still honors the rounting key (see ShardSplittingQuery
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOOH! I hadn't thought of it. You've given me a wonderful thing to experiment with. I imagine I break this now. I'll look!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shard splitting query is indeed busted here. We'll need revive it, but I'd like to wait for a follow up for that.
@@ -558,6 +558,7 @@ public String resolveWriteIndexRouting(@Nullable String routing, String aliasOrI | |||
return routing; | |||
} | |||
|
|||
// TODO this is the same sort of code we have in bulk action already. we should share |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Over the years we've grown a lot of very similar code over in TransportBulkAction. We should unify these. somehow.
@@ -48,7 +49,7 @@ | |||
|
|||
public static final Predicate<String> INDEX_SETTINGS_KEY_PREDICATE = (s) -> s.startsWith(IndexMetadata.INDEX_SETTING_PREFIX); | |||
|
|||
public static final Set<Setting<?>> BUILT_IN_INDEX_SETTINGS = Set.of( | |||
private static final Set<Setting<?>> ALWAYS_ENABLED_BUILT_IN_INDEX_SETTINGS = Set.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sure that the index setting is only registered if the feature flag is enabled or we're on a snapshot. I tried to do this in the way that "looks small" in the diff so it's easier for us to reason about....
@@ -443,34 +447,28 @@ protected String contentType() { | |||
|
|||
@Override | |||
protected void parseCreateField(DocumentParserContext context) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted very much to reuse the parsing code but it was super twisty. I believe the new code doesn't change the behavior but let's us reuse it during tsid extraction.
@@ -58,6 +58,7 @@ boolean hasTimestampData() { | |||
|
|||
@Nullable | |||
public MappedFieldType getFieldType(String fieldName) { | |||
// TODO use time series id generation? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be complex, but we already have the time series id generation code on the coordinating node. So - if you are querying all of the dimensions - we could generate the time series id that must match here and then use the routing it makes to limit the nodes we land on.
* mapping's {@link CompressedXContent} but {@link CompressedXContent#equals(Object)} | ||
* will try to decompress the mapping if the crc matches but the compressed bytes | ||
* don't. That's wasteful for us - probably for everyone. If the crc and compressed | ||
* bytes match that's a match. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's wasteful specifically because if we don't match we need to decompress the bytes anyway. Since that's slow we don't want to decompress a second time. I'm fairly sure it'd be safe to modify the equals
implementation in CompressedXContent
but I don't want to tempt fate in this PR.
|
||
@Override | ||
public int hashCode() { | ||
return mapping.hashCode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompressedXContent#hashCode
is fast so its safe to just delegate to it.
@Override | ||
public Object format(BytesRef value) { | ||
try { | ||
return TimeSeriesIdGenerator.parse(new BytesArray(value).streamInput()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose to make parse
static here so any code can decode a _tsid
without knowing what mapping it came from. We could go the other way - it'd save a few bytes of encoding. But I don't think its worth it.
@@ -135,13 +141,8 @@ public void testInvalidMissing() { | |||
" must be one of [_last, _first]")); | |||
} | |||
|
|||
public void testIndexSorting() { | |||
public void testIndexSortingNoDocValues() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just renamed this method. I didn't change what it does. All of the removed lines are just relocated into buildIndexSort
.
@@ -284,8 +295,59 @@ public void testEmptyDocumentMapper() { | |||
assertNotNull(documentMapper.idFieldMapper()); | |||
assertNotNull(documentMapper.sourceMapper()); | |||
assertNotNull(documentMapper.IndexFieldMapper()); | |||
assertEquals(10, documentMapper.mappers().getMapping().getMetadataMappersMap().size()); | |||
assertEquals(10, documentMapper.mappers().getMatchingFieldNames("*").size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These assertions failed in my PR but were hard to read. So I made them easier to read while fixing them.
@elasticsearchmachine run elasticsearch-ci/part-2 |
Wow, you are so efficient! |
Not so much! We've been talking about this for a while. And I was working
on this for a while too!
…On Thu, Jul 22, 2021, 23:25 weizijun ***@***.***> wrote:
Wow, you are so efficient!
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#75638 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABUXIX5WMJE26EWFCQYO7LTZDOKJANCNFSM5A2M2XQQ>
.
|
Great stuff! I have 2 questions:
|
That's a great question! One who's answer I'll want to stick in the javadocs somewhere. But I'll reply here too. We sort on tsid first because all the things we want to do work better that way. Well, two things I can think of:
@imotov I sorted by |
I like the simplicity of the binary choice for dimension fields, though I had wondered about this too for the case of one high-cardinality dimension and another low-cardinality dimension since I think we'd generally prefer sorting by the low-cardinality dimension first and then by the high-cardinality dimension. Maybe one way to make this work (in a follow-up PR, this one is complex already) would be to allow configuring an index sort on timeseries indices, but only using dimension fields. And then the timeseries mode would add |
I like the simplicity too! Being able to configure index sorting with special rules could do the trick. I think its safe to keep this in mind but experiment more with it later. @weizijun had an similar feeling proposal: allow the users to specify which dimensions are used in routing. If you don't use a dimension in routing then it is just used for index sorting. It'd be useful to force similar documents even closer together. Though I don't know how space savings it'd buy. @weizijun, do you have some experimental results? I'd like to throw this out there: Traditionally Elasticsearch routes documents to a particular index based on the This PR will cause more unbalanced shards. For sure. But not as many as just going crazy with routing would. Because it scatters different time series to different nodes randomly. We do expect for some time series to get more documents than others. But we expect there to be many many time series so we expect that in most cases the more chatty time series will get scattered to the different shards evenly. Now, looping around to the routing proposal: it feels like exactly the kind of thing that'd make it easy to accidentally route the high traffic time series indices to the same shard. That doesn't mean its not a good idea too. But, like giving folks more control over the sort order, its something I want to do later. And carefully. Or not at all. |
That's a good question. Since we will be returning data in the ascending order, it might be easier for us to iterate over the data in the ascending order as well, in this case we don't have to flip it later on. I don't think ordering of
I agree with @jpountz and @nik9000, things are already very complicated as they are, but I also agree with @hendrikmuhs that indeed data have hierarchical structures. I just want to add a caveat that naturally occurring hierarchies do not always match optimizations. Let's take for example
So a natural hierarchy would be job, instance, cpu and then mode. However, a typical query would be most likely interested in average non idle cpu rate I think this idea is definitely worth exploring, but I would like to spend more time to work with this type of data before we start working on this sort of optimization. I just don't think we will get it right at the moment. |
I'll drop that into a comment. I think its a thing we could change our minds on in the future if we need to. |
You can't write any documents into the index, but you can create one. This is useful because you might want to create an index and then use dynamic mappings and templates to fill in the dimensions later.
I've updated the timings in the PR. I don't have a rally track for this (yet) so I'm taking some fairly rough numbers. Indexing my data takes about 2.6 hours with or without timeseries mode. It takes about 10% longer if you sort of all timensions (2.9 hours) instead of generating the time series id and sorting by that. We like generating the timeseries and anyway because we want to aggregate and route on out - but its nice to know we get some time savings by sorting on it. Similarly, we get time savings when you force merge down to a single segment. The force merge with just best_compression takes .8 hours. In time series mode it takes 110% longer (1.7 hours). This seems to be dominated by merging the sorted segments. If you just use best_compression the segments aren't sorted at all so this isn't surprising. The force_merge time when you sort on every segment is 2.2 hours. That's almost as long as it takes to index the data with best_compression! Still, these show areas for improvement - merging sorted segments is slower than we'd like. Finally, I got some new size estimates. Remember, this data uncompressed is 150GB. I keep it compressed with zstd and it clocks in at 18gb. gzipped it is 21gb. When you index it with best_compression a single shard is, like we said above, 39.3gb. That's with the search index, doc values, and the _source compressed with zlib. Doing it in time series mode it is 21.8gb, pretty close to gzip even though we still have the search index and doc values! That's lovely. But we can do better. We will do better. In a follow up. |
Pinging @elastic/es-analytics-geo (Team:Analytics) |
I've converted this back to a draft because I've discovered some "fun" stuff around recoveries. The trick I do with putting the tsid in to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I really like where it is going. Left a few minor comments.
I noticed that there are 11 TODOs in this PR, are you planning to fulfill some of them in this PR or they all are for the follow up PRs?
@@ -163,7 +163,7 @@ public Object objectBytes() throws IOException { | |||
|
|||
@Override | |||
public boolean hasTextCharacters() { | |||
throw new UnsupportedOperationException("use text() instead"); | |||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the most confusing method in XContentParser :) Good catch!
body: | ||
settings: | ||
index: | ||
time_series_mode: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is the first of possibly other indices with specialized operation modes. These modes are most likely to be mutually exclusive. So, would it make more sense to have mode: time_series
here?
this.documentParser = documentParser; | ||
this.type = mapping.getRoot().name(); | ||
this.mappingLookup = MappingLookup.fromMapping(mapping); | ||
this.mappingSource = mapping.toCompressedXContent(); | ||
timeSeriesIdGenerator = inTimeSeriesMode ? mapping.buildTimeSeriesIdGenerator() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a moment, absence of this.
here confused me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Digging deeper, that statement gives me an uneasy feeling for some reason. I feel like if we make it responsibility of the Mapping
to buildTimeSeriesIdGenerator
shouldn't we supply Mapping
with enough information for it to know if building it makes any sense in the first place? Should this logic go into Mapping
? WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll play with moving this. @romseygeek suggested moving it to MappingLookup
I'll try both and see how I feel.
} | ||
|
||
private final Function<Index, LocalIndex> lookupLocalIndex; | ||
private final Function<IndexMetadata, TimeSeriesIdGenerator> buildTimeSeriedIdGenerator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in Seried
} | ||
} | ||
|
||
private abstract static class Value { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is an internal class, but I think Value
still sounds a bit too generic. When I first saw AsyncValue I thought it is some generic class that calculate values asynchronously. In reality Value
here is basically optional (in case of exception) versioned (to match mapping version) TimeSeriesIdGenerator. So, it contains a really specific thing. I think we can increase readability if we replaced Value
with IdGeneratorProvider
or IdConfig
or something else that doesn't sound like a generic mechanism for lazily storing arbitrary values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
return new IpTsidGen(nullValue); | ||
} | ||
private static class IpTsidGen extends TimeSeriesIdGenerator.StringLeaf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line
} | ||
return new KeywordTsidGen(nullValue); | ||
} | ||
private static class KeywordTsidGen extends TimeSeriesIdGenerator.StringLeaf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Collections.sort(dimensionNames); | ||
throw new IllegalArgumentException("Document must contain one of the dimensions " + dimensionNames); | ||
} | ||
Collections.sort(values, Comparator.comparing(Map.Entry::getKey)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a dedicated method for that - Map.Entry.comparingByKey()
root.extract(values, "", parser); | ||
if (values.isEmpty()) { | ||
List<String> dimensionNames = new ArrayList<>(); | ||
root.collectDimensionNames("", dimensionNames::add); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably move this to constructor, you have to run collectDimensionNames anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want keep the string sitting around though. In case there are zillions of these things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did another pass and left a couple of minor comments. Besides disabling specifying dimensions in dynamic mappings that we discussed and possibly dealing with copy_to
into dimension fields, I think it looks good. It would be great if somebody from @elastic/es-distributed could take a look at it as well.
@@ -102,6 +103,11 @@ public boolean required() { | |||
@Override | |||
public void preParse(DocumentParserContext context) { | |||
String routing = context.sourceToParse().routing(); | |||
if (context.indexSettings().mode() == IndexMode.TIME_SERIES) { | |||
// TODO when we stop storing the tsid in the routing fail any request with routing in time series mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been thinking about that. Allowing routing in the time_series mode might actually make sense. Yes, it is allowing users to shoot themselves in the foot. but routing is an advanced feature that already enables screwing things up on "standard" indices, but it also enables advanced users to build solutions they wouldn't be able to build otherwise.
Having only few elements of TSID in the routing key doesn't break anything, but it can speed things up and help in case of multi-tenant solutions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose so. I can revive support for routing once we have a field on IndexRequest
specifically for the tsid. That was something I was hoping to do in a follow up.
} | ||
if (INDEX_SORT_MISSING_SETTING.exists(settings)) { | ||
throw new IllegalArgumentException("Can't set [" + INDEX_SORT_MISSING_SETTING.getKey() + "] in time series mode"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like it can be replaced with something like
for (Setting<?> setting: indexMode.unsupportedSettings()) {
if (setting.exists(settings)) {
throw new IllegalArgumentException("Can't set [" + setting.getKey() + "] in time series mode");
}
}
or moved completely into mode.checkSettings(settings)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I am now sure what's up. Its..... icky. The dynamic mapping runs on the data node and doesn't tell the coordinating node what to do. I think the plan for this PR is accept that its broken and then try and fix it in a follow up. Alternatively, we could disallow dynamic mapping adding dimensions. That might be ok in the short term, but I think folks need some way of dynamically adding dimensions and dynamic mappings are the most sensible way to do it in the framework we have now. So we'd have to get it working sooner or later. |
I've pushed a change to detect when we modify the list of dimensions with a dynamic mapping update and reject the document. That'll have to do in the short term. In the long term we'll have to make it work but that's a bigger thing. Big enough that we talked about taking another approach to this change entirely. But, we believe the the majority of the change required to get dynamic mappings working lines up with the change required to get hard time bounds on indices working - so we're going to do it anyway. Probably. My plan is to scrounge up more reviews for this ASAP and then get this in and start on the dynamic mapping updates. |
For anyone following along from home - it took me a while to get good reviews on this, but we found enough "fun" corner cases involving races with mapping that we're rethinking this. We're unlikely to merge this code as it stands, but fairly likely to use it as an inspiration that will be pretty similar in practice. Just without the races. |
This adds more tests copied from the our original TSDB prototype in PR elastic#75638 that are still applicable time series mode indices. There are a bunch of disabled assertions because we are not yet generating `_tsid` but the non-disabled assertions are useful. And we will soon be generating the `_tsid` so we can re-enable those assertions.
This adds more tests copied from the our original TSDB prototype in PR #75638 that are still applicable time series mode indices. There are a bunch of disabled assertions because we are not yet generating `_tsid` but the non-disabled assertions are useful. And we will soon be generating the `_tsid` so we can re-enable those assertions.
Enabling this setting will put the index into a mode optimized for time
series data, grouping documents who's fields are annotated with
dimension
. Because time time series data is usually write-once we alsodisable your ability to update or delete documents.
We have big plans for things we can do with data organized in this way,
but for now the primary advantage of enabling
time_series_mode
is thatit shrinks the index on disk. We did some local tests with k8s
monitoring data in ECS format. Here are some figures for a single
sharded index without replicas, force merged to a single segment:
So the compression you get is similar to best_compression and it can
operate "on top of" best compression to further reduce the index size.
The price is, like we mentioned above, disabling update and delete of
documents. In addition to that, there is a performance cost on ingest
and on mere. Loading the data from above took:
In addition to the space savings this creates an unsearchable
_tsid
field which functions as a short hand for aggregating on all the
dimension. So you can run this:
and the
key
field in the json of the terms bucket will contain all ofthe dimensions. Something like this: