Skip to content

Commit

Permalink
bugfix: repaired orphaned run routine
Browse files Browse the repository at this point in the history
  • Loading branch information
clintjedwards committed Nov 22, 2022
1 parent 082975b commit 3f514c6
Show file tree
Hide file tree
Showing 10 changed files with 449 additions and 497 deletions.
3 changes: 0 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ We need a common way to alert on a PR or something that a task has succeeded or
- Once the pipeline finishes updating the pipeline will switch back to Ready state but the API will not
autoswitch back to active.
3. We need to add "update methods" to pipeline settings which will control the manner in which we roll out updates. Runs will need to include which version of the pipeline has run
- Orphaned run recovery is currently broken.
- Instead of injecting Gofer API tokens by default, allow the user to turn it on per pipeline and possibly even better allow the user to opt out certain tasks from receiving the key.
- Make sure that common tasks also get the same injected vars that other tasks get. This should be a baseline injection that all tasks can expect. Those tasks can then choose to ignore those specific vars.
- Clean up both github triggers and add a github common task.
- common task we can throw in there as a parallel task a the start of each pipeline. It will consume github commit, inform github of the pipeline pending and then query gofer to see when the run has ended. When the run ends the task will then inform github that the run has finished with a particular state.
- Think about making a new task type that you can pass commands to that automatically uses the gofer container. So users can get zero to code ultra-fast.
45 changes: 26 additions & 19 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func NewAPI(config *config.API, storage storage.DB, scheduler scheduler.Engine,

// findOrphans is a repair method that picks up where the gofer service left off if it was shutdown while
// a run was currently in progress.
// go newAPI.findOrphans()
go newAPI.findOrphans()

// These two functions are responsible for gofer's trigger event loop system. The first launches goroutines that
// consumes events from triggers and the latter processes them into pipeline runs.
Expand Down Expand Up @@ -374,8 +374,9 @@ func (api *API) createDefaultNamespace() error {
// While simple on its face this is actually quite non-trivial as it requires delicate figuring out where the run is
// currently in its lifecycle and accounting for any state it could possibly be in.
//
// Gofer identifies runs that haven't fully completed by keeping an in-progress run "cache" and identifying which ones it
// did not get an opportunity to finish tracking.
// Gofer identifies runs that haven't fully completed by searching through and matching run events.
// If an event is missing it's "Completed" event then on startup Gofer considers that run not finished and attempts
// to recover it.
//
// It then asks the scheduler for the last status of the container and appropriately either:
// - If the run is unfinished: Attach the goroutine responsible for monitoring said run.
Expand All @@ -398,7 +399,7 @@ func (api *API) findOrphans() {
for event := range events {
switch event.Kind {
case models.EventKindStartedRun:
// TODO(clintjedwards): This causes the data race alert to be angry,
// This causes the data race alert to be angry,
// but in theory it should be fine as we only read and write from
// the var once. Need to find a way to pass trait objects without
// Go complaining that other things can access them.
Expand Down Expand Up @@ -457,47 +458,53 @@ func (api *API) findOrphans() {
}
}

// repairOrphanRun allows gofer to repair runs that are orphaned from a bug of sudden shutdown.
// repairOrphanRun allows gofer to repair runs that are orphaned from a loss of tracking or sudden shutdown.
//
// - If the run is unfinished: Attach the goroutine responsible for monitoring said run.
// - If the container/task run is still running: Attach state watcher goroutine, truncate logs, attach new log watcher.
// - If the container is in a finished state: Remove from run cache -> update container state -> clear out logs
// - If the container is in a finished state: update container state -> clear out logs
// -> update logs with new logs.
// - If the scheduler has no record of this container ever running then assume the state is unknown.
func (api *API) repairOrphanRun(namespace, pipelineID string, runID int64) error {
pipeline, err := api.db.GetPipeline(nil, namespace, pipelineID)
func (api *API) repairOrphanRun(namespaceID, pipelineID string, runID int64) error {
pipeline, err := api.db.GetPipeline(nil, namespaceID, pipelineID)
if err != nil {
return err
}

run, err := api.db.GetRun(namespaceID, pipelineID, runID)
if err != nil {
return err
}

run, err := api.db.GetRun(namespace, pipelineID, runID)
taskRuns, err := api.db.ListTaskRuns(0, 0, namespaceID, pipelineID, runID)
if err != nil {
return err
}

// In order to manage the orphaned run we will create a new state machine and make it part of that.
runStateMachine := api.newRunStateMachine(&pipeline, &run)

// For each run we also need to handle the individual task runs.
for _, taskrunID := range run.TaskRuns {
taskrun, err := api.db.GetTaskRun(run.Namespace, run.Pipeline, run.ID, taskrunID)
if err != nil {
log.Error().Err(err).Str("pipeline", run.Pipeline).Int64("run", run.ID).
Msg("could not get run status for repair orphan")
continue
}
// For each run we also need to evaluate the individual task runs.
for _, taskrun := range taskRuns {
taskrun := taskrun

// If the task run was actually marked complete in the database. Then we add it to the state machine.
// This is necessary because eventually we will compute whether the run was complete and we'll need the
// state of that run.
if taskrun.State == models.TaskRunStateComplete {
runStateMachine.TaskRuns.Set(taskrun.Task.GetID(), taskrun)
continue
}

// If the taskrun was waiting to be scheduled then we have to make sure it gets scheduled as normal.
if taskrun.State == models.TaskRunStateWaiting || taskrun.State == models.TaskRunStateProcessing {
go runStateMachine.launchTaskRun(taskrun.Task)
go runStateMachine.launchTaskRun(taskrun.Task, false)
continue
}

// If it is unfinished and just need to be tracked then we just add log/state trackers onto it.
// If the task run was in a state where it had been launched and just needs to be tracked then we just
// add log/state trackers onto it.
runStateMachine.TaskRuns.Set(taskrun.Task.GetID(), taskrun)
go runStateMachine.handleLogUpdates(taskContainerID(taskrun.Namespace, taskrun.Pipeline, taskrun.Run, taskrun.ID), taskrun.ID)
go func() {
err = runStateMachine.waitTaskRunFinish(taskContainerID(taskrun.Namespace, taskrun.Pipeline, taskrun.Run, taskrun.ID), taskrun.ID)
Expand Down
39 changes: 22 additions & 17 deletions internal/api/runStateMachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (r *RunStateMachine) executeTaskTree() {
// Launch a new task run for each task found.
for _, task := range r.Pipeline.CustomTasks {
task := task
go r.launchTaskRun(&task)
go r.launchTaskRun(&task, true)
}

for _, taskSettings := range r.Pipeline.CommonTasks {
Expand All @@ -175,7 +175,7 @@ func (r *RunStateMachine) executeTaskTree() {
Settings: taskSettings,
}

go r.launchTaskRun(&task)
go r.launchTaskRun(&task, true)
}

// Finally monitor the entire run until it finishes. This will block until the run has ended.
Expand Down Expand Up @@ -617,9 +617,12 @@ outerLoop:
log.Debug().Strs("removed_files", removedFiles).Msg("removed task run logs")
}

// Launches a brand new task run as part of a larger run for a specific task.
// Registers[^1] and Launches a brand new task run as part of a larger run for a specific task.
// It blocks until the task run has completed.
func (r *RunStateMachine) launchTaskRun(task models.Task) {
//
// [^1]: The register parameter controls whether the task is registered in the database, announces it's creation
// via events. It's useful to turn this off when we're trying to revive a taskRun that is previously lost.
func (r *RunStateMachine) launchTaskRun(task models.Task, register bool) {
// If the task is a common task we need to check that it is in the registry, fill in those registry details,
// and then fail properly if it is not.
commonTask, isCommonTask := task.(*models.CommonTask)
Expand Down Expand Up @@ -660,24 +663,26 @@ func (r *RunStateMachine) launchTaskRun(task models.Task) {

r.TaskRuns.Set(newTaskRun.ID, *newTaskRun)

err := r.API.db.InsertTaskRun(newTaskRun)
if err != nil {
log.Error().Err(err).Msg("could not register task run; db error")
return
}
if register {
err := r.API.db.InsertTaskRun(newTaskRun)
if err != nil {
log.Error().Err(err).Msg("could not register task run; db error")
return
}

// Alert the event bus that a new task run is being started.
go r.API.events.Publish(models.EventCreatedTaskRun{
NamespaceID: r.Pipeline.Namespace,
PipelineID: r.Pipeline.ID,
RunID: r.Run.ID,
TaskRunID: newTaskRun.ID,
})
// Alert the event bus that a new task run is being started.
go r.API.events.Publish(models.EventCreatedTaskRun{
NamespaceID: r.Pipeline.Namespace,
PipelineID: r.Pipeline.ID,
RunID: r.Run.ID,
TaskRunID: newTaskRun.ID,
})
}

envVars := combineVariables(r.Run, task)

// Determine the task run's final variable set and pass them in.
err = r.API.db.UpdateTaskRun(newTaskRun, storage.UpdatableTaskRunFields{
err := r.API.db.UpdateTaskRun(newTaskRun, storage.UpdatableTaskRunFields{
Variables: &envVars,
})
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion internal/storage/migrations/0_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ CREATE TABLE IF NOT EXISTS runs (
state TEXT NOT NULL,
status TEXT NOT NULL,
status_reason TEXT,
task_runs TEXT NOT NULL,
trigger TEXT NOT NULL,
variables TEXT NOT NULL,
store_objects_expired INTEGER NOT NULL CHECK (store_objects_expired IN (0, 1)),
Expand Down
42 changes: 6 additions & 36 deletions internal/storage/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type UpdatableRunFields struct {
State *models.RunState
Status *models.RunStatus
StatusReason *models.RunStatusReason
TaskRuns *[]string
Variables *[]models.Variable
StoreObjectsExpired *bool
}
Expand All @@ -31,7 +30,7 @@ func (db *DB) ListRuns(conn qb.BaseRunner, offset, limit int, namespace, pipelin
}

rows, err := qb.Select("namespace", "pipeline", "id", "started", "ended", "state", "status", "status_reason",
"task_runs", "trigger", "variables", "store_objects_expired").
"trigger", "variables", "store_objects_expired").
From("runs").
Where(qb.Eq{"namespace": namespace, "pipeline": pipeline}).
OrderBy("started DESC").
Expand Down Expand Up @@ -59,13 +58,12 @@ func (db *DB) ListRuns(conn qb.BaseRunner, offset, limit int, namespace, pipelin
var state string
var status string
var statusReasonJSON sql.NullString
var taskRunsJSON string
var triggerJSON string
var variablesJSON string
var storeObjectsExpired bool

err = rows.Scan(&namespace, &pipeline, &id, &started, &ended, &state, &status, &statusReasonJSON,
&taskRunsJSON, &triggerJSON, &variablesJSON, &storeObjectsExpired)
&triggerJSON, &variablesJSON, &storeObjectsExpired)
if err != nil {
return nil, fmt.Errorf("database error occurred: %v; %w", err, ErrInternal)
}
Expand All @@ -79,12 +77,6 @@ func (db *DB) ListRuns(conn qb.BaseRunner, offset, limit int, namespace, pipelin
}
}

taskRuns := []string{}
err = json.Unmarshal([]byte(taskRunsJSON), &taskRuns)
if err != nil {
return nil, fmt.Errorf("database error occurred; could not decode object; %v", err)
}

trigger := models.TriggerInfo{}
err = json.Unmarshal([]byte(triggerJSON), &trigger)
if err != nil {
Expand All @@ -105,7 +97,6 @@ func (db *DB) ListRuns(conn qb.BaseRunner, offset, limit int, namespace, pipelin
run.State = models.RunState(state)
run.Status = models.RunStatus(status)
run.StatusReason = statusReason
run.TaskRuns = taskRuns
run.Trigger = trigger
run.Variables = variables
run.StoreObjectsExpired = storeObjectsExpired
Expand Down Expand Up @@ -137,11 +128,6 @@ func (db *DB) InsertRun(run *models.Run) (int64, error) {
statusReasonJSON = ptr(string(rawJSON))
}

taskRunsJSON, err := json.Marshal(run.TaskRuns)
if err != nil {
return 0, fmt.Errorf("database error occurred; could not encode object; %v", err)
}

triggerJSON, err := json.Marshal(run.Trigger)
if err != nil {
return 0, fmt.Errorf("database error occurred; could not encode object; %v", err)
Expand All @@ -165,9 +151,9 @@ func (db *DB) InsertRun(run *models.Run) (int64, error) {
}

_, err = qb.Insert("runs").Columns("namespace", "pipeline", "id", "started", "ended", "state", "status",
"status_reason", "task_runs", "trigger", "variables", "store_objects_expired").Values(
"status_reason", "trigger", "variables", "store_objects_expired").Values(
run.Namespace, run.Pipeline, nextID, run.Started, run.Ended, run.State, run.Status, statusReasonJSON,
string(taskRunsJSON), string(triggerJSON), string(variablesJSON), run.StoreObjectsExpired,
string(triggerJSON), string(variablesJSON), run.StoreObjectsExpired,
).RunWith(tx).Exec()
if err != nil {
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
Expand All @@ -187,7 +173,7 @@ func (db *DB) InsertRun(run *models.Run) (int64, error) {
}

func (db *DB) GetRun(namespace, pipeline string, run int64) (models.Run, error) {
row := qb.Select("started", "ended", "state", "status", "status_reason", "task_runs", "trigger", "variables",
row := qb.Select("started", "ended", "state", "status", "status_reason", "trigger", "variables",
"store_objects_expired").From("runs").
Where(qb.Eq{"namespace": namespace, "pipeline": pipeline, "id": run}).RunWith(db).QueryRow()

Expand All @@ -196,12 +182,11 @@ func (db *DB) GetRun(namespace, pipeline string, run int64) (models.Run, error)
var state string
var status string
var statusReasonJSON sql.NullString
var taskRunsJSON string
var triggerJSON string
var variablesJSON string
var storeObjectsExpired bool

err := row.Scan(&started, &ended, &state, &status, &statusReasonJSON, &taskRunsJSON, &triggerJSON,
err := row.Scan(&started, &ended, &state, &status, &statusReasonJSON, &triggerJSON,
&variablesJSON, &storeObjectsExpired)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand All @@ -220,12 +205,6 @@ func (db *DB) GetRun(namespace, pipeline string, run int64) (models.Run, error)
}
}

taskRuns := []string{}
err = json.Unmarshal([]byte(taskRunsJSON), &taskRuns)
if err != nil {
return models.Run{}, fmt.Errorf("database error occurred; could not decode object; %v", err)
}

trigger := models.TriggerInfo{}
err = json.Unmarshal([]byte(triggerJSON), &trigger)
if err != nil {
Expand All @@ -248,7 +227,6 @@ func (db *DB) GetRun(namespace, pipeline string, run int64) (models.Run, error)
retrievedRun.State = models.RunState(state)
retrievedRun.Status = models.RunStatus(status)
retrievedRun.StatusReason = statusReason
retrievedRun.TaskRuns = taskRuns
retrievedRun.Trigger = trigger
retrievedRun.Variables = variables
retrievedRun.StoreObjectsExpired = storeObjectsExpired
Expand Down Expand Up @@ -279,14 +257,6 @@ func (db *DB) UpdateRun(namespace, pipeline string, run int64, fields UpdatableR
query = query.Set("status_reason", string(statusReason))
}

if fields.TaskRuns != nil {
taskRuns, err := json.Marshal(fields.TaskRuns)
if err != nil {
return fmt.Errorf("database error occurred; could not encode object; %v", err)
}
query = query.Set("task_runs", taskRuns)
}

if fields.Variables != nil {
variables, err := json.Marshal(fields.Variables)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions internal/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,6 @@ func TestCRUDRuns(t *testing.T) {
Name: "test_trigger_name",
Label: "test_trigger_label",
}, []models.Variable{})
run.TaskRuns = []string{
"test_task_run",
}

runID, err := db.InsertRun(run)
if err != nil {
Expand Down Expand Up @@ -361,9 +358,6 @@ func TestCRUDTaskRuns(t *testing.T) {
Name: "test_trigger_name",
Label: "test_trigger_label",
}, []models.Variable{})
run.TaskRuns = []string{
"test_task_run",
}

runID, err := db.InsertRun(run)
if err != nil {
Expand Down Expand Up @@ -796,9 +790,6 @@ func TestCRUDObjectStorePipelineRuns(t *testing.T) {
Name: "test_trigger_name",
Label: "test_trigger_label",
}, []models.Variable{})
run.TaskRuns = []string{
"test_task_run",
}

runID, err := db.InsertRun(run)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions models/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ type Run struct {
State RunState `json:"state"` // The current state of the run.
Status RunStatus `json:"status"` // The current status of the run.
StatusReason *RunStatusReason `json:"status_reason"` // Contains more information about a run's current status.
TaskRuns []string `json:"task_runs"` // The unique ID of each task run.
Trigger TriggerInfo `json:"trigger"` // Information about which trigger was responsible for the run's execution.
Variables []Variable `json:"variables"` // Environment variables to be injected into each child task run. These are usually injected by the trigger.
StoreObjectsExpired bool `json:"store_objects_expired"` // Tracks whether objects for this run have expired already.
Expand Down Expand Up @@ -111,7 +110,6 @@ func NewRun(namespace, pipeline string, trigger TriggerInfo, variables []Variabl
State: RunStatePending,
Status: RunStatusUnknown,
StatusReason: nil,
TaskRuns: []string{},
Trigger: trigger,
Variables: variables,
StoreObjectsExpired: false,
Expand All @@ -138,7 +136,6 @@ func (r *Run) ToProto() *proto.Run {
State: proto.Run_RunState(proto.Run_RunState_value[string(r.State)]),
Status: proto.Run_RunStatus(proto.Run_RunStatus_value[string(r.Status)]),
StatusReason: statusReason,
TaskRuns: r.TaskRuns,
Trigger: r.Trigger.ToProto(),
Variables: variables,
StoreObjectsExpired: r.StoreObjectsExpired,
Expand Down Expand Up @@ -171,7 +168,6 @@ func (r *Run) FromProto(proto *proto.Run) {
r.State = RunState(proto.State.String())
r.Status = RunStatus(proto.Status.String())
r.StatusReason = statusReason
r.TaskRuns = proto.TaskRuns
r.Trigger = trigger
r.Variables = variables
r.StoreObjectsExpired = proto.StoreObjectsExpired
Expand Down
Loading

0 comments on commit 3f514c6

Please sign in to comment.