Skip to content

Commit

Permalink
Add jobs support to CLI
Browse files Browse the repository at this point in the history
* Added two new modes accepted by the `--mode` flag
  * `replicated-job` creates a replicated job
  * `global-job` creates a global job.
* When using `replicated-job` mode, the `replicas` flag sets the
  `TotalCompletions` parameter of the job. This is the total number of
  tasks that will run
* Added a new flag, `max-concurrent`, for use with `replicated-job`
  mode. This flag sets the `MaxConcurrent` parameter of the job, which
  is the maximum number of replicas the job will run simultaneously.
* When using `replicated-job` or `global-job` mode, using any of the
  update parameter flags will result in an error, as jobs cannot be
  updated in the traditional sense.
* Updated the `docker service ls` UI to include the completion status
  (completed vs total tasks) if the service is a job.
* Updated the progress bars UI for service creation and update to
  support jobs. For jobs, there is displayed a bar covering the overall
  progress of the job (the number of tasks completed over the total
  number of tasks to complete).
* Added documentation explaining the use of the new flags, and of jobs
  in general.

Signed-off-by: Drew Erny <derny@mirantis.com>
  • Loading branch information
dperny committed Apr 24, 2020
1 parent 2c37970 commit 9375644
Show file tree
Hide file tree
Showing 14 changed files with 1,126 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cli/command/service/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newCreateCommand(dockerCli command.Cli) *cobra.Command {
},
}
flags := cmd.Flags()
flags.StringVar(&opts.mode, flagMode, "replicated", "Service mode (replicated or global)")
flags.StringVar(&opts.mode, flagMode, "replicated", "Service mode (replicated, global, replicated-job, or global-job)")
flags.StringVar(&opts.name, flagName, "", "Service name")

