Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use assignments instead of tasks #1508

Merged
merged 11 commits into from
Sep 9, 2016
13 changes: 10 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,16 @@ func (a *Agent) run(ctx context.Context) {
select {
case operation := <-sessionq:
operation.response <- operation.fn(session)
case msg := <-session.tasks:
if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
log.G(ctx).WithError(err).Error("task assignment failed")
case msg := <-session.assignments:
switch msg.Type {
case api.AssignmentsMessage_COMPLETE:
if err := a.worker.AssignTasks(ctx, msg.UpdateTasks); err != nil {
log.G(ctx).WithError(err).Error("failed to synchronize worker assignments")
}
case api.AssignmentsMessage_INCREMENTAL:
if err := a.worker.UpdateTasks(ctx, msg.UpdateTasks, msg.RemoveTasks); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is now an error condition where the agent and dispatcher disagree on the assignment set. The session will need to be restarted to sync everything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session.go:watch() deals with that (line 263)

log.G(ctx).WithError(err).Error("failed to update worker assignments")
}
}
case msg := <-session.messages:
if err := a.handleSessionMessage(ctx, msg); err != nil {
Expand Down
96 changes: 71 additions & 25 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/protobuf/ptypes"
Expand Down Expand Up @@ -31,26 +32,26 @@ type session struct {
conn *grpc.ClientConn
addr string

agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
tasks chan *api.TasksMessage
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
assignments chan *api.AssignmentsMessage

registered chan struct{} // closed registration
closed chan struct{}
}

func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session {
s := &session{
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
tasks: make(chan *api.TasksMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
agent: agent,
sessionID: sessionID,
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
assignments: make(chan *api.AssignmentsMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
}
peer, err := agent.config.Managers.Select()
if err != nil {
Expand Down Expand Up @@ -205,22 +206,68 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess
}

func (s *session) watch(ctx context.Context) error {
log.G(ctx).Debugf("(*session).watch")
client := api.NewDispatcherClient(s.conn)
watch, err := client.Tasks(ctx, &api.TasksRequest{
SessionID: s.sessionID})
if err != nil {
return err
}
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"})
log.Debugf("")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what mean for the debugf log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wanted to log the call to (*session).watch (see https://github.com/docker/swarmkit/pull/1508/files#diff-15ffe95a2da45f70696ffa3c01949601L208), just to indicate that it was starting, but we also wanted those fields to be attached to the logger so it can be used to log errors later. This call to Debugf just logs the fields of the logger without any additional info.

var (
resp *api.AssignmentsMessage
assignmentWatch api.Dispatcher_AssignmentsClient
tasksWatch api.Dispatcher_TasksClient
streamReference string
tasksFallback bool
err error
)

client := api.NewDispatcherClient(s.conn)
for {
resp, err := watch.Recv()
if err != nil {
return err
// If this is the first time we're running the loop, or there was a reference mismatch
// attempt to get the assignmentWatch
if assignmentWatch == nil && !tasksFallback {
assignmentWatch, err = client.Assignments(ctx, &api.AssignmentsRequest{SessionID: s.sessionID})
if err != nil {
return err
}
}
// We have an assignmentWatch, let's try to receive an AssignmentMessage
if assignmentWatch != nil {
// If we get a code = 12 desc = unknown method Assignments, try to use tasks
resp, err = assignmentWatch.Recv()
if err != nil {
if grpc.Code(err) != codes.Unimplemented {
return err
}
tasksFallback = true
assignmentWatch = nil
log.WithError(err).Infof("falling back to Tasks")
}
}

// This code is here for backwards compatibility (so that newer clients can use the
// older method Tasks)
if tasksWatch == nil && tasksFallback {
tasksWatch, err = client.Tasks(ctx, &api.TasksRequest{SessionID: s.sessionID})
if err != nil {
return err
}
}
if tasksWatch != nil {
var taskResp *api.TasksMessage
taskResp, err = tasksWatch.Recv()
if err != nil {
return err
}
resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, UpdateTasks: taskResp.Tasks}
}

// If there seems to be a gap in the stream, let's break out of the inner for and
// re-sync (by calling Assignments again).
if streamReference != "" && streamReference != resp.AppliesTo {
assignmentWatch = nil
} else {
streamReference = resp.ResultsIn
}

select {
case s.tasks <- resp:
case s.assignments <- resp:
case <-s.closed:
return errSessionClosed
case <-ctx.Done():
Expand All @@ -231,7 +278,6 @@ func (s *session) watch(ctx context.Context) error {

// sendTaskStatus uses the current session to send the status of a single task.
func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {

client := api.NewDispatcherClient(s.conn)
if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
SessionID: s.sessionID,
Expand Down
94 changes: 72 additions & 22 deletions agent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ type Worker interface {
// Init prepares the worker for task assignment.
Init(ctx context.Context) error

// Assign the set of tasks to the worker. Tasks outside of this set will be
// removed.
Assign(ctx context.Context, tasks []*api.Task) error
// AssignTasks assigns a complete set of tasks to a worker. Any task not included in
// this set will be removed.
AssignTasks(ctx context.Context, tasks []*api.Task) error

// UpdateTasks updates an incremental set of tasks to the worker. Any task not included
// either in added or removed will remain untouched.
UpdateTasks(ctx context.Context, added []*api.Task, removed []string) error

// Listen to updates about tasks controlled by the worker. When first
// called, the reporter will receive all updates for all tasks controlled
Expand Down Expand Up @@ -86,25 +90,47 @@ func (w *worker) Init(ctx context.Context) error {
})
}

// Assign the set of tasks to the worker. Any tasks not previously known will
// AssignTasks assigns the set of tasks to the worker. Any tasks not previously known will
// be started. Any tasks that are in the task set and already running will be
// updated, if possible. Any tasks currently running on the
// worker outside the task set will be terminated.
func (w *worker) Assign(ctx context.Context, tasks []*api.Task) error {
func (w *worker) AssignTasks(ctx context.Context, tasks []*api.Task) error {
w.mu.Lock()
defer w.mu.Unlock()

log.G(ctx).WithFields(logrus.Fields{
"len(tasks)": len(tasks),
}).Debug("(*worker).AssignTasks")

return reconcileTaskState(ctx, w, tasks, nil, true)
}

// UpdateTasks the set of tasks to the worker.
// Tasks in the added set will be added to the worker, and tasks in the removed set
// will be removed from the worker
func (w *worker) UpdateTasks(ctx context.Context, added []*api.Task, removed []string) error {
w.mu.Lock()
defer w.mu.Unlock()

log.G(ctx).WithFields(logrus.Fields{
"len(added)": len(added),
"len(removed)": len(removed),
}).Debug("(*worker).UpdateTasks")

return reconcileTaskState(ctx, w, added, removed, false)
}

func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, removed []string, fullSnapshot bool) error {
tx, err := w.db.Begin(true)
if err != nil {
log.G(ctx).WithError(err).Error("failed starting transaction against task database")
return err
}
defer tx.Rollback()

log.G(ctx).WithField("len(tasks)", len(tasks)).Debug("(*worker).Assign")
assigned := map[string]struct{}{}

for _, task := range tasks {
for _, task := range added {
log.G(ctx).WithFields(
logrus.Fields{
"task.id": task.ID,
Expand Down Expand Up @@ -135,35 +161,59 @@ func (w *worker) Assign(ctx context.Context, tasks []*api.Task) error {
return err
}
} else {
task.Status = *status // overwrite the stale manager status with ours.
task.Status = *status
}

w.startTask(ctx, tx, task)
}

assigned[task.ID] = struct{}{}
}

for id, tm := range w.taskManagers {
if _, ok := assigned[id]; ok {
continue
closeManager := func(tm *taskManager) {
// when a task is no longer assigned, we shutdown the task manager for
// it and leave cleanup to the sweeper.
if err := tm.Close(); err != nil {
log.G(ctx).WithError(err).Error("error closing task manager")
}
}

ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", id))
if err := SetTaskAssignment(tx, id, false); err != nil {
removeTaskAssignment := func(taskID string) error {
ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", taskID))
if err := SetTaskAssignment(tx, taskID, false); err != nil {
log.G(ctx).WithError(err).Error("error setting task assignment in database")
continue
}
return err
}

// If this was a complete set of assignments, we're going to remove all the remaining
// tasks.
if fullSnapshot {
for id, tm := range w.taskManagers {
if _, ok := assigned[id]; ok {
continue
}

delete(w.taskManagers, id)
err := removeTaskAssignment(id)
if err == nil {
delete(w.taskManagers, id)
go closeManager(tm)
}
}
} else {
// If this was an incremental set of assignments, we're going to remove only the tasks
// in the removed set
for _, taskID := range removed {
err := removeTaskAssignment(taskID)
if err != nil {
continue
}

go func(tm *taskManager) {
// when a task is no longer assigned, we shutdown the task manager for
// it and leave cleanup to the sweeper.
if err := tm.Close(); err != nil {
log.G(ctx).WithError(err).Error("error closing task manager")
tm, ok := w.taskManagers[taskID]
if ok {
delete(w.taskManagers, taskID)
go closeManager(tm)
}
}(tm)
}
}

return tx.Commit()
Expand Down
2 changes: 1 addition & 1 deletion agent/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestWorker(t *testing.T) {
// TODO(stevvooe): There are a few more states here we need to get
// covered to ensure correct during code changes.
} {
assert.NoError(t, worker.Assign(ctx, testcase.taskSet))
assert.NoError(t, worker.AssignTasks(ctx, testcase.taskSet))

var (
tasks []*api.Task
Expand Down
Loading