Skip to content

Commit

Permalink
nsqlookupd: fix write lock starvation
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Nov 5, 2019
1 parent 53ee4e4 commit 383fba1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 70 deletions.
7 changes: 5 additions & 2 deletions nsqlookupd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro
defer s.ctx.nsqlookupd.DB.RUnlock()

data := make(map[string][]map[string]interface{})
for r, producers := range s.ctx.nsqlookupd.DB.registrationMap {
s.ctx.nsqlookupd.DB.registrationMap.Range(func(k, v interface{}) bool {
producers := v.(ProducerMap)
r := k.(Registration)
key := r.Category + ":" + r.Key + ":" + r.SubKey
for _, p := range producers {
m := map[string]interface{}{
Expand All @@ -383,7 +385,8 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro
}
data[key] = append(data[key], m)
}
}
return true
})

return data, nil
}
Expand Down
113 changes: 45 additions & 68 deletions nsqlookupd/registration_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,11 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/patrickmn/go-cache"
)

type RegistrationDB struct {
sync.RWMutex
registrationMap map[Registration]ProducerMap

cachedMutex sync.RWMutex
cachedFindProducersResults *cache.Cache
registrationMap *sync.Map
}

type MetaDB struct {
Expand Down Expand Up @@ -64,8 +59,7 @@ func (p *Producer) IsTombstoned(lifetime time.Duration) bool {

func NewRegistrationDB() *RegistrationDB {
return &RegistrationDB{
registrationMap: make(map[Registration]ProducerMap),
cachedFindProducersResults: cache.New(1*time.Minute, 5*time.Minute),
registrationMap: &sync.Map{},
}
}

Expand All @@ -77,12 +71,7 @@ func NewMetaDB() *MetaDB {

// add a registration key
func (r *RegistrationDB) AddRegistration(k Registration) {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if !ok {
r.registrationMap[k] = make(map[string]*Producer)
}
r.registrationMap.LoadOrStore(k, make(ProducerMap))
}

// add a registration key
Expand Down Expand Up @@ -123,123 +112,111 @@ func (m *MetaDB) FindRegistrations(category string, key string, subkey string) R
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if !ok {
r.registrationMap[k] = make(map[string]*Producer)
}
producers := r.registrationMap[k]
val, _ := r.registrationMap.LoadOrStore(k, make(ProducerMap))
producers := val.(ProducerMap)
_, found := producers[p.peerInfo.id]
if found == false {
producers[p.peerInfo.id] = p
}

return !found
}

// remove a producer from a registration
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
r.Lock()
defer r.Unlock()
producers, ok := r.registrationMap[k]
value, ok := r.registrationMap.Load(k)
if !ok {
return false, 0
}
producers := value.(ProducerMap)
removed := false
if _, exists := producers[id]; exists {
removed = true
}

// Note: this leaves keys in the DB even if they have empty lists
delete(producers, id)

return removed, len(producers)
}

// remove a Registration and all it's producers
func (r *RegistrationDB) RemoveRegistration(k Registration) {
r.Lock()
defer r.Unlock()
delete(r.registrationMap, k)
r.registrationMap.Delete(k)
}

func (r *RegistrationDB) needFilter(key string, subkey string) bool {
return key == "*" || subkey == "*"
}

func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
if _, ok := r.registrationMap[k]; ok {
if _, ok := r.registrationMap.Load(k); ok {
return Registrations{k}
}
return Registrations{}
}
results := Registrations{}
for k := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
r.registrationMap.Range(func(k, _ interface{}) bool {
if k.(Registration).IsMatch(category, key, subkey) {
results = append(results, k.(Registration))
}
results = append(results, k)
}
return true
})
return results
}

func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
r.cachedMutex.RLock()
cachedKey := fmt.Sprintf("%s:%s:%s", category, key, subkey)

if val, found := r.cachedFindProducersResults.Get(cachedKey); found {
r.cachedMutex.RUnlock()
return val.(Producers)
}

r.cachedMutex.RUnlock()

r.cachedMutex.Lock()
defer r.cachedMutex.Unlock()

if val, found := r.cachedFindProducersResults.Get(cachedKey); found {
return val.(Producers)
}

r.RLock()
defer r.RUnlock()

if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
r.cachedFindProducersResults.Set(cachedKey, ProducerMap2Slice(r.registrationMap[k]), cache.DefaultExpiration)
return ProducerMap2Slice(r.registrationMap[k])
val, _ := r.registrationMap.Load(k)

r.RLock()
defer r.RUnlock()
return ProducerMap2Slice(val.(ProducerMap))
}

r.RLock()
results := make(map[string]struct{})
var retProducers Producers
for k, producers := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
for _, producer := range producers {
_, found := results[producer.peerInfo.id]
if found == false {
results[producer.peerInfo.id] = struct{}{}
retProducers = append(retProducers, producer)
r.registrationMap.Range(func(k, v interface{}) bool {
if k.(Registration).IsMatch(category, key, subkey) {
producers := v.(ProducerMap)
for _, producer := range producers {
_, found := results[producer.peerInfo.id]
if found == false {
results[producer.peerInfo.id] = struct{}{}
retProducers = append(retProducers, producer)
}
}
}
}

r.cachedFindProducersResults.Set(cachedKey, retProducers, cache.DefaultExpiration)
return true
})

r.RUnlock()

return retProducers
}

func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
r.RLock()
defer r.RUnlock()

results := Registrations{}
for k, producers := range r.registrationMap {
r.registrationMap.Range(func(k, v interface{}) bool {
producers := v.(ProducerMap)
if _, exists := producers[id]; exists {
results = append(results, k)
results = append(results, k.(Registration))
}
}

return true
})

r.RUnlock()

return results
}

Expand Down

0 comments on commit 383fba1

Please sign in to comment.