Skip to content

Commit

Permalink
Limits
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Heckel committed Nov 1, 2021
1 parent fa7a459 commit b775e6d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 37 deletions.
47 changes: 26 additions & 21 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,41 @@ const (
DefaultManagerInterval = time.Minute
)

// Defines the max number of requests, here:
// 50 requests bucket, replenished at a rate of 1 per second
// Defines all the limits
// - request limit: max number of PUT/GET/.. requests (here: 50 requests bucket, replenished at a rate of 1 per second)
// - global topic limit: max number of topics overall
// - subscription limit: max number of subscriptions (active HTTP connections) per per-visitor/IP
var (
defaultRequestLimit = rate.Every(time.Second)
defaultRequestLimitBurst = 50
defaultSubscriptionLimit = 30 // per visitor
defaultGlobalTopicLimit = 5000
defaultVisitorRequestLimit = rate.Every(time.Second)
defaultVisitorRequestLimitBurst = 50
defaultVisitorSubscriptionLimit = 30
)

// Config is the main config struct for the application. Use New to instantiate a default config struct.
type Config struct {
ListenHTTP string
FirebaseKeyFile string
MessageBufferDuration time.Duration
KeepaliveInterval time.Duration
ManagerInterval time.Duration
RequestLimit rate.Limit
RequestLimitBurst int
SubscriptionLimit int
ListenHTTP string
FirebaseKeyFile string
MessageBufferDuration time.Duration
KeepaliveInterval time.Duration
ManagerInterval time.Duration
GlobalTopicLimit int
VisitorRequestLimit rate.Limit
VisitorRequestLimitBurst int
VisitorSubscriptionLimit int
}

// New instantiates a default new config
func New(listenHTTP string) *Config {
return &Config{
ListenHTTP: listenHTTP,
FirebaseKeyFile: "",
MessageBufferDuration: DefaultMessageBufferDuration,
KeepaliveInterval: DefaultKeepaliveInterval,
ManagerInterval: DefaultManagerInterval,
RequestLimit: defaultRequestLimit,
RequestLimitBurst: defaultRequestLimitBurst,
SubscriptionLimit: defaultSubscriptionLimit,
ListenHTTP: listenHTTP,
FirebaseKeyFile: "",
MessageBufferDuration: DefaultMessageBufferDuration,
KeepaliveInterval: DefaultKeepaliveInterval,
ManagerInterval: DefaultManagerInterval,
GlobalTopicLimit: defaultGlobalTopicLimit,
VisitorRequestLimit: defaultVisitorRequestLimit,
VisitorRequestLimitBurst: defaultVisitorRequestLimitBurst,
VisitorSubscriptionLimit: defaultVisitorSubscriptionLimit,
}
}
29 changes: 29 additions & 0 deletions server/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ <h3>Subscribe via web</h3>
<ul id="topicsList"></ul>
<audio id="notifySound" src="static/sound/mixkit-message-pop-alert-2354.mp3"></audio>

<h3>Subscribe via phone</h3>
<p>
Once it's approved, you can use the <b>Ntfy Android App</b> to receive notifications directly on your phone. Just like
the server, this app is also <a href="https://github.com/binwiederhier/ntfy-android">open source</a>.
</p>

<h3>Subscribe via your app, or via the CLI</h3>
<p class="smallMarginBottom">
Using <a href="https://developer.mozilla.org/en-US/docs/Web/API/EventSource">EventSource</a> in JS, you can consume
Expand Down Expand Up @@ -142,6 +148,7 @@ <h3>Message buffering and polling</h3>
$ curl -s "ntfy.sh/mytopic/json?poll=1&since=10m"<br/>
# Returns messages from up to 10 minutes ago and ends the connection
</code>

<h2>FAQ</h2>
<p>
<b>Isn't this like ...?</b><br/>
Expand All @@ -165,6 +172,28 @@ <h2>FAQ</h2>
That said, the logs do not contain any topic names or other details about you. Check the code if you don't believe me.
</p>

<p>
<b>Why is Firebase used?</b><br/>
In addition to caching messages locally and delivering them to long-polling subscribers, all messages are also
published to Firebase Cloud Messaging (FCM) (if <tt>FirebaseKeyFile</tt> is set, which it is on ntfy.sh). This
is to facilitate instant notifications on Android. I tried really, really hard to avoid using FCM, but newer
versions of Android made it impossible to implement <a href="https://developer.android.com/guide/background">background services</a>>.
I'm sorry.
</p>

<h2>Privacy policy</h2>
<p>
Neither the server nor the app record any personal information, or share any of the messages and topics with
any outside service. All data is exclusively used to make the service function properly. The notable exception
is the Firebase Cloud Messaging (FCM) service, which is required to provide instant Android notifications (see
FAQ for details).
</p>

<p>
The web server does not log or otherwise store request paths, remote IP addresses or even topics or messages,
aside from a short on-disk cache (up to a day) to support the <tt>since=</tt> feature and service restarts.
</p>

<center id="ironicCenterTagDontFreakOut"><i>Made with ❤️ by <a href="https://heckel.io">Philipp C. Heckel</a></i></center>
</div>
<script src="static/js/app.js"></script>
Expand Down
26 changes: 18 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

// TODO add "max messages in a topic" limit
// TODO implement persistence
// TODO implement "since=<ID>"

