Skip to content

Commit

Permalink
#13 Added incremental migration support to postgres indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronc committed Mar 25, 2019
1 parent 33b84ae commit a87f722
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 9 deletions.
2 changes: 1 addition & 1 deletion app.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewXrnApp(logger log.Logger, db dbm.DB, postgresUrl string) *xrnApp {
if len(postgresUrl) != 0 {
pgIndexer, err := postgresql.NewIndexer(postgresUrl, txDecoder)
if err == nil {
pgIndexer.AddMigration(geo.PostgresSchema)
pgIndexer.AddMigrations("geo", geo.PostgresMigrations)
app.pgIndexer = pgIndexer
logger.Info("Started PostgreSQL Indexer")
} else {
Expand Down
52 changes: 45 additions & 7 deletions index/postgresql/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type indexer struct {
conn *sql.DB
txDecoder sdk.TxDecoder
migrations []string
migrations map[string][]string
curTx *sql.Tx
blockHeader abci.Header
}
Expand All @@ -26,11 +26,11 @@ func NewIndexer(connString string, txDecoder sdk.TxDecoder) (Indexer, error) {
if err != nil {
return nil, err
}
return &indexer{conn: conn, txDecoder: txDecoder, migrations: []string{}}, nil
return &indexer{conn: conn, txDecoder: txDecoder, migrations: make(map[string][]string)}, nil
}

func (indexer *indexer) AddMigration(ddl string) {
indexer.migrations = append(indexer.migrations, ddl)
func (indexer *indexer) AddMigrations(module string, ddlStatements []string) {
indexer.migrations[module] = ddlStatements
}

func (indexer *indexer) Exec(query string, args ...interface{}) {
Expand All @@ -45,12 +45,50 @@ func (indexer *indexer) Exec(query string, args ...interface{}) {
}

func (indexer *indexer) OnInitChain(abci.RequestInitChain, abci.ResponseInitChain) {
_, err := indexer.conn.Exec(InitialSchema)
indexer.ApplyMigrations()
}

var baseMigrations = []string{InitialSchema}

func (indexer *indexer) ApplyMigrations() {
_, err := indexer.conn.Exec(`CREATE TABLE IF NOT EXISTS __migration_state (
module text PRIMARY KEY,
version int,
other jsonb
);`)
if err != nil {
panic(err)
}
for _, ddl := range indexer.migrations {
_, err := indexer.conn.Exec(ddl)
indexer.applyModuleMigrations("", baseMigrations)
for key, value := range indexer.migrations {
indexer.applyModuleMigrations(key, value)
}
}

func (indexer *indexer) applyModuleMigrations(module string, ddl []string) {
row := indexer.conn.QueryRow("SELECT version FROM __migration_state WHERE module = $1", module)
version := 0
haveRow := false
if row != nil {
err := row.Scan(&version)
if err == nil {
haveRow = true
}
}
n := len(ddl)
for i := version; i < n; i++ {
_, err := indexer.conn.Exec(ddl[i])
if err != nil {
panic(err)
}
}
if !haveRow {
_, err := indexer.conn.Exec("INSERT INTO __migration_state (module, version) VALUES ($1, $2)", module, n)
if err != nil {
panic(err)
}
} else {
_, err := indexer.conn.Exec("UPDATE __migration_state SET version = $1 WHERE module = $2", n, module)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion index/postgresql/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Indexer interface {
// AddMigration adds a migration to the list of migrations to be run during
// InitChain or an upgrade. Migrations will be run in the order
// they are added
AddMigration(ddl string)
AddMigrations(module string, ddlStatements []string)

// Exec executes a PostgreSQL statement against the database.
// Can be used in keepers for custom indexing
Expand Down
2 changes: 2 additions & 0 deletions index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
type Indexer interface {
// OnInitChain intercepts the InitChain ABCI method
OnInitChain(abci.RequestInitChain, abci.ResponseInitChain)
// ApplyMigrations applies any pending migrations
ApplyMigrations()
// OnBeginBlock intercepts the BeginBlock ABCI method
OnBeginBlock(abci.RequestBeginBlock, abci.ResponseBeginBlock)
// BeforeDeliverTx should be called on the ABCI DeliverTx method
Expand Down
4 changes: 4 additions & 0 deletions x/geo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Geometry struct {

type GeoAddress []byte

var PostgresMigrations = []string{
PostgresSchema,
}

const PostgresSchema = `
CREATE TABLE geo (
url text NOT NULL PRIMARY KEY,
Expand Down

0 comments on commit a87f722

Please sign in to comment.