addServiceFlags(flags, opts, buildServiceDefaultFlagMapping())
Expand Down
29 changes: 28 additions & 1 deletion cli/command/service/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,10 @@ func (c *serviceContext) Mode() string {
return "global"
case c.service.Spec.Mode.Replicated != nil:
return "replicated"
case c.service.Spec.Mode.ReplicatedJob != nil:
return "replicated job"
case c.service.Spec.Mode.GlobalJob != nil:
return "global job"
default:
return ""
}
Expand All @@ -604,10 +608,33 @@ func (c *serviceContext) Mode() string {
func (c *serviceContext) Replicas() string {
s := &c.service

var running, desired uint64
var running, desired, completed uint64
if s.ServiceStatus != nil {
running = c.service.ServiceStatus.RunningTasks
desired = c.service.ServiceStatus.DesiredTasks
completed = c.service.ServiceStatus.CompletedTasks
}
// for jobs, we will not include the max per node, even if it is set. jobs
// include instead the progress of the job as a whole, in addition to the
// current running state. the system respects max per node, but if we
// included it in the list output, the lines for jobs would be entirely too
// long and make the UI look bad.
if s.Spec.Mode.ReplicatedJob != nil {
return fmt.Sprintf(
"%d/%d (%d/%d completed)",
running, desired, completed, *s.Spec.Mode.ReplicatedJob.TotalCompletions,
)
}
if s.Spec.Mode.GlobalJob != nil {
// for global jobs, we need to do a little math. desired tasks are only
// the tasks that have not yet actually reached the Completed state.
// Completed tasks have reached the completed state. the TOTAL number
// of tasks to run is the sum of the tasks desired to still complete,
// and the tasks actually completed.
return fmt.Sprintf(
"%d/%d (%d/%d completed)",
running, desired, completed, desired+completed,
)
}
if r := c.maxReplicas(); r > 0 {
return fmt.Sprintf("%d/%d (max %d per node)", running, desired, r)
Expand Down
50 changes: 50 additions & 0 deletions cli/command/service/formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import (
)

func TestServiceContextWrite(t *testing.T) {
var (
// we need a pair of variables for setting the job parameters, because
// those parameters take pointers to uint64, which we can't make as a
// literal
varThree uint64 = 3
varTen uint64 = 10
)
cases := []struct {
context formatter.Context
expected string
Expand All @@ -38,6 +45,8 @@ func TestServiceContextWrite(t *testing.T) {
01_baz baz global 1/3 *:80->8080/tcp
04_qux2 qux2 replicated 3/3 (max 2 per node)
03_qux10 qux10 replicated 2/3 (max 1 per node)
05_job1 zarp1 replicated job 2/3 (5/10 completed)
06_job2 zarp2 global job 1/1 (3/4 completed)
`,
},
{
Expand All @@ -46,6 +55,8 @@ func TestServiceContextWrite(t *testing.T) {
01_baz
04_qux2
03_qux10
05_job1
06_job2
`,
},
{
Expand All @@ -55,6 +66,8 @@ bar replicated
baz global
qux2 replicated
qux10 replicated
zarp1 replicated job
zarp2 global job
`,
},
{
Expand All @@ -64,6 +77,8 @@ bar
baz
qux2
qux10
zarp1
zarp2
`,
},
// Raw Format
Expand All @@ -77,6 +92,8 @@ qux10
id: 01_baz
id: 04_qux2
id: 03_qux10
id: 05_job1
id: 06_job2
`,
},
// Custom Format
Expand All @@ -86,6 +103,8 @@ id: 03_qux10
baz
qux2
qux10
zarp1
zarp2
`,
},
}
Expand Down Expand Up @@ -170,6 +189,37 @@ qux10
DesiredTasks: 3,
},
},
{
ID: "05_job1",
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{Name: "zarp1"},
Mode: swarm.ServiceMode{
ReplicatedJob: &swarm.ReplicatedJob{
MaxConcurrent: &varThree,
TotalCompletions: &varTen,
},
},
},
ServiceStatus: &swarm.ServiceStatus{
RunningTasks: 2,
DesiredTasks: 3,
CompletedTasks: 5,
},
},
{
ID: "06_job2",
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{Name: "zarp2"},
Mode: swarm.ServiceMode{
GlobalJob: &swarm.GlobalJob{},
},
},
ServiceStatus: &swarm.ServiceStatus{
RunningTasks: 1,
DesiredTasks: 1,
CompletedTasks: 3,
},
},
}
out := bytes.NewBufferString("")
testcase.context.Output = out
Expand Down
2 changes: 2 additions & 0 deletions cli/command/service/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func AppendServiceStatus(ctx context.Context, c client.APIClient, services []swa
status := map[string]*swarm.ServiceStatus{}
taskFilter := filters.NewArgs()
for i, s := range services {
// there is no need in this switch to check for job modes. jobs are not
// supported until after ServiceStatus was introduced.
switch {
case s.ServiceStatus != nil:
// Server already returned service-status, so we don't
Expand Down
80 changes: 69 additions & 11 deletions cli/command/service/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,9 @@ type serviceOptions struct {
resources resourceOptions
stopGrace opts.DurationOpt

replicas Uint64Opt
mode string
replicas Uint64Opt
mode string
maxConcurrent Uint64Opt

restartPolicy restartPolicyOptions
constraints opts.ListOpts
Expand Down Expand Up @@ -554,18 +555,45 @@ func (options *serviceOptions) ToServiceMode() (swarm.ServiceMode, error) {
switch options.mode {
case "global":
if options.replicas.Value() != nil {
return serviceMode, errors.Errorf("replicas can only be used with replicated mode")
return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode")
}

if options.maxReplicas > 0 {
return serviceMode, errors.New("replicas-max-per-node can only be used with replicated mode")
return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode")
}
if options.maxConcurrent.Value() != nil {
return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode")
}

serviceMode.Global = &swarm.GlobalService{}
case "replicated":
if options.maxConcurrent.Value() != nil {
return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode")
}

serviceMode.Replicated = &swarm.ReplicatedService{
Replicas: options.replicas.Value(),
}
case "replicated-job":
concurrent := options.maxConcurrent.Value()
if concurrent == nil {
concurrent = options.replicas.Value()
}
serviceMode.ReplicatedJob = &swarm.ReplicatedJob{
MaxConcurrent: concurrent,
TotalCompletions: options.replicas.Value(),
}
case "global-job":
if options.maxReplicas > 0 {
return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode")
}
if options.maxConcurrent.Value() != nil {
return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode")
}
if options.replicas.Value() != nil {
return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode")
}
serviceMode.GlobalJob = &swarm.GlobalJob{}
default:
return serviceMode, errors.Errorf("Unknown mode: %s, only replicated and global supported", options.mode)
}
Expand All @@ -579,14 +607,13 @@ func (options *serviceOptions) ToStopGracePeriod(flags *pflag.FlagSet) *time.Dur
return nil
}

func (options *serviceOptions) ToService(ctx context.Context, apiClient client.NetworkAPIClient, flags *pflag.FlagSet) (swarm.ServiceSpec, error) {
var service swarm.ServiceSpec

// makeEnv gets the environment variables from the command line options and
// returns a slice of strings to use in the service spec when doing ToService
func (options *serviceOptions) makeEnv() ([]string, error) {
envVariables, err := opts.ReadKVEnvStrings(options.envFile.GetAll(), options.env.GetAll())
if err != nil {
return service, err
return nil, err
}

currentEnv := make([]string, 0, len(envVariables))
for _, env := range envVariables { // need to process each var, in order
k := strings.SplitN(env, "=", 2)[0]
Expand All @@ -601,6 +628,24 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N
currentEnv = append(currentEnv, env)
}

return currentEnv, nil
}

// ToService takes the set of flags passed to the command and converts them
// into a service spec.
//
// Takes an API client as the second argument in order to resolve network names
// from the flags into network IDs.
//
// Returns an error if any flags are invalid or contradictory.
func (options *serviceOptions) ToService(ctx context.Context, apiClient client.NetworkAPIClient, flags *pflag.FlagSet) (swarm.ServiceSpec, error) {
var service swarm.ServiceSpec

currentEnv, err := options.makeEnv()
if err != nil {
return service, err
}

healthConfig, err := options.healthcheck.toHealthConfig()
if err != nil {
return service, err
Expand All @@ -611,6 +656,16 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N
return service, err
}

updateConfig := options.update.updateConfig(flags)
rollbackConfig := options.rollback.rollbackConfig(flags)

// update and rollback configuration is not supported for jobs. If these
// flags are not set, then the values will be nil. If they are non-nil,
// then return an error.
if (serviceMode.ReplicatedJob != nil || serviceMode.GlobalJob != nil) && (updateConfig != nil || rollbackConfig != nil) {
return service, errors.Errorf("update and rollback configuration is not supported for jobs")
}

networks := convertNetworks(options.networks)
for i, net := range networks {
nwID, err := resolveNetworkID(ctx, apiClient, net.Target)
Expand Down Expand Up @@ -671,8 +726,8 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N
LogDriver: options.logDriver.toLogDriver(),
},
Mode: serviceMode,
UpdateConfig: options.update.updateConfig(flags),
RollbackConfig: options.rollback.rollbackConfig(flags),
UpdateConfig: updateConfig,
RollbackConfig: rollbackConfig,
EndpointSpec: options.endpoint.ToEndpointSpec(),
}

Expand Down Expand Up @@ -769,6 +824,8 @@ func addServiceFlags(flags *pflag.FlagSet, opts *serviceOptions, defaultFlagValu

flags.Var(&opts.stopGrace, flagStopGracePeriod, flagDesc(flagStopGracePeriod, "Time to wait before force killing a container (ns|us|ms|s|m|h)"))
flags.Var(&opts.replicas, flagReplicas, "Number of tasks")
flags.Var(&opts.maxConcurrent, flagConcurrent, "Number of job tasks to run concurrently (default equal to --replicas)")
flags.SetAnnotation(flagConcurrent, "version", []string{"1.41"})
flags.Uint64Var(&opts.maxReplicas, flagMaxReplicas, defaultFlagValues.getUint64(flagMaxReplicas), "Maximum number of tasks per node (default 0 = unlimited)")
flags.SetAnnotation(flagMaxReplicas, "version", []string{"1.40"})

Expand Down Expand Up @@ -878,6 +935,7 @@ const (
flagLimitCPU = "limit-cpu"
flagLimitMemory = "limit-memory"
flagMaxReplicas = "replicas-max-per-node"
flagConcurrent = "max-concurrent"
flagMode = "mode"
flagMount = "mount"
flagMountRemove = "mount-rm"
Expand Down
2 changes: 1 addition & 1 deletion cli/command/service/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestToServiceMaxReplicasGlobalModeConflict(t *testing.T) {
maxReplicas: 1,
}
_, err := opt.ToServiceMode()
assert.Error(t, err, "replicas-max-per-node can only be used with replicated mode")
assert.Error(t, err, "replicas-max-per-node can only be used with replicated or replicated-job mode")
}

func TestToServiceSysCtls(t *testing.T) {
Expand Down
Loading

0 comments on commit 9375644

Please sign in to comment.