Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

metricdefinition refactor #199

Closed
Dieterbe opened this issue Apr 21, 2016 · 17 comments
Closed

metricdefinition refactor #199

Dieterbe opened this issue Apr 21, 2016 · 17 comments

Comments

@Dieterbe
Copy link
Contributor

Dieterbe commented Apr 21, 2016

below is a list of problems i'm seeing with our metric data across the entire stack.
it covers problem in architectural design, performance, and metrics2.0 compatibility.
i may update this in the future as i think of more. in the meantime, happy to discuss.
I'ld like to start adressing these in the medium term (e.g. after high prio stuff such as kafka)

  • apps sending metric data to the raintank platform (such as WP probes) do so repetitively: most of the traffic is caused by redundant metric metadata
  • similar for the platform pipeline: metricdata is always accompanied by metric medata (metric definition), this causes an amplified load on our queues, network io, and cpu to encode/decode messages
  • the metricdefinitions are currently also not very GC friendly due to their high usage of strings. (one optimisations we can do is intern the strings, meaning have 1 long string with all data, and string indexes to know how to "select" each property from the big string). some properties can also be represented more efficiently as enums (like target_type there's only a handful of options)
  • our current metric ID is actually a uint + [16]byte but needlessly implemented as a more costly string
  • we generate our metric id's from orgid+name+tags which is a pretty elegant way to deterministically determine metric id's without needing distributed consensus, database locks/transactions, etc. this is the reason for this choice right @woodsaj ? hash collisions will probably never be a concern.
    these id's don't compress as well as sequential numbers but i think that's a reasonable tradeoff.
  • our current metric ID's make metric lookups mostly content addressable; meaning, if you know the orgid, name and the tags you can generate the id to look up the rest of the metadata. in current practice (nor forseeable future practice) we don't and generally can't leverage this, but i think that's ok. just interesting to think about.
  • interval, unit and target_type currently don't contribute to the metric ID. meaning they can change without creating a new series. The latter 2 should be included the id, because they are strongly tied to the meaning of the series. the former seems debatable but probably a good idea as well as we've discussed previously. because it keeps things simple as we try to solve complicated cases where we have several periods of the same metric at different intervals (we should however start enforcing that a given metric can only be sent at one interval at a time)
  • and also, we should know for any series (incl interval) when its datarange starts, and ideally when the datarange ends, this way MT will be able to make better decisions about which series to use when responding to queries
  • we currently don't have an explicit provision for metadata in the m2.0 sense e.g. tags that can change without affecting the metric identity, as per metrics2.0, we can easily add this as soon as we want to support it. not an issue for now.
  • the orgId is mandatory in a multi-tenant setup like ours but is currently undefined in the metrics2.0 spec. I think it should be made optional in metrics2.0 spec (because many people have single tenancy) but mandatory in our validation.
  • target-type and unit should be mandatory again see target-type and unit #173 but we should rename target_type to metrictype or something (both here and in the spec)
  • at some point we have to rename litmus.* to worldping.* as well
  • as i've brought up elsewhere, WP is currently also sending data in suboptimal form (using decimal numbers where they could have been avoided) causing in subpar compression

implementation ideas/proposals:

  • senders can use sessions: for any (new metric) they can send the metricdefinition first, get an id back, and then just use id+ts+val from then on.
  • ditto through the rest of the pipeline, almost everywhere can we just use the id, instead of the full metricdef.
  • we may want to run a separate index service that is single source of truth (still backed by ES), take this concern out of MT, but not a concern yet.
  • when id's become entirely based on all tags (except metadata tags) caching becomes a lot easier, as the def will never update. once you know the metricdef for an id, it will never change (except metadata), so we can use groupcache for example
@woodsaj
Copy link
Member

woodsaj commented Apr 22, 2016

we generate our metric id's from orgid+name+tags which is a pretty elegant way to deterministically determine metric id's without needing distributed consensus, database locks/transactions, etc. this is the reason for this choice right @woodsaj ? hash collisions will probably never be a concern. these id's don't compress as well as sequential numbers but i think that's a reasonable tradeoff.

Correct. Using an auto-increment ID or UUID for each metric would require MT to keep an index and co-ordinate this index with all other nodes.

@woodsaj
Copy link
Member

woodsaj commented Apr 22, 2016

senders can use sessions: for any (new metric) they can send the metricdefinition first, get an id back, and then just use id+ts+val from then on.

I really like this idea. However sending a metric and getting back an id is not really possible as metric-ingestion is asynchronous. However, as the metric ID is deterministic we dont need to ask for the ID, the sender can just compute it.

So i propose that we use two metric ingestion "commands". STORE and INDEX

STORE : simply send the ID+ts+val
INDEX: send the full metricDef payload. (MetricDef has no TS or value field)

The two commands should be independent of each other. So that data can still be stored even if an INDEX has not been sent. But without the INDEX it will not be possible to query the data.
We would need to provide best practices for sending the INDEX at a regular interval.

@Dieterbe
Copy link
Contributor Author

Dieterbe commented Feb 2, 2018

we most urgently need an improvement in our kafka data format, for the following reasons:

  • mdm is very verbose and redundant. afaik easily 100B or up to a multiple of that. whereas the corresponding points in our chunks are only a hundredth of that. optimizing this will make for much less kafka disk space, disk io, and bandwith usage
  • this in turn would enable us to increase kafka retention, which need to do, to be safer.
  • msg decoding the current structures is slow, raises our overall memory profile and triggers frequent GC due to all the allocations (mostly due to the strings IIRC)
  • so fixing this means nodes can replay data much faster

later we may be able to use the same format to carbon-relay-ng -> tsdgw

@Dieterbe Dieterbe added this to the 0.8.2 milestone Feb 2, 2018
@shanson7
Copy link
Collaborator

shanson7 commented Feb 2, 2018

I am very eager to discuss and help with this endeavor. We see a significant amount of memory being used when replaying data on start up, requiring us to set our resource limits 3x higher than they otherwise would need. Of course, going faster and using less CPU/disk/network bandwidth is excellent as well.

Could having a "nullable" def field in the msg be a possibility? I'm not sure how much this would save us, but maybe worth benchmarking.

@Dieterbe
Copy link
Contributor Author

Dieterbe commented Feb 2, 2018

without thinking about it too much, I think it all boils down to cramming the metric id, ts and value into as small of a number of bytes as possible. I think a good starting point may be a new message format like:

[16]byte // id (instead of hash string). or 2 x uint64
8byte for float64 value
8byte for unix timestamp
optional: 8bytes for orgId as uint32 # orgId can either be configured in the input plugin for single-tenant setups, or added for multi-tenant setups.

this would give us messages of 32B or 40B.
note that if we receive such a message but haven't seen the metricdef yet, we can't add to index, but also not to the tank (e.g. chunk data), because we need to know which aggregations and schemas are associated.
if we then want to enforce that the schemas must have been received before receiving a msg such as this one, we must either mix them in the same topic (meaning we need some kind of extra format identifier in the message), or synchronize the topic with the mdm one (which would still receive definitions)

potentially we can also batch points together in single messages if they have the same timestamp.

@shanson7
Copy link
Collaborator

shanson7 commented Feb 2, 2018

That's fair. Considering the rate that we are publishing data a savings of anything would be good. A quick sample of our (tag heavy) data stream shows that our average message size is 258.8B. So going down to 40B would be huge for us.

I, for one, would prefer they lived in the same topic, as we currently have to match up the data topic and the clustering topic and adding a third becomes costly when you consider how many partitions we have for large clusters (and how little traffic the def topic would realistically get during steady state).

@DanCech
Copy link
Contributor

DanCech commented Feb 2, 2018

It seems like a good idea to allocate the first byte as a header that can identify the packet format (similar to @woodsaj proposal of STORE and INDEX above), then you can mix and match in the same topic without any issues, and you have the ability to iterate on the format later.

That said, you can likely get away with using up only one of the available packet types if you define the packet format as containing <header><metric id><value><timestamp><optional metricdef>. If the metricdef is sent in every n-th message then you'll miss at most the first n-1 points for a new series before you see the packet that does have the metricdef and it gets added to the index. One possibility would be to introduce a buffer similar to the reorder buffer than can be used to store up to n entries for any metric id that doesn't have an entry in the index, then when a packet with the definition comes in it can be added to the index and the buffer drained to backfill the data.

@woodsaj
Copy link
Member

woodsaj commented Feb 2, 2018

The timestamp and orgId only need 4bytes (uint32). But we also use 1byte for the msg version. So that brings the message size down to 29B or 33B

I also agree with @shanson7 that the messages need to be in the same topic. Using the first byte as a message version makes this pretty straight forward. If the initial version is 0x0, then it should be trivial to handle versioned and non-versioned messages during a transition to newer code.

To better support these streamlined messages i think we are going to need to move the TSDB ingestion into Metrictank. So users send directly to MT, then MT sends to kafka. With this approach we only need to send the full payload once, then track the orgId+metricId in a large concurrentMap. The benefits of this approach is that when a metricDef is deleted from the index, we can broadcast the delete to other MT nodes to have them delete from their Map as well.

If we have data sent to tsdb-gw, and then sent into kafka, there is no real way for us to notify the tsdb-gw's that a metricDef has been deleted. So the tsdb-gw's will need to track the metricIds seen and the last time a full payload was sent so they can periodically send full payloads. If a metricDef is deleted while data is still being sent then metrics will be lost for up to the full payload send frequency.
The tsdb-gw's would also need to periodically scan their map to remove entries that have not been seen for a long time. In MT, we already do that on the index.

@woodsaj
Copy link
Member

woodsaj commented Feb 2, 2018

Having MT ingest then forward to Kafka also works great for users that want to send carbon directly to MT but also have clustering.

@shanson7
Copy link
Collaborator

shanson7 commented Feb 2, 2018

The timestamp and orgId only need 4bytes

Not worried about the year 2038 problem? :)

