Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Optimize kafka format #876

Merged
merged 70 commits into from
Apr 11, 2018
Merged

Optimize kafka format #876

merged 70 commits into from
Apr 11, 2018

Conversation

Dieterbe
Copy link
Contributor

see #199

adds support for the following payloads incoming from kafka:
(codenames, will be changed later if we take this further)

// useful for singletenant setups where we can just configure orgid
type MetricPointId1 struct {
        Id    [16]byte
        Value float64
        Time  uint32
}

// for multi-tenant setups, need to pass orgid in the payloads
type MetricPointId2 struct {
        MetricPointId1
        Org uint32
}

(in the actual messages they are preceded by a 1byte format header)
because we now encode the id's in a more optimized form, i will also attempt to update the codebase itself to work with these id's instead of strings; in particular:

// when the orgid is implied already
type Key [16]byte

// multinenant
type MKey struct {
        Key [16]byte
        Org uint32
}

@Dieterbe Dieterbe force-pushed the optimize-kafka-format branch 6 times, most recently from 20484f9 to 98f590b Compare March 18, 2018 17:40
@Dieterbe
Copy link
Contributor Author

Dieterbe commented Mar 18, 2018

goals and consequences -> extra sub-goals

  1. smaller kafka mdm messages, less allocation/cpu/memory heavy serializing and deserializing of the messages
  2. because of the more optimized format, it now also makes sense to update all our metric identifiers internally to be based on byte arrays rather than strings, in the cache, AggMetrics, index, etc. (less GC load, takes less space, should be a bit cpu friendlier also)
  3. because our internal index format uses the MetricDefinition type, we update the id of the type itself (otherwise we pay the cost of frequent conversions as shown in Initial tag query performance improvements #848)
    (should be harmless as explained in the commit message).

it would be prettier to do these steps one at the time but because they're so tangled let's just bite the bullet and do them at once. also helps that we won't have to test again later.

status

Making good progress on this. main things left to do:

  • make chunk cache work again
  • update all Req related things (incoming http requests, intra-cluster requests, api stuff, etc) to use the new id/key types.
  • real-world benchmarking / load testing

what is changing in this PR:

  • new data payload formats and Key types in schema
  • kafka plugin to support the new format
  • input and idx now need to support old and new types when handling incoming data. tried a composite type, but looks better to just use separate methods.
  • anywhere we use metric id's (aggmetric/aggmetrics/aggregator/notifier/CWR/store/...) now work with the new keys. nowhere in the code shall metric id's be strings other than when converting to cassandra rows or columns or notifier messages, or reading the old format.
  • internal request types for the http api, intra-cluster traffic

what is NOT changing:

  • MetricData or MetricDataArray types we don't need to change it. we can keep using it (sparingly) for now, a new method to submit "full" payloads (with definition) can be done later but probably low priority.
  • other input plugins
  • persist notifier message format (minimal conversion code)
  • chunk data/schema in cassandra (minimal conversions to get the string id's)
  • index data in cassandra (minimal conversions to get the string id's)

note to self. to test:

  • data saved to correct place, both raw and rollup
  • reads look good, both raw and rollup
  • clustered index calls
  • index saving/loading

deployment

  • writers can be upgraded in place.
  • clustered readers cannot: will need to create a new reader cluster then destroy old one (e.g. via the color mechanism in hosted-metrics-api) too much has changed. in particular:
  • intra cluster traffic cluster nodes talking to each other (Req types and MetricDefinition type)

to reviewers

  • commit order in github is messed up due to frequent history rewriting. to see history, check out locally and use https://github.com/jonas/tig or similar. the commits try to tell a story, you'll see the schema changes and then the metrictank changes they enable.
    looking at the schema stuff first is probably best to then understand how it's used in metrictank.
  • one of my favorite side effects of this PR is that uses of the key are now much more explicit.
    e.g. where do we use regular keys like 123.deadbeef vs 123.deadbeef_avg_600 for aggregates, in the past this was often hard to follow, with the new code the new key types make this explicit.

@Dieterbe Dieterbe added this to the 0.9.0 milestone Mar 19, 2018
@@ -365,8 +365,15 @@ func (c *CasIdx) load(defs []schema.MetricDefinition, iter cqlIterator, cutoff u
var tags []string
cutoff64 := int64(cutoff)
for iter.Scan(&id, &orgId, &partition, &name, &metric, &interval, &unit, &mtype, &tags, &lastupdate) {
mkey, err := schema.MKeyFromString(id)
if er != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be err not er

msg/msg.go Outdated
// Point represents an incoming datapoint
// represented either as MetricData or MetricPointId2
type Point struct {
Val uint8 // 0 means use md, 1 means use point
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Val seems like the wrong term to use here. Perhaps we use DataType and then use

var MetricData = uint8(0)
var MetricPoint = uint8(1)

Which will make the code much more readable

p := msg.Point{
   DataType: msg.MetricPoint,
   Point: point,
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. or const even not var. but i decided to do away with this composite msg.Point type and instead use different codepaths in the input handler and index. will push soon.

@Dieterbe Dieterbe force-pushed the optimize-kafka-format branch from 98f590b to 2878d78 Compare March 19, 2018 12:41
@@ -125,20 +126,20 @@ func (c *CCache) CacheIfHot(metric string, prev uint32, itergen chunk.IterGen) {
met.Add(prev, itergen)
}

func (c *CCache) Add(metric, rawMetric string, prev uint32, itergen chunk.IterGen) {
func (c *CCache) Add(metric, rawMetric schema.AMKey, prev uint32, itergen chunk.IterGen) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this still require both params metric and rawMetric? I think just one schema.AMKey contains all the information that's needed, because the raw key can be extracted from it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right


// sets of metric keys, indexed by their raw metric keys
metricRawKeys map[string]map[string]struct{}
metricRawKeys map[schema.AMKey]map[schema.AMKey]struct{}
Copy link
Contributor

@replay replay Mar 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be optimized into something like map[schema.MKey]map[schema.Archive]?

@Dieterbe Dieterbe force-pushed the optimize-kafka-format branch from 2878d78 to 1eec530 Compare March 19, 2018 12:45
@@ -20,7 +21,7 @@ type CCacheMetric struct {
// the list of chunk time stamps in ascending order
keys []uint32

SuffixLen uint8
rawKey schema.AMKey
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't schema.MKey be sufficient here?

md := schema.MetricData{}
_, err := md.UnmarshalMsg(data)
pointMsg := msg.Point{}
if len(data) == 29 && data[0] == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the unmarshaling of bytes to a msg.Point should be handled in the msg package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved it

@@ -232,7 +234,13 @@ func (c *NotifierKafka) flush() {
payload := make([]*sarama.ProducerMessage, 0, len(c.buf))
var pMsg mdata.PersistMessageBatch
for i, msg := range c.buf {
def, ok := c.idx.Get(strings.SplitN(msg.Key, "_", 2)[0])
mkey, err := schema.MKeyFromString(strings.SplitN(msg.Key, "_", 2)[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't msg.Key be an AMKey?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msg.Key is SavedChunk.Key which is a string because I don't want to change the notifier format at this point. it does represent a schema.AMKey (see SavedChunk definition)

if your point is that it's a bit confusing that we're parsing out an MKey here, then i agree. I actually attempted to write an AMKeyFromString function but skipped it for now. maybe i can give it another shot.
but anyway we only need an MKey for the index lookup so doing the string.Split and then parsing that seemed ok to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cleaned this up today.

@@ -32,6 +32,91 @@ type PartitionedMetric interface {

//go:generate msgp

type MetricPointId1 struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really dont think we need two MetricPointId types. If we do, they need meaningful names.

Looking over this PR, it looks like the metricPointId1 is only used for sending over the wire. Currently if the payload received by MT does not have an Org, it is being set to the runtime orgId config setting.

So instead of 2 struct types, we just need the 2 message formats for the 1 MetricPoint struct.
MarshalWithOrg()
UnmarshalWithOrg()
MarshalWithoutOrg()
UnmarshalWithoutOrg()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really dont think we need two MetricPointId types. If we do, they need meaningful names.

i agree. i wanted to use the name MetricPoint but you used that in your pr so i had to separate them at first so i could compare the different implementations.
as i said in OP they are codenames that will be renamed.

i like your idea of switching to a single type though, that will make the code easier.
also while we're at it, I want to use the MKey type in this struct.

@Dieterbe Dieterbe force-pushed the optimize-kafka-format branch 2 times, most recently from 72d66d1 to ec13cb9 Compare March 22, 2018 10:07
Copy link
Collaborator

@shanson7 shanson7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall these changes look promising. There is just the one concern I have and it seems centered around having deviated from the prescribed key hashing.

// abeit very unlikely,
// the idx entry could be pruned in between the two calls and so they could be different
existing, inMemory := c.MemoryIdx.Get(point.MKey)
archive, inMemory2 := c.MemoryIdx.UpdateMaybe(point, partition)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that UpdateMaybe could return existing as well? That would prevent the unlikely disagreement scenario in the comment (since it would operate under a lock) plus it would prevent an additional read lock from being acquired (which likely increases the area of contention, since it blocks writers).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that UpdateMaybe could return existing as well?

it does, but after it updates the LastUpdate and Partition fields.
that said, the only thing we really need existing for is the existing.Partition attribute which determines if we have to clean up the old entry. UpdateMaybe could just return that value. Not very elegant but could be worthwhile.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I meant existing as it is in this snippet (i.e. the value before update). If you don't foresee another need to pop up other than the partition value, then that would likely be the most efficient solution and could greatly reduce the contention on this mutex (conjecture).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll give it a shot :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done now btw

@@ -91,3 +92,42 @@ func CreateMsg(metrics []*schema.MetricData, id int64, version Format) ([]byte,
}
return buf.Bytes(), nil
}

// WritePointMsg is like CreateMsg, except optimized for MetricPoint and buffer re-use.
// caller must be assure a cap-len diff of at least:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"must assure"

archive := c.MemoryIdx.AddOrUpdate(data, partition)

// note that both functions return an 'ok' bool.
// abeit very unlikely,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"albeit"

return bts[28:], nil
}

// MMarshal32 marshals the MetricPoint directly to b.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Marshal32"

}

if inMemory2 {
archive = c.updateCassandraIfStale(inMemory, archive, partition)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility that updating via just the MetricPoint could starve out changes that would be seen by the MetricData?

One example we have is that we don't include the Interval in the id we generate (for reasons I can explain if you are curious).

This works as-is because we always send the new interval and after a few hours, it gets updated in cassandra and everything is fine (provided the interval was reduced, otherwise the history is a little funky). After this change, it seems that likely the interval change wouldn't be detected unless we send full MetricData payloads for hours after an interval change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One example we have is that we don't include the Interval in the id we generate (for reasons I can explain if you are curious).

always :)

I think I don't really understand the question/problem,
but there's 2 places where this logic is called (UpdateMaybe which takes the MetricPoint, and AddOrUpdate which takes a full MetricData). however, by the time we get to the updateCassandraIfStale call, it doesn't really matter which of the 2 paths we took to get to this logic, as the calls use the same parameters which it pulled out of the memory index (possibly after an update), and the partition value.
In that regard, whether you sent a MetricData (with the same interval) or a MetricPoint, the logic should be equivalent.
and the ordering should remain honored throughout the kafka->CassandraIndex->Cassandra pipeline.

does this help @shanson7 ? otherwise let's discuss further

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original comment can be ignored. As long as any data that is not encoded into the Id is in the MetricPoint, then this should be sufficient.

Copy link
Contributor Author

@Dieterbe Dieterbe Mar 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is one field from MetricData that is not included in MetricPoint and doesn't contribute to its Id, which is Name. However, since we always say that field should equal the Metric field (which does contribute the id), this should be OK. Note that because of this, we plan to remove one of these two as they should always be the same.

Copy link
Collaborator

@shanson7 shanson7 Mar 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I noticed that. I use Name when encoding and set Metric to . for everything (to reduce the size of the messages)

@Dieterbe
Copy link
Contributor Author

Dieterbe commented Mar 25, 2018

note: since the idx structures are mostly already by (or could be restructured to be by) orgid we could potentially use Keys instead of MKey's for e.g IdSet, and shard defById by orgid first. that would save some memory. future optimization, not going to bother with it now I think. also updating orgid to uint32 .
judging from a profile, looks like this would only save a couple of % though.

@Dieterbe Dieterbe force-pushed the optimize-kafka-format branch from 99fa6f7 to 20b3cd5 Compare March 28, 2018 20:34
@Dieterbe
Copy link
Contributor Author

Dieterbe commented Mar 28, 2018

making progress. have been able to do some useful tests
here's a run that shows old format, new, old, new. all using snappy compression with kafka

https://snapshot.raintank.io/dashboard/snapshot/ZS91Vehxxsd3xp5RTtXeyPu5txOhdbaJ?orgId=2
see https://snapshot.raintank.io/dashboard/snapshot/P14i6y2q55sYjluu5q9p69yGszAOHp39?orgId=2

some preliminary notes:

  1. kafka
  • disk space saved: haven't found a good way to measure locally (we can just measure when we deploy to ops) but until then io (see below) should give an indication
  • io saved is not as much as I expected:
    fakemetrics messages were 203B before, now they are 33B which is a significant reduction to only 16%
    However, IO on kafka and MT containers reduced by about half. maybe we underestimated the overhead kafka adds to each message
    (i believe it adds some metadata like a timestamp). We can probably get more significant savings by batching MetricPoint's per message. or it's snappy that works better for the old MetricData data ?
  • interestingly, kafka CPU also reduced by about 30%
  1. MT
  • cpu reduction in line with what was expected. 20~30%
  • memory usage -> need to do more experiments
  • MT allocation rate dropped by just over 50%. sweet. allocation profile is looking very clean.
    most allocations are now coming from our sarama library and the snappy decoding.
    (I wonder how that will change with the switch to confluent).
    also GC frequency reduced by over 50%
  • if anyone thought kafka backlog replay would speed up by a multiple, I don't think that's true, as index and AggMetrics operations are holding us back. will probably need some anti-lock contention work to speed that up significantly. will probably see ~30% improvement though.
  1. fakemetrics (representative of tsdb-gw) (Refactor and mdm v2 raintank/fakemetrics#7)
  • cpu is about the same
  • memory is reduced. i had expected memory usage would be higher due to the keycache we need now (to track which full defs have been sent yet). I guess the more efficient MetricPoint encoding overcompensates for it. (yeay)

@Dieterbe
Copy link
Contributor Author

Dieterbe commented Mar 29, 2018

to compare ram and GC effects, here's a fresh stack start and 10 minute long 25kHz ingest, comparing old MT with old payloads, vs new MT with new payloads
old: https://snapshot.raintank.io/dashboard/snapshot/eEZv5VleN95vz4ovh3bA3IiKGtmXFWT3?orgId=2
new: https://snapshot.raintank.io/dashboard/snapshot/HImyLItr6jTw9AMnb22AigDmczADmdmF?orgId=2

conclusion:

  • no significant change in heap objects. this is surprising to me. I think the main takeway here is that while we (I) thought live non-recyclable pointers was the big problem, over recycleable pointers (garbage), it seems all the recycleable objects that get allocated (as part of sarama/snappy, etc) outweigh the live pointers. note that v2 is able to reach lower minima (probably due to reduced live pointers), the avg and max is higher. perhaps because GC is now less frequent allowing more garbage to pile up
  • the GC stats confirm this I think: while collections are only half as frequent, and some of them were shorter, the avg and max is higher. though we know GC stop-the-world is a very misleading metric that in our case barely scratches the surface of conveying true "GC impact" (in the concurrent phases, which are not measured). GC Cpu fraction promille went down which is nice. this is probably more representative of GC impact.
  • MT RAM usage is reduced by about 1.7% (if you move the start time at 2 minutes in to exclude the startup, then it's 3.1%); I expected better. looking at a profile, turns out 83% of memory in use is held by sarama.(*consumer).ConsumePartition. curious to see how this will change with confluent.

@Dieterbe
Copy link
Contributor Author

blocked on #260

@Dieterbe Dieterbe force-pushed the optimize-kafka-format branch 2 times, most recently from 158b3e5 to 8a321e5 Compare March 30, 2018 20:49
@Dieterbe
Copy link
Contributor Author

Dieterbe commented Mar 30, 2018

status update:

  • benchmarks (see above) are satisfactory, though not entirely in line with expectations regarding kafka io/disk space (and probably backlog replay speed)
  • all tools build. had to remove mt-replicate-via-tsdb, will have to devise a new intra-cluster replication method since tsdb-gw doesn't support MetricPoint messages
  • all unit tests pretty much work. there's some breakage in idx which is probably quickly resolved once orgid -1 -> 0 #260 is fixed

TODO

  • code reviews
  • some more functionality tests
  • fix orgid -1 -> 0 #260 and deploy it, will need that before we can proceed with this

this helps clarify some things, like why table metric_16 is created
github.com/jpillora/backoff no longer needed due to
mt-replicator-via-tsdb removal.

others should have been cleaned up longer ago.
Not sure why scripts/qa/vendor.sh started complaining about this only
now, but anyway.
it's a better name, and frees up the name 'Update'
so we have latest functions available like groupByNodes
@Dieterbe Dieterbe force-pushed the optimize-kafka-format branch 6 times, most recently from 6e90517 to 3e02874 Compare April 10, 2018 13:43
* received and invalid on a per-messagetype basis
* track unknown metricpoint metrics
* remove unused MsgsAge metric

note: dashboard now requires recent graphite which has groupByNodes
@Dieterbe Dieterbe force-pushed the optimize-kafka-format branch from 751ae46 to d23d807 Compare April 10, 2018 13:55
@Dieterbe
Copy link
Contributor Author

Dieterbe commented Apr 10, 2018

rebased on top of master, now we have all the org id stuff and related idx fixes
addressed all comments. in particular the removal of the .Metric field is not something I was planning to add to this PR; it seems fairly safe though, but it can definitely use a lookover by @woodsaj

===> time for another/last round of reviews ! and then let's merge and deploy ! :)
@woodsaj @replay @shanson7

Copy link
Member

@woodsaj woodsaj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Dieterbe Dieterbe merged commit a673544 into master Apr 11, 2018
Dieterbe added a commit to raintank/schema that referenced this pull request Apr 11, 2018
@Dieterbe Dieterbe deleted the optimize-kafka-format branch April 20, 2018 08:36
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants