Skip to content

Commit

Permalink
indexer: backups are now slower but deterministic
Browse files Browse the repository at this point in the history
they are a (zstd compressed) set of SQL statements

also, relevant methods now use io.Reader and io.Writer
  • Loading branch information
altergui committed Sep 11, 2024
1 parent 43b6518 commit f345045
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 73 deletions.
12 changes: 12 additions & 0 deletions data/compressor/compression.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
package compressor

import (
"io"
"time"

"github.com/klauspost/compress/zstd"
"go.vocdoni.io/dvote/log"
)

// NewWriter creates a new writer that uses zstd
func NewWriter(w io.Writer) (io.WriteCloser, error) {
return zstd.NewWriter(w)
}

// NewReader creates a new reader that uses zstd
func NewReader(r io.Reader) (io.ReadCloser, error) {
zr, err := zstd.NewReader(r)
return zr.IOReadCloser(), err
}

// Compressor is a data compressor that uses zstd.
type Compressor struct {
encoder *zstd.Encoder
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.11.0
github.com/libp2p/go-reuseport v0.4.0
github.com/manifoldco/promptui v0.9.0
github.com/mattn/go-sqlite3 v1.14.23
github.com/multiformats/go-multiaddr v0.12.4
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -970,8 +970,6 @@ github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.23 h1:gbShiuAP1W5j9UOksQ06aiiqPMxYecovVGwmTxWtuw0=
github.com/mattn/go-sqlite3 v1.14.23/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY=
Expand Down
44 changes: 2 additions & 42 deletions service/indexer.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package service

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"

"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/snapshot"
Expand All @@ -29,43 +24,8 @@ func (vs *VocdoniService) VochainIndexer() error {
// launch the indexer after sync routine (executed when the blockchain is ready)
go vs.Indexer.AfterSyncBootstrap(false)

snapshot.SetFnImportIndexer(func(r io.Reader) error {
log.Debugf("restoring indexer backup")

file, err := os.CreateTemp("", "indexer.sqlite3")
if err != nil {
return fmt.Errorf("creating tmpfile: %w", err)
}
defer func() {
if err := file.Close(); err != nil {
log.Warnw("error closing tmpfile", "path", file.Name(), "err", err)
}
if err := os.Remove(file.Name()); err != nil {
log.Warnw("error removing tmpfile", "path", file.Name(), "err", err)
}
}()

if _, err := io.Copy(file, r); err != nil {
return fmt.Errorf("writing tmpfile: %w", err)
}

return vs.Indexer.RestoreBackup(file.Name())
})

snapshot.SetFnExportIndexer(func(w io.Writer) error {
log.Debugf("saving indexer backup")

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
data, err := vs.Indexer.ExportBackupAsBytes(ctx)
if err != nil {
return fmt.Errorf("creating indexer backup: %w", err)
}
if _, err := w.Write(data); err != nil {
return fmt.Errorf("writing data: %w", err)
}
return nil
})
snapshot.SetFnImportIndexer(vs.Indexer.ImportBackup)
snapshot.SetFnExportIndexer(vs.Indexer.ExportBackup)

return nil
}
125 changes: 102 additions & 23 deletions vochain/indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package indexer

import (
"bufio"
"bytes"
"context"
"database/sql"
Expand All @@ -18,6 +19,7 @@ import (
"sync"
"time"

"go.vocdoni.io/dvote/data/compressor"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/statedb"
"go.vocdoni.io/dvote/types"
Expand All @@ -31,6 +33,7 @@ import (

"github.com/pressly/goose/v3"

"github.com/schollz/sqlite3dump"

Check failure on line 36 in vochain/indexer/indexer.go

View workflow job for this annotation

GitHub Actions / job_go_test

no required module provides package github.com/schollz/sqlite3dump; to add it:
// modernc is a pure-Go version, but its errors have less useful info.
// We use mattn while developing and testing, and we can swap them later.
// _ "modernc.org/sqlite"
Expand Down Expand Up @@ -240,7 +243,8 @@ func (idx *Indexer) Close() error {
return nil
}

// BackupPath restores the database from a backup created via SaveBackup.
// RestoreBackup restores the indexer by copying the file (raw binary format)
// from the passed path.
// Note that this must be called with ExpectBackupRestore set to true,
// and before any indexing or queries happen.
func (idx *Indexer) RestoreBackup(path string) error {
Expand Down Expand Up @@ -277,36 +281,111 @@ func gooseMigrationsPending(db *sql.DB, dir string) bool {
return len(migrations) > 0
}

// SaveBackup backs up the database to a file on disk.
// Note that writes to the database may be blocked until the backup finishes,
// and an error may occur if a file at path already exists.
//
// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database.
func (idx *Indexer) SaveBackup(ctx context.Context, path string) error {
_, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, path)
return err
// ImportBackup restores the database from a backup created via ExportBackup.
// Note that this must be called with ExpectBackupRestore set to true,
// and before any indexing or queries happen.
func (idx *Indexer) ImportBackup(r io.Reader) error {
if idx.readWriteDB != nil {
panic("Indexer.RestoreBackup called after the database was initialized")
}
log.Debugf("restoring indexer backup")

zr, err := compressor.NewReader(r)
if err != nil {
return fmt.Errorf("could not init decompressor: %w", err)
}
defer zr.Close()

if err := restoreDBFromSQLDump(idx.dbPath, zr); err != nil {
return fmt.Errorf("could not restore indexer backup: %w", err)
}
if err := idx.startDB(); err != nil {
return err
}
return nil
}

func restoreDBFromSQLDump(dbPath string, r io.Reader) error {
db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal&_foreign_keys=true", dbPath))
if err != nil {
return fmt.Errorf("could not open indexer db: %w", err)
}
defer db.Close()

scanner := bufio.NewScanner(r)
var statement strings.Builder
for scanner.Scan() {
line := scanner.Text()
statement.WriteString(line)
statement.WriteString("\n")

if strings.HasSuffix(line, ";") {
_, err := db.Exec(statement.String())
if err != nil {
return fmt.Errorf("failed to execute statement: %s (error: %w)", statement.String(), err)
}
statement.Reset()
}
}

if err := scanner.Err(); err != nil {
return fmt.Errorf("error during restore: %w", err)
}

return nil
}

// ExportBackup exports a (compressed) deterministic set of SQL statements.
// Note that writes to the database may be blocked until the method finishes.
func (idx *Indexer) ExportBackup(w io.Writer) error {
log.Debugf("exporting indexer backup")
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

tmpDB, err := os.CreateTemp("", "indexer*.sqlite3")
if err != nil {
return fmt.Errorf("could not create tmpdb file: %w", err)
}
defer os.Remove(tmpDB.Name())

if _, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, tmpDB.Name()); err != nil {
return fmt.Errorf("could not vacuum into tmpdb: %w", err)
}

db, err := sql.Open("sqlite3", tmpDB.Name())
if err != nil {
return fmt.Errorf("could not open tmpDB: %w", err)
}
defer db.Close()

// first drop stats table
if _, err := db.ExecContext(ctx, `DROP TABLE IF EXISTS sqlite_stat1;`); err != nil {
return fmt.Errorf("could not drop table sqlite_stat1: %w", err)
}

// make goose_db_version table deterministic
if _, err := db.ExecContext(ctx, `UPDATE goose_db_version SET tstamp = '1970-01-01 00:00:00';`); err != nil {
return fmt.Errorf("could not update goose_db_version: %w", err)
}

zw, err := compressor.NewWriter(w)
if err != nil {
return fmt.Errorf("could not init compressor: %w", err)
}
defer zw.Close()

return sqlite3dump.DumpDB(db, zw)
}

// ExportBackupAsBytes backs up the database, and returns the contents as []byte.
//
// Note that writes to the database may be blocked until the backup finishes.
//
// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database.
func (idx *Indexer) ExportBackupAsBytes(ctx context.Context) ([]byte, error) {
tmpDir, err := os.MkdirTemp("", "indexer")
if err != nil {
return nil, fmt.Errorf("error creating tmpDir: %w", err)
}
tmpFilePath := filepath.Join(tmpDir, "indexer.sqlite3")
if err := idx.SaveBackup(ctx, tmpFilePath); err != nil {
var buf bytes.Buffer
if err := idx.ExportBackup(&buf); err != nil {
return nil, fmt.Errorf("error saving indexer backup: %w", err)
}
defer func() {
if err := os.Remove(tmpFilePath); err != nil {
log.Warnw("error removing indexer backup file", "path", tmpFilePath, "err", err)
}
}()
return os.ReadFile(tmpFilePath)
return buf.Bytes(), nil
}

// blockTxQueries assumes that lockPool is locked.
Expand Down
8 changes: 3 additions & 5 deletions vochain/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package indexer

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
stdlog "log"
"math/big"
"path/filepath"
"strings"
"testing"

Expand Down Expand Up @@ -89,8 +87,8 @@ func TestBackup(t *testing.T) {
wantTotalVotes(10)

// Back up the database.
backupPath := filepath.Join(t.TempDir(), "backup")
err = idx.SaveBackup(context.TODO(), backupPath)
var bkp bytes.Buffer
err = idx.ExportBackup(&bkp)
qt.Assert(t, err, qt.IsNil)

// Add another 5 votes which aren't in the backup.
Expand All @@ -111,7 +109,7 @@ func TestBackup(t *testing.T) {
idx.Close()
idx, err = New(app, Options{DataDir: t.TempDir(), ExpectBackupRestore: true})
qt.Assert(t, err, qt.IsNil)
err = idx.RestoreBackup(backupPath)
err = idx.ImportBackup(&bkp)
qt.Assert(t, err, qt.IsNil)
wantTotalVotes(10)

Expand Down

0 comments on commit f345045

Please sign in to comment.