Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ feat(indexer): Execute sql migrations file on startup #7092

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion indexer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ RUN make indexer
FROM alpine:3.18

COPY --from=builder /app/indexer/indexer /usr/local/bin
COPY --from=builder /app/indexer/indexer.toml /configs/indexer.toml

CMD ["indexer", "all", "--config", "/app/indexer/indexer.toml"]
CMD ["indexer", "all", "--config", "/configs/indexer.toml"]
89 changes: 68 additions & 21 deletions indexer/database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
package database

import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/ethereum-optimism/optimism/indexer/config"
_ "github.com/ethereum-optimism/optimism/indexer/database/serializers"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/pkg/errors"

"gorm.io/driver/postgres"
"gorm.io/gorm"
Expand All @@ -31,33 +36,45 @@ type DB struct {
}

func NewDB(dbConfig config.DBConfig) (*DB, error) {
dsn := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Port, dbConfig.Name)
if dbConfig.User != "" {
dsn += fmt.Sprintf(" user=%s", dbConfig.User)
}
if dbConfig.Password != "" {
dsn += fmt.Sprintf(" password=%s", dbConfig.Password)
}
gorm, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
// The indexer will explicitly manage the transactions
SkipDefaultTransaction: true,
var db *DB

retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}

_, err := retry.Do[interface{}](context.Background(), 10, retryStrategy, func() (interface{}, error) {
dsn := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Port, dbConfig.Name)
if dbConfig.User != "" {
dsn += fmt.Sprintf(" user=%s", dbConfig.User)
}
if dbConfig.Password != "" {
dsn += fmt.Sprintf(" password=%s", dbConfig.Password)
}
gorm, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
// The indexer will explicitly manage the transactions
SkipDefaultTransaction: true,
Logger: logger.Default.LogMode(logger.Silent),
})

// We may choose to create an adapter such that the
// logger emits to the geth logger when on DEBUG mode
Logger: logger.Default.LogMode(logger.Silent),
if err != nil {
return nil, errors.Wrap(err, "failed to connect to database")
}

db = &DB{
gorm: gorm,
Blocks: newBlocksDB(gorm),
ContractEvents: newContractEventsDB(gorm),
BridgeTransfers: newBridgeTransfersDB(gorm),
BridgeMessages: newBridgeMessagesDB(gorm),
BridgeTransactions: newBridgeTransactionsDB(gorm),
}
return nil, nil
})

if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to connect to database after multiple retries")
}

db := &DB{
gorm: gorm,
Blocks: newBlocksDB(gorm),
ContractEvents: newContractEventsDB(gorm),
BridgeTransfers: newBridgeTransfersDB(gorm),
BridgeMessages: newBridgeMessagesDB(gorm),
BridgeTransactions: newBridgeTransactionsDB(gorm),
if err := db.executeSQLMigration(); err != nil {
return nil, errors.Wrap(err, "failed to execute SQL migration")
}

return db, nil
Expand Down Expand Up @@ -90,3 +107,33 @@ func dbFromGormTx(tx *gorm.DB) *DB {
BridgeTransactions: newBridgeTransactionsDB(tx),
}
}

func (db *DB) executeSQLMigration() error {
roninjin10 marked this conversation as resolved.
Show resolved Hide resolved
err := filepath.Walk("migrations", func(path string, info os.FileInfo, err error) error {
// Check for any walking error
if err != nil {
return errors.Wrap(err, fmt.Sprintf("Failed to process migration file: %s", path))
}

// Skip directories
if info.IsDir() {
return nil
}

// Read the migration file content
fileContent, readErr := os.ReadFile(path)
if readErr != nil {
return errors.Wrap(readErr, fmt.Sprintf("Error reading SQL file: %s", path))
}

// Execute the migration
execErr := db.gorm.Exec(string(fileContent)).Error
if execErr != nil {
return errors.Wrap(execErr, fmt.Sprintf("Error executing SQL script: %s", path))
}

return nil
})

return err
}
4 changes: 2 additions & 2 deletions indexer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ services:
environment:
- INDEXER_RPC_URL_L1=$INDEXER_RPC_URL_L1
- INDEXER_RPC_URL_L2=$INDEXER_RPC_URL_L2
- INDEXER_CONFIG=/indexer/indexer.toml
- INDEXER_CONFIG=/configs/indexer.toml
volumes:
- ./indexer.toml:/indexer/indexer.toml
depends_on:
Expand All @@ -44,7 +44,7 @@ services:
# reindex with INDEXER_BEDROCK=true or seed the database
- INDEXER_RPC_URL_L1=$INDEXER_RPC_URL_L1
- INDEXER_RPC_URL_L2=$INDEXER_RPC_URL_L2
- INDEXER_CONFIG=/indexer/indexer.toml
- INDEXER_CONFIG=/configs/indexer.toml
volumes:
- ./indexer.toml:/indexer/indexer.toml
ports:
Expand Down
40 changes: 13 additions & 27 deletions indexer/e2e_tests/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"io/fs"
"os"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -132,31 +130,19 @@ func setupTestDatabase(t *testing.T) string {
pg.Close()
})

// setup schema, migration files ware walked in lexical order
t.Logf("created database %s", dbName)
db, err := sql.Open("pgx", fmt.Sprintf("postgres://%s@localhost:5432/%s?sslmode=disable", user, dbName))
dbConfig := config.DBConfig{
Host: "127.0.0.1",
Port: 5432,
Name: dbName,
User: user,
Password: "",
}
// NewDB will create the database schema
db, err := database.NewDB(dbConfig)
require.NoError(t, err)
err = db.Close()
require.NoError(t, err)
require.NoError(t, db.Ping())
defer db.Close()

t.Logf("running schema migrations...")
require.NoError(t, filepath.Walk("../migrations", func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
} else if info.IsDir() {
return nil
}

t.Logf("running schema migration: %s", path)
data, err := os.ReadFile(path)
if err != nil {
return err
}

_, err = db.Exec(string(data))
return err
}))

t.Logf("schema loaded")

t.Logf("database %s setup and migrations executed", dbName)
return dbName
}