// Server is the main server
type Server struct {
Expand Down Expand Up @@ -146,7 +147,7 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error {
} else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) {
return s.handleStatic(w, r)
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicRegex.MatchString(r.URL.Path) {
return s.handlePublish(w, r)
return s.handlePublish(w, r, v)
} else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) {
return s.handleSubscribeJSON(w, r, v)
} else if r.Method == http.MethodGet && sseRegex.MatchString(r.URL.Path) {
Expand All @@ -169,8 +170,11 @@ func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request) error {
return nil
}

func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request) error {
t := s.createTopic(r.URL.Path[1:])
func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
t, err := s.topic(r.URL.Path[1:])
if err != nil {
return err
}
reader := io.LimitReader(r.Body, messageLimit)
b, err := io.ReadAll(reader)
if err != nil {
Expand Down Expand Up @@ -223,10 +227,13 @@ func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *v

func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visitor, format string, contentType string, encoder messageEncoder) error {
if err := v.AddSubscription(); err != nil {
return err
return errHTTPTooManyRequests
}
defer v.RemoveSubscription()
t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack
t, err := s.topic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack
if err != nil {
return err
}
since, err := parseSince(r)
if err != nil {
return err
Expand Down Expand Up @@ -304,16 +311,19 @@ func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error {
return nil
}

func (s *Server) createTopic(id string) *topic {
func (s *Server) topic(id string) (*topic, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.topics[id]; !ok {
if len(s.topics) >= s.config.GlobalTopicLimit {
return nil, errHTTPTooManyRequests
}
s.topics[id] = newTopic(id)
if s.firebase != nil {
s.topics[id].Subscribe(s.firebase)
}
}
return s.topics[id]
return s.topics[id], nil
}

func (s *Server) updateStatsAndExpire() {
Expand All @@ -331,7 +341,7 @@ func (s *Server) updateStatsAndExpire() {
for _, t := range s.topics {
t.Prune(s.config.MessageBufferDuration)
subs, msgs := t.Stats()
if msgs == 0 && (subs == 0 || (s.firebase != nil && subs == 1)) {
if msgs == 0 && (subs == 0 || (s.firebase != nil && subs == 1)) { // Firebase is a subscriber!
delete(s.topics, t.id)
}
}
Expand Down
16 changes: 8 additions & 8 deletions server/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"golang.org/x/time/rate"
"heckel.io/ntfy/config"
"heckel.io/ntfy/util"
"sync"
"time"
)
Expand All @@ -15,16 +16,17 @@ const (
type visitor struct {
config *config.Config
limiter *rate.Limiter
subscriptions int
subscriptions *util.Limiter
seen time.Time
mu sync.Mutex
}

func newVisitor(conf *config.Config) *visitor {
return &visitor{
config: conf,
limiter: rate.NewLimiter(conf.RequestLimit, conf.RequestLimitBurst),
seen: time.Now(),
config: conf,
limiter: rate.NewLimiter(conf.VisitorRequestLimit, conf.VisitorRequestLimitBurst),
subscriptions: util.NewLimiter(int64(conf.VisitorSubscriptionLimit)),
seen: time.Now(),
}
}

Expand All @@ -38,17 +40,16 @@ func (v *visitor) RequestAllowed() error {
func (v *visitor) AddSubscription() error {
v.mu.Lock()
defer v.mu.Unlock()
if v.subscriptions >= v.config.SubscriptionLimit {
if err := v.subscriptions.Add(1); err != nil {
return errHTTPTooManyRequests
}
v.subscriptions++
return nil
}

func (v *visitor) RemoveSubscription() {
v.mu.Lock()
defer v.mu.Unlock()
v.subscriptions--
v.subscriptions.Sub(1)
}

func (v *visitor) Keepalive() {
Expand All @@ -60,6 +61,5 @@ func (v *visitor) Keepalive() {
func (v *visitor) Stale() bool {
v.mu.Lock()
defer v.mu.Unlock()
v.seen = time.Now()
return time.Since(v.seen) > visitorExpungeAfter
}
65 changes: 65 additions & 0 deletions util/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package util

import (
"errors"
"sync"
)

// ErrLimitReached is the error returned by the Limiter and LimitWriter when the predefined limit has been reached
var ErrLimitReached = errors.New("limit reached")

// Limiter is a helper that allows adding values up to a well-defined limit. Once the limit is reached
// ErrLimitReached will be returned. Limiter may be used by multiple goroutines.
type Limiter struct {
value int64
limit int64
mu sync.Mutex
}

// NewLimiter creates a new Limiter
func NewLimiter(limit int64) *Limiter {
return &Limiter{
limit: limit,
}
}

// Add adds n to the limiters internal value, but only if the limit has not been reached. If the limit would be
// exceeded after adding n, ErrLimitReached is returned.
func (l *Limiter) Add(n int64) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.limit == 0 {
l.value += n
return nil
} else if l.value+n <= l.limit {
l.value += n
return nil
} else {
return ErrLimitReached
}
}

// Sub subtracts a value from the limiters internal value
func (l *Limiter) Sub(n int64) {
l.Add(-n)
}

// Set sets the value of the limiter to n. This function ignores the limit. It is meant to set the value
// based on reality.
func (l *Limiter) Set(n int64) {
l.mu.Lock()
l.value = n
l.mu.Unlock()
}

// Value returns the internal value of the limiter
func (l *Limiter) Value() int64 {
l.mu.Lock()
defer l.mu.Unlock()
return l.value
}

// Limit returns the defined limit
func (l *Limiter) Limit() int64 {
return l.limit
}

0 comments on commit b775e6d

Please sign in to comment.