Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DB migration status in the dev dashboard. #1737

Merged
merged 2 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/cmd/encore/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (d *Daemon) serveObjects() {

func (d *Daemon) serveDash() {
log.Info().Stringer("addr", d.Dash.Addr()).Msg("serving dash")
srv := dash.NewServer(d.Apps, d.RunMgr, d.Trace, d.Dash.Port())
srv := dash.NewServer(d.Apps, d.RunMgr, d.NS, d.Trace, d.Dash.Port())
d.exit <- http.Serve(d.Dash, srv)
}

Expand Down
115 changes: 115 additions & 0 deletions cli/daemon/dash/dash.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"encr.dev/cli/daemon/apps"
"encr.dev/cli/daemon/dash/ai"
"encr.dev/cli/daemon/engine/trace2"
"encr.dev/cli/daemon/namespace"
"encr.dev/cli/daemon/run"
"encr.dev/cli/daemon/sqldb"
"encr.dev/cli/internal/browser"
"encr.dev/cli/internal/jsonrpc2"
"encr.dev/cli/internal/onboarding"
Expand All @@ -40,6 +42,7 @@ type handler struct {
rpc jsonrpc2.Conn
apps *apps.Manager
run *run.Manager
ns *namespace.Manager
ai *ai.Manager
tr trace2.Store
}
Expand All @@ -64,6 +67,23 @@ func (h *handler) GetMeta(appID string) (*meta.Data, error) {
return md, nil
}

func (h *handler) GetNamespace(ctx context.Context, appID string) (*namespace.Namespace, error) {
runInstance := h.run.FindRunByAppID(appID)
if runInstance != nil && runInstance.ProcGroup() != nil {
return runInstance.NS, nil
} else {
app, err := h.apps.FindLatestByPlatformOrLocalID(appID)
if err != nil {
return nil, err
}
ns, err := h.ns.GetActive(ctx, app)
if err != nil {
return nil, err
}
return ns, nil
}
}

func (h *handler) Handle(ctx context.Context, reply jsonrpc2.Replier, r jsonrpc2.Request) error {
reply = makeProtoReplier(reply)

Expand Down Expand Up @@ -264,7 +284,39 @@ func (h *handler) Handle(ctx context.Context, reply jsonrpc2.Replier, r jsonrpc2
}

return reply(ctx, status, nil)
case "db-migration-status":
var params struct {
AppID string
}
if err := unmarshal(&params); err != nil {
return reply(ctx, nil, err)
}

// Find the latest app by platform ID or local ID.
app, err := h.apps.FindLatestByPlatformOrLocalID(params.AppID)
if err != nil {
return reply(ctx, nil, err)
}

appMeta, err := h.GetMeta(params.AppID)
if err != nil {
return reply(ctx, nil, err)
}

namespace, err := h.GetNamespace(ctx, params.AppID)
if err != nil {
return reply(ctx, nil, err)
}

clusterType := sqldb.Run
cluster, ok := h.run.ClusterMgr.Get(sqldb.GetClusterID(app, clusterType, namespace))
if !ok {
return reply(ctx, nil, fmt.Errorf("failed to get database cluster of type %s", clusterType))
}

status := buildDbMigrationStatus(ctx, appMeta, cluster)

return reply(ctx, status, nil)
case "api-call":
telemetry.Send("api.call")
var params apiCallParams
Expand Down Expand Up @@ -1015,6 +1067,18 @@ type appStatus struct {
CompileError string `json:"compileError,omitempty"`
}

type dbMigrationHistory struct {
DatabaseName string `json:"databaseName"`
Migrations []dbMigration `json:"migrations"`
}

type dbMigration struct {
Filename string `json:"filename"`
Number uint64 `json:"number"`
Description string `json:"description"`
Applied bool `json:"applied"`
}

func buildAppStatus(app *apps.Instance, runInstance *run.Run) (s appStatus, err error) {
// Now try and grab latest metadata for the app
var md *meta.Data
Expand Down Expand Up @@ -1064,3 +1128,54 @@ func buildAppStatus(app *apps.Instance, runInstance *run.Run) (s appStatus, err

return resp, nil
}

func buildDbMigrationStatus(ctx context.Context, appMeta *meta.Data, cluster *sqldb.Cluster) []dbMigrationHistory {
var statuses []dbMigrationHistory
for _, dbMeta := range appMeta.SqlDatabases {
db, ok := cluster.GetDB(dbMeta.Name)
if !ok {
log.Error().Msgf("failed to get database %s", dbMeta.Name)
continue
}
appliedVersions, err := db.ListAppliedMigrations(ctx)
if err != nil {
log.Error().Msgf("failed to list applied migrations for database %s: %v", dbMeta.Name, err)
continue
}
statuses = append(statuses, buildMigrationHistory(dbMeta, appliedVersions))
}
return statuses
}

func buildMigrationHistory(dbMeta *meta.SQLDatabase, appliedVersions map[uint64]bool) dbMigrationHistory {
history := dbMigrationHistory{
DatabaseName: dbMeta.Name,
Migrations: []dbMigration{},
}
// Go over migrations from latest to earliest
sortedMigrations := make([]*meta.DBMigration, len(dbMeta.Migrations))
copy(sortedMigrations, dbMeta.Migrations)
slices.SortStableFunc(sortedMigrations, func(a, b *meta.DBMigration) int {
return int(b.Number - a.Number)
})
implicitlyApplied := false
for _, migration := range sortedMigrations {
dirty, attempted := appliedVersions[migration.Number]
applied := attempted && !dirty
// If the database doesn't allow non-sequential migrations,
// then any migrations before the last applied will also have
// been applied even if we don't see them in the database.
if !dbMeta.AllowNonSequentialMigrations && applied {
implicitlyApplied = true
}

status := dbMigration{
Filename: migration.Filename,
Number: migration.Number,
Description: migration.Description,
Applied: applied || implicitlyApplied,
}
history.Migrations = append(history.Migrations, status)
}
return history
}
138 changes: 138 additions & 0 deletions cli/daemon/dash/dash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package dash

import (
"reflect"
"testing"

meta "encr.dev/proto/encore/parser/meta/v1"
)

func TestBuildMigrationHistory(t *testing.T) {
tests := []struct {
name string
dbMeta *meta.SQLDatabase
appliedVersions map[uint64]bool
want dbMigrationHistory
}{
{
name: "sequential migrations all applied cleanly",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{
{Number: 1, Filename: "001.sql", Description: "first"},
{Number: 2, Filename: "002.sql", Description: "second"},
{Number: 3, Filename: "003.sql", Description: "third"},
},
AllowNonSequentialMigrations: false,
},
appliedVersions: map[uint64]bool{
1: false, // clean
2: false, // clean
3: false, // clean
},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{
{Number: 3, Filename: "003.sql", Description: "third", Applied: true},
{Number: 2, Filename: "002.sql", Description: "second", Applied: true},
{Number: 1, Filename: "001.sql", Description: "first", Applied: true},
},
},
},
{
name: "sequential migrations with dirty migration",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{
{Number: 1, Filename: "001.sql", Description: "first"},
{Number: 2, Filename: "002.sql", Description: "second"},
{Number: 3, Filename: "003.sql", Description: "third"},
},
AllowNonSequentialMigrations: false,
},
appliedVersions: map[uint64]bool{
1: false, // clean
2: true, // dirty
},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{
{Number: 3, Filename: "003.sql", Description: "third", Applied: false},
{Number: 2, Filename: "002.sql", Description: "second", Applied: false},
{Number: 1, Filename: "001.sql", Description: "first", Applied: true},
},
},
},
{
name: "sequential migrations partially applied",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{
{Number: 1, Filename: "001.sql", Description: "first"},
{Number: 2, Filename: "002.sql", Description: "second"},
{Number: 3, Filename: "003.sql", Description: "third"},
},
AllowNonSequentialMigrations: false,
},
appliedVersions: map[uint64]bool{
1: false, // clean
2: false, // clean
},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{
{Number: 3, Filename: "003.sql", Description: "third", Applied: false},
{Number: 2, Filename: "002.sql", Description: "second", Applied: true},
{Number: 1, Filename: "001.sql", Description: "first", Applied: true},
},
},
},
{
name: "non-sequential migrations with mix of clean and dirty",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{
{Number: 1, Filename: "001.sql", Description: "first"},
{Number: 2, Filename: "002.sql", Description: "second"},
{Number: 3, Filename: "003.sql", Description: "third"},
},
AllowNonSequentialMigrations: true,
},
appliedVersions: map[uint64]bool{
1: false, // clean
2: true, // dirty
3: false, // clean
},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{
{Number: 3, Filename: "003.sql", Description: "third", Applied: true},
{Number: 2, Filename: "002.sql", Description: "second", Applied: false},
{Number: 1, Filename: "001.sql", Description: "first", Applied: true},
},
},
},
{
name: "empty migrations list",
dbMeta: &meta.SQLDatabase{
Name: "test-db",
Migrations: []*meta.DBMigration{},
AllowNonSequentialMigrations: false,
},
appliedVersions: map[uint64]bool{},
want: dbMigrationHistory{
DatabaseName: "test-db",
Migrations: []dbMigration{},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := buildMigrationHistory(tt.dbMeta, tt.appliedVersions)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("buildMigrationHistory() = %v, want %v", got, tt.want)
}
})
}
}
7 changes: 5 additions & 2 deletions cli/daemon/dash/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"encr.dev/cli/daemon/dash/apiproxy"
"encr.dev/cli/daemon/dash/dashproxy"
"encr.dev/cli/daemon/engine/trace2"
"encr.dev/cli/daemon/namespace"
"encr.dev/cli/daemon/run"
"encr.dev/cli/internal/jsonrpc2"
"encr.dev/internal/conf"
Expand All @@ -27,7 +28,7 @@ var upgrader = websocket.Upgrader{
}

// NewServer starts a new server and returns it.
func NewServer(appsMgr *apps.Manager, runMgr *run.Manager, tr trace2.Store, dashPort int) *Server {
func NewServer(appsMgr *apps.Manager, runMgr *run.Manager, nsMgr *namespace.Manager, tr trace2.Store, dashPort int) *Server {
proxy, err := dashproxy.New(conf.DevDashURL)
if err != nil {
log.Fatal().Err(err).Msg("could not create dash proxy")
Expand All @@ -45,6 +46,7 @@ func NewServer(appsMgr *apps.Manager, runMgr *run.Manager, tr trace2.Store, dash
apiProxy: apiProxy,
apps: appsMgr,
run: runMgr,
ns: nsMgr,
tr: tr,
dashPort: dashPort,
traceCh: make(chan trace2.NewSpanEvent, 10),
Expand All @@ -64,6 +66,7 @@ type Server struct {
apiProxy *httputil.ReverseProxy
apps *apps.Manager
run *run.Manager
ns *namespace.Manager
tr trace2.Store
dashPort int
traceCh chan trace2.NewSpanEvent
Expand Down Expand Up @@ -96,7 +99,7 @@ func (s *Server) WebSocket(w http.ResponseWriter, req *http.Request) {

stream := &wsStream{c: c}
conn := jsonrpc2.NewConn(stream)
handler := &handler{rpc: conn, apps: s.apps, run: s.run, tr: s.tr, ai: s.ai}
handler := &handler{rpc: conn, apps: s.apps, run: s.run, ns: s.ns, tr: s.tr, ai: s.ai}
conn.Go(req.Context(), handler.Handle)

ch := make(chan *notification, 20)
Expand Down
31 changes: 31 additions & 0 deletions cli/daemon/sqldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@ func (db *DB) doMigrate(ctx context.Context, cloudName, appRoot string, dbMeta *
return nil
}

func (db *DB) ListAppliedMigrations(ctx context.Context) (map[uint64]bool, error) {
conn, err := db.connectToDB(ctx)
if err != nil {
return nil, err
}
defer fns.CloseIgnore(conn)
return LoadAppliedVersions(ctx, conn, "public", "schema_migrations")
}

func RunMigration(ctx context.Context, dbName string, allowNonSeq bool, conn *sql.Conn, mdSrc *MetadataSource) (err error) {
var (
dbDriver database.Driver
Expand Down Expand Up @@ -477,3 +486,25 @@ func (db *DB) connectSuperuser(ctx context.Context) (*pgx.Conn, error) {
db.log.Debug().Err(err).Msgf("failed to connect to admin db")
return nil, fmt.Errorf("failed to connect to superuser database: %v", err)
}

// Connects as a superuser or admin to the database. Fails fast if the cluster
// is not running yet.
// On success the returned conn must be closed by the caller.
func (db *DB) connectToDB(ctx context.Context) (*sql.Conn, error) {
info, err := db.Cluster.Info(ctx)
if err != nil {
return nil, err
}
uri := info.ConnURI(db.EncoreName, info.Config.Superuser)
pool, err := sql.Open("pgx", uri)
erikcarlsson marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
defer fns.CloseIgnore(pool)

conn, err := pool.Conn(ctx)
if err != nil {
return nil, err
}
return conn, nil
}
Loading
Loading