EDIT: I guess unsigned int makes this a year 2106 problem

@woodsaj
Copy link
Member

woodsaj commented Feb 16, 2018

To better support these streamlined messages i think we are going to need to move the TSDB ingestion into Metrictank. So users send directly to MT, then MT sends to kafka.

I have come to realize that this isn't true. Ideally, we want to push the optimized format up the stack to the metric sources. ie carbon-relay-ng should send the optimized format.
So full metricData payloads are going to need to be sent periodically.

So lets just keep this simple for now by adding support to MT for processing two message formats.

As the work here requires major refactoring of the MT ingestion code, i would also like to address #741 at the same time.

@woodsaj
Copy link
Member

woodsaj commented Feb 16, 2018

https://drive.google.com/file/d/15qMNQcLD7fwaA58yZP87REkT380TrO9W/view?usp=sharing

image

Here is what i am thinking. The basic implementation would be to make "AggMetric" an interface with two variants. A FullAggMetric which is what we have today and a TempAggMetric which is basically just a buffer.

Thoughts?

@replay
Copy link
Contributor

replay commented Feb 16, 2018

@woodsaj that's an interesting idea. How would that work with aggregations? I guess the aggregations would have to be generated when a TempAggMetric is converted to a FullAggMetric because before that the correct schema cannot be determined?

