Skip to content

Commit

Permalink
Merge pull request #502 from binwiederhier/async-message-cache
Browse files Browse the repository at this point in the history
Batch message INSERTs
  • Loading branch information
binwiederhier authored Nov 17, 2022
2 parents 7551554 + 978118a commit f4daa45
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 20 deletions.
6 changes: 6 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ var flagsServe = append(
altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"firebase_key_file", "F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"cache_file", "C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-duration", Aliases: []string{"cache_duration", "b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: server.DefaultCacheDuration, Usage: "buffer messages for this time to allow `since` requests"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "cache-batch-size", Aliases: []string{"cache_batch_size"}, EnvVars: []string{"NTFY_BATCH_SIZE"}, Usage: "max size of messages to batch together when writing to message cache (if zero, writes are synchronous)"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-batch-timeout", Aliases: []string{"cache_batch_timeout"}, EnvVars: []string{"NTFY_CACHE_BATCH_TIMEOUT"}, Usage: "timeout for batched async writes to the message cache (if zero, writes are synchronous)"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-startup-queries", Aliases: []string{"cache_startup_queries"}, EnvVars: []string{"NTFY_CACHE_STARTUP_QUERIES"}, Usage: "queries run when the cache database is initialized"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-file", Aliases: []string{"auth_file", "H"}, EnvVars: []string{"NTFY_AUTH_FILE"}, Usage: "auth database file used for access control"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-default-access", Aliases: []string{"auth_default_access", "p"}, EnvVars: []string{"NTFY_AUTH_DEFAULT_ACCESS"}, Value: "read-write", Usage: "default permissions if no matching entries in the auth database are found"}),
Expand Down Expand Up @@ -110,6 +112,8 @@ func execServe(c *cli.Context) error {
cacheFile := c.String("cache-file")
cacheDuration := c.Duration("cache-duration")
cacheStartupQueries := c.String("cache-startup-queries")
cacheBatchSize := c.Int("cache-batch-size")
cacheBatchTimeout := c.Duration("cache-batch-timeout")
authFile := c.String("auth-file")
authDefaultAccess := c.String("auth-default-access")
attachmentCacheDir := c.String("attachment-cache-dir")
Expand Down Expand Up @@ -233,6 +237,8 @@ func execServe(c *cli.Context) error {
conf.CacheFile = cacheFile
conf.CacheDuration = cacheDuration
conf.CacheStartupQueries = cacheStartupQueries
conf.CacheBatchSize = cacheBatchSize
conf.CacheBatchTimeout = cacheBatchTimeout
conf.AuthFile = authFile
conf.AuthDefaultRead = authDefaultRead
conf.AuthDefaultWrite = authDefaultWrite
Expand Down
14 changes: 13 additions & 1 deletion docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -825,19 +825,27 @@ out [this discussion on Reddit](https://www.reddit.com/r/golang/comments/r9u4ee/

Depending on *how you run it*, here are a few limits that are relevant:

### WAL for message cache
### Message cache
By default, the [message cache](#message-cache) (defined by `cache-file`) uses the SQLite default settings, which means it
syncs to disk on every write. For personal servers, this is perfectly adequate. For larger installations, such as ntfy.sh,
the [write-ahead log (WAL)](https://sqlite.org/wal.html) should be enabled, and the sync mode should be adjusted.
See [this article](https://phiresky.github.io/blog/2020/sqlite-performance-tuning/) for details.

In addition to that, for very high load servers (such as ntfy.sh), it may be beneficial to write messages to the cache
in batches, and asynchronously. This can be enabled with the `cache-batch-size` and `cache-batch-timeout`. If you start
seeing `database locked` messages in the logs, you should probably enable that.

Here's how ntfy.sh has been tuned in the `server.yml` file:

``` yaml
cache-batch-size: 25
cache-batch-timeout: "1s"
cache-startup-queries: |
pragma journal_mode = WAL;
pragma synchronous = normal;
pragma temp_store = memory;
pragma busy_timeout = 15000;
vacuum;
```

### For systemd services
Expand Down Expand Up @@ -990,6 +998,8 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`).
| `cache-file` | `NTFY_CACHE_FILE` | *filename* | - | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache). |
| `cache-duration` | `NTFY_CACHE_DURATION` | *duration* | 12h | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely. |
| `cache-startup-queries` | `NTFY_CACHE_STARTUP_QUERIES` | *string (SQL queries)* | - | SQL queries to run during database startup; this is useful for tuning and [enabling WAL mode](#wal-for-message-cache) |
| `cache-batch-size` | `NTFY_CACHE_BATCH_SIZE` | *int* | 0 | Max size of messages to batch together when writing to message cache (if zero, writes are synchronous) |
| `cache-batch-timeout` | `NTFY_CACHE_BATCH_TIMEOUT` | *duration* | 0s | Timeout for batched async writes to the message cache (if zero, writes are synchronous) |
| `auth-file` | `NTFY_AUTH_FILE` | *filename* | - | Auth database file used for access control. If set, enables authentication and access control. See [access control](#access-control). |
| `auth-default-access` | `NTFY_AUTH_DEFAULT_ACCESS` | `read-write`, `read-only`, `write-only`, `deny-all` | `read-write` | Default permissions if no matching entries in the auth database are found. Default is `read-write`. |
| `behind-proxy` | `NTFY_BEHIND_PROXY` | *bool* | false | If set, the X-Forwarded-For header is used to determine the visitor IP address instead of the remote address of the connection. |
Expand Down Expand Up @@ -1054,6 +1064,8 @@ OPTIONS:
--behind-proxy, --behind_proxy, -P if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting) (default: false) [$NTFY_BEHIND_PROXY]
--cache-duration since, --cache_duration since, -b since buffer messages for this time to allow since requests (default: 12h0m0s) [$NTFY_CACHE_DURATION]
--cache-file value, --cache_file value, -C value cache file used for message caching [$NTFY_CACHE_FILE]
--cache-batch-size value, --cache_batch_size value max size of messages to batch together when writing to message cache (if zero, writes are synchronous) (default: 0) [$NTFY_BATCH_SIZE]
--cache-batch-timeout value, --cache_batch_timeout value timeout for batched async writes to the message cache (if zero, writes are synchronous) (default: 0s) [$NTFY_CACHE_BATCH_TIMEOUT]
--cache-startup-queries value, --cache_startup_queries value queries run when the cache database is initialized [$NTFY_CACHE_STARTUP_QUERIES]
--cert-file value, --cert_file value, -E value certificate file, if listen-https is set [$NTFY_CERT_FILE]
--config value, -c value config file (default: /etc/ntfy/server.yml) [$NTFY_CONFIG_FILE]
Expand Down
5 changes: 5 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release

## ntfy server v1.30.0 (UNRELREASED)

**Features:**

* High-load servers: Allow asynchronous batch-writing of messages to cache via `cache-batch-*` options ([#498](https://github.com/binwiederhier/ntfy/issues/498)/[#502](https://github.com/binwiederhier/ntfy/pull/502))

**Documentation:**

* GitHub Actions example ([#492](https://github.com/binwiederhier/ntfy/pull/492), thanks to [@ksurl](https://github.com/ksurl))
Expand All @@ -22,6 +26,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
**Other things:**

* Put ntfy.sh docs on GitHub pages to reduce AWS outbound traffic cost ([#491](https://github.com/binwiederhier/ntfy/issues/491))
* The ntfy.sh server hardware was upgraded to a bigger box. If you'd like to help out carrying the server cost, **[sponsorships and donations](https://github.com/sponsors/binwiederhier)** 💸 would be very much appreciated

## ntfy server v1.29.0
Released November 12, 2022
Expand Down
4 changes: 4 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type Config struct {
CacheFile string
CacheDuration time.Duration
CacheStartupQueries string
CacheBatchSize int
CacheBatchTimeout time.Duration
AuthFile string
AuthDefaultRead bool
AuthDefaultWrite bool
Expand Down Expand Up @@ -114,6 +116,8 @@ func NewConfig() *Config {
FirebaseKeyFile: "",
CacheFile: "",
CacheDuration: DefaultCacheDuration,
CacheBatchSize: 0,
CacheBatchTimeout: 0,
AuthFile: "",
AuthDefaultRead: true,
AuthDefaultWrite: true,
Expand Down
82 changes: 69 additions & 13 deletions server/message_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
published INT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
COMMIT;
`
Expand Down Expand Up @@ -92,7 +93,7 @@ const (

// Schema management queries
const (
currentSchemaVersion = 8
currentSchemaVersion = 9
createSchemaVersionTableQuery = `
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
Expand Down Expand Up @@ -185,37 +186,50 @@ const (
migrate7To8AlterMessagesTableQuery = `
ALTER TABLE messages ADD COLUMN icon TEXT NOT NULL DEFAULT('');
`

// 8 -> 9
migrate8To9AlterMessagesTableQuery = `
CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
`
)

type messageCache struct {
db *sql.DB
nop bool
db *sql.DB
queue *util.BatchingQueue[*message]
nop bool
}

// newSqliteCache creates a SQLite file-backed cache
func newSqliteCache(filename, startupQueries string, nop bool) (*messageCache, error) {
func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout time.Duration, nop bool) (*messageCache, error) {
db, err := sql.Open("sqlite3", filename)
if err != nil {
return nil, err
}
if err := setupCacheDB(db, startupQueries); err != nil {
return nil, err
}
return &messageCache{
db: db,
nop: nop,
}, nil
var queue *util.BatchingQueue[*message]
if batchSize > 0 || batchTimeout > 0 {
queue = util.NewBatchingQueue[*message](batchSize, batchTimeout)
}
cache := &messageCache{
db: db,
queue: queue,
nop: nop,
}
go cache.processMessageBatches()
return cache, nil
}

// newMemCache creates an in-memory cache
func newMemCache() (*messageCache, error) {
return newSqliteCache(createMemoryFilename(), "", false)
return newSqliteCache(createMemoryFilename(), "", 0, 0, false)
}

// newNopCache creates an in-memory cache that discards all messages;
// it is always empty and can be used if caching is entirely disabled
func newNopCache() (*messageCache, error) {
return newSqliteCache(createMemoryFilename(), "", true)
return newSqliteCache(createMemoryFilename(), "", 0, 0, true)
}

// createMemoryFilename creates a unique memory filename to use for the SQLite backend.
Expand All @@ -228,14 +242,23 @@ func createMemoryFilename() string {
return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
}

// AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asyncronously.
// The message is queued only if "batchSize" or "batchTimeout" are passed to the constructor.
func (c *messageCache) AddMessage(m *message) error {
if c.queue != nil {
c.queue.Enqueue(m)
return nil
}
return c.addMessages([]*message{m})
}

// addMessages synchronously stores a match of messages. If the database is locked, the transaction waits until
// SQLite's busy_timeout is exceeded before erroring out.
func (c *messageCache) addMessages(ms []*message) error {
if c.nop {
return nil
}
start := time.Now()
tx, err := c.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -289,7 +312,12 @@ func (c *messageCache) addMessages(ms []*message) error {
return err
}
}
return tx.Commit()
if err := tx.Commit(); err != nil {
log.Error("Cache: Writing %d message(s) failed (took %v)", len(ms), time.Since(start))
return err
}
log.Debug("Cache: Wrote %d message(s) in %v", len(ms), time.Since(start))
return nil
}

func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
Expand Down Expand Up @@ -395,8 +423,12 @@ func (c *messageCache) Topics() (map[string]*topic, error) {
}

func (c *messageCache) Prune(olderThan time.Time) error {
_, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
return err
start := time.Now()
if _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix()); err != nil {
log.Warn("Cache: Pruning failed (after %v): %s", time.Since(start), err.Error())
}
log.Debug("Cache: Pruning successful (took %v)", time.Since(start))
return nil
}

func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) {
Expand All @@ -417,6 +449,17 @@ func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) {
return size, nil
}

func (c *messageCache) processMessageBatches() {
if c.queue == nil {
return
}
for messages := range c.queue.Dequeue() {
if err := c.addMessages(messages); err != nil {
log.Error("Cache: %s", err.Error())
}
}
}

func readMessages(rows *sql.Rows) ([]*message, error) {
defer rows.Close()
messages := make([]*message, 0)
Expand Down Expand Up @@ -542,6 +585,8 @@ func setupCacheDB(db *sql.DB, startupQueries string) error {
return migrateFrom6(db)
} else if schemaVersion == 7 {
return migrateFrom7(db)
} else if schemaVersion == 8 {
return migrateFrom8(db)
}
return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
}
Expand Down Expand Up @@ -647,5 +692,16 @@ func migrateFrom7(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 8); err != nil {
return err
}
return migrateFrom8(db)
}

func migrateFrom8(db *sql.DB) error {
log.Info("Migrating cache database schema: from 8 to 9")
if _, err := db.Exec(migrate8To9AlterMessagesTableQuery); err != nil {
return err
}
if _, err := db.Exec(updateSchemaVersion, 9); err != nil {
return err
}
return nil // Update this when a new version is added
}
10 changes: 5 additions & 5 deletions server/message_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func TestSqliteCache_StartupQueries_WAL(t *testing.T) {
startupQueries := `pragma journal_mode = WAL;
pragma synchronous = normal;
pragma temp_store = memory;`
db, err := newSqliteCache(filename, startupQueries, false)
db, err := newSqliteCache(filename, startupQueries, 0, 0, false)
require.Nil(t, err)
require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
require.FileExists(t, filename)
Expand All @@ -461,7 +461,7 @@ pragma temp_store = memory;`
func TestSqliteCache_StartupQueries_None(t *testing.T) {
filename := newSqliteTestCacheFile(t)
startupQueries := ""
db, err := newSqliteCache(filename, startupQueries, false)
db, err := newSqliteCache(filename, startupQueries, 0, 0, false)
require.Nil(t, err)
require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
require.FileExists(t, filename)
Expand All @@ -472,7 +472,7 @@ func TestSqliteCache_StartupQueries_None(t *testing.T) {
func TestSqliteCache_StartupQueries_Fail(t *testing.T) {
filename := newSqliteTestCacheFile(t)
startupQueries := `xx error`
_, err := newSqliteCache(filename, startupQueries, false)
_, err := newSqliteCache(filename, startupQueries, 0, 0, false)
require.Error(t, err)
}

Expand Down Expand Up @@ -501,7 +501,7 @@ func TestMemCache_NopCache(t *testing.T) {
}

func newSqliteTestCache(t *testing.T) *messageCache {
c, err := newSqliteCache(newSqliteTestCacheFile(t), "", false)
c, err := newSqliteCache(newSqliteTestCacheFile(t), "", 0, 0, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -513,7 +513,7 @@ func newSqliteTestCacheFile(t *testing.T) string {
}

func newSqliteTestCacheFromFile(t *testing.T, filename, startupQueries string) *messageCache {
c, err := newSqliteCache(filename, startupQueries, false)
c, err := newSqliteCache(filename, startupQueries, 0, 0, false)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func createMessageCache(conf *Config) (*messageCache, error) {
if conf.CacheDuration == 0 {
return newNopCache()
} else if conf.CacheFile != "" {
return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, false)
return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
}
return newMemCache()
}
Expand Down Expand Up @@ -491,6 +491,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
log.Debug("%s Message delayed, will process later", logMessagePrefix(v, m))
}
if cache {
log.Debug("%s Adding message to cache", logMessagePrefix(v, m))
if err := s.messageCache.AddMessage(m); err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions server/server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@
# pragma journal_mode = WAL;
# pragma synchronous = normal;
# pragma temp_store = memory;
# pragma busy_timeout = 15000;
# vacuum;
#
# The "cache-batch-size" and "cache-batch-timeout" parameter allow enabling async batch writing
# of messages. If set, messages will be queued and written to the database in batches of the given
# size, or after the given timeout. This is only required for high volume servers.
#
# Debian/RPM package users:
# Use /var/cache/ntfy/cache.db as cache file to avoid permission issues. The package
Expand All @@ -65,6 +71,8 @@
# cache-file: <filename>
# cache-duration: "12h"
# cache-startup-queries:
# cache-batch-size: 0
# cache-batch-timeout: "0ms"

# If set, access to the ntfy server and API can be controlled on a granular level using
# the 'ntfy user' and 'ntfy access' commands. See the --help pages for details, or check the docs.
Expand Down
Loading

0 comments on commit f4daa45

Please sign in to comment.