Skip to content

Commit

Permalink
feat: more beautiful progress bar
Browse files Browse the repository at this point in the history
Signed-off-by: joyceliu <joyceliu@yunify.com>
  • Loading branch information
joyceliu committed Aug 2, 2024
1 parent 0b3aae2 commit 031c8da
Show file tree
Hide file tree
Showing 24 changed files with 291 additions and 81 deletions.
2 changes: 1 addition & 1 deletion builtin/roles/addons/cni/tasks/kubeovn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
- name: Install kubeovn
command: |
helm install kubeovn /etc/kubernetes/cni/kubeovn-{{ .kubeovn_version }}.tgz --set replicaCount={{ .cni.kubeovn.replica }} \
{{- $ips := list -}}
{{ $ips := list }}
{{- range .groups.kube_control_plane -}}
{{- $ips = append $ips (index $.inventory_hosts . "internal_ipv4") -}}
{{- end -}}
Expand Down
2 changes: 1 addition & 1 deletion builtin/roles/install/etcd/templates/etcd.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ETCD_LISTEN_PEER_URLS={{ printf "https://%s:2380" .internal_ipv4 }}
ETCD_NAME={{ .inventory_name }}
ETCD_PROXY=off
ETCD_ENABLE_V2=true
{{- $ips := list -}}
{{ $ips := list }}
{{- range .groups.etcd -}}
{{- $ips = append $ips (printf "%s=https://%s:2380" (index $.inventory_hosts . "inventory_name") (index $.inventory_hosts . "internal_ipv4")) -}}
{{- end -}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
register: docker_install_version

- name: Install docker
when: or (.docker_install_version.stderr | ne "") (not .docker_install_version.stdout | hasPrefix (printf "Docker version %s," .docker_version))
when: or (.docker_install_version.stderr | ne "") (.docker_install_version.stdout | hasPrefix (printf "Docker version %s," .docker_version) | not)
block:
- name: Sync docker binary to remote
copy:
Expand Down
2 changes: 1 addition & 1 deletion builtin/roles/install/image-registry/tasks/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ignore_errors: true
command: systemctl status harbor.service
register: image_registry_service
- include_tasks: install_registry.yaml
- include_tasks: install_harbor.yaml
when: .image_registry_service.stderr | ne ""

- name: Install registry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
{{- if and .cri.docker.bridge_ip (ne .cri.docker.bridge_ip "") }}
"bip": "{{ .cri.docker.bridge_ip }}",
{{- end }}
"exec-opts": ["native.cgroupdriver={{ .cri.cgroup_driver }}"]
"exec-opts": ["native.cgroupdriver={{ .cri.cgroup_driver | default "systemd" }}"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ spec:
value: "true"
- name: bgp_routerid
value: |
{{- $ips := list -}}
{{ $ips := list }}
{{- range .groups.kube_control_plane -}}
{{- $ips = append $ips (index $.inventory_hosts . "internal_ipv4") -}}
{{- end -}}
Expand All @@ -43,7 +43,7 @@ spec:
value: "65000"
- name: bgp_peers
value: |
{{- $ips := list -}}
{{ $ips := list }}
{{- range .groups.kube_control_plane -}}
{{- $ips = append $ips (printf "%s:65000::false" (index $.inventory_hosts . "internal_ipv4")) -}}
{{- end -}}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/opencontainers/image-spec v1.1.0
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.13.6
github.com/schollz/progressbar/v3 v3.14.3
github.com/schollz/progressbar/v3 v3.14.5
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -115,8 +115,8 @@ require (
golang.org/x/net v0.20.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.17.0 // indirect
Expand Down
14 changes: 6 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/schollz/progressbar/v3 v3.14.3 h1:oOuWW19ka12wxYU1XblR4n16wF/2Y1dBLMarMo6p4xU=
github.com/schollz/progressbar/v3 v3.14.3/go.mod h1:aT3UQ7yGm+2ZjeXPqsjTenwL3ddUiuZ0kfQ/2tHlyNI=
github.com/schollz/progressbar/v3 v3.14.5 h1:97RrSxbBASxQuZN9yemnyGrFZ/swnG6IrEe2R0BseX8=
github.com/schollz/progressbar/v3 v3.14.5/go.mod h1:Nrzpuw3Nl0srLY0VlTvC4V6RL50pcEymjy6qyJAaLa0=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
Expand Down Expand Up @@ -341,18 +341,16 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA=
golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
130 changes: 75 additions & 55 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/schollz/progressbar/v3"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -56,9 +59,10 @@ func NewTaskExecutor(client ctrlclient.Client, pipeline *kubekeyv1.Pipeline) Tas
}

return &executor{
client: client,
pipeline: pipeline,
variable: v,
client: client,
pipeline: pipeline,
variable: v,
logOutput: os.Stdout,
}
}

Expand All @@ -67,6 +71,8 @@ type executor struct {

pipeline *kubekeyv1.Pipeline
variable variable.Variable

logOutput io.Writer
}

type execBlockOptions struct {
Expand Down Expand Up @@ -234,6 +240,7 @@ func (e executor) getGatherFact(ctx context.Context, hostname string, vars varia
return nil, nil
}

// execBlock loop block and generate task.
func (e executor) execBlock(ctx context.Context, options execBlockOptions) error {
for _, at := range options.blocks {
if !kkcorev1.JoinTag(at.Taggable, options.tags).IsEnabled(e.pipeline.Spec.Tags, e.pipeline.Spec.SkipTags) {
Expand Down Expand Up @@ -343,7 +350,7 @@ func (e executor) execBlock(ctx context.Context, options execBlockOptions) error
}

for {
klog.Infof("[Task %s] task exec \"%s\" begin for %v times", ctrlclient.ObjectKeyFromObject(task), task.Spec.Name, task.Status.RestartCount+1)
fmt.Fprintf(e.logOutput, "%s [Task %s] \"%s\":\"%s\" exec:%v times\n", time.Now().Format(time.RFC3339), ctrlclient.ObjectKeyFromObject(task), task.Annotations[kubekeyv1alpha1.TaskAnnotationRole], task.Spec.Name, task.Status.RestartCount+1)
// exec task
task.Status.Phase = kubekeyv1alpha1.TaskPhaseRunning
if err := e.client.Status().Update(ctx, task); err != nil {
Expand All @@ -362,7 +369,6 @@ func (e executor) execBlock(ctx context.Context, options execBlockOptions) error
break
}
}
klog.Infof("[Task %s] task exec \"%s\" end status is %s", ctrlclient.ObjectKeyFromObject(task), task.Spec.Name, task.Status.Phase)
e.pipeline.Status.TaskResult.Total++
switch task.Status.Phase {
case kubekeyv1alpha1.TaskPhaseSuccess:
Expand Down Expand Up @@ -395,26 +401,16 @@ func (e executor) execBlock(ctx context.Context, options execBlockOptions) error
return nil
}

// executeTask parallel in each host.
func (e executor) executeTask(ctx context.Context, task *kubekeyv1alpha1.Task, options execBlockOptions) error {
// check task host results
wg := &wait.Group{}
task.Status.HostResults = make([]kubekeyv1alpha1.TaskHostResult, len(task.Spec.Hosts))

for i, h := range task.Spec.Hosts {
wg.StartWithContext(ctx, func(ctx context.Context) {
// task result
var stdout, stderr string

// progress bar for task
var bar = progressbar.NewOptions(1,
progressbar.OptionEnableColorCodes(true),
progressbar.OptionSetDescription(fmt.Sprintf("[%s] running...", h)),
progressbar.OptionOnCompletion(func() {
if _, err := os.Stdout.WriteString("\n"); err != nil {
klog.ErrorS(err, "failed to write output", "host", h)
}
}),
progressbar.OptionShowElapsedTimeOnFinish(),
progressbar.OptionSetPredictTime(false),
)
defer func() {
if task.Spec.Register != "" {
var stdoutResult any = stdout
Expand All @@ -434,47 +430,75 @@ func (e executor) executeTask(ctx context.Context, task *kubekeyv1alpha1.Task, o
return
}
}

switch {
case stderr != "": // failed
bar.Describe(fmt.Sprintf("[%s] failed", h))
if err := bar.Finish(); err != nil {
klog.ErrorS(err, "fail to finish bar")
}
klog.Errorf("[Task %s] run failed: %s", ctrlclient.ObjectKeyFromObject(task), stderr)
case stdout == "skip": // skip
bar.Describe(fmt.Sprintf("[%s] skip", h))
if err := bar.Finish(); err != nil {
klog.ErrorS(err, "fail to finish bar")
}
default: //success
bar.Describe(fmt.Sprintf("[%s] success", h))
if err := bar.Finish(); err != nil {
klog.ErrorS(err, "fail to finish bar")
}
if stderr != "" && task.Spec.IgnoreError != nil && *task.Spec.IgnoreError {
klog.V(4).ErrorS(fmt.Errorf("%s", stderr), "task run failed", "task", ctrlclient.ObjectKeyFromObject(task))
} else if stderr != "" {
klog.ErrorS(fmt.Errorf("%s", stderr), "task run failed", "task", ctrlclient.ObjectKeyFromObject(task))
}

// fill result
task.Status.HostResults[i] = kubekeyv1alpha1.TaskHostResult{
Host: h,
Stdout: stdout,
StdErr: stderr,
}
}()

// task log
// placeholder format task log
var placeholder string
if hostNameMaxLen, err := e.variable.Get(variable.GetHostMaxLength()); err == nil {
placeholder = strings.Repeat(" ", hostNameMaxLen.(int)-len(h))
}
// progress bar for task
var bar = progressbar.NewOptions(-1,
progressbar.OptionSetWriter(e.logOutput),
progressbar.OptionSpinnerType(59),
progressbar.OptionEnableColorCodes(true),
progressbar.OptionSetDescription(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[36mrunning\033[0m", h, placeholder)),
progressbar.OptionOnCompletion(func() {
if _, err := os.Stdout.WriteString("\n"); err != nil {
klog.ErrorS(err, "failed to write output", "host", h)
}
}),
progressbar.OptionShowElapsedTimeOnFinish(),
progressbar.OptionSetPredictTime(false),
)
go func() {
for !bar.IsFinished() {
if err := bar.Add(1); err != nil {
return
}
time.Sleep(100 * time.Millisecond)
}
}()
defer func() {
switch {
case stderr != "":
if task.Spec.IgnoreError != nil && *task.Spec.IgnoreError { // ignore
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mignore \033[0m ", h, placeholder))
} else { // failed
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[31mfailed \033[0m ", h, placeholder))
}
case stdout == "skip": // skip
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34mskip \033[0m", h, placeholder))
default: //success
bar.Describe(fmt.Sprintf("[\033[36m%s\033[0m]%s \033[34msuccess\033[0m", h, placeholder))
}
if err := bar.Finish(); err != nil {
klog.ErrorS(err, "finish bar error")
}
}()
// task execute
ha, err := e.variable.Get(variable.GetAllVariable(h))
if err != nil {
stderr = fmt.Sprintf("get variable error: %v", err)
return
}
// execute module with loop
loop, err := e.execLoop(ctx, ha.(map[string]any), task)
loop, err := e.parseLoop(ctx, ha.(map[string]any), task)
if err != nil {
stderr = fmt.Sprintf("parse loop vars error: %v", err)
return
}
bar.ChangeMax(len(loop)*3 + 1)

// check when condition
if len(task.Spec.When) > 0 {
ok, err := tmpl.ParseBool(ha.(map[string]any), task.Spec.When)
Expand All @@ -487,7 +511,7 @@ func (e executor) executeTask(ctx context.Context, task *kubekeyv1alpha1.Task, o
return
}
}

// if loop is empty. execute once, and the item is null
for _, item := range loop {
// set item to runtime variable
if err := e.variable.Merge(variable.MergeRuntimeVariable(h, map[string]any{
Expand All @@ -496,34 +520,25 @@ func (e executor) executeTask(ctx context.Context, task *kubekeyv1alpha1.Task, o
stderr = fmt.Sprintf("set loop item to variable error: %v", err)
return
}
if err := bar.Add(1); err != nil {
klog.ErrorS(err, "fail to add bar")
}
stdout, stderr = e.executeModule(ctx, task, modules.ExecOptions{
Args: task.Spec.Module.Args,
Host: h,
Variable: e.variable,
Task: *task,
Pipeline: *e.pipeline,
})
if err := bar.Add(1); err != nil {
klog.ErrorS(err, "fail to add bar")
}
// delete item
if err := e.variable.Merge(variable.MergeRuntimeVariable(h, map[string]any{
_const.VariableItem: nil,
})); err != nil {
stderr = fmt.Sprintf("clean loop item to variable error: %v", err)
return
}
if err := bar.Add(1); err != nil {
klog.ErrorS(err, "fail to add bar")
}
}
})
}
wg.Wait()

// host result for task
task.Status.Phase = kubekeyv1alpha1.TaskPhaseSuccess
for _, data := range task.Status.HostResults {
if data.StdErr != "" {
Expand All @@ -539,7 +554,11 @@ func (e executor) executeTask(ctx context.Context, task *kubekeyv1alpha1.Task, o
return nil
}

func (e executor) execLoop(ctx context.Context, ha map[string]any, task *kubekeyv1alpha1.Task) ([]any, error) {
// parseLoop parse loop to slice. if loop contains template string. convert it.
// loop is json string. try convertor to string slice by json.
// loop is normal string. set it to empty slice and return.
// loop is string slice. return it.
func (e executor) parseLoop(ctx context.Context, ha map[string]any, task *kubekeyv1alpha1.Task) ([]any, error) {
switch {
case task.Spec.Loop.Raw == nil:
// loop is not set. add one element to execute once module.
Expand All @@ -549,13 +568,14 @@ func (e executor) execLoop(ctx context.Context, ha map[string]any, task *kubekey
}
}

// executeModule find register module and execute it.
func (e executor) executeModule(ctx context.Context, task *kubekeyv1alpha1.Task, opts modules.ExecOptions) (string, string) {
// get all variable. which contains item.
lg, err := opts.Variable.Get(variable.GetAllVariable(opts.Host))
if err != nil {
klog.V(5).ErrorS(err, "get location variable error", "task", ctrlclient.ObjectKeyFromObject(task))
return "", err.Error()
}

// check failed when condition
if len(task.Spec.FailedWhen) > 0 {
ok, err := tmpl.ParseBool(lg.(map[string]any), task.Spec.FailedWhen)
Expand All @@ -571,7 +591,7 @@ func (e executor) executeModule(ctx context.Context, task *kubekeyv1alpha1.Task,
return modules.FindModule(task.Spec.Module.Name)(ctx, opts)
}

// merge defined variable to host variable
// mergeVariable to runtime variable
func (e executor) mergeVariable(ctx context.Context, v variable.Variable, vd map[string]any, hosts ...string) error {
if len(vd) == 0 {
// skip
Expand Down
Loading

0 comments on commit 031c8da

Please sign in to comment.