Skip to content

Commit

Permalink
Merge pull request #214 from resgateio/bugfix/gh-213-throttle-negativ…
Browse files Browse the repository at this point in the history
…e-running-counter

Bugfix/gh 213 throttle negative running counter
  • Loading branch information
jirenius authored Oct 12, 2021
2 parents 02239f6 + d120005 commit fb58f1d
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 53 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ resgate [options]
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--deletemethod &lt;methodName&gt;</code> | Call method name mapped to HTTP DELETE requests |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--patchmethod &lt;methodName&gt;</code> | Call method name mapped to HTTP PATCH requests |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--wscompression</code> | Enable WebSocket per message compression |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--resetthrottle &lt;limit&gt;</coce> | Limit on parallel requests sent in response to a system reset |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--referencethrottle &lt;limit&gt;</coce> | Limit on parallel requests sent when following resource references |
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--resetthrottle &lt;limit&gt;</code> | Limit on parallel requests sent on a system reset | `0` (no limit)
| <code>&nbsp;&nbsp;&nbsp;&nbsp;--referencethrottle &lt;limit&gt;</code> | Limit on parallel requests sent following references | `0` (no limit)
| <code>-c, --config &lt;file&gt;</code> | Configuration file in JSON format |

### Security options
Expand Down
28 changes: 9 additions & 19 deletions server/rescache/rescache.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,27 +312,17 @@ func (c *Cache) handleSystemReset(payload []byte) {
c.mu.Lock()
defer c.mu.Unlock()

var t *Throttle
if c.resetThrottle > 0 {
t := NewThrottle(c.resetThrottle)

c.forEachMatch(r.Resources, func(e *EventSubscription) {
t.Add(func() {
e.handleResetResource(t)
})
})
c.forEachMatch(r.Access, func(e *EventSubscription) {
t.Add(func() {
e.handleResetAccess(t)
})
})
} else {
c.forEachMatch(r.Resources, func(e *EventSubscription) {
e.handleResetResource(nil)
})
c.forEachMatch(r.Access, func(e *EventSubscription) {
e.handleResetAccess(nil)
})
t = NewThrottle(c.resetThrottle)
}

c.forEachMatch(r.Resources, func(e *EventSubscription) {
e.handleResetResource(t)
})
c.forEachMatch(r.Access, func(e *EventSubscription) {
e.handleResetAccess(t)
})
}

func (c *Cache) forEachMatch(p []string, cb func(e *EventSubscription)) {
Expand Down
24 changes: 18 additions & 6 deletions server/rescache/resourceSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,25 @@ func (rs *ResourceSubscription) handleResetResource(t *Throttle) {
// Create request
subj := "get." + rs.e.ResourceName
payload := codec.CreateGetRequest(rs.query)
rs.e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.e.Enqueue(func() {
rs.resetting = false
rs.processResetGetResponse(data, err)

if t != nil {
t.Add(func() {
rs.e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.e.Enqueue(func() {
rs.resetting = false
rs.processResetGetResponse(data, err)
})
t.Done()
})
})
t.Done()
})
} else {
rs.e.cache.mq.SendRequest(subj, payload, func(_ string, data []byte, err error) {
rs.e.Enqueue(func() {
rs.resetting = false
rs.processResetGetResponse(data, err)
})
})
}
}

func (rs *ResourceSubscription) handleResetAccess(t *Throttle) {
Expand Down
70 changes: 45 additions & 25 deletions server/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,23 +739,20 @@ func (s *Subscription) handleReaccess(t *rescache.Throttle) {
s.flags &= ^flagReaccess

if s.direct == 0 {
t.Done()
return
}

// If we already have an access instance set, use that one to test without queueing
if s.access != nil {
t.Done()
s.validateAccess(s.access)
return
}

s.queueEvents(queueReasonReaccess)
s.loadAccess(func(a *rescache.Access) {
t.Done()
s.validateAccess(a)
s.unqueueEvents(queueReasonReaccess)
})
}, t)
}

// validateAccess checks if subscription has get access, or else unsubscribes.
Expand Down Expand Up @@ -811,13 +808,11 @@ func (s *Subscription) Reaccess(t *rescache.Throttle) {

func (s *Subscription) reaccess(t *rescache.Throttle) {
if s.state == stateDisposed {
t.Done()
return
}

if s.queueFlag != 0 {
s.flags |= flagReaccess
t.Done()
return
}

Expand All @@ -833,7 +828,7 @@ func parseRID(rid string) (name string, query string) {
return rid[:i], rid[i+1:]
}

func (s *Subscription) loadAccess(cb func(*rescache.Access)) {
func (s *Subscription) loadAccess(cb func(*rescache.Access), t *rescache.Throttle) {
if s.access != nil {
cb(s.access)
return
Expand All @@ -847,25 +842,50 @@ func (s *Subscription) loadAccess(cb func(*rescache.Access)) {

s.flags |= flagAccessCalled

s.c.Access(s, func(access *rescache.Access) {
s.c.Enqueue(func() {
if s.state == stateDisposed {
return
}
if t != nil {
t.Add(func() {
s.c.Access(s, func(access *rescache.Access) {
s.c.Enqueue(func() {
if s.state == stateDisposed {
return
}

cbs := s.accessCallbacks
s.flags &= ^flagAccessCalled
// Only store in case of an actual result or system.accessDenied error
if access.Error == nil || access.Error.Code == reserr.CodeAccessDenied {
s.access = access
}
s.accessCallbacks = nil
cbs := s.accessCallbacks
s.flags &= ^flagAccessCalled
// Only store in case of an actual result or system.accessDenied error
if access.Error == nil || access.Error.Code == reserr.CodeAccessDenied {
s.access = access
}
s.accessCallbacks = nil

for _, cb := range cbs {
cb(access)
}
for _, cb := range cbs {
cb(access)
}
})
t.Done()
})
})
})
} else {
s.c.Access(s, func(access *rescache.Access) {
s.c.Enqueue(func() {
if s.state == stateDisposed {
return
}

cbs := s.accessCallbacks
s.flags &= ^flagAccessCalled
// Only store in case of an actual result or system.accessDenied error
if access.Error == nil || access.Error.Code == reserr.CodeAccessDenied {
s.access = access
}
s.accessCallbacks = nil

for _, cb := range cbs {
cb(access)
}
})
})
}
}

// CanGet checks asynchronously if the client connection has access to get (read)
Expand All @@ -875,7 +895,7 @@ func (s *Subscription) loadAccess(cb func(*rescache.Access)) {
func (s *Subscription) CanGet(cb func(err error)) {
s.loadAccess(func(a *rescache.Access) {
cb(a.CanGet())
})
}, nil)
}

// CanCall checks asynchronously if the client connection has access to call
Expand All @@ -885,5 +905,5 @@ func (s *Subscription) CanGet(cb func(err error)) {
func (s *Subscription) CanCall(action string, cb func(err error)) {
s.loadAccess(func(a *rescache.Access) {
cb(a.CanCall(action))
})
}, nil)
}
Loading

0 comments on commit fb58f1d

Please sign in to comment.