Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Use atomic operations and a read lock instead of a write lock #945

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/grafana/metrictank/errors"
Expand All @@ -15,7 +16,7 @@ import (
"github.com/grafana/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"
"gopkg.in/raintank/schema.v1"
schema "gopkg.in/raintank/schema.v1"
)

var (
Expand Down Expand Up @@ -213,20 +214,19 @@ func (m *MemoryIdx) Stop() {
func (m *MemoryIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()

existing, ok := m.defById[point.MKey]
if ok {
oldPart := existing.Partition
if LogLevel < 2 {
log.Debug("metricDef with id %v already in index", point.MKey)
}

if existing.LastUpdate < int64(point.Time) {
existing.LastUpdate = int64(point.Time)
if atomic.LoadInt64(&existing.LastUpdate) < int64(point.Time) {
atomic.SwapInt64(&existing.LastUpdate, int64(point.Time))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this is racey.
let's say existing.LastUpdate is very old (30 days ago)
then a point comes in for 29 days ago, and concurrently another one for a day ago via a different kafka partition, and then no more points.
in that case, we can have concurrent Update calls, resulting in the LastUpdate field being updated to 29 days ago, but never to a day ago.
note that for any kafka given partition, carbon stream or prometheus POST we never have overlap in our Update calls.

so in practice, doesn't seem like an issue, but perhaps we should document this something under "tradeoffs and extremely rare edge cases" or something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we can solve it by either:

  1. doing CompareAndSwap in a loop until we're able to swap for the value we wanted to swap
  2. confirming the swapped out value (return value of SwapInt64) is smaller than what we swapped in. if not, put the old value back, check that we didn't swap for an ever higher value (placed by a concurrent Update call), etc

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operation is racey at that point even with a write lock. It's dependent on the order individual threads hit that lock call which ( with the hypothetical assumption that data can come for the same series from different threads) can be out of order from the kafka ingest step.

I'm hesitant to add anything overly complex to Update for no real world benefit, but I'll defer to your preference.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. if you use a write lock, you can lock, check value, if we have a larger one, update, unlock. this works regardless of the order between two concurrent update operations. (the most recent will always survive).

I think my proposal above will also solve it, and at almost no additional cost.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was extending it to the Partition update as well. Perhaps Partition should only be updated when we have a newer timestamp as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the partition property is really only to shard the index in cassandra, so nodes on startup know which metrics they're responsible for.
i'm not sure if we even properly support live partition changes of metrics (i.e. whether after the change we properly display both the old and new data)
under concurrent updates it's probably ok for the partition to be ambiguous ("under transition"), but once update operations are serialized, the later one should probably always win, even when data is sent out of order. I think MT's behavior in these scenarios is so undefined that probably either way works.

}
existing.Partition = partition
oldPart := atomic.SwapInt32(&existing.Partition, partition)
statUpdate.Inc()
statUpdateDuration.Value(time.Since(pre))
return *existing, oldPart, true
Expand All @@ -240,22 +240,27 @@ func (m *MemoryIdx) Update(point schema.MetricPoint, partition int32) (idx.Archi
// if was new -> adds new MetricDefinition to index
func (m *MemoryIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()
m.Lock()
defer m.Unlock()

// Optimistically read lock
m.RLock()

existing, ok := m.defById[mkey]
if ok {
oldPart := existing.Partition
log.Debug("metricDef with id %s already in index.", mkey)
if existing.LastUpdate < int64(data.Time) {
existing.LastUpdate = int64(data.Time)
if atomic.LoadInt64(&existing.LastUpdate) < int64(data.Time) {
atomic.SwapInt64(&existing.LastUpdate, int64(data.Time))
}
existing.Partition = partition
oldPart := atomic.SwapInt32(&existing.Partition, partition)
statUpdate.Inc()
statUpdateDuration.Value(time.Since(pre))
m.RUnlock()
return *existing, oldPart, ok
}

m.RUnlock()
m.Lock()
defer m.Unlock()

def := schema.MetricDefinitionFromMetricData(data)
def.Partition = partition
archive := m.add(def)
Expand Down Expand Up @@ -541,7 +546,7 @@ func (m *MemoryIdx) TagDetails(orgId uint32, key, filter string, from int64) (ma
continue
}

if def.LastUpdate < from {
if atomic.LoadInt64(&def.LastUpdate) < from {
continue
}

Expand Down Expand Up @@ -814,7 +819,7 @@ func (m *MemoryIdx) hasOneMetricFrom(tags TagIndex, tag string, from int64) bool

// as soon as we found one metric definition with LastUpdate >= from
// we can return true
if def.LastUpdate >= from {
if atomic.LoadInt64(&def.LastUpdate) >= from {
return true
}
}
Expand Down Expand Up @@ -910,9 +915,9 @@ func (m *MemoryIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node,
idxNode.Defs = make([]idx.Archive, 0, len(n.Defs))
for _, id := range n.Defs {
def := m.defById[id]
if from != 0 && def.LastUpdate < from {
if from != 0 && atomic.LoadInt64(&def.LastUpdate) < from {
statFiltered.Inc()
log.Debug("memory-idx: from is %d, so skipping %s which has LastUpdate %d", from, def.Id, def.LastUpdate)
log.Debug("memory-idx: from is %d, so skipping %s which has LastUpdate %d", from, def.Id, atomic.LoadInt64(&def.LastUpdate))
continue
}
log.Debug("memory-idx Find: adding to path %s archive id=%s name=%s int=%d schemaId=%d aggId=%d lastSave=%d", n.Path, def.Id, def.Name, def.Interval, def.SchemaId, def.AggId, def.LastSave)
Expand Down Expand Up @@ -1253,7 +1258,7 @@ func (m *MemoryIdx) Prune(oldest time.Time) ([]idx.Archive, error) {
m.RLock()
DEFS:
for _, def := range m.defById {
if def.LastUpdate >= oldestUnix {
if atomic.LoadInt64(&def.LastUpdate) >= oldestUnix {
continue DEFS
}

Expand All @@ -1269,7 +1274,7 @@ DEFS:
}

for _, id := range n.Defs {
if m.defById[id].LastUpdate >= oldestUnix {
if atomic.LoadInt64(&m.defById[id].LastUpdate) >= oldestUnix {
continue DEFS
}
}
Expand All @@ -1280,7 +1285,7 @@ DEFS:
// if any other MetricDef with the same tag set is not expired yet,
// then we do not want to prune any of them
for def := range defs {
if def.LastUpdate >= oldestUnix {
if atomic.LoadInt64(&def.LastUpdate) >= oldestUnix {
continue DEFS
}
}
Expand Down
2 changes: 1 addition & 1 deletion idx/memory/tag_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (q *TagQuery) testByTagMatch(def *idx.Archive) bool {

// testByFrom filters a given metric by its LastUpdate time
func (q *TagQuery) testByFrom(def *idx.Archive) bool {
return q.from <= def.LastUpdate
return q.from <= atomic.LoadInt64(&def.LastUpdate)
}

// testByPrefix filters a given metric by matching prefixes against the values
Expand Down