Skip to content

Commit

Permalink
WIP: Remove state from operations.
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Laing <mark.laing@canonical.com>
  • Loading branch information
markylaing committed Sep 19, 2024
1 parent a6b80c9 commit 9653f79
Show file tree
Hide file tree
Showing 33 changed files with 315 additions and 164 deletions.
11 changes: 7 additions & 4 deletions lxd-agent/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func execPost(d *Daemon, r *http.Request) response.Response {

resources := map[string][]api.URL{}

op, err := operations.OperationCreate(nil, "", operations.OperationClassWebsocket, operationtype.CommandExec, resources, ws.Metadata(), ws.Do, nil, ws.Connect, r)
operationOpts := operations.Options().WithResources(resources).WithMetadata(ws.Metadata()).WithOnConnect(ws.Connect).WithRequest(r)
op, err := operations.OperationCreate(context.Background(), operations.OperationClassWebsocket, operationtype.CommandExec, "lxd-agent", d.events, ws.Do, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand All @@ -163,6 +164,7 @@ type execWs struct {
cwd string
}

// Metadata returns information required for the client to interact with the exec websocket.
func (s *execWs) Metadata() any {
fds := shared.Jmap{}
for fd, secret := range s.fds {
Expand All @@ -181,6 +183,7 @@ func (s *execWs) Metadata() any {
}
}

// Connect is the websocket connect hook for the exec operation.
func (s *execWs) Connect(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
Expand Down Expand Up @@ -211,9 +214,9 @@ func (s *execWs) Connect(op *operations.Operation, r *http.Request, w http.Respo
return nil
} else if !found {
return fmt.Errorf("Unknown websocket number")
} else {
return fmt.Errorf("Websocket number already connected")
}

return fmt.Errorf("Websocket number already connected")
}
}

Expand All @@ -222,6 +225,7 @@ func (s *execWs) Connect(op *operations.Operation, r *http.Request, w http.Respo
return os.ErrPermission
}

// Do runs waits for the websocket to connect, then proxies the VM ttys through the websocket.
func (s *execWs) Do(op *operations.Operation) error {
// Once this function ends ensure that any connected websockets are closed.
defer func() {
Expand All @@ -239,7 +243,6 @@ func (s *execWs) Do(op *operations.Operation) error {
logger.Debug("Waiting for exec websockets to connect")
select {
case <-s.requiredConnectedCtx.Done():
break
case <-time.After(time.Second * 5):
return fmt.Errorf("Timed out waiting for websockets to connect")
}
Expand Down
2 changes: 1 addition & 1 deletion lxd/acme.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func autoRenewCertificate(ctx context.Context, d *Daemon, force bool) error {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.RenewServerCertificate, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.RenewServerCertificate, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating renew server certificate operation", logger.Ctx{"err": err})
return err
Expand Down
17 changes: 11 additions & 6 deletions lxd/api_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ func clusterPutBootstrap(d *Daemon, r *http.Request, req api.ClusterPut) respons
d.localConfig = config
d.globalConfigMu.Unlock()

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterBootstrap, resources, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithResources(resources).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterBootstrap, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -869,7 +870,8 @@ func clusterPutJoin(d *Daemon, r *http.Request, req api.ClusterPut) response.Res
resources := map[string][]api.URL{}
resources["cluster"] = []api.URL{}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterJoin, resources, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithResources(resources).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterJoin, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -1433,7 +1435,8 @@ func clusterNodesPost(d *Daemon, r *http.Request) response.Response {
resources := map[string][]api.URL{}
resources["cluster"] = []api.URL{}

op, err := operations.OperationCreate(s, api.ProjectDefaultName, operations.OperationClassToken, operationtype.ClusterJoinToken, resources, meta, nil, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(api.ProjectDefaultName).WithResources(resources).WithMetadata(meta).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassToken, operationtype.ClusterJoinToken, s.ServerName, s.Events, nil, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -3276,7 +3279,8 @@ func evacuateClusterMember(s *state.State, gateway *cluster.Gateway, r *http.Req
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterMemberEvacuate, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterMemberEvacuate, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -3601,7 +3605,8 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterMemberRestore, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterMemberRestore, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -4492,7 +4497,7 @@ func autoHealClusterTask(d *Daemon) (task.Func, task.Schedule) {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ClusterHeal, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ClusterHeal, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating cluster instances heal operation", logger.Ctx{"err": err})
return
Expand Down
3 changes: 2 additions & 1 deletion lxd/api_project.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,8 @@ func projectPost(d *Daemon, r *http.Request) response.Response {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ProjectRename, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ProjectRename, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lxd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func pruneExpiredBackupsTask(d *Daemon) (task.Func, task.Schedule) {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.BackupsExpire, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.BackupsExpire, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating expired backups operation", logger.Ctx{"err": err})
return
Expand Down
3 changes: 2 additions & 1 deletion lxd/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ func certificatesPost(d *Daemon, r *http.Request) response.Response {
meta["expiresAt"] = expiresAt
}

op, err := operations.OperationCreate(s, api.ProjectDefaultName, operations.OperationClassToken, operationtype.CertificateAddToken, nil, meta, nil, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithRequest(r).WithMetadata(meta).WithProjectName(api.ProjectDefaultName)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassToken, operationtype.CertificateAddToken, s.ServerName, s.Events, nil, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down
24 changes: 23 additions & 1 deletion lxd/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ func (c *Cluster) Transaction(ctx context.Context, f func(context.Context, *Clus
return c.transaction(ctx, f)
}

// TransactionSQL is identical to Transaction but the hook has a sql.Tx instead (for import cycle reasons).
func (c *Cluster) TransactionSQL(ctx context.Context, f func(context.Context, *sql.Tx) error) error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.transactionSQL(ctx, f)
}

// EnterExclusive acquires a lock on the cluster db, so any successive call to
// Transaction will block until ExitExclusive has been called.
func (c *Cluster) EnterExclusive() error {
Expand Down Expand Up @@ -384,6 +391,21 @@ func (c *Cluster) transaction(ctx context.Context, f func(context.Context, *Clus
})
}

func (c *Cluster) transactionSQL(ctx context.Context, f func(context.Context, *sql.Tx) error) error {
return query.Retry(ctx, func(ctx context.Context) error {
err := query.Transaction(ctx, c.db, f)
if errors.Is(err, context.DeadlineExceeded) {
// If the query timed out it likely means that the leader has abruptly become unreachable.
// Now that this query has been cancelled, a leader election should have taken place by now.
// So let's retry the transaction once more in case the global database is now available again.
logger.Warn("Transaction timed out. Retrying once", logger.Ctx{"member": c.nodeID, "err": err})
return query.Transaction(ctx, c.db, f)
}

return err
})
}

// NodeID sets the node NodeID associated with this cluster instance. It's used for
// backward-compatibility of all db-related APIs that were written before
// clustering and don't accept a node NodeID, so in those cases we automatically
Expand Down Expand Up @@ -432,7 +454,7 @@ func begin(db *sql.DB) (*sql.Tx, error) {
}

logger.Debugf("DbBegin: DB still locked")
logger.Debugf(logger.GetStack())
logger.Debug(logger.GetStack())
return nil, fmt.Errorf("DB is locked")
}

Expand Down
27 changes: 18 additions & 9 deletions lxd/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,8 @@ func imagesPost(d *Daemon, r *http.Request) response.Response {
}
}

imageOp, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.ImageDownload, nil, metadata, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithMetadata(metadata).WithRequest(r)
imageOp, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImageDownload, s.ServerName, s.Events, run, operationOpts)
if err != nil {
cleanup(builddir, post)
return response.InternalError(err)
Expand Down Expand Up @@ -1763,7 +1764,8 @@ func autoUpdateImagesTask(d *Daemon) (task.Func, task.Schedule) {
return autoUpdateImages(ctx, s)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ImagesUpdate, nil, nil, opRun, nil, nil, nil)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImagesUpdate, s.ServerName, s.Events, opRun, operationOpts)
if err != nil {
logger.Error("Failed creating image update operation", logger.Ctx{"err": err})
return
Expand Down Expand Up @@ -2412,7 +2414,8 @@ func pruneExpiredImagesTask(d *Daemon) (task.Func, task.Schedule) {
return pruneExpiredImages(ctx, s, op)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ImagesExpire, nil, nil, opRun, nil, nil, nil)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImagesExpire, s.ServerName, s.Events, opRun, operationOpts)
if err != nil {
logger.Error("Failed creating expired image prune operation", logger.Ctx{"err": err})
return
Expand Down Expand Up @@ -2530,7 +2533,8 @@ func pruneLeftoverImages(s *state.State) {
return nil
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ImagesPruneLeftover, nil, nil, opRun, nil, nil, nil)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImagesPruneLeftover, s.ServerName, s.Events, opRun, operationOpts)
if err != nil {
logger.Error("Failed creating leftover image clean up operation", logger.Ctx{"err": err})
return
Expand Down Expand Up @@ -2889,7 +2893,8 @@ func imageDelete(d *Daemon, r *http.Request) response.Response {
resources := map[string][]api.URL{}
resources["images"] = []api.URL{*api.NewURL().Path(version.APIVersion, "images", details.image.Fingerprint)}

op, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.ImageDelete, resources, nil, do, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithResources(resources).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImageDelete, s.ServerName, s.Events, do, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -4400,7 +4405,8 @@ func imageExportPost(d *Daemon, r *http.Request) response.Response {
return nil
}

op, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.ImageDownload, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImageDownload, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -4587,7 +4593,8 @@ func imageRefresh(d *Daemon, r *http.Request) response.Response {
return err
}

op, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.ImageRefresh, nil, nil, run, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImageRefresh, s.ServerName, s.Events, run, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -4622,7 +4629,8 @@ func autoSyncImagesTask(d *Daemon) (task.Func, task.Schedule) {
return autoSyncImages(ctx, s)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.ImagesSynchronize, nil, nil, opRun, nil, nil, nil)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.ImagesSynchronize, s.ServerName, s.Events, opRun, operationOpts)
if err != nil {
logger.Error("Failed creating image synchronization operation", logger.Ctx{"err": err})
return
Expand Down Expand Up @@ -4825,7 +4833,8 @@ func createTokenResponse(s *state.State, r *http.Request, projectName string, fi
resources := map[string][]api.URL{}
resources["images"] = []api.URL{*api.NewURL().Path(version.APIVersion, "images", fingerprint)}

op, err := operations.OperationCreate(s, projectName, operations.OperationClassToken, operationtype.ImageToken, resources, meta, nil, nil, nil, r)
operationOpts := operations.ClusterOptions(s.DB.Cluster.TransactionSQL).WithProjectName(projectName).WithResources(resources).WithMetadata(meta).WithRequest(r)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassToken, operationtype.ImageToken, s.ServerName, s.Events, nil, operationOpts)
if err != nil {
return response.InternalError(err)
}
Expand Down
4 changes: 2 additions & 2 deletions lxd/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func pruneExpiredAndAutoCreateInstanceSnapshotsTask(d *Daemon) (task.Func, task.
return pruneExpiredInstanceSnapshots(ctx, expiredSnapshotInstances)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.SnapshotsExpire, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.SnapshotsExpire, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating instance snapshots expiry operation", logger.Ctx{"err": err})
} else {
Expand All @@ -690,7 +690,7 @@ func pruneExpiredAndAutoCreateInstanceSnapshotsTask(d *Daemon) (task.Func, task.
return autoCreateInstanceSnapshots(ctx, s, instances)
}

op, err := operations.OperationCreate(s, "", operations.OperationClassTask, operationtype.SnapshotCreate, nil, nil, opRun, nil, nil, nil)
op, err := operations.OperationCreate(s.ShutdownCtx, operations.OperationClassTask, operationtype.SnapshotCreate, s.ServerName, s.Events, opRun, operations.ClusterOptions(s.DB.Cluster.TransactionSQL))
if err != nil {
logger.Error("Failed creating scheduled instance snapshot operation", logger.Ctx{"err": err})
} else {
Expand Down
43 changes: 21 additions & 22 deletions lxd/instance/drivers/driver_lxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5467,12 +5467,11 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
}

actionScriptOp, err := operations.OperationCreate(
d.state,
d.Project().Name,
d.state.ShutdownCtx,
operations.OperationClassWebsocket,
operationtype.InstanceLiveMigrate,
nil,
nil,
d.state.ServerName,
d.state.Events,
func(op *operations.Operation) error {
result := <-restoreSuccess
if !result {
Expand All @@ -5481,28 +5480,28 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {

return nil
},
nil,
func(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
return fmt.Errorf("Missing action script secret")
}
operations.ClusterOptions(d.state.DB.Cluster.TransactionSQL).
WithProjectName(d.Project().Name).
WithOnConnect(func(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
return fmt.Errorf("Missing action script secret")
}

if secret != actionScriptOpSecret {
return os.ErrPermission
}
if secret != actionScriptOpSecret {
return os.ErrPermission
}

c, err := ws.Upgrader.Upgrade(w, r, nil)
if err != nil {
return err
}
c, err := ws.Upgrader.Upgrade(w, r, nil)
if err != nil {
return err
}

dumpDone <- true
dumpDone <- true

closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
return c.WriteMessage(websocket.CloseMessage, closeMsg)
},
nil,
closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
return c.WriteMessage(websocket.CloseMessage, closeMsg)
}),
)
if err != nil {
_ = os.RemoveAll(checkpointDir)
Expand Down
Loading

0 comments on commit 9653f79

Please sign in to comment.