diff --git a/vendor.conf b/vendor.conf index 33e2dc84454f4..ab7d99c14673c 100644 --- a/vendor.conf +++ b/vendor.conf @@ -125,7 +125,7 @@ github.com/stevvooe/ttrpc d4528379866b0ce7e9d71f3eb96f0582fc374577 github.com/gogo/googleapis 08a7655d27152912db7aaf4f983275eaf8d128ef # cluster -github.com/docker/swarmkit edd5641391926a50bc5f7040e20b7efc05003c26 +github.com/docker/swarmkit 199cf49cd99690135d99e52a1907ec82e8113c4f github.com/gogo/protobuf v1.0.0 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/fernet/fernet-go 1b2437bc582b3cfbb341ee5a29f8ef5b42912ff2 diff --git a/vendor/github.com/docker/swarmkit/api/objects.pb.go b/vendor/github.com/docker/swarmkit/api/objects.pb.go index e7c95c51756b6..5ae93011c8fca 100644 --- a/vendor/github.com/docker/swarmkit/api/objects.pb.go +++ b/vendor/github.com/docker/swarmkit/api/objects.pb.go @@ -2014,6 +2014,10 @@ func sozObjects(x uint64) (n int) { type NodeCheckFunc func(t1, t2 *Node) bool +type EventNode interface { + IsEventNode() bool +} + type EventCreateNode struct { Node *Node Checks []NodeCheckFunc @@ -2033,6 +2037,14 @@ func (e EventCreateNode) Matches(apiEvent go_events.Event) bool { return true } +func (e EventCreateNode) IsEventCreate() bool { + return true +} + +func (e EventCreateNode) IsEventNode() bool { + return true +} + type EventUpdateNode struct { Node *Node OldNode *Node @@ -2053,6 +2065,14 @@ func (e EventUpdateNode) Matches(apiEvent go_events.Event) bool { return true } +func (e EventUpdateNode) IsEventUpdate() bool { + return true +} + +func (e EventUpdateNode) IsEventNode() bool { + return true +} + type EventDeleteNode struct { Node *Node Checks []NodeCheckFunc @@ -2071,6 +2091,15 @@ func (e EventDeleteNode) Matches(apiEvent go_events.Event) bool { } return true } + +func (e EventDeleteNode) IsEventDelete() bool { + return true +} + +func (e EventDeleteNode) IsEventNode() bool { + return true +} + func (m *Node) CopyStoreObject() StoreObject { return m.Copy() } @@ -2261,6 +2290,10 @@ func (indexer NodeCustomIndexer) FromObject(obj interface{}) (bool, [][]byte, er type ServiceCheckFunc func(t1, t2 *Service) bool +type EventService interface { + IsEventService() bool +} + type EventCreateService struct { Service *Service Checks []ServiceCheckFunc @@ -2280,6 +2313,14 @@ func (e EventCreateService) Matches(apiEvent go_events.Event) bool { return true } +func (e EventCreateService) IsEventCreate() bool { + return true +} + +func (e EventCreateService) IsEventService() bool { + return true +} + type EventUpdateService struct { Service *Service OldService *Service @@ -2300,6 +2341,14 @@ func (e EventUpdateService) Matches(apiEvent go_events.Event) bool { return true } +func (e EventUpdateService) IsEventUpdate() bool { + return true +} + +func (e EventUpdateService) IsEventService() bool { + return true +} + type EventDeleteService struct { Service *Service Checks []ServiceCheckFunc @@ -2318,6 +2367,15 @@ func (e EventDeleteService) Matches(apiEvent go_events.Event) bool { } return true } + +func (e EventDeleteService) IsEventDelete() bool { + return true +} + +func (e EventDeleteService) IsEventService() bool { + return true +} + func (m *Service) CopyStoreObject() StoreObject { return m.Copy() } @@ -2478,6 +2536,10 @@ func (indexer ServiceCustomIndexer) FromObject(obj interface{}) (bool, [][]byte, type TaskCheckFunc func(t1, t2 *Task) bool +type EventTask interface { + IsEventTask() bool +} + type EventCreateTask struct { Task *Task Checks []TaskCheckFunc @@ -2497,6 +2559,14 @@ func (e EventCreateTask) Matches(apiEvent go_events.Event) bool { return true } +func (e EventCreateTask) IsEventCreate() bool { + return true +} + +func (e EventCreateTask) IsEventTask() bool { + return true +} + type EventUpdateTask struct { Task *Task OldTask *Task @@ -2517,6 +2587,14 @@ func (e EventUpdateTask) Matches(apiEvent go_events.Event) bool { return true } +func (e EventUpdateTask) IsEventUpdate() bool { + return true +} + +func (e EventUpdateTask) IsEventTask() bool { + return true +} + type EventDeleteTask struct { Task *Task Checks []TaskCheckFunc @@ -2535,6 +2613,15 @@ func (e EventDeleteTask) Matches(apiEvent go_events.Event) bool { } return true } + +func (e EventDeleteTask) IsEventDelete() bool { + return true +} + +func (e EventDeleteTask) IsEventTask() bool { + return true +} + func (m *Task) CopyStoreObject() StoreObject { return m.Copy() } @@ -2738,6 +2825,10 @@ func (indexer TaskCustomIndexer) FromObject(obj interface{}) (bool, [][]byte, er type NetworkCheckFunc func(t1, t2 *Network) bool +type EventNetwork interface { + IsEventNetwork() bool +} + type EventCreateNetwork struct { Network *Network Checks []NetworkCheckFunc @@ -2757,6 +2848,14 @@ func (e EventCreateNetwork) Matches(apiEvent go_events.Event) bool { return true } +func (e EventCreateNetwork) IsEventCreate() bool { + return true +} + +func (e EventCreateNetwork) IsEventNetwork() bool { + return true +} + type EventUpdateNetwork struct { Network *Network OldNetwork *Network @@ -2777,6 +2876,14 @@ func (e EventUpdateNetwork) Matches(apiEvent go_events.Event) bool { return true } +func (e EventUpdateNetwork) IsEventUpdate() bool { + return true +} + +func (e EventUpdateNetwork) IsEventNetwork() bool { + return true +} + type EventDeleteNetwork struct { Network *Network Checks []NetworkCheckFunc @@ -2795,6 +2902,15 @@ func (e EventDeleteNetwork) Matches(apiEvent go_events.Event) bool { } return true } + +func (e EventDeleteNetwork) IsEventDelete() bool { + return true +} + +func (e EventDeleteNetwork) IsEventNetwork() bool { + return true +} + func (m *Network) CopyStoreObject() StoreObject { return m.Copy() } @@ -2955,6 +3071,10 @@ func (indexer NetworkCustomIndexer) FromObject(obj interface{}) (bool, [][]byte, type ClusterCheckFunc func(t1, t2 *Cluster) bool +type EventCluster interface { + IsEventCluster() bool +} + type EventCreateCluster struct { Cluster *Cluster Checks []ClusterCheckFunc @@ -2974,6 +3094,14 @@ func (e EventCreateCluster) Matches(apiEvent go_events.Event) bool { return true } +func (e EventCreateCluster) IsEventCreate() bool { + return true +} + +func (e EventCreateCluster) IsEventCluster() bool { + return true +} + type EventUpdateCluster struct { Cluster *Cluster OldCluster *Cluster @@ -2994,6 +3122,14 @@ func (e EventUpdateCluster) Matches(apiEvent go_events.Event) bool { return true } +func (e EventUpdateCluster) IsEventUpdate() bool { + return true +} + +func (e EventUpdateCluster) IsEventCluster() bool { + return true +} + type EventDeleteCluster struct { Cluster *Cluster Checks []ClusterCheckFunc @@ -3012,6 +3148,15 @@ func (e EventDeleteCluster) Matches(apiEvent go_events.Event) bool { } return true } + +func (e EventDeleteCluster) IsEventDelete() bool { + return true +} + +func (e EventDeleteCluster) IsEventCluster() bool { + return true +} + func (m *Cluster) CopyStoreObject() StoreObject { return m.Copy() } @@ -3172,6 +3317,10 @@ func (indexer ClusterCustomIndexer) FromObject(obj interface{}) (bool, [][]byte, type SecretCheckFunc func(t1, t2 *Secret) bool +type EventSecret interface { + IsEventSecret() bool +} + type EventCreateSecret struct { Secret *Secret Checks []SecretCheckFunc @@ -3191,6 +3340,14 @@ func (e EventCreateSecret) Matches(apiEvent go_events.Event) bool { return true } +func (e EventCreateSecret) IsEventCreate() bool { + return true +} + +func (e EventCreateSecret) IsEventSecret() bool { + return true +} + type EventUpdateSecret struct { Secret *Secret OldSecret *Secret @@ -3211,6 +3368,14 @@ func (e EventUpdateSecret) Matches(apiEvent go_events.Event) bool { return true } +func (e EventUpdateSecret) IsEventUpdate() bool { + return true +} + +func (e EventUpdateSecret) IsEventSecret() bool { + return true +} + type EventDeleteSecret struct { Secret *Secret Checks []SecretCheckFunc @@ -3229,6 +3394,15 @@ func (e EventDeleteSecret) Matches(apiEvent go_events.Event) bool { } return true } + +func (e EventDeleteSecret) IsEventDelete() bool { + return true +} + +func (e EventDeleteSecret) IsEventSecret() bool { + return true +} + func (m *Secret) CopyStoreObject() StoreObject { return m.Copy() } @@ -3389,6 +3563,10 @@ func (indexer SecretCustomIndexer) FromObject(obj interface{}) (bool, [][]byte, type ConfigCheckFunc func(t1, t2 *Config) bool +type EventConfig interface { + IsEventConfig() bool +} + type EventCreateConfig struct { Config *Config Checks []ConfigCheckFunc @@ -3408,6 +3586,14 @@ func (e EventCreateConfig) Matches(apiEvent go_events.Event) bool { return true } +func (e EventCreateConfig) IsEventCreate() bool { + return true +} + +func (e EventCreateConfig) IsEventConfig() bool { + return true +} + type EventUpdateConfig struct { Config *Config OldConfig *Config @@ -3428,6 +3614,14 @@ func (e EventUpdateConfig) Matches(apiEvent go_events.Event) bool { return true } +func (e EventUpdateConfig) IsEventUpdate() bool { + return true +} + +func (e EventUpdateConfig) IsEventConfig() bool { + return true +} + type EventDeleteConfig struct { Config *Config Checks []ConfigCheckFunc @@ -3446,6 +3640,15 @@ func (e EventDeleteConfig) Matches(apiEvent go_events.Event) bool { } return true } + +func (e EventDeleteConfig) IsEventDelete() bool { + return true +} + +func (e EventDeleteConfig) IsEventConfig() bool { + return true +} + func (m *Config) CopyStoreObject() StoreObject { return m.Copy() } @@ -3606,6 +3809,10 @@ func (indexer ConfigCustomIndexer) FromObject(obj interface{}) (bool, [][]byte, type ResourceCheckFunc func(t1, t2 *Resource) bool +type EventResource interface { + IsEventResource() bool +} + type EventCreateResource struct { Resource *Resource Checks []ResourceCheckFunc @@ -3625,6 +3832,14 @@ func (e EventCreateResource) Matches(apiEvent go_events.Event) bool { return true } +func (e EventCreateResource) IsEventCreate() bool { + return true +} + +func (e EventCreateResource) IsEventResource() bool { + return true +} + type EventUpdateResource struct { Resource *Resource OldResource *Resource @@ -3645,6 +3860,14 @@ func (e EventUpdateResource) Matches(apiEvent go_events.Event) bool { return true } +func (e EventUpdateResource) IsEventUpdate() bool { + return true +} + +func (e EventUpdateResource) IsEventResource() bool { + return true +} + type EventDeleteResource struct { Resource *Resource Checks []ResourceCheckFunc @@ -3663,6 +3886,15 @@ func (e EventDeleteResource) Matches(apiEvent go_events.Event) bool { } return true } + +func (e EventDeleteResource) IsEventDelete() bool { + return true +} + +func (e EventDeleteResource) IsEventResource() bool { + return true +} + func (m *Resource) CopyStoreObject() StoreObject { return m.Copy() } @@ -3829,6 +4061,10 @@ func (indexer ResourceCustomIndexer) FromObject(obj interface{}) (bool, [][]byte type ExtensionCheckFunc func(t1, t2 *Extension) bool +type EventExtension interface { + IsEventExtension() bool +} + type EventCreateExtension struct { Extension *Extension Checks []ExtensionCheckFunc @@ -3848,6 +4084,14 @@ func (e EventCreateExtension) Matches(apiEvent go_events.Event) bool { return true } +func (e EventCreateExtension) IsEventCreate() bool { + return true +} + +func (e EventCreateExtension) IsEventExtension() bool { + return true +} + type EventUpdateExtension struct { Extension *Extension OldExtension *Extension @@ -3868,6 +4112,14 @@ func (e EventUpdateExtension) Matches(apiEvent go_events.Event) bool { return true } +func (e EventUpdateExtension) IsEventUpdate() bool { + return true +} + +func (e EventUpdateExtension) IsEventExtension() bool { + return true +} + type EventDeleteExtension struct { Extension *Extension Checks []ExtensionCheckFunc @@ -3886,6 +4138,15 @@ func (e EventDeleteExtension) Matches(apiEvent go_events.Event) bool { } return true } + +func (e EventDeleteExtension) IsEventDelete() bool { + return true +} + +func (e EventDeleteExtension) IsEventExtension() bool { + return true +} + func (m *Extension) CopyStoreObject() StoreObject { return m.Copy() } diff --git a/vendor/github.com/docker/swarmkit/api/storeobject.go b/vendor/github.com/docker/swarmkit/api/storeobject.go index 48b50b72dd9d3..d140fa3e0ca21 100644 --- a/vendor/github.com/docker/swarmkit/api/storeobject.go +++ b/vendor/github.com/docker/swarmkit/api/storeobject.go @@ -38,6 +38,21 @@ type Event interface { Matches(events.Event) bool } +// EventCreate is an interface implemented by every creation event type +type EventCreate interface { + IsEventCreate() bool +} + +// EventUpdate is an interface impelemented by every update event type +type EventUpdate interface { + IsEventUpdate() bool +} + +// EventDelete is an interface implemented by every delete event type +type EventDelete interface { + IsEventDelete() +} + func customIndexer(kind string, annotations *Annotations) (bool, [][]byte, error) { var converted [][]byte diff --git a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go index 6f40fe3e238cc..407ced0b699e6 100644 --- a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go +++ b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go @@ -1145,11 +1145,8 @@ func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*a d.rpcRW.RLock() defer d.rpcRW.RUnlock() - // Its OK to call isRunning() here instead of isRunningLocked() - // because of the rpcRW readlock above. - // TODO(anshul) other uses of isRunningLocked() can probably - // also be removed. - if !d.isRunning() { + // TODO(anshul) Explore if its possible to check context here without locking. + if _, err := d.isRunningLocked(); err != nil { return nil, status.Errorf(codes.Aborted, "dispatcher is stopped") } diff --git a/vendor/github.com/docker/swarmkit/manager/metrics/collector.go b/vendor/github.com/docker/swarmkit/manager/metrics/collector.go index d5adcb8306436..384743707d6fc 100644 --- a/vendor/github.com/docker/swarmkit/manager/metrics/collector.go +++ b/vendor/github.com/docker/swarmkit/manager/metrics/collector.go @@ -5,21 +5,36 @@ import ( "strings" + "github.com/docker/go-events" metrics "github.com/docker/go-metrics" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/manager/state/store" ) var ( - ns = metrics.NewNamespace("swarm", "manager", nil) + ns = metrics.NewNamespace("swarm", "manager", nil) + + // counts of the various objects in swarmkit nodesMetric metrics.LabeledGauge + tasksMetric metrics.LabeledGauge + + // none of these objects have state, so they're just regular gauges + servicesMetric metrics.Gauge + networksMetric metrics.Gauge + secretsMetric metrics.Gauge + configsMetric metrics.Gauge ) func init() { nodesMetric = ns.NewLabeledGauge("nodes", "The number of nodes", "", "state") - for _, state := range api.NodeStatus_State_name { - nodesMetric.WithValues(strings.ToLower(state)).Set(0) - } + tasksMetric = ns.NewLabeledGauge("tasks", "The number of tasks in the cluster object store", metrics.Total, "state") + servicesMetric = ns.NewGauge("services", "The number of services in the cluster object store", metrics.Total) + networksMetric = ns.NewGauge("networks", "The number of networks in the cluster object store", metrics.Total) + secretsMetric = ns.NewGauge("secrets", "The number of secrets in the cluster object store", metrics.Total) + configsMetric = ns.NewGauge("configs", "The number of configs in the cluster object store", metrics.Total) + + resetMetrics() + metrics.Register(ns) } @@ -42,20 +57,6 @@ func NewCollector(store *store.MemoryStore) *Collector { } } -func (c *Collector) updateNodeState(prevNode, newNode *api.Node) { - // Skip updates if nothing changed. - if prevNode != nil && newNode != nil && prevNode.Status.State == newNode.Status.State { - return - } - - if prevNode != nil { - nodesMetric.WithValues(strings.ToLower(prevNode.Status.State.String())).Dec(1) - } - if newNode != nil { - nodesMetric.WithValues(strings.ToLower(newNode.Status.State.String())).Inc(1) - } -} - // Run contains the collector event loop func (c *Collector) Run(ctx context.Context) error { defer close(c.doneChan) @@ -65,9 +66,46 @@ func (c *Collector) Run(ctx context.Context) error { if err != nil { return err } - for _, node := range nodes { - c.updateNodeState(nil, node) + tasks, err := store.FindTasks(readTx, store.All) + if err != nil { + return err } + services, err := store.FindServices(readTx, store.All) + if err != nil { + return err + } + networks, err := store.FindNetworks(readTx, store.All) + if err != nil { + return err + } + secrets, err := store.FindSecrets(readTx, store.All) + if err != nil { + return err + } + configs, err := store.FindConfigs(readTx, store.All) + if err != nil { + return err + } + + for _, obj := range nodes { + c.handleEvent(obj.EventCreate()) + } + for _, obj := range tasks { + c.handleEvent(obj.EventCreate()) + } + for _, obj := range services { + c.handleEvent(obj.EventCreate()) + } + for _, obj := range networks { + c.handleEvent(obj.EventCreate()) + } + for _, obj := range secrets { + c.handleEvent(obj.EventCreate()) + } + for _, obj := range configs { + c.handleEvent(obj.EventCreate()) + } + return nil }) if err != nil { @@ -78,14 +116,7 @@ func (c *Collector) Run(ctx context.Context) error { for { select { case event := <-watcher: - switch v := event.(type) { - case api.EventCreateNode: - c.updateNodeState(nil, v.Node) - case api.EventUpdateNode: - c.updateNodeState(v.OldNode, v.Node) - case api.EventDeleteNode: - c.updateNodeState(v.Node, nil) - } + c.handleEvent(event) case <-c.stopChan: return nil } @@ -98,7 +129,131 @@ func (c *Collector) Stop() { <-c.doneChan // Clean the metrics on exit. + resetMetrics() +} + +// resetMetrics resets all metrics to their default (base) value +func resetMetrics() { for _, state := range api.NodeStatus_State_name { nodesMetric.WithValues(strings.ToLower(state)).Set(0) } + for _, state := range api.TaskState_name { + tasksMetric.WithValues(strings.ToLower(state)).Set(0) + } + servicesMetric.Set(0) + networksMetric.Set(0) + secretsMetric.Set(0) + configsMetric.Set(0) + +} + +// handleEvent handles a single incoming cluster event. +func (c *Collector) handleEvent(event events.Event) { + switch event.(type) { + case api.EventNode: + c.handleNodeEvent(event) + case api.EventTask: + c.handleTaskEvent(event) + case api.EventService: + c.handleServiceEvent(event) + case api.EventNetwork: + c.handleNetworkEvent(event) + case api.EventSecret: + c.handleSecretsEvent(event) + case api.EventConfig: + c.handleConfigsEvent(event) + } +} + +func (c *Collector) handleNodeEvent(event events.Event) { + var prevNode, newNode *api.Node + + switch v := event.(type) { + case api.EventCreateNode: + prevNode, newNode = nil, v.Node + case api.EventUpdateNode: + prevNode, newNode = v.OldNode, v.Node + case api.EventDeleteNode: + prevNode, newNode = v.Node, nil + } + + // Skip updates if nothing changed. + if prevNode != nil && newNode != nil && prevNode.Status.State == newNode.Status.State { + return + } + + if prevNode != nil { + nodesMetric.WithValues(strings.ToLower(prevNode.Status.State.String())).Dec(1) + } + if newNode != nil { + nodesMetric.WithValues(strings.ToLower(newNode.Status.State.String())).Inc(1) + } + return +} + +func (c *Collector) handleTaskEvent(event events.Event) { + var prevTask, newTask *api.Task + + switch v := event.(type) { + case api.EventCreateTask: + prevTask, newTask = nil, v.Task + case api.EventUpdateTask: + prevTask, newTask = v.OldTask, v.Task + case api.EventDeleteTask: + prevTask, newTask = v.Task, nil + } + + // Skip updates if nothing changed. + if prevTask != nil && newTask != nil && prevTask.Status.State == newTask.Status.State { + return + } + + if prevTask != nil { + tasksMetric.WithValues( + strings.ToLower(prevTask.Status.State.String()), + ).Dec(1) + } + if newTask != nil { + tasksMetric.WithValues( + strings.ToLower(newTask.Status.State.String()), + ).Inc(1) + } + + return +} + +func (c *Collector) handleServiceEvent(event events.Event) { + switch event.(type) { + case api.EventCreateService: + servicesMetric.Inc(1) + case api.EventDeleteService: + servicesMetric.Dec(1) + } +} + +func (c *Collector) handleNetworkEvent(event events.Event) { + switch event.(type) { + case api.EventCreateNetwork: + networksMetric.Inc(1) + case api.EventDeleteNetwork: + networksMetric.Dec(1) + } +} + +func (c *Collector) handleSecretsEvent(event events.Event) { + switch event.(type) { + case api.EventCreateSecret: + secretsMetric.Inc(1) + case api.EventDeleteSecret: + secretsMetric.Dec(1) + } +} + +func (c *Collector) handleConfigsEvent(event events.Event) { + switch event.(type) { + case api.EventCreateConfig: + configsMetric.Inc(1) + case api.EventDeleteConfig: + configsMetric.Dec(1) + } } diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/taskreaper/task_reaper.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/taskreaper/task_reaper.go index 1c93b77fbe62e..e3c2b8265dfac 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/taskreaper/task_reaper.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/taskreaper/task_reaper.go @@ -40,6 +40,12 @@ type TaskReaper struct { cleanup []string stopChan chan struct{} doneChan chan struct{} + + // tickSignal is a channel that, if non-nil and available, will be written + // to to signal that a tick has occurred. its sole purpose is for testing + // code, to verify that take cleanup attempts are happening when they + // should be. + tickSignal chan struct{} } // New creates a new TaskReaper. @@ -115,7 +121,34 @@ func (tr *TaskReaper) Run(ctx context.Context) { // Clean up when we hit TaskHistoryRetentionLimit or when the timer expires, // whichever happens first. + // + // Specifically, the way this should work: + // - Create a timer and immediately stop it. We don't want to fire the + // cleanup routine yet, because we just did a cleanup as part of the + // initialization above. + // - Launch into an event loop + // - When we receive an event, handle the event as needed + // - After receiving the event: + // - If minimum batch size (maxDirty) is exceeded with dirty + cleanup, + // then immediately launch into the cleanup routine + // - Otherwise, if the timer is stopped, start it (reset). + // - If the timer expires and the timer channel is signaled, then Stop the + // timer (so that it will be ready to be started again as needed), and + // execute the cleanup routine (tick) timer := time.NewTimer(reaperBatchingInterval) + timer.Stop() + + // If stop is somehow called AFTER the timer has expired, there will be a + // value in the timer.C channel. If there is such a value, we should drain + // it out. This select statement allows us to drain that value if it's + // present, or continue straight through otherwise. + select { + case <-timer.C: + default: + } + + // keep track with a boolean of whether the timer is currently stopped + isTimerStopped := true // Watch for: // 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot. @@ -153,16 +186,35 @@ func (tr *TaskReaper) Run(ctx context.Context) { } if len(tr.dirty)+len(tr.cleanup) > maxDirty { + // stop the timer, so we don't fire it. if we get another event + // after we do this cleaning, we will reset the timer then timer.Stop() + // if the timer had fired, drain out the value. + select { + case <-timer.C: + default: + } + isTimerStopped = true tr.tick() } else { - timer.Reset(reaperBatchingInterval) + if isTimerStopped { + timer.Reset(reaperBatchingInterval) + isTimerStopped = false + } } case <-timer.C: - timer.Stop() + // we can safely ignore draining off of the timer channel, because + // we already know that the timer has expired. + isTimerStopped = true tr.tick() case <-tr.stopChan: + // even though this doesn't really matter in this context, it's + // good hygiene to drain the value. timer.Stop() + select { + case <-timer.C: + default: + } return } } @@ -170,6 +222,16 @@ func (tr *TaskReaper) Run(ctx context.Context) { // tick performs task history cleanup. func (tr *TaskReaper) tick() { + // this signals that a tick has occurred. it exists solely for testing. + if tr.tickSignal != nil { + // try writing to this channel, but if it's full, fall straight through + // and ignore it. + select { + case tr.tickSignal <- struct{}{}: + default: + } + } + if len(tr.dirty) == 0 && len(tr.cleanup) == 0 { return } @@ -184,10 +246,24 @@ func (tr *TaskReaper) tick() { } // Check history of dirty tasks for cleanup. + // Note: Clean out the dirty set at the end of this tick iteration + // in all but one scenarios (documented below). + // When tick() finishes, the tasks in the slot were either cleaned up, + // or it was skipped because it didn't meet the criteria for cleaning. + // Either way, we can discard the dirty set because future events on + // that slot will cause the task to be readded to the dirty set + // at that point. + // + // The only case when we keep the slot dirty is when there are more + // than one running tasks present for a given slot. + // In that case, we need to keep the slot dirty to allow it to be + // cleaned when tick() is called next and one or more the tasks + // in that slot have stopped running. tr.store.View(func(tx store.ReadTx) { for dirty := range tr.dirty { service := store.GetService(tx, dirty.ServiceID) if service == nil { + delete(tr.dirty, dirty) continue } @@ -211,6 +287,7 @@ func (tr *TaskReaper) tick() { // Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history. if taskHistory < 0 { + delete(tr.dirty, dirty) continue } @@ -240,6 +317,7 @@ func (tr *TaskReaper) tick() { } if int64(len(historicTasks)) <= taskHistory { + delete(tr.dirty, dirty) continue } @@ -270,6 +348,12 @@ func (tr *TaskReaper) tick() { } } + // The only case when we keep the slot dirty at the end of tick() + // is when there are more than one running tasks present + // for a given slot. + // In that case, we keep the slot dirty to allow it to be + // cleaned when tick() is called next and one or more of + // the tasks in that slot have stopped running. if runningTasks <= 1 { delete(tr.dirty, dirty) }