I'm trying to imagine the startup procedure in a setup like ours. Currently when MT has replayed the kafka backlog it announces that it is ready to handle queries. But in a case like what you illustrate, even if it has replayed the backlog there might be certain queries that it can't handle yet because it might not have the necessary MetricDefinitions for all the data it has replayed. Does that mean that MT can't announce to the rest of the cluster that it is ready before it has all the MDs, in order to ensure that we never serve incomplete data due to a restart?
We should probably require the user/data-generator to not only send the full MD every X datapoints but also every Y seconds, otherwise a metric which has very low resolution could slow down the rolling restart procedure a lot because the MTs have to wait with announcing that they are ready.

@woodsaj
Copy link
Member

woodsaj commented Feb 16, 2018

How would that work with aggregations

When the tempAggMetrics are converted to FullAggMetrics all of the buffered datapoints will be fed in, so the Aggregations will be created as normal.

We should probably require the user/data-generator to not only send the full MD every X datapoints but also every Y seconds,

Requiring full messages every 3hours or so seems reasonable and avoids all of these problems.

@Dieterbe
Copy link
Contributor Author

Dieterbe commented Feb 20, 2018

If we have data sent to tsdb-gw, and then sent into kafka, there is no real way for us to notify the tsdb-gw's that a metricDef has been deleted. So the tsdb-gw's will need to track the metricIds seen and the last time a full payload was sent so they can periodically send full payloads.

I was thinking on how we can avoid tsdbgw needing to keep list. e.g. for every incoming metric, it can instead do something like if hash(name+tags+time) % 3600 == 0 (where time could be point timestamp, or wall time) but can't think of a way where this would properly work for arbitrary intervals and unknown data arrival times, so I think you're right we'll need an MT/index->tsdbgw sync mechanism

Does that mean that MT can't announce to the rest of the cluster that it is ready before it has all the MDs, in order to ensure that we never serve incomplete data due to a restart?

I think MT should announce readyness as usual (based on priority aka ingestion lag).
it's up to the user/us to store high quality data into the kafka ingest stream, and if we don't, it'll result in the data not being available for querying or whatever, but it should not affect functioning of the cluster IMHO.

I need to think a bit more about the schematic, will get back to it soon.

depedency analysis

FYI, I looked through our code and paraphrased all dependencies between the components/properties.
it helps me reason through things and might help you too.

ReorderBuffer

needs interval so that it can bucketize. which allows to keep the size static, and allows to discard points early if they would overwrite another anyway (meaning, we could potentially create another ROB that does not need the interval, but is less efficient)

intervaldetector:

  • needs ordered data (either orders internally, or does it after ROB did its job)

Aggmetric

  • needs schemas (to set aggregators and reorderwindow) / needs aggregations config
  • sortof needs id ... which it assigns to Key, but only uses it for debug logging, cache pushing and store persisting.
    *note: even if we temporarily disable cache pushing and persistence, and later update the Key property, we still need interval because of the schemas

schema

needs name and interval

aggregations config

only needs name

id

needs metric,unit,mtype,interval,tags

index

needs basically all properties of a metric before it can add it
(technically, could add data with all properties except interval known, but wouldn't be useful as you need interval to be able to read the data)
data can only be queried for data in index

@Dieterbe
Copy link
Contributor Author

I think the diagram makes sense for the most part. 2 comments:

  1. I think the interval detection is much too eager to conclude an interval. it may be incorrect. in two ways:
    A) data may be out of order. if ROB is enabled, interval detection should look at a time range >= reorderwindow. the reorderwindow setting effectively signals "points may be out of order by max this amount", which is useful for the interval-figure-outer.
    B) if for whatever reason some points don't make it (eg issues at client), we may not draw the right conclusion by only looking at 3 points. I think we should instead compute some confidence score, eg with a confidence of 90% if we have N points, and if we look at the interval between all of them, then i would only draw a conclusion once we have the ~same interval 0.9 * N times, (where ~same allows for a 10% error to allow for points with ts like 10, 20, 31, 40, 49, etc which should be 10s)
    we should probably set an upper bound though, like if N >= 100 we should probably conclude an interval irrespective of confidence.
    I would also add that confidence override the reorderwindow constraint above (IOW even if we've allowed of up to 100 points reorderwindow but we're >95% confident after 10 points, then that's good enough)

  2. the blue box talks about a user provided id. to be clear, for grafanacloud, this id is always generated by tsdbgw (possibly by computing it with interval=0)

@Dieterbe Dieterbe modified the milestones: 0.9.1, 0.9.0 Mar 19, 2018
@Dieterbe Dieterbe removed this from the 0.9.0 milestone Apr 18, 2018
@stale
Copy link

stale bot commented Apr 4, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Apr 4, 2020
@stale stale bot closed this as completed Apr 11, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

5 participants