Skip to content

Commit

Permalink
Implement FileWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Feb 6, 2022
1 parent 8009bcf commit 899b342
Show file tree
Hide file tree
Showing 15 changed files with 1,129 additions and 53 deletions.
8 changes: 4 additions & 4 deletions cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
// DBConfig represents the configuration for a single database.
type DBConfig struct {
Path string `yaml:"path"`
MonitorInterval *time.Duration `yaml:"monitor-interval"`
CheckpointInterval *time.Duration `yaml:"checkpoint-interval"`
MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"`
MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"`
Expand All @@ -281,14 +280,15 @@ func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) {
if err != nil {
return nil, err
}
return NewDBFromConfigWithPath(dbc, path)
}

// NewDBFromConfigWithPath instantiates a DB based on a configuration and using a given path.
func NewDBFromConfigWithPath(dbc *DBConfig, path string) (*litestream.DB, error) {
// Initialize database with given path.
db := litestream.NewDB(path)

// Override default database settings if specified in configuration.
if dbc.MonitorInterval != nil {
db.MonitorInterval = *dbc.MonitorInterval
}
if dbc.CheckpointInterval != nil {
db.CheckpointInterval = *dbc.CheckpointInterval
}
Expand Down
28 changes: 14 additions & 14 deletions cmd/litestream/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ type ReplicateCommand struct {

Config Config

// List of managed databases specified in the config.
DBs []*litestream.DB
server *litestream.Server
}

// NewReplicateCommand returns a new instance of ReplicateCommand.
Expand Down Expand Up @@ -104,21 +103,27 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
log.Println("no databases specified in configuration")
}

c.server = litestream.NewServer()
if err := c.server.Open(); err != nil {
return fmt.Errorf("open server: %w", err)
}

// Add databases to the server.
for _, dbConfig := range c.Config.DBs {
db, err := NewDBFromConfig(dbConfig)
path, err := expand(dbConfig.Path)
if err != nil {
return err
}

// Open database & attach to program.
if err := db.Open(); err != nil {
if err := c.server.Watch(path, func(path string) (*litestream.DB, error) {
return NewDBFromConfigWithPath(dbConfig, path)
}); err != nil {
return err
}
c.DBs = append(c.DBs, db)
}

// Notify user that initialization is done.
for _, db := range c.DBs {
for _, db := range c.server.DBs() {
log.Printf("initialized db: %s", db.Path())
for _, r := range db.Replicas {
switch client := r.Client().(type) {
Expand Down Expand Up @@ -180,13 +185,8 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {

// Close closes all open databases.
func (c *ReplicateCommand) Close() (err error) {
for _, db := range c.DBs {
if e := db.Close(); e != nil {
log.Printf("error closing db: path=%s err=%s", db.Path(), e)
if err == nil {
err = e
}
}
if e := c.server.Close(); e != nil && err == nil {
err = e
}
return err
}
Expand Down
71 changes: 42 additions & 29 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import (

// Default DB settings.
const (
DefaultMonitorInterval = 1 * time.Second
DefaultCheckpointInterval = 1 * time.Minute
DefaultMinCheckpointPageN = 1000
DefaultMaxCheckpointPageN = 10000
)

// MonitorDelayInterval is the time Litestream will wait after receiving a file
// change notification before processing the WAL file for changes.
const MonitorDelayInterval = 100 * time.Millisecond

// MaxIndex is the maximum possible WAL index.
// If this index is reached then a new generation will be started.
const MaxIndex = 0x7FFFFFFF
Expand All @@ -43,14 +46,15 @@ const BusyTimeout = 1 * time.Second

// DB represents a managed instance of a SQLite database in the file system.
type DB struct {
mu sync.RWMutex
path string // part to database
db *sql.DB // target database
f *os.File // long-running db file descriptor
rtx *sql.Tx // long running read transaction
pos Pos // cached position
pageSize int // page size, in bytes
notify chan struct{} // closes on WAL change
mu sync.RWMutex
path string // part to database
db *sql.DB // target database
f *os.File // long-running db file descriptor
rtx *sql.Tx // long running read transaction
pos Pos // cached position
pageSize int // page size, in bytes
notifyCh chan struct{} // notifies DB of changes
walNotify chan struct{} // closes on WAL change

// Cached salt & checksum from current shadow header.
hdr []byte
Expand Down Expand Up @@ -98,9 +102,6 @@ type DB struct {
// better precision.
CheckpointInterval time.Duration

// Frequency at which to perform db sync.
MonitorInterval time.Duration

// List of replicas for the database.
// Must be set before calling Open().
Replicas []*Replica
Expand All @@ -111,13 +112,13 @@ type DB struct {
// NewDB returns a new instance of DB for a given path.
func NewDB(path string) *DB {
db := &DB{
path: path,
notify: make(chan struct{}),
path: path,
notifyCh: make(chan struct{}, 1),
walNotify: make(chan struct{}),

MinCheckpointPageN: DefaultMinCheckpointPageN,
MaxCheckpointPageN: DefaultMaxCheckpointPageN,
CheckpointInterval: DefaultCheckpointInterval,
MonitorInterval: DefaultMonitorInterval,

Logger: log.New(LogWriter, fmt.Sprintf("%s: ", logPrefixPath(path)), LogFlags),
}
Expand Down Expand Up @@ -358,11 +359,16 @@ func (db *DB) walSegmentOffsetsByIndex(generation string, index int) ([]int64, e
return offsets, nil
}

// Notify returns a channel that closes when the shadow WAL changes.
func (db *DB) Notify() <-chan struct{} {
// NotifyCh returns a channel that can be used to signal changes in the DB.
func (db *DB) NotifyCh() chan<- struct{} {
return db.notifyCh
}

// WALNotify returns a channel that closes when the shadow WAL changes.
func (db *DB) WALNotify() <-chan struct{} {
db.mu.RLock()
defer db.mu.RUnlock()
return db.notify
return db.walNotify
}

// PageSize returns the page size of the underlying database.
Expand Down Expand Up @@ -395,10 +401,8 @@ func (db *DB) Open() (err error) {
}

// Start monitoring SQLite database in a separate goroutine.
if db.MonitorInterval > 0 {
db.wg.Add(1)
go func() { defer db.wg.Done(); db.monitor() }()
}
db.wg.Add(1)
go func() { defer db.wg.Done(); db.monitor() }()

return nil
}
Expand Down Expand Up @@ -903,8 +907,8 @@ func (db *DB) Sync(ctx context.Context) (err error) {

// Notify replicas of WAL changes.
if db.pos != origPos {
close(db.notify)
db.notify = make(chan struct{})
close(db.walNotify)
db.walNotify = make(chan struct{})
}

return nil
Expand Down Expand Up @@ -1367,18 +1371,27 @@ func (db *DB) execCheckpoint(mode string) (err error) {

// monitor runs in a separate goroutine and monitors the database & WAL.
func (db *DB) monitor() {
ticker := time.NewTicker(db.MonitorInterval)
defer ticker.Stop()
timer := time.NewTimer(MonitorDelayInterval)
defer timer.Stop()

for {
// Wait for ticker or context close.
// Wait for a file change notification from the file system.
select {
case <-db.ctx.Done():
return
case <-ticker.C:
case <-db.notifyCh:
}

// Wait for small delay before processing changes.
timer.Reset(MonitorDelayInterval)
<-timer.C

// Clear any additional change notifications that occurred during delay.
select {
case <-db.notifyCh:
default:
}

// Sync the database to the shadow WAL.
if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) {
db.Logger.Printf("sync error: %s", err)
}
Expand Down
1 change: 0 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,6 @@ func MustOpenDB(tb testing.TB) *litestream.DB {
func MustOpenDBAt(tb testing.TB, path string) *litestream.DB {
tb.Helper()
db := litestream.NewDB(path)
db.MonitorInterval = 0 // disable background goroutine
if err := db.Open(); err != nil {
tb.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/prometheus/client_golang v1.12.1
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a // indirect
google.golang.org/api v0.66.0
gopkg.in/yaml.v2 v2.4.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a h1:ppl5mZgokTT8uPkmYOyEUmPTr3ypaKkg5eFOGrAmxxE=
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
6 changes: 4 additions & 2 deletions integration/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestCmd_Replicate_OK(t *testing.T) {
db, err := sql.Open("sqlite3", filepath.Join(tempDir, "db"))
if err != nil {
t.Fatal(err)
} else if _, err := db.ExecContext(ctx, `PRAGMA journal_mode = wal`); err != nil {
t.Fatal(err)
} else if _, err := db.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -378,9 +380,9 @@ func waitForLogMessage(tb testing.TB, b *internal.LockingBuffer, msg string) {
// killLitestreamCmd interrupts the process and waits for a clean shutdown.
func killLitestreamCmd(tb testing.TB, cmd *exec.Cmd, stdout *internal.LockingBuffer) {
if err := cmd.Process.Signal(os.Interrupt); err != nil {
tb.Fatal(err)
tb.Fatal("kill litestream: signal:", err)
} else if err := cmd.Wait(); err != nil {
tb.Fatal(err)
tb.Fatal("kill litestream: cmd:", err)
}
}

Expand Down
36 changes: 36 additions & 0 deletions internal/file_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package internal

import (
"errors"
)

// File event mask constants.
const (
FileEventCreated = 1 << iota
FileEventModified
FileEventDeleted
)

// FileEvent represents an event on a watched file.
type FileEvent struct {
Name string
Mask int
}

// ErrFileEventQueueOverflow is returned when the file event queue has overflowed.
var ErrFileEventQueueOverflow = errors.New("file event queue overflow")

// FileWatcher represents a watcher of file events.
type FileWatcher interface {
Open() error
Close() error

// Returns a channel of events for watched files.
Events() <-chan FileEvent

// Adds a specific file to be watched.
Watch(filename string) error

// Removes a specific file from being watched.
Unwatch(filename string) error
}
Loading

0 comments on commit 899b342

Please sign in to comment.