diff --git a/agent/agent.go b/agent/agent.go index a5750e22ca..5972f8cb70 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -168,9 +168,16 @@ func (a *Agent) run(ctx context.Context) { select { case operation := <-sessionq: operation.response <- operation.fn(session) - case msg := <-session.tasks: - if err := a.worker.Assign(ctx, msg.Tasks); err != nil { - log.G(ctx).WithError(err).Error("task assignment failed") + case msg := <-session.assignments: + switch msg.Type { + case api.AssignmentsMessage_COMPLETE: + if err := a.worker.AssignTasks(ctx, msg.UpdateTasks); err != nil { + log.G(ctx).WithError(err).Error("failed to synchronize worker assignments") + } + case api.AssignmentsMessage_INCREMENTAL: + if err := a.worker.UpdateTasks(ctx, msg.UpdateTasks, msg.RemoveTasks); err != nil { + log.G(ctx).WithError(err).Error("failed to update worker assignments") + } } case msg := <-session.messages: if err := a.handleSessionMessage(ctx, msg); err != nil { diff --git a/agent/session.go b/agent/session.go index c720a54f0c..b4afc0d6f0 100644 --- a/agent/session.go +++ b/agent/session.go @@ -4,6 +4,7 @@ import ( "errors" "time" + "github.com/Sirupsen/logrus" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/protobuf/ptypes" @@ -31,12 +32,12 @@ type session struct { conn *grpc.ClientConn addr string - agent *Agent - sessionID string - session api.Dispatcher_SessionClient - errs chan error - messages chan *api.SessionMessage - tasks chan *api.TasksMessage + agent *Agent + sessionID string + session api.Dispatcher_SessionClient + errs chan error + messages chan *api.SessionMessage + assignments chan *api.AssignmentsMessage registered chan struct{} // closed registration closed chan struct{} @@ -44,13 +45,13 @@ type session struct { func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session { s := &session{ - agent: agent, - sessionID: sessionID, - errs: make(chan error, 1), - messages: make(chan *api.SessionMessage), - tasks: make(chan *api.TasksMessage), - registered: make(chan struct{}), - closed: make(chan struct{}), + agent: agent, + sessionID: sessionID, + errs: make(chan error, 1), + messages: make(chan *api.SessionMessage), + assignments: make(chan *api.AssignmentsMessage), + registered: make(chan struct{}), + closed: make(chan struct{}), } peer, err := agent.config.Managers.Select() if err != nil { @@ -205,22 +206,68 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess } func (s *session) watch(ctx context.Context) error { - log.G(ctx).Debugf("(*session).watch") - client := api.NewDispatcherClient(s.conn) - watch, err := client.Tasks(ctx, &api.TasksRequest{ - SessionID: s.sessionID}) - if err != nil { - return err - } + log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"}) + log.Debugf("") + var ( + resp *api.AssignmentsMessage + assignmentWatch api.Dispatcher_AssignmentsClient + tasksWatch api.Dispatcher_TasksClient + streamReference string + tasksFallback bool + err error + ) + client := api.NewDispatcherClient(s.conn) for { - resp, err := watch.Recv() - if err != nil { - return err + // If this is the first time we're running the loop, or there was a reference mismatch + // attempt to get the assignmentWatch + if assignmentWatch == nil && !tasksFallback { + assignmentWatch, err = client.Assignments(ctx, &api.AssignmentsRequest{SessionID: s.sessionID}) + if err != nil { + return err + } + } + // We have an assignmentWatch, let's try to receive an AssignmentMessage + if assignmentWatch != nil { + // If we get a code = 12 desc = unknown method Assignments, try to use tasks + resp, err = assignmentWatch.Recv() + if err != nil { + if grpc.Code(err) != codes.Unimplemented { + return err + } + tasksFallback = true + assignmentWatch = nil + log.WithError(err).Infof("falling back to Tasks") + } + } + + // This code is here for backwards compatibility (so that newer clients can use the + // older method Tasks) + if tasksWatch == nil && tasksFallback { + tasksWatch, err = client.Tasks(ctx, &api.TasksRequest{SessionID: s.sessionID}) + if err != nil { + return err + } + } + if tasksWatch != nil { + var taskResp *api.TasksMessage + taskResp, err = tasksWatch.Recv() + if err != nil { + return err + } + resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, UpdateTasks: taskResp.Tasks} + } + + // If there seems to be a gap in the stream, let's break out of the inner for and + // re-sync (by calling Assignments again). + if streamReference != "" && streamReference != resp.AppliesTo { + assignmentWatch = nil + } else { + streamReference = resp.ResultsIn } select { - case s.tasks <- resp: + case s.assignments <- resp: case <-s.closed: return errSessionClosed case <-ctx.Done(): @@ -231,7 +278,6 @@ func (s *session) watch(ctx context.Context) error { // sendTaskStatus uses the current session to send the status of a single task. func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error { - client := api.NewDispatcherClient(s.conn) if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{ SessionID: s.sessionID, diff --git a/agent/worker.go b/agent/worker.go index 80e9ab07ab..f19c9c957b 100644 --- a/agent/worker.go +++ b/agent/worker.go @@ -17,9 +17,13 @@ type Worker interface { // Init prepares the worker for task assignment. Init(ctx context.Context) error - // Assign the set of tasks to the worker. Tasks outside of this set will be - // removed. - Assign(ctx context.Context, tasks []*api.Task) error + // AssignTasks assigns a complete set of tasks to a worker. Any task not included in + // this set will be removed. + AssignTasks(ctx context.Context, tasks []*api.Task) error + + // UpdateTasks updates an incremental set of tasks to the worker. Any task not included + // either in added or removed will remain untouched. + UpdateTasks(ctx context.Context, added []*api.Task, removed []string) error // Listen to updates about tasks controlled by the worker. When first // called, the reporter will receive all updates for all tasks controlled @@ -86,14 +90,37 @@ func (w *worker) Init(ctx context.Context) error { }) } -// Assign the set of tasks to the worker. Any tasks not previously known will +// AssignTasks assigns the set of tasks to the worker. Any tasks not previously known will // be started. Any tasks that are in the task set and already running will be // updated, if possible. Any tasks currently running on the // worker outside the task set will be terminated. -func (w *worker) Assign(ctx context.Context, tasks []*api.Task) error { +func (w *worker) AssignTasks(ctx context.Context, tasks []*api.Task) error { w.mu.Lock() defer w.mu.Unlock() + log.G(ctx).WithFields(logrus.Fields{ + "len(tasks)": len(tasks), + }).Debug("(*worker).AssignTasks") + + return reconcileTaskState(ctx, w, tasks, nil, true) +} + +// UpdateTasks the set of tasks to the worker. +// Tasks in the added set will be added to the worker, and tasks in the removed set +// will be removed from the worker +func (w *worker) UpdateTasks(ctx context.Context, added []*api.Task, removed []string) error { + w.mu.Lock() + defer w.mu.Unlock() + + log.G(ctx).WithFields(logrus.Fields{ + "len(added)": len(added), + "len(removed)": len(removed), + }).Debug("(*worker).UpdateTasks") + + return reconcileTaskState(ctx, w, added, removed, false) +} + +func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, removed []string, fullSnapshot bool) error { tx, err := w.db.Begin(true) if err != nil { log.G(ctx).WithError(err).Error("failed starting transaction against task database") @@ -101,10 +128,9 @@ func (w *worker) Assign(ctx context.Context, tasks []*api.Task) error { } defer tx.Rollback() - log.G(ctx).WithField("len(tasks)", len(tasks)).Debug("(*worker).Assign") assigned := map[string]struct{}{} - for _, task := range tasks { + for _, task := range added { log.G(ctx).WithFields( logrus.Fields{ "task.id": task.ID, @@ -135,35 +161,59 @@ func (w *worker) Assign(ctx context.Context, tasks []*api.Task) error { return err } } else { - task.Status = *status // overwrite the stale manager status with ours. + task.Status = *status } - w.startTask(ctx, tx, task) } assigned[task.ID] = struct{}{} } - for id, tm := range w.taskManagers { - if _, ok := assigned[id]; ok { - continue + closeManager := func(tm *taskManager) { + // when a task is no longer assigned, we shutdown the task manager for + // it and leave cleanup to the sweeper. + if err := tm.Close(); err != nil { + log.G(ctx).WithError(err).Error("error closing task manager") } + } - ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", id)) - if err := SetTaskAssignment(tx, id, false); err != nil { + removeTaskAssignment := func(taskID string) error { + ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", taskID)) + if err := SetTaskAssignment(tx, taskID, false); err != nil { log.G(ctx).WithError(err).Error("error setting task assignment in database") - continue } + return err + } + + // If this was a complete set of assignments, we're going to remove all the remaining + // tasks. + if fullSnapshot { + for id, tm := range w.taskManagers { + if _, ok := assigned[id]; ok { + continue + } - delete(w.taskManagers, id) + err := removeTaskAssignment(id) + if err == nil { + delete(w.taskManagers, id) + go closeManager(tm) + } + } + } else { + // If this was an incremental set of assignments, we're going to remove only the tasks + // in the removed set + for _, taskID := range removed { + err := removeTaskAssignment(taskID) + if err != nil { + continue + } - go func(tm *taskManager) { - // when a task is no longer assigned, we shutdown the task manager for - // it and leave cleanup to the sweeper. - if err := tm.Close(); err != nil { - log.G(ctx).WithError(err).Error("error closing task manager") + tm, ok := w.taskManagers[taskID] + if ok { + delete(w.taskManagers, taskID) + go closeManager(tm) } - }(tm) + } } return tx.Commit() diff --git a/agent/worker_test.go b/agent/worker_test.go index d554b88f57..9504074203 100644 --- a/agent/worker_test.go +++ b/agent/worker_test.go @@ -53,7 +53,7 @@ func TestWorker(t *testing.T) { // TODO(stevvooe): There are a few more states here we need to get // covered to ensure correct during code changes. } { - assert.NoError(t, worker.Assign(ctx, testcase.taskSet)) + assert.NoError(t, worker.AssignTasks(ctx, testcase.taskSet)) var ( tasks []*api.Task diff --git a/api/dispatcher.pb.go b/api/dispatcher.pb.go index 818933dd31..4e4b7f1b55 100644 --- a/api/dispatcher.pb.go +++ b/api/dispatcher.pb.go @@ -34,6 +34,31 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +// AssignmentType specifies whether this assignment message carries +// the full state, or is an update to an existing state. +type AssignmentsMessage_Type int32 + +const ( + AssignmentsMessage_COMPLETE AssignmentsMessage_Type = 0 + AssignmentsMessage_INCREMENTAL AssignmentsMessage_Type = 1 +) + +var AssignmentsMessage_Type_name = map[int32]string{ + 0: "COMPLETE", + 1: "INCREMENTAL", +} +var AssignmentsMessage_Type_value = map[string]int32{ + "COMPLETE": 0, + "INCREMENTAL": 1, +} + +func (x AssignmentsMessage_Type) String() string { + return proto.EnumName(AssignmentsMessage_Type_name, int32(x)) +} +func (AssignmentsMessage_Type) EnumDescriptor() ([]byte, []int) { + return fileDescriptorDispatcher, []int{9, 0} +} + // SessionRequest starts a session. type SessionRequest struct { Description *NodeDescription `protobuf:"bytes,1,opt,name=description" json:"description,omitempty"` @@ -180,6 +205,41 @@ func (m *TasksMessage) Reset() { *m = TasksMessage{} } func (*TasksMessage) ProtoMessage() {} func (*TasksMessage) Descriptor() ([]byte, []int) { return fileDescriptorDispatcher, []int{7} } +type AssignmentsRequest struct { + SessionID string `protobuf:"bytes,1,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` +} + +func (m *AssignmentsRequest) Reset() { *m = AssignmentsRequest{} } +func (*AssignmentsRequest) ProtoMessage() {} +func (*AssignmentsRequest) Descriptor() ([]byte, []int) { return fileDescriptorDispatcher, []int{8} } + +type AssignmentsMessage struct { + Type AssignmentsMessage_Type `protobuf:"varint,1,opt,name=type,proto3,enum=docker.swarmkit.v1.AssignmentsMessage_Type" json:"type,omitempty"` + // AppliesTo references the previous ResultsIn value, to chain + // incremental updates together. For the first update in a stream, + // AppliesTo is empty. If AppliesTo does not match the previously + // received ResultsIn, the consumer of the stream should start a new + // Assignments stream to re-sync. + AppliesTo string `protobuf:"bytes,2,opt,name=applies_to,json=appliesTo,proto3" json:"applies_to,omitempty"` + // ResultsIn identifies the result of this assignments message, to + // match against the next message's AppliesTo value and protect + // against missed messages. + ResultsIn string `protobuf:"bytes,3,opt,name=results_in,json=resultsIn,proto3" json:"results_in,omitempty"` + // UpdateTasks is a set of new or updated tasks to run on this node. + // In the first assignments message, it contains all of the tasks + // to run on this node. Tasks outside of this set running on the node + // should be terminated. + UpdateTasks []*Task `protobuf:"bytes,4,rep,name=update_tasks,json=updateTasks" json:"update_tasks,omitempty"` + // RemoveTasks is a set of previously-assigned task IDs to remove from the + // assignment set. It is not used in the first assignments message of + // a stream. + RemoveTasks []string `protobuf:"bytes,5,rep,name=remove_tasks,json=removeTasks" json:"remove_tasks,omitempty"` +} + +func (m *AssignmentsMessage) Reset() { *m = AssignmentsMessage{} } +func (*AssignmentsMessage) ProtoMessage() {} +func (*AssignmentsMessage) Descriptor() ([]byte, []int) { return fileDescriptorDispatcher, []int{9} } + func init() { proto.RegisterType((*SessionRequest)(nil), "docker.swarmkit.v1.SessionRequest") proto.RegisterType((*SessionMessage)(nil), "docker.swarmkit.v1.SessionMessage") @@ -190,6 +250,9 @@ func init() { proto.RegisterType((*UpdateTaskStatusResponse)(nil), "docker.swarmkit.v1.UpdateTaskStatusResponse") proto.RegisterType((*TasksRequest)(nil), "docker.swarmkit.v1.TasksRequest") proto.RegisterType((*TasksMessage)(nil), "docker.swarmkit.v1.TasksMessage") + proto.RegisterType((*AssignmentsRequest)(nil), "docker.swarmkit.v1.AssignmentsRequest") + proto.RegisterType((*AssignmentsMessage)(nil), "docker.swarmkit.v1.AssignmentsMessage") + proto.RegisterEnum("docker.swarmkit.v1.AssignmentsMessage_Type", AssignmentsMessage_Type_name, AssignmentsMessage_Type_value) } type authenticatedWrapperDispatcherServer struct { @@ -236,6 +299,14 @@ func (p *authenticatedWrapperDispatcherServer) Tasks(r *TasksRequest, stream Dis return p.local.Tasks(r, stream) } +func (p *authenticatedWrapperDispatcherServer) Assignments(r *AssignmentsRequest, stream Dispatcher_AssignmentsServer) error { + + if err := p.authorize(stream.Context(), []string{"swarm-worker", "swarm-manager"}); err != nil { + return err + } + return p.local.Assignments(r, stream) +} + func (m *SessionRequest) Copy() *SessionRequest { if m == nil { return nil @@ -371,6 +442,46 @@ func (m *TasksMessage) Copy() *TasksMessage { return o } +func (m *AssignmentsRequest) Copy() *AssignmentsRequest { + if m == nil { + return nil + } + + o := &AssignmentsRequest{ + SessionID: m.SessionID, + } + + return o +} + +func (m *AssignmentsMessage) Copy() *AssignmentsMessage { + if m == nil { + return nil + } + + o := &AssignmentsMessage{ + Type: m.Type, + AppliesTo: m.AppliesTo, + ResultsIn: m.ResultsIn, + } + + if m.UpdateTasks != nil { + o.UpdateTasks = make([]*Task, 0, len(m.UpdateTasks)) + for _, v := range m.UpdateTasks { + o.UpdateTasks = append(o.UpdateTasks, v.Copy()) + } + } + + if m.RemoveTasks != nil { + o.RemoveTasks = make([]string, 0, len(m.RemoveTasks)) + for _, v := range m.RemoveTasks { + o.RemoveTasks = append(o.RemoveTasks, v) + } + } + + return o +} + func (this *SessionRequest) GoString() string { if this == nil { return "nil" @@ -480,6 +591,32 @@ func (this *TasksMessage) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *AssignmentsRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&api.AssignmentsRequest{") + s = append(s, "SessionID: "+fmt.Sprintf("%#v", this.SessionID)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *AssignmentsMessage) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 9) + s = append(s, "&api.AssignmentsMessage{") + s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") + s = append(s, "AppliesTo: "+fmt.Sprintf("%#v", this.AppliesTo)+",\n") + s = append(s, "ResultsIn: "+fmt.Sprintf("%#v", this.ResultsIn)+",\n") + if this.UpdateTasks != nil { + s = append(s, "UpdateTasks: "+fmt.Sprintf("%#v", this.UpdateTasks)+",\n") + } + s = append(s, "RemoveTasks: "+fmt.Sprintf("%#v", this.RemoveTasks)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringDispatcher(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -541,6 +678,11 @@ type DispatcherClient interface { // of tasks which should be run on node, if task is not present in that list, // it should be terminated. Tasks(ctx context.Context, in *TasksRequest, opts ...grpc.CallOption) (Dispatcher_TasksClient, error) + // Assignments is a stream of assignments such as tasks and secrets for node. + // The first message in the stream contains all of the tasks and secrets + // that are relevant to the node. Future messages in the stream are updates to + // the set of assignments. + Assignments(ctx context.Context, in *AssignmentsRequest, opts ...grpc.CallOption) (Dispatcher_AssignmentsClient, error) } type dispatcherClient struct { @@ -633,6 +775,38 @@ func (x *dispatcherTasksClient) Recv() (*TasksMessage, error) { return m, nil } +func (c *dispatcherClient) Assignments(ctx context.Context, in *AssignmentsRequest, opts ...grpc.CallOption) (Dispatcher_AssignmentsClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Dispatcher_serviceDesc.Streams[2], c.cc, "/docker.swarmkit.v1.Dispatcher/Assignments", opts...) + if err != nil { + return nil, err + } + x := &dispatcherAssignmentsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Dispatcher_AssignmentsClient interface { + Recv() (*AssignmentsMessage, error) + grpc.ClientStream +} + +type dispatcherAssignmentsClient struct { + grpc.ClientStream +} + +func (x *dispatcherAssignmentsClient) Recv() (*AssignmentsMessage, error) { + m := new(AssignmentsMessage) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Server API for Dispatcher service type DispatcherServer interface { @@ -660,6 +834,11 @@ type DispatcherServer interface { // of tasks which should be run on node, if task is not present in that list, // it should be terminated. Tasks(*TasksRequest, Dispatcher_TasksServer) error + // Assignments is a stream of assignments such as tasks and secrets for node. + // The first message in the stream contains all of the tasks and secrets + // that are relevant to the node. Future messages in the stream are updates to + // the set of assignments. + Assignments(*AssignmentsRequest, Dispatcher_AssignmentsServer) error } func RegisterDispatcherServer(s *grpc.Server, srv DispatcherServer) { @@ -744,6 +923,27 @@ func (x *dispatcherTasksServer) Send(m *TasksMessage) error { return x.ServerStream.SendMsg(m) } +func _Dispatcher_Assignments_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(AssignmentsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(DispatcherServer).Assignments(m, &dispatcherAssignmentsServer{stream}) +} + +type Dispatcher_AssignmentsServer interface { + Send(*AssignmentsMessage) error + grpc.ServerStream +} + +type dispatcherAssignmentsServer struct { + grpc.ServerStream +} + +func (x *dispatcherAssignmentsServer) Send(m *AssignmentsMessage) error { + return x.ServerStream.SendMsg(m) +} + var _Dispatcher_serviceDesc = grpc.ServiceDesc{ ServiceName: "docker.swarmkit.v1.Dispatcher", HandlerType: (*DispatcherServer)(nil), @@ -768,6 +968,11 @@ var _Dispatcher_serviceDesc = grpc.ServiceDesc{ Handler: _Dispatcher_Tasks_Handler, ServerStreams: true, }, + { + StreamName: "Assignments", + Handler: _Dispatcher_Assignments_Handler, + ServerStreams: true, + }, }, } @@ -1055,6 +1260,92 @@ func (m *TasksMessage) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *AssignmentsRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *AssignmentsRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.SessionID) > 0 { + data[i] = 0xa + i++ + i = encodeVarintDispatcher(data, i, uint64(len(m.SessionID))) + i += copy(data[i:], m.SessionID) + } + return i, nil +} + +func (m *AssignmentsMessage) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *AssignmentsMessage) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Type != 0 { + data[i] = 0x8 + i++ + i = encodeVarintDispatcher(data, i, uint64(m.Type)) + } + if len(m.AppliesTo) > 0 { + data[i] = 0x12 + i++ + i = encodeVarintDispatcher(data, i, uint64(len(m.AppliesTo))) + i += copy(data[i:], m.AppliesTo) + } + if len(m.ResultsIn) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintDispatcher(data, i, uint64(len(m.ResultsIn))) + i += copy(data[i:], m.ResultsIn) + } + if len(m.UpdateTasks) > 0 { + for _, msg := range m.UpdateTasks { + data[i] = 0x22 + i++ + i = encodeVarintDispatcher(data, i, uint64(msg.Size())) + n, err := msg.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.RemoveTasks) > 0 { + for _, s := range m.RemoveTasks { + data[i] = 0x2a + i++ + l = len(s) + for l >= 1<<7 { + data[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + data[i] = uint8(l) + i++ + i += copy(data[i:], s) + } + } + return i, nil +} + func encodeFixed64Dispatcher(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) @@ -1280,6 +1571,53 @@ func (p *raftProxyDispatcherServer) Tasks(r *TasksRequest, stream Dispatcher_Tas return nil } +func (p *raftProxyDispatcherServer) Assignments(r *AssignmentsRequest, stream Dispatcher_AssignmentsServer) error { + + if p.cluster.IsLeader() { + return p.local.Assignments(r, stream) + } + ctx, err := p.runCtxMods(stream.Context()) + if err != nil { + return err + } + conn, err := p.connSelector.Conn() + if err != nil { + return err + } + + defer func() { + if err != nil { + errStr := err.Error() + if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) || + strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) || + strings.Contains(errStr, "connection error") || + grpc.Code(err) == codes.Internal { + p.connSelector.Reset() + } + } + }() + + clientStream, err := NewDispatcherClient(conn).Assignments(ctx, r) + + if err != nil { + return err + } + + for { + msg, err := clientStream.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + if err := stream.Send(msg); err != nil { + return err + } + } + return nil +} + func (m *SessionRequest) Size() (n int) { var l int _ = l @@ -1396,6 +1734,45 @@ func (m *TasksMessage) Size() (n int) { return n } +func (m *AssignmentsRequest) Size() (n int) { + var l int + _ = l + l = len(m.SessionID) + if l > 0 { + n += 1 + l + sovDispatcher(uint64(l)) + } + return n +} + +func (m *AssignmentsMessage) Size() (n int) { + var l int + _ = l + if m.Type != 0 { + n += 1 + sovDispatcher(uint64(m.Type)) + } + l = len(m.AppliesTo) + if l > 0 { + n += 1 + l + sovDispatcher(uint64(l)) + } + l = len(m.ResultsIn) + if l > 0 { + n += 1 + l + sovDispatcher(uint64(l)) + } + if len(m.UpdateTasks) > 0 { + for _, e := range m.UpdateTasks { + l = e.Size() + n += 1 + l + sovDispatcher(uint64(l)) + } + } + if len(m.RemoveTasks) > 0 { + for _, s := range m.RemoveTasks { + l = len(s) + n += 1 + l + sovDispatcher(uint64(l)) + } + } + return n +} + func sovDispatcher(x uint64) (n int) { for { n++ @@ -1504,6 +1881,30 @@ func (this *TasksMessage) String() string { }, "") return s } +func (this *AssignmentsRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&AssignmentsRequest{`, + `SessionID:` + fmt.Sprintf("%v", this.SessionID) + `,`, + `}`, + }, "") + return s +} +func (this *AssignmentsMessage) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&AssignmentsMessage{`, + `Type:` + fmt.Sprintf("%v", this.Type) + `,`, + `AppliesTo:` + fmt.Sprintf("%v", this.AppliesTo) + `,`, + `ResultsIn:` + fmt.Sprintf("%v", this.ResultsIn) + `,`, + `UpdateTasks:` + strings.Replace(fmt.Sprintf("%v", this.UpdateTasks), "Task", "Task", 1) + `,`, + `RemoveTasks:` + fmt.Sprintf("%v", this.RemoveTasks) + `,`, + `}`, + }, "") + return s +} func valueToStringDispatcher(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -2389,6 +2790,272 @@ func (m *TasksMessage) Unmarshal(data []byte) error { } return nil } +func (m *AssignmentsRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AssignmentsRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AssignmentsRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SessionID", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDispatcher + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SessionID = string(data[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDispatcher(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDispatcher + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AssignmentsMessage) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AssignmentsMessage: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AssignmentsMessage: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Type |= (AssignmentsMessage_Type(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field AppliesTo", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDispatcher + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.AppliesTo = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ResultsIn", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDispatcher + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ResultsIn = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UpdateTasks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthDispatcher + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.UpdateTasks = append(m.UpdateTasks, &Task{}) + if err := m.UpdateTasks[len(m.UpdateTasks)-1].Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RemoveTasks", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDispatcher + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RemoveTasks = append(m.RemoveTasks, string(data[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDispatcher(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDispatcher + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipDispatcher(data []byte) (n int, err error) { l := len(data) iNdEx := 0 @@ -2495,46 +3162,57 @@ var ( ) var fileDescriptorDispatcher = []byte{ - // 645 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x95, 0xdf, 0x6a, 0x13, 0x4f, - 0x14, 0xc7, 0x3b, 0x69, 0x9a, 0xfe, 0x72, 0xd2, 0xfe, 0x88, 0x63, 0xb1, 0xcb, 0x52, 0xb7, 0x71, - 0xab, 0x50, 0xb0, 0x6e, 0x35, 0x82, 0x17, 0x52, 0x44, 0x42, 0x0a, 0x86, 0xe2, 0x1f, 0xb6, 0x6a, - 0x2f, 0xcb, 0x24, 0x7b, 0x48, 0xd7, 0xd8, 0x9d, 0x75, 0x66, 0x62, 0xcd, 0x85, 0x20, 0x88, 0xb7, - 0x22, 0x5e, 0xf9, 0x14, 0x3e, 0x47, 0xf1, 0xca, 0x4b, 0xaf, 0x8a, 0xcd, 0x03, 0x88, 0x8f, 0x20, - 0xbb, 0x3b, 0x9b, 0xd6, 0x74, 0x53, 0x9b, 0x5e, 0x65, 0xfe, 0x7c, 0xcf, 0xf7, 0x7c, 0x38, 0xe7, - 0x4c, 0x16, 0xca, 0x9e, 0x2f, 0x43, 0xa6, 0x5a, 0x3b, 0x28, 0x9c, 0x50, 0x70, 0xc5, 0x29, 0xf5, - 0x78, 0xab, 0x83, 0xc2, 0x91, 0x7b, 0x4c, 0xec, 0x76, 0x7c, 0xe5, 0xbc, 0xbe, 0x65, 0x96, 0x54, - 0x2f, 0x44, 0x99, 0x08, 0xcc, 0x59, 0xde, 0x7c, 0x81, 0x2d, 0x95, 0x6e, 0xe7, 0xda, 0xbc, 0xcd, - 0xe3, 0xe5, 0x6a, 0xb4, 0xd2, 0xa7, 0x17, 0xc3, 0x97, 0xdd, 0xb6, 0x1f, 0xac, 0x26, 0x3f, 0xfa, - 0x70, 0xde, 0xeb, 0x0a, 0xa6, 0x7c, 0x1e, 0xac, 0xa6, 0x8b, 0xe4, 0xc2, 0xfe, 0x40, 0xe0, 0xff, - 0x4d, 0x94, 0xd2, 0xe7, 0x81, 0x8b, 0xaf, 0xba, 0x28, 0x15, 0x5d, 0x87, 0x92, 0x87, 0xb2, 0x25, - 0xfc, 0x30, 0xd2, 0x19, 0xa4, 0x42, 0x96, 0x4b, 0xd5, 0x25, 0xe7, 0x24, 0x9c, 0xf3, 0x88, 0x7b, - 0x58, 0x3f, 0x92, 0xba, 0xc7, 0xe3, 0xe8, 0x0a, 0x80, 0x4c, 0x8c, 0xb7, 0x7d, 0xcf, 0xc8, 0x55, - 0xc8, 0x72, 0xb1, 0x36, 0xdb, 0x3f, 0x58, 0x2c, 0xea, 0x74, 0x8d, 0xba, 0x5b, 0xd4, 0x82, 0x86, - 0x67, 0xbf, 0xcf, 0x0d, 0x38, 0x1e, 0xa2, 0x94, 0xac, 0x8d, 0x43, 0x06, 0xe4, 0x74, 0x03, 0xba, - 0x02, 0xf9, 0x80, 0x7b, 0x18, 0x27, 0x2a, 0x55, 0x8d, 0x51, 0xb8, 0x6e, 0xac, 0xa2, 0x6b, 0xf0, - 0xdf, 0x2e, 0x0b, 0x58, 0x1b, 0x85, 0x34, 0x26, 0x2b, 0x93, 0xcb, 0xa5, 0x6a, 0x25, 0x2b, 0x62, - 0x0b, 0xfd, 0xf6, 0x8e, 0x42, 0xef, 0x09, 0xa2, 0x70, 0x07, 0x11, 0x74, 0x0b, 0x2e, 0x05, 0xa8, - 0xf6, 0xb8, 0xe8, 0x6c, 0x37, 0x39, 0x57, 0x52, 0x09, 0x16, 0x6e, 0x77, 0xb0, 0x27, 0x8d, 0x7c, - 0xec, 0x75, 0x25, 0xcb, 0x6b, 0x3d, 0x68, 0x89, 0x5e, 0x5c, 0x9a, 0x0d, 0xec, 0xb9, 0x73, 0xda, - 0xa0, 0x96, 0xc6, 0x6f, 0x60, 0x4f, 0xda, 0xf7, 0xa1, 0xfc, 0x00, 0x99, 0x50, 0x4d, 0x64, 0x2a, - 0x6d, 0xc7, 0x58, 0x65, 0xb0, 0x1f, 0xc3, 0x85, 0x63, 0x0e, 0x32, 0xe4, 0x81, 0x44, 0x7a, 0x17, - 0x0a, 0x21, 0x0a, 0x9f, 0x7b, 0xba, 0x99, 0x0b, 0x59, 0x7c, 0x75, 0x3d, 0x18, 0xb5, 0xfc, 0xfe, - 0xc1, 0xe2, 0x84, 0xab, 0x23, 0xec, 0x4f, 0x39, 0x98, 0x7f, 0x16, 0x7a, 0x4c, 0xe1, 0x53, 0x26, - 0x3b, 0x9b, 0x8a, 0xa9, 0xae, 0x3c, 0x17, 0x1a, 0x7d, 0x0e, 0xd3, 0xdd, 0xd8, 0x28, 0x2d, 0xf9, - 0x5a, 0x16, 0xc6, 0x88, 0x5c, 0xce, 0xd1, 0x49, 0xa2, 0x70, 0x53, 0x33, 0x93, 0x43, 0x79, 0xf8, - 0x92, 0x2e, 0xc1, 0xb4, 0x62, 0xb2, 0x73, 0x84, 0x05, 0xfd, 0x83, 0xc5, 0x42, 0x24, 0x6b, 0xd4, - 0xdd, 0x42, 0x74, 0xd5, 0xf0, 0xe8, 0x1d, 0x28, 0xc8, 0x38, 0x48, 0x0f, 0x8d, 0x95, 0xc5, 0x73, - 0x8c, 0x44, 0xab, 0x6d, 0x13, 0x8c, 0x93, 0x94, 0x49, 0xa9, 0xed, 0x35, 0x98, 0x89, 0x4e, 0xcf, - 0x57, 0x22, 0xfb, 0x9e, 0x8e, 0x4e, 0x9f, 0x80, 0x03, 0x53, 0x11, 0xab, 0x34, 0x48, 0x5c, 0x30, - 0x63, 0x14, 0xa0, 0x9b, 0xc8, 0xaa, 0x1f, 0xf3, 0x00, 0xf5, 0xc1, 0xdf, 0x0a, 0x7d, 0x03, 0xd3, - 0x3a, 0x0d, 0xb5, 0xb3, 0x42, 0xff, 0x7e, 0xf8, 0xe6, 0x69, 0x1a, 0x4d, 0x64, 0x2f, 0x7d, 0xfb, - 0xfa, 0xeb, 0x4b, 0xee, 0x32, 0xcc, 0xc4, 0x9a, 0x1b, 0xd1, 0x08, 0xa3, 0x80, 0xd9, 0x64, 0xa7, - 0x1f, 0xc8, 0x4d, 0x42, 0xdf, 0x42, 0x71, 0x30, 0x86, 0xf4, 0x6a, 0x96, 0xef, 0xf0, 0x9c, 0x9b, - 0xd7, 0xfe, 0xa1, 0xd2, 0x05, 0x3e, 0x0b, 0x00, 0xfd, 0x4c, 0xa0, 0x3c, 0xdc, 0x22, 0x7a, 0x7d, - 0x8c, 0x71, 0x33, 0x57, 0xce, 0x26, 0x1e, 0x07, 0x4a, 0xc0, 0x54, 0xdc, 0x5c, 0x5a, 0x19, 0xd5, - 0xc6, 0x41, 0xf6, 0xd1, 0x8a, 0xf1, 0xfa, 0x50, 0x5b, 0xd8, 0x3f, 0xb4, 0x26, 0x7e, 0x1c, 0x5a, - 0x13, 0xbf, 0x0f, 0x2d, 0xf2, 0xae, 0x6f, 0x91, 0xfd, 0xbe, 0x45, 0xbe, 0xf7, 0x2d, 0xf2, 0xb3, - 0x6f, 0x91, 0x66, 0x21, 0xfe, 0x06, 0xdc, 0xfe, 0x13, 0x00, 0x00, 0xff, 0xff, 0xa3, 0xfc, 0x50, - 0xc8, 0x8b, 0x06, 0x00, 0x00, + // 820 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x55, 0x4d, 0x6f, 0x1b, 0x45, + 0x18, 0xce, 0x38, 0x8e, 0x53, 0xbf, 0xeb, 0x14, 0x33, 0x54, 0x74, 0x65, 0xb5, 0x1b, 0x77, 0x43, + 0x23, 0x4b, 0x0d, 0x9b, 0x62, 0x24, 0x0e, 0x10, 0x01, 0x75, 0x6d, 0x09, 0xab, 0x4d, 0x5a, 0x6d, + 0x0d, 0x3d, 0x5a, 0x6b, 0xef, 0x2b, 0x77, 0x71, 0xbc, 0xb3, 0xcc, 0xcc, 0xb6, 0xf8, 0x80, 0x84, + 0x04, 0x48, 0x1c, 0x11, 0xa7, 0x8a, 0x1f, 0xc1, 0xef, 0x88, 0x38, 0x71, 0xe4, 0x14, 0x11, 0xff, + 0x00, 0xc4, 0x4f, 0xa8, 0x76, 0x77, 0xd6, 0x71, 0x9d, 0x75, 0xe2, 0xe4, 0xe4, 0xd9, 0x77, 0x9e, + 0xe7, 0x99, 0x47, 0xef, 0x97, 0xa1, 0xec, 0x7a, 0x22, 0x70, 0x64, 0xff, 0x05, 0x72, 0x2b, 0xe0, + 0x4c, 0x32, 0x4a, 0x5d, 0xd6, 0x1f, 0x22, 0xb7, 0xc4, 0x2b, 0x87, 0x8f, 0x86, 0x9e, 0xb4, 0x5e, + 0x7e, 0x54, 0xd1, 0xe4, 0x38, 0x40, 0x91, 0x00, 0x2a, 0x1b, 0xac, 0xf7, 0x2d, 0xf6, 0x65, 0xfa, + 0x79, 0x63, 0xc0, 0x06, 0x2c, 0x3e, 0xee, 0x46, 0x27, 0x15, 0x7d, 0x2f, 0x38, 0x0c, 0x07, 0x9e, + 0xbf, 0x9b, 0xfc, 0xa8, 0xe0, 0x4d, 0x37, 0xe4, 0x8e, 0xf4, 0x98, 0xbf, 0x9b, 0x1e, 0x92, 0x0b, + 0xf3, 0x17, 0x02, 0xd7, 0x9f, 0xa1, 0x10, 0x1e, 0xf3, 0x6d, 0xfc, 0x2e, 0x44, 0x21, 0x69, 0x0b, + 0x34, 0x17, 0x45, 0x9f, 0x7b, 0x41, 0x84, 0xd3, 0x49, 0x95, 0xd4, 0xb4, 0xfa, 0x96, 0x75, 0xd6, + 0x9c, 0x75, 0xc0, 0x5c, 0x6c, 0x9e, 0x42, 0xed, 0x59, 0x1e, 0xdd, 0x01, 0x10, 0x89, 0x70, 0xd7, + 0x73, 0xf5, 0x5c, 0x95, 0xd4, 0x8a, 0x8d, 0x8d, 0xc9, 0xf1, 0x66, 0x51, 0x3d, 0xd7, 0x6e, 0xda, + 0x45, 0x05, 0x68, 0xbb, 0xe6, 0x4f, 0xb9, 0xa9, 0x8f, 0x7d, 0x14, 0xc2, 0x19, 0xe0, 0x9c, 0x00, + 0x39, 0x5f, 0x80, 0xee, 0x40, 0xde, 0x67, 0x2e, 0xc6, 0x0f, 0x69, 0x75, 0x7d, 0x91, 0x5d, 0x3b, + 0x46, 0xd1, 0x3d, 0xb8, 0x36, 0x72, 0x7c, 0x67, 0x80, 0x5c, 0xe8, 0xab, 0xd5, 0xd5, 0x9a, 0x56, + 0xaf, 0x66, 0x31, 0x9e, 0xa3, 0x37, 0x78, 0x21, 0xd1, 0x7d, 0x8a, 0xc8, 0xed, 0x29, 0x83, 0x3e, + 0x87, 0xf7, 0x7d, 0x94, 0xaf, 0x18, 0x1f, 0x76, 0x7b, 0x8c, 0x49, 0x21, 0xb9, 0x13, 0x74, 0x87, + 0x38, 0x16, 0x7a, 0x3e, 0xd6, 0xba, 0x93, 0xa5, 0xd5, 0xf2, 0xfb, 0x7c, 0x1c, 0xa7, 0xe6, 0x11, + 0x8e, 0xed, 0x1b, 0x4a, 0xa0, 0x91, 0xf2, 0x1f, 0xe1, 0x58, 0x98, 0x5f, 0x42, 0xf9, 0x2b, 0x74, + 0xb8, 0xec, 0xa1, 0x23, 0xd3, 0x72, 0x5c, 0x2a, 0x0d, 0xe6, 0x13, 0x78, 0x77, 0x46, 0x41, 0x04, + 0xcc, 0x17, 0x48, 0x3f, 0x85, 0x42, 0x80, 0xdc, 0x63, 0xae, 0x2a, 0xe6, 0xad, 0x2c, 0x7f, 0x4d, + 0xd5, 0x18, 0x8d, 0xfc, 0xd1, 0xf1, 0xe6, 0x8a, 0xad, 0x18, 0xe6, 0x6f, 0x39, 0xb8, 0xf9, 0x75, + 0xe0, 0x3a, 0x12, 0x3b, 0x8e, 0x18, 0x3e, 0x93, 0x8e, 0x0c, 0xc5, 0x95, 0xac, 0xd1, 0x6f, 0x60, + 0x3d, 0x8c, 0x85, 0xd2, 0x94, 0xef, 0x65, 0xd9, 0x58, 0xf0, 0x96, 0x75, 0x1a, 0x49, 0x10, 0x76, + 0x2a, 0x56, 0x61, 0x50, 0x9e, 0xbf, 0xa4, 0x5b, 0xb0, 0x2e, 0x1d, 0x31, 0x3c, 0xb5, 0x05, 0x93, + 0xe3, 0xcd, 0x42, 0x04, 0x6b, 0x37, 0xed, 0x42, 0x74, 0xd5, 0x76, 0xe9, 0x27, 0x50, 0x10, 0x31, + 0x49, 0x35, 0x8d, 0x91, 0xe5, 0x67, 0xc6, 0x89, 0x42, 0x9b, 0x15, 0xd0, 0xcf, 0xba, 0x4c, 0x52, + 0x6d, 0xee, 0x41, 0x29, 0x8a, 0x5e, 0x2d, 0x45, 0xe6, 0xe7, 0x8a, 0x9d, 0x8e, 0x80, 0x05, 0x6b, + 0x91, 0x57, 0xa1, 0x93, 0x38, 0x61, 0xfa, 0x22, 0x83, 0x76, 0x02, 0x33, 0x1b, 0x40, 0x1f, 0x08, + 0xe1, 0x0d, 0xfc, 0x11, 0xfa, 0xf2, 0x8a, 0x1e, 0xfe, 0xc8, 0xbd, 0x25, 0x92, 0x5a, 0xf9, 0x02, + 0xf2, 0xd1, 0x2a, 0x8a, 0xe9, 0xd7, 0xeb, 0xf7, 0xb2, 0x9c, 0x9c, 0x65, 0x59, 0x9d, 0x71, 0x80, + 0x76, 0x4c, 0xa4, 0xb7, 0x01, 0x9c, 0x20, 0x38, 0xf4, 0x50, 0x74, 0x25, 0x4b, 0xf6, 0x81, 0x5d, + 0x54, 0x91, 0x0e, 0x8b, 0xae, 0x39, 0x8a, 0xf0, 0x50, 0x8a, 0xae, 0xe7, 0xeb, 0xab, 0xc9, 0xb5, + 0x8a, 0xb4, 0x7d, 0xfa, 0x19, 0x94, 0x92, 0x7a, 0x77, 0x93, 0x84, 0xe4, 0x2f, 0x48, 0x88, 0x16, + 0x4e, 0x2b, 0x24, 0xe8, 0x1d, 0x28, 0x71, 0x1c, 0xb1, 0x97, 0x29, 0x79, 0xad, 0xba, 0x5a, 0x2b, + 0xda, 0x5a, 0x12, 0x8b, 0x21, 0xe6, 0x5d, 0xc8, 0x47, 0x5e, 0x69, 0x09, 0xae, 0x3d, 0x7c, 0xb2, + 0xff, 0xf4, 0x71, 0xab, 0xd3, 0x2a, 0xaf, 0xd0, 0x77, 0x40, 0x6b, 0x1f, 0x3c, 0xb4, 0x5b, 0xfb, + 0xad, 0x83, 0xce, 0x83, 0xc7, 0x65, 0x52, 0x7f, 0xbd, 0x06, 0xd0, 0x9c, 0xee, 0x6d, 0xfa, 0x3d, + 0xac, 0xab, 0x1c, 0x52, 0x33, 0xcb, 0xca, 0xdb, 0x9b, 0xb5, 0x72, 0x1e, 0x46, 0x65, 0xcc, 0xdc, + 0xfa, 0xeb, 0xcf, 0xff, 0x5e, 0xe7, 0x6e, 0x43, 0x29, 0xc6, 0x7c, 0x18, 0xed, 0x08, 0xe4, 0xb0, + 0x91, 0x7c, 0xa9, 0x0d, 0x74, 0x9f, 0xd0, 0x1f, 0xa0, 0x38, 0x9d, 0x73, 0xfa, 0x41, 0x96, 0xee, + 0xfc, 0x22, 0xa9, 0xdc, 0xbd, 0x00, 0xa5, 0x3a, 0x78, 0x19, 0x03, 0xf4, 0x77, 0x02, 0xe5, 0xf9, + 0x19, 0xa0, 0xf7, 0x2e, 0x31, 0xcf, 0x95, 0x9d, 0xe5, 0xc0, 0x97, 0x31, 0x15, 0xc2, 0x5a, 0x52, + 0xef, 0xea, 0xa2, 0xb6, 0x98, 0xbe, 0xbe, 0x18, 0x91, 0xd6, 0x61, 0x7b, 0x89, 0x17, 0x7f, 0xcd, + 0x91, 0xfb, 0x84, 0xfe, 0x4c, 0x40, 0x9b, 0x69, 0x7d, 0xba, 0x7d, 0xc1, 0x6c, 0xa4, 0x1e, 0xb6, + 0x97, 0x9b, 0xa1, 0x25, 0x3b, 0xa2, 0x71, 0xeb, 0xe8, 0xc4, 0x58, 0xf9, 0xe7, 0xc4, 0x58, 0xf9, + 0xff, 0xc4, 0x20, 0x3f, 0x4e, 0x0c, 0x72, 0x34, 0x31, 0xc8, 0xdf, 0x13, 0x83, 0xfc, 0x3b, 0x31, + 0x48, 0xaf, 0x10, 0xff, 0xdd, 0x7f, 0xfc, 0x26, 0x00, 0x00, 0xff, 0xff, 0xf9, 0xb7, 0x47, 0x6b, + 0x76, 0x08, 0x00, 0x00, } diff --git a/api/dispatcher.proto b/api/dispatcher.proto index acb8c72c33..40c1e33804 100644 --- a/api/dispatcher.proto +++ b/api/dispatcher.proto @@ -47,13 +47,22 @@ service Dispatcher { // maybe dispatch, al likes this // it should be terminated. rpc Tasks(TasksRequest) returns (stream TasksMessage) { option (docker.protobuf.plugin.tls_authorization) = { roles: "swarm-worker" roles: "swarm-manager" }; + option deprecated = true; + }; + + // Assignments is a stream of assignments such as tasks and secrets for node. + // The first message in the stream contains all of the tasks and secrets + // that are relevant to the node. Future messages in the stream are updates to + // the set of assignments. + rpc Assignments(AssignmentsRequest) returns (stream AssignmentsMessage) { + option (docker.protobuf.plugin.tls_authorization) = { roles: "swarm-worker" roles: "swarm-manager" }; }; } // SessionRequest starts a session. message SessionRequest { NodeDescription description = 1; - // SessionID can be provided to attempt resuming an exising session. If the + // SessionID can be provided to attempt resuming an exising session. If the // SessionID is empty or invalid, a new SessionID will be assigned. // // See SessionMessage.SessionID for details. @@ -115,7 +124,7 @@ message SessionMessage { repeated WeightedPeer managers = 3; // Symmetric encryption key distributed by the lead manager. Used by agents - // for securing network bootstrapping and communication. + // for securing network bootstrapping and communication. repeated EncryptionKey network_bootstrap_keys = 4; } @@ -157,3 +166,40 @@ message TasksMessage { repeated Task tasks = 1; } +message AssignmentsRequest { + string session_id = 1 [(gogoproto.customname) = "SessionID"]; +} + +message AssignmentsMessage { + // AssignmentType specifies whether this assignment message carries + // the full state, or is an update to an existing state. + enum Type { + COMPLETE = 0; + INCREMENTAL = 1; + } + + Type type = 1; + + // AppliesTo references the previous ResultsIn value, to chain + // incremental updates together. For the first update in a stream, + // AppliesTo is empty. If AppliesTo does not match the previously + // received ResultsIn, the consumer of the stream should start a new + // Assignments stream to re-sync. + string applies_to = 2; + + // ResultsIn identifies the result of this assignments message, to + // match against the next message's AppliesTo value and protect + // against missed messages. + string results_in = 3; + + // UpdateTasks is a set of new or updated tasks to run on this node. + // In the first assignments message, it contains all of the tasks + // to run on this node. Tasks outside of this set running on the node + // should be terminated. + repeated Task update_tasks = 4; + + // RemoveTasks is a set of previously-assigned task IDs to remove from the + // assignment set. It is not used in the first assignments message of + // a stream. + repeated string remove_tasks = 5; +} diff --git a/api/raft.pb.go b/api/raft.pb.go index 1cb2e3173f..96180d389c 100644 --- a/api/raft.pb.go +++ b/api/raft.pb.go @@ -163,7 +163,7 @@ func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequ func (*InternalRaftRequest) ProtoMessage() {} func (*InternalRaftRequest) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{9} } -// StoreAction defines a taret and operation to apply on the storage system. +// StoreAction defines a target and operation to apply on the storage system. type StoreAction struct { Action StoreActionKind `protobuf:"varint,1,opt,name=action,proto3,enum=docker.swarmkit.v1.StoreActionKind" json:"action,omitempty"` // Types that are valid to be assigned to Target: diff --git a/api/raft.proto b/api/raft.proto index 911de323f4..e5a0ffb4d8 100644 --- a/api/raft.proto +++ b/api/raft.proto @@ -115,7 +115,7 @@ enum StoreActionKind { STORE_ACTION_REMOVE = 3 [(gogoproto.enumvalue_customname) = "StoreActionKindRemove"]; } -// StoreAction defines a taret and operation to apply on the storage system. +// StoreAction defines a target and operation to apply on the storage system. message StoreAction { StoreActionKind action = 1; oneof target { diff --git a/api/types.pb.go b/api/types.pb.go index dbd8286cdc..60d2a04441 100644 --- a/api/types.pb.go +++ b/api/types.pb.go @@ -121,6 +121,8 @@ UpdateTaskStatusResponse TasksRequest TasksMessage + AssignmentsRequest + AssignmentsMessage NodeCertificateStatusRequest NodeCertificateStatusResponse IssueNodeCertificateRequest diff --git a/manager/dispatcher/dispatcher.go b/manager/dispatcher/dispatcher.go index 791f7a7882..7622a00fc9 100644 --- a/manager/dispatcher/dispatcher.go +++ b/manager/dispatcher/dispatcher.go @@ -3,6 +3,7 @@ package dispatcher import ( "errors" "fmt" + "strconv" "sync" "time" @@ -41,6 +42,9 @@ const ( // into a single transaction. A fraction of a second feels about // right. maxBatchInterval = 100 * time.Millisecond + + modificationBatchLimit = 100 + batchingWaitTime = 100 * time.Millisecond ) var ( @@ -668,14 +672,10 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe } // bursty events should be processed in batches and sent out snapshot - const ( - modificationBatchLimit = 200 - eventPausedGap = 50 * time.Millisecond - ) var ( - modificationCnt int - eventPausedTimer *time.Timer - eventPausedTimeout <-chan time.Time + modificationCnt int + batchingTimer *time.Timer + batchingTimeout <-chan time.Time ) batchingLoop: @@ -703,13 +703,189 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe delete(tasksMap, v.Task.ID) modificationCnt++ } - if eventPausedTimer != nil { - eventPausedTimer.Reset(eventPausedGap) + if batchingTimer != nil { + batchingTimer.Reset(batchingWaitTime) } else { - eventPausedTimer = time.NewTimer(eventPausedGap) - eventPausedTimeout = eventPausedTimer.C + batchingTimer = time.NewTimer(batchingWaitTime) + batchingTimeout = batchingTimer.C + } + case <-batchingTimeout: + break batchingLoop + case <-stream.Context().Done(): + return stream.Context().Err() + case <-d.ctx.Done(): + return d.ctx.Err() + } + } + + if batchingTimer != nil { + batchingTimer.Stop() + } + } +} + +// Assignments is a stream of assignments for a node. Each message contains +// either full list of tasks and secrets for the node, or an incremental update. +func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error { + nodeInfo, err := ca.RemoteNode(stream.Context()) + if err != nil { + return err + } + nodeID := nodeInfo.NodeID + + if err := d.isRunningLocked(); err != nil { + return err + } + + fields := logrus.Fields{ + "node.id": nodeID, + "node.session": r.SessionID, + "method": "(*Dispatcher).Assignments", + } + if nodeInfo.ForwardedBy != nil { + fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID + } + log := log.G(stream.Context()).WithFields(fields) + log.Debugf("") + + if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil { + return err + } + + var ( + sequence int64 + appliesTo string + initial api.AssignmentsMessage + ) + tasksMap := make(map[string]*api.Task) + + sendMessage := func(msg api.AssignmentsMessage, assignmentType api.AssignmentsMessage_Type) error { + sequence++ + msg.AppliesTo = appliesTo + msg.ResultsIn = strconv.FormatInt(sequence, 10) + appliesTo = msg.ResultsIn + msg.Type = assignmentType + + if err := stream.Send(&msg); err != nil { + return err + } + return nil + } + + // TODO(aaronl): Also send node secrets that should be exposed to + // this node. + nodeTasks, cancel, err := store.ViewAndWatch( + d.store, + func(readTx store.ReadTx) error { + tasks, err := store.FindTasks(readTx, store.ByNodeID(nodeID)) + if err != nil { + return err + } + + for _, t := range tasks { + // We only care about tasks that are ASSIGNED or + // higher. If the state is below ASSIGNED, the + // task may not meet the constraints for this + // node, so we have to be careful about sending + // secrets associated with it. + if t.Status.State < api.TaskStateAssigned { + continue + } + + tasksMap[t.ID] = t + initial.UpdateTasks = append(initial.UpdateTasks, t) + } + return nil + }, + state.EventUpdateTask{Task: &api.Task{NodeID: nodeID}, + Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}}, + state.EventDeleteTask{Task: &api.Task{NodeID: nodeID}, + Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}}, + ) + if err != nil { + return err + } + defer cancel() + + if err := sendMessage(initial, api.AssignmentsMessage_COMPLETE); err != nil { + return err + } + + for { + // Check for session expiration + if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil { + return err + } + + // bursty events should be processed in batches and sent out together + var ( + update api.AssignmentsMessage + modificationCnt int + batchingTimer *time.Timer + batchingTimeout <-chan time.Time + updateTasks = make(map[string]*api.Task) + removeTasks = make(map[string]struct{}) + ) + + oneModification := func() { + modificationCnt++ + + if batchingTimer != nil { + batchingTimer.Reset(batchingWaitTime) + } else { + batchingTimer = time.NewTimer(batchingWaitTime) + batchingTimeout = batchingTimer.C + } + } + + // The batching loop waits for 50 ms after the most recent + // change, or until modificationBatchLimit is reached. The + // worst case latency is modificationBatchLimit * batchingWaitTime, + // which is 10 seconds. + batchingLoop: + for modificationCnt < modificationBatchLimit { + select { + case event := <-nodeTasks: + switch v := event.(type) { + // We don't monitor EventCreateTask because tasks are + // never created in the ASSIGNED state. First tasks are + // created by the orchestrator, then the scheduler moves + // them to ASSIGNED. If this ever changes, we will need + // to monitor task creations as well. + case state.EventUpdateTask: + // We only care about tasks that are ASSIGNED or + // higher. + if v.Task.Status.State < api.TaskStateAssigned { + continue + } + + if oldTask, exists := tasksMap[v.Task.ID]; exists { + // States ASSIGNED and below are set by the orchestrator/scheduler, + // not the agent, so tasks in these states need to be sent to the + // agent even if nothing else has changed. + if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned { + // this update should not trigger a task change for the agent + tasksMap[v.Task.ID] = v.Task + continue + } + } + tasksMap[v.Task.ID] = v.Task + updateTasks[v.Task.ID] = v.Task + + oneModification() + case state.EventDeleteTask: + + if _, exists := tasksMap[v.Task.ID]; !exists { + continue + } + + removeTasks[v.Task.ID] = struct{}{} + + delete(tasksMap, v.Task.ID) + + oneModification() } - case <-eventPausedTimeout: + case <-batchingTimeout: break batchingLoop case <-stream.Context().Done(): return stream.Context().Err() @@ -718,8 +894,22 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe } } - if eventPausedTimer != nil { - eventPausedTimer.Stop() + if batchingTimer != nil { + batchingTimer.Stop() + } + + if modificationCnt > 0 { + for id, task := range updateTasks { + if _, ok := removeTasks[id]; !ok { + update.UpdateTasks = append(update.UpdateTasks, task) + } + } + for id := range removeTasks { + update.RemoveTasks = append(update.RemoveTasks, id) + } + if err := sendMessage(update, api.AssignmentsMessage_INCREMENTAL); err != nil { + return err + } } } } diff --git a/manager/dispatcher/dispatcher_test.go b/manager/dispatcher/dispatcher_test.go index 9ca1fb3dfc..5f961ab0d5 100644 --- a/manager/dispatcher/dispatcher_test.go +++ b/manager/dispatcher/dispatcher_test.go @@ -360,7 +360,7 @@ func TestTasks(t *testing.T) { { // without correct SessionID should fail - stream, err := gd.Clients[0].Tasks(context.Background(), &api.TasksRequest{}) + stream, err := gd.Clients[0].Assignments(context.Background(), &api.AssignmentsRequest{}) assert.NoError(t, err) assert.NotNil(t, stream) resp, err := stream.Recv() @@ -369,7 +369,7 @@ func TestTasks(t *testing.T) { assert.Equal(t, grpc.Code(err), codes.InvalidArgument) } - stream, err := gd.Clients[0].Tasks(context.Background(), &api.TasksRequest{SessionID: expectedSessionID}) + stream, err := gd.Clients[0].Assignments(context.Background(), &api.AssignmentsRequest{SessionID: expectedSessionID}) assert.NoError(t, err) time.Sleep(100 * time.Millisecond) @@ -377,19 +377,26 @@ func TestTasks(t *testing.T) { resp, err := stream.Recv() assert.NoError(t, err) // initially no tasks - assert.Equal(t, 0, len(resp.Tasks)) + assert.Equal(t, 0, len(resp.UpdateTasks)) + // Creating the tasks will not create an event for assignments err = gd.Store.Update(func(tx store.Tx) error { assert.NoError(t, store.CreateTask(tx, testTask1)) assert.NoError(t, store.CreateTask(tx, testTask2)) return nil }) assert.NoError(t, err) + err = gd.Store.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, testTask1)) + assert.NoError(t, store.UpdateTask(tx, testTask2)) + return nil + }) + assert.NoError(t, err) resp, err = stream.Recv() assert.NoError(t, err) - assert.Equal(t, len(resp.Tasks), 2) - assert.True(t, resp.Tasks[0].ID == "testTask1" && resp.Tasks[1].ID == "testTask2" || resp.Tasks[0].ID == "testTask2" && resp.Tasks[1].ID == "testTask1") + assert.Equal(t, len(resp.UpdateTasks), 2) + assert.True(t, resp.UpdateTasks[0].ID == "testTask1" && resp.UpdateTasks[1].ID == "testTask2" || resp.UpdateTasks[0].ID == "testTask2" && resp.UpdateTasks[1].ID == "testTask1") err = gd.Store.Update(func(tx store.Tx) error { assert.NoError(t, store.UpdateTask(tx, &api.Task{ @@ -404,8 +411,8 @@ func TestTasks(t *testing.T) { resp, err = stream.Recv() assert.NoError(t, err) - assert.Equal(t, len(resp.Tasks), 2) - for _, task := range resp.Tasks { + assert.Equal(t, 1, len(resp.UpdateTasks)) + for _, task := range resp.UpdateTasks { if task.ID == "testTask1" { assert.Equal(t, task.DesiredState, api.TaskStateRunning) } @@ -420,7 +427,7 @@ func TestTasks(t *testing.T) { resp, err = stream.Recv() assert.NoError(t, err) - assert.Equal(t, len(resp.Tasks), 0) + assert.Equal(t, len(resp.UpdateTasks), 0) } func TestTasksStatusChange(t *testing.T) { @@ -456,7 +463,7 @@ func TestTasksStatusChange(t *testing.T) { { // without correct SessionID should fail - stream, err := gd.Clients[0].Tasks(context.Background(), &api.TasksRequest{}) + stream, err := gd.Clients[0].Assignments(context.Background(), &api.AssignmentsRequest{}) assert.NoError(t, err) assert.NotNil(t, stream) resp, err := stream.Recv() @@ -465,7 +472,7 @@ func TestTasksStatusChange(t *testing.T) { assert.Equal(t, grpc.Code(err), codes.InvalidArgument) } - stream, err := gd.Clients[0].Tasks(context.Background(), &api.TasksRequest{SessionID: expectedSessionID}) + stream, err := gd.Clients[0].Assignments(context.Background(), &api.AssignmentsRequest{SessionID: expectedSessionID}) assert.NoError(t, err) time.Sleep(100 * time.Millisecond) @@ -473,19 +480,26 @@ func TestTasksStatusChange(t *testing.T) { resp, err := stream.Recv() assert.NoError(t, err) // initially no tasks - assert.Equal(t, 0, len(resp.Tasks)) + assert.Equal(t, 0, len(resp.UpdateTasks)) + // Creating the tasks will not create an event for assignments err = gd.Store.Update(func(tx store.Tx) error { assert.NoError(t, store.CreateTask(tx, testTask1)) assert.NoError(t, store.CreateTask(tx, testTask2)) return nil }) assert.NoError(t, err) + err = gd.Store.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, testTask1)) + assert.NoError(t, store.UpdateTask(tx, testTask2)) + return nil + }) + assert.NoError(t, err) resp, err = stream.Recv() assert.NoError(t, err) - assert.Equal(t, len(resp.Tasks), 2) - assert.True(t, resp.Tasks[0].ID == "testTask1" && resp.Tasks[1].ID == "testTask2" || resp.Tasks[0].ID == "testTask2" && resp.Tasks[1].ID == "testTask1") + assert.Equal(t, len(resp.UpdateTasks), 2) + assert.True(t, resp.UpdateTasks[0].ID == "testTask1" && resp.UpdateTasks[1].ID == "testTask2" || resp.UpdateTasks[0].ID == "testTask2" && resp.UpdateTasks[1].ID == "testTask1") err = gd.Store.Update(func(tx store.Tx) error { assert.NoError(t, store.UpdateTask(tx, &api.Task{ @@ -542,20 +556,27 @@ func TestTasksBatch(t *testing.T) { Status: api.TaskStatus{State: api.TaskStateAssigned}, } - stream, err := gd.Clients[0].Tasks(context.Background(), &api.TasksRequest{SessionID: expectedSessionID}) + stream, err := gd.Clients[0].Assignments(context.Background(), &api.AssignmentsRequest{SessionID: expectedSessionID}) assert.NoError(t, err) resp, err := stream.Recv() assert.NoError(t, err) // initially no tasks - assert.Equal(t, 0, len(resp.Tasks)) + assert.Equal(t, 0, len(resp.UpdateTasks)) + // Create, Update and Delete tasks. err = gd.Store.Update(func(tx store.Tx) error { assert.NoError(t, store.CreateTask(tx, testTask1)) assert.NoError(t, store.CreateTask(tx, testTask2)) return nil }) assert.NoError(t, err) + err = gd.Store.Update(func(tx store.Tx) error { + assert.NoError(t, store.UpdateTask(tx, testTask1)) + assert.NoError(t, store.UpdateTask(tx, testTask2)) + return nil + }) + assert.NoError(t, err) err = gd.Store.Update(func(tx store.Tx) error { assert.NoError(t, store.DeleteTask(tx, testTask1.ID)) @@ -567,7 +588,8 @@ func TestTasksBatch(t *testing.T) { resp, err = stream.Recv() assert.NoError(t, err) // all tasks have been deleted - assert.Equal(t, len(resp.Tasks), 0) + assert.Equal(t, len(resp.UpdateTasks), 0) + assert.Equal(t, len(resp.RemoveTasks), 2) } func TestTasksNoCert(t *testing.T) { @@ -575,7 +597,7 @@ func TestTasksNoCert(t *testing.T) { assert.NoError(t, err) defer gd.Close() - stream, err := gd.Clients[2].Tasks(context.Background(), &api.TasksRequest{}) + stream, err := gd.Clients[2].Assignments(context.Background(), &api.AssignmentsRequest{}) assert.NoError(t, err) assert.NotNil(t, stream) resp, err := stream.Recv()