Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Refactor operations to not import state. #14124

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions lxd-agent/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func execPost(d *Daemon, r *http.Request) response.Response {

resources := map[string][]api.URL{}

op, err := operations.OperationCreate(nil, "", operations.OperationClassWebsocket, operationtype.CommandExec, resources, ws.Metadata(), ws.Do, nil, ws.Connect, r)
operationOpts := operations.Options().WithResources(resources).WithMetadata(ws.Metadata()).WithOnConnect(ws.Connect).WithRequest(r)
op, err := operations.OperationCreate(context.Background(), operations.OperationClassWebsocket, operationtype.CommandExec, "lxd-agent", d.events, ws.Do, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand All @@ -163,6 +164,7 @@ type execWs struct {
cwd string
}

// Metadata returns information required for the client to interact with the exec websocket.
func (s *execWs) Metadata() any {
fds := shared.Jmap{}
for fd, secret := range s.fds {
Expand All @@ -181,6 +183,7 @@ func (s *execWs) Metadata() any {
}
}

// Connect is the websocket connect hook for the exec operation.
func (s *execWs) Connect(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
Expand Down Expand Up @@ -211,9 +214,9 @@ func (s *execWs) Connect(op *operations.Operation, r *http.Request, w http.Respo
return nil
} else if !found {
return fmt.Errorf("Unknown websocket number")
} else {
return fmt.Errorf("Websocket number already connected")
}

return fmt.Errorf("Websocket number already connected")
}
}

Expand All @@ -222,6 +225,7 @@ func (s *execWs) Connect(op *operations.Operation, r *http.Request, w http.Respo
return os.ErrPermission
}

// Do runs waits for the websocket to connect, then proxies the VM ttys through the websocket.
func (s *execWs) Do(op *operations.Operation) error {
// Once this function ends ensure that any connected websockets are closed.
defer func() {
Expand All @@ -239,7 +243,6 @@ func (s *execWs) Do(op *operations.Operation) error {
logger.Debug("Waiting for exec websockets to connect")
select {
case <-s.requiredConnectedCtx.Done():
break
case <-time.After(time.Second * 5):
return fmt.Errorf("Timed out waiting for websockets to connect")
}
Expand Down
2 changes: 1 addition & 1 deletion lxd/acme.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func autoRenewCertificate(ctx context.Context, d *Daemon, force bool) error {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.RenewServerCertificate, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.RenewServerCertificate, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating renew server certificate operation", logger.Ctx{"err": err})
return err
Expand Down
17 changes: 11 additions & 6 deletions lxd/api_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ func clusterPutBootstrap(d *Daemon, r *http.Request, req api.ClusterPut) respons
d.localConfig = config
d.globalConfigMu.Unlock()

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterBootstrap, resources, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithResources(resources).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterBootstrap, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -869,7 +870,8 @@ func clusterPutJoin(d *Daemon, r *http.Request, req api.ClusterPut) response.Res
resources := map[string][]api.URL{}
resources["cluster"] = []api.URL{}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterJoin, resources, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithResources(resources).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterJoin, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -1433,7 +1435,8 @@ func clusterNodesPost(d *Daemon, r *http.Request) response.Response {
resources := map[string][]api.URL{}
resources["cluster"] = []api.URL{}

op, err := operations.OperationCreate(s, api.ProjectDefaultName, operations.OperationClassToken, operationtype.ClusterJoinToken, resources, meta, nil, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(api.ProjectDefaultName).WithResources(resources).WithMetadata(meta).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassToken, operationtype.ClusterJoinToken, s.ServerName, s.Events, nil, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -3276,7 +3279,8 @@ func evacuateClusterMember(s *state.State, gateway *cluster.Gateway, r *http.Req
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterMemberEvacuate, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterMemberEvacuate, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -3601,7 +3605,8 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterMemberRestore, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterMemberRestore, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -4492,7 +4497,7 @@ func autoHealClusterTask(d *Daemon) (task.Func, task.Schedule) {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterHeal, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterHeal, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating cluster instances heal operation", logger.Ctx{"err": err})
return
Expand Down
3 changes: 2 additions & 1 deletion lxd/api_project.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,8 @@ func projectPost(d *Daemon, r *http.Request) response.Response {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ProjectRename, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ProjectRename, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lxd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func pruneExpiredBackupsTask(d *Daemon) (task.Func, task.Schedule) {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.BackupsExpire, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.BackupsExpire, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating expired backups operation", logger.Ctx{"err": err})
return
Expand Down
3 changes: 2 additions & 1 deletion lxd/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ func certificatesPost(d *Daemon, r *http.Request) response.Response {
meta["expiresAt"] = expiresAt
}

op, err := operations.OperationCreate(s, api.ProjectDefaultName, operations.OperationClassToken, operationtype.CertificateAddToken, nil, meta, nil, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithRequest(r).WithMetadata(meta).WithProjectName(api.ProjectDefaultName)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassToken, operationtype.CertificateAddToken, s.ServerName, s.Events, nil, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down
24 changes: 23 additions & 1 deletion lxd/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ func (c *Cluster) Transaction(ctx context.Context, f func(context.Context, *Clus
return c.transaction(ctx, f)
}

// TransactionSQL is identical to Transaction but the hook has a sql.Tx instead (for import cycle reasons).
func (c *Cluster) TransactionSQL(ctx context.Context, f func(context.Context, *sql.Tx) error) error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.transactionSQL(ctx, f)
}

// EnterExclusive acquires a lock on the cluster db, so any successive call to
// Transaction will block until ExitExclusive has been called.
func (c *Cluster) EnterExclusive() error {
Expand Down Expand Up @@ -384,6 +391,21 @@ func (c *Cluster) transaction(ctx context.Context, f func(context.Context, *Clus
})
}

func (c *Cluster) transactionSQL(ctx context.Context, f func(context.Context, *sql.Tx) error) error {
return query.Retry(ctx, func(ctx context.Context) error {
err := query.Transaction(ctx, c.db, f)
if errors.Is(err, context.DeadlineExceeded) {
// If the query timed out it likely means that the leader has abruptly become unreachable.
// Now that this query has been cancelled, a leader election should have taken place by now.
// So let's retry the transaction once more in case the global database is now available again.
logger.Warn("Transaction timed out. Retrying once", logger.Ctx{"member": c.nodeID, "err": err})
return query.Transaction(ctx, c.db, f)
}

return err
})
}

// NodeID sets the node NodeID associated with this cluster instance. It's used for
// backward-compatibility of all db-related APIs that were written before
// clustering and don't accept a node NodeID, so in those cases we automatically
Expand Down Expand Up @@ -432,7 +454,7 @@ func begin(db *sql.DB) (*sql.Tx, error) {
}

logger.Debugf("DbBegin: DB still locked")
logger.Debugf(logger.GetStack())
logger.Debug(logger.GetStack())
return nil, fmt.Errorf("DB is locked")
}

Expand Down
27 changes: 18 additions & 9 deletions lxd/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,8 @@ func imagesPost(d *Daemon, r *http.Request) response.Response {
}
}

imageOp, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.ImageDownload, nil, metadata, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithMetadata(metadata).WithRequest(r)
imageOp, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImageDownload, s.ServerName, s.Events, run, operationOpts)
if err != nil {
cleanup(builddir, post)
return response.InternalError(err)
Expand Down Expand Up @@ -1763,7 +1764,8 @@ func autoUpdateImagesTask(d *Daemon) (task.Func, task.Schedule) {
return autoUpdateImages(ctx, s)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ImagesUpdate, nil, nil, opRun, nil, nil, nil)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImagesUpdate, s.ServerName, s.Events, opRun, operationOpts)
if err != nil {
logger.Error("Failed creating image update operation", logger.Ctx{"err": err})
return
Expand Down Expand Up @@ -2412,7 +2414,8 @@ func pruneExpiredImagesTask(d *Daemon) (task.Func, task.Schedule) {
return pruneExpiredImages(ctx, s, op)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ImagesExpire, nil, nil, opRun, nil, nil, nil)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImagesExpire, s.ServerName, s.Events, opRun, operationOpts)
if err != nil {
logger.Error("Failed creating expired image prune operation", logger.Ctx{"err": err})
return
Expand Down Expand Up @@ -2530,7 +2533,8 @@ func pruneLeftoverImages(s *state.State) {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ImagesPruneLeftover, nil, nil, opRun, nil, nil, nil)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImagesPruneLeftover, s.ServerName, s.Events, opRun, operationOpts)
if err != nil {
logger.Error("Failed creating leftover image clean up operation", logger.Ctx{"err": err})
return
Expand Down Expand Up @@ -2889,7 +2893,8 @@ func imageDelete(d *Daemon, r *http.Request) response.Response {
resources := map[string][]api.URL{}
resources["images"] = []api.URL{*api.NewURL().Path(version.APIVersion, "images", details.image.Fingerprint)}

op, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.ImageDelete, resources, nil, do, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithResources(resources).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImageDelete, s.ServerName, s.Events, do, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -4400,7 +4405,8 @@ func imageExportPost(d *Daemon, r *http.Request) response.Response {
return nil
}

op, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.ImageDownload, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImageDownload, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -4587,7 +4593,8 @@ func imageRefresh(d *Daemon, r *http.Request) response.Response {
return err
}

op, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.ImageRefresh, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImageRefresh, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -4622,7 +4629,8 @@ func autoSyncImagesTask(d *Daemon) (task.Func, task.Schedule) {
return autoSyncImages(ctx, s)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ImagesSynchronize, nil, nil, opRun, nil, nil, nil)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImagesSynchronize, s.ServerName, s.Events, opRun, operationOpts)
if err != nil {
logger.Error("Failed creating image synchronization operation", logger.Ctx{"err": err})
return
Expand Down Expand Up @@ -4825,7 +4833,8 @@ func createTokenResponse(s *state.State, r *http.Request, projectName string, fi
resources := map[string][]api.URL{}
resources["images"] = []api.URL{*api.NewURL().Path(version.APIVersion, "images", fingerprint)}

op, err := operations.OperationCreate(s, projectName, operations.OperationClassToken, operationtype.ImageToken, resources, meta, nil, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithResources(resources).WithMetadata(meta).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassToken, operationtype.ImageToken, s.ServerName, s.Events, nil, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down
4 changes: 2 additions & 2 deletions lxd/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func pruneExpiredAndAutoCreateInstanceSnapshotsTask(d *Daemon) (task.Func, task.
return pruneExpiredInstanceSnapshots(ctx, expiredSnapshotInstances)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.SnapshotsExpire, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.SnapshotsExpire, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating instance snapshots expiry operation", logger.Ctx{"err": err})
} else {
Expand All @@ -690,7 +690,7 @@ func pruneExpiredAndAutoCreateInstanceSnapshotsTask(d *Daemon) (task.Func, task.
return autoCreateInstanceSnapshots(ctx, s, instances)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.SnapshotCreate, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.SnapshotCreate, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating scheduled instance snapshot operation", logger.Ctx{"err": err})
} else {
Expand Down
43 changes: 21 additions & 22 deletions lxd/instance/drivers/driver_lxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5467,12 +5467,11 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
}

actionScriptOp, err := operations.OperationCreate(
d.state,
d.Project().Name,
d.state.ShutdownCtx,
operations.OperationClassWebsocket,
operationtype.InstanceLiveMigrate,
nil,
nil,
d.state.ServerName,
d.state.Events,
func(op *operations.Operation) error {
result := <-restoreSuccess
if !result {
Expand All @@ -5481,28 +5480,28 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {

return nil
},
nil,
func(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
return fmt.Errorf("Missing action script secret")
}
operations.ClusterOptions(d.state.DB.Cluster.TransactionSQL).
WithProjectName(d.Project().Name).
WithOnConnect(func(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
return fmt.Errorf("Missing action script secret")
}

if secret != actionScriptOpSecret {
return os.ErrPermission
}
if secret != actionScriptOpSecret {
return os.ErrPermission
}

c, err := ws.Upgrader.Upgrade(w, r, nil)
if err != nil {
return err
}
c, err := ws.Upgrader.Upgrade(w, r, nil)
if err != nil {
return err
}

dumpDone <- true
dumpDone <- true

closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
return c.WriteMessage(websocket.CloseMessage, closeMsg)
},
nil,
closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
return c.WriteMessage(websocket.CloseMessage, closeMsg)
}),
)
if err != nil {
_ = os.RemoveAll(checkpointDir)
Expand Down
Loading
Loading