Skip to content

Commit

Permalink
Merge pull request #1586 from fuweid/enhance_adjust_data_stream_from_…
Browse files Browse the repository at this point in the history
…pull_api

enhance: adjust data stream from pouch pull api
  • Loading branch information
yyb196 authored Jul 6, 2018
2 parents 540f51b + f66c62c commit b58ff0c
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 182 deletions.
2 changes: 1 addition & 1 deletion apis/server/image_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *Server) pullImage(ctx context.Context, rw http.ResponseWriter, req *htt
}
}
// Error information has be sent to client, so no need call resp.Write
if err := s.ImageMgr.PullImage(ctx, image, &authConfig, rw); err != nil {
if err := s.ImageMgr.PullImage(ctx, image, &authConfig, newWriteFlusher(rw)); err != nil {
logrus.Errorf("failed to pull image %s: %v", image, err)
return nil
}
Expand Down
112 changes: 45 additions & 67 deletions cli/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/client"
"github.com/alibaba/pouch/credential"
"github.com/alibaba/pouch/ctrd"
"github.com/alibaba/pouch/pkg/jsonstream"
"github.com/alibaba/pouch/pkg/reference"

"github.com/containerd/containerd/progress"
Expand Down Expand Up @@ -93,62 +93,72 @@ func showProgress(body io.ReadCloser) error {
output = progress.NewWriter(os.Stdout)
}

dec := json.NewDecoder(body)
if _, err := dec.Token(); err != nil {
return fmt.Errorf("failed to read the opening token: %v", err)
}
pos := make(map[string]int)
status := []jsonstream.JSONMessage{}

refStatus := make(map[string]string)
for dec.More() {
var infos []ctrd.ProgressInfo
dec := json.NewDecoder(body)
for {
var (
msg jsonstream.JSONMessage
msgs []jsonstream.JSONMessage
)

if err := dec.Decode(&msg); err != nil {
if err == io.EOF {
break
}
return err
}

if err := dec.Decode(&infos); err != nil {
return fmt.Errorf("failed to decode: %v", err)
change := true
if _, ok := pos[msg.ID]; !ok {
status = append(status, msg)
pos[msg.ID] = len(status) - 1
} else {
change = (status[pos[msg.ID]].Status != msg.Status)
status[pos[msg.ID]] = msg
}

// only display the new status if the stdout is not terminal
if !isTerminal {
newInfos := make([]ctrd.ProgressInfo, 0)
for i, info := range infos {
old, ok := refStatus[info.Ref]
if !ok || info.Status != old {
refStatus[info.Ref] = info.Status
newInfos = append(newInfos, infos[i])
}
// if the status doesn't change, skip to avoid duplicate status
if !change {
continue
}

infos = newInfos
msgs = []jsonstream.JSONMessage{msg}
} else {
msgs = status
}

if err := displayProgressInfos(output, isTerminal, infos, start); err != nil {
if err := displayImageReferenceProgress(output, isTerminal, msgs, start); err != nil {
return fmt.Errorf("failed to display progress: %v", err)
}

if err := output.Flush(); err != nil {
return fmt.Errorf("failed to display progress: %v", err)
}
}

if _, err := dec.Token(); err != nil {
return fmt.Errorf("failed to read the closing token: %v", err)
}
return nil
}

// displayProgressInfos uses tabwriter to show current progress info.
func displayProgressInfos(output io.Writer, isTerminal bool, infos []ctrd.ProgressInfo, start time.Time) error {
// displayImageReferenceProgress uses tabwriter to show current progress status.
func displayImageReferenceProgress(output io.Writer, isTerminal bool, msgs []jsonstream.JSONMessage, start time.Time) error {
var (
tw = tabwriter.NewWriter(output, 1, 8, 1, ' ', 0)
total = int64(0)
tw = tabwriter.NewWriter(output, 1, 8, 1, ' ', 0)
current = int64(0)
)

for _, info := range infos {
if info.ErrorMessage != "" {
return fmt.Errorf(info.ErrorMessage)
for _, msg := range msgs {
if msg.Error != nil {
return fmt.Errorf(msg.Error.Message)
}

total += info.Offset
if _, err := fmt.Fprint(tw, formatProgressInfo(info, isTerminal)); err != nil {
if msg.Detail != nil {
current += msg.Detail.Current
}

status := jsonstream.PullReferenceStatus(!isTerminal, msg)
if _, err := fmt.Fprint(tw, status); err != nil {
return err
}
}
Expand All @@ -157,47 +167,15 @@ func displayProgressInfos(output io.Writer, isTerminal bool, infos []ctrd.Progre
if isTerminal {
_, err := fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
time.Since(start).Seconds(),
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
progress.Bytes(current),
progress.NewBytesPerSecond(current, time.Since(start)))
if err != nil {
return err
}
}
return tw.Flush()
}

// formatProgressInfo formats ProgressInfo into string.
func formatProgressInfo(info ctrd.ProgressInfo, isTerminal bool) string {
if !isTerminal {
return fmt.Sprintf("%s:\t%s\n", info.Ref, info.Status)
}

switch info.Status {
case "downloading", "uploading":
var bar progress.Bar
if info.Total > 0.0 {
bar = progress.Bar(float64(info.Offset) / float64(info.Total))
}
return fmt.Sprintf("%s:\t%s\t%40r\t%8.8s/%s\t\n",
info.Ref,
info.Status,
bar,
progress.Bytes(info.Offset), progress.Bytes(info.Total))

case "resolving", "waiting":
return fmt.Sprintf("%s:\t%s\t%40r\t\n",
info.Ref,
info.Status,
progress.Bar(0.0))

default:
return fmt.Sprintf("%s:\t%s\t%40r\t\n",
info.Ref,
info.Status,
progress.Bar(1.0))
}
}

// pullExample shows examples in pull command, and is used in auto-generated cli docs.
func pullExample() string {
return `$ pouch images
Expand Down
96 changes: 45 additions & 51 deletions ctrd/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,13 @@ func (c *Client) PullImage(ctx context.Context, ref string, authConfig *types.Au

if err != nil {
// Send Error information to client through stream
messages := []ProgressInfo{
{Code: http.StatusInternalServerError, ErrorMessage: err.Error()},
message := jsonstream.JSONMessage{
Error: &jsonstream.JSONError{
Code: http.StatusInternalServerError,
Message: err.Error(),
},
}
stream.WriteObject(messages)
stream.WriteObject(message)
return nil, err
}

Expand All @@ -165,26 +168,13 @@ func (c *Client) pullImage(ctx context.Context, wrapperCli *WrapperClient, ref s
return img, nil
}

// ProgressInfo represents the status of downloading image.
type ProgressInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time

// For Error handling
Code int // http response code
ErrorMessage string // detail error information
}

// FIXME(fuwei): put the fetchProgress into jsonstream and make it readable.
func (c *Client) fetchProgress(ctx context.Context, wrapperCli *WrapperClient, ongoing *jobs, stream *jsonstream.JSONStream) error {
var (
ticker = time.NewTicker(100 * time.Millisecond)
ticker = time.NewTicker(300 * time.Millisecond)
cs = wrapperCli.client.ContentStore()
start = time.Now()
progresses = map[string]ProgressInfo{}
progresses = map[string]jsonstream.JSONMessage{}
done bool
)
defer ticker.Stop()
Expand All @@ -193,30 +183,33 @@ outer:
for {
select {
case <-ticker.C:
resolved := "resolved"
resolved := jsonstream.PullStatusResolved
if !ongoing.isResolved() {
resolved = "resolving"
resolved = jsonstream.PullStatusResolving
}
progresses[ongoing.name] = ProgressInfo{
Ref: ongoing.name,
progresses[ongoing.name] = jsonstream.JSONMessage{
ID: ongoing.name,
Status: resolved,
Detail: &jsonstream.ProgressDetail{},
}
keys := []string{ongoing.name}

activeSeen := map[string]struct{}{}
if !done {
active, err := cs.ListStatuses(context.TODO(), "")
actives, err := cs.ListStatuses(context.TODO(), "")
if err != nil {
logrus.Errorf("failed to list statuses: %v", err)
continue
}
// update status of active entries!
for _, active := range active {
progresses[active.Ref] = ProgressInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
for _, active := range actives {
progresses[active.Ref] = jsonstream.JSONMessage{
ID: active.Ref,
Status: jsonstream.PullStatusDownloading,
Detail: &jsonstream.ProgressDetail{
Current: active.Offset,
Total: active.Total,
},
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
Expand All @@ -233,54 +226,55 @@ outer:
}

status, ok := progresses[key]
if !done && (!ok || status.Status == "downloading") {
if !done && (!ok || status.Status == jsonstream.PullStatusDownloading) {
info, err := cs.Info(context.TODO(), j.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.Errorf("failed to get content info: %v", err)
continue outer
} else {
progresses[key] = ProgressInfo{
Ref: key,
Status: "waiting",
progresses[key] = jsonstream.JSONMessage{
ID: key,
Status: jsonstream.PullStatusWaiting,
}
}
} else if info.CreatedAt.After(start) {
progresses[key] = ProgressInfo{
Ref: key,
Status: "done",
Offset: info.Size,
Total: info.Size,
progresses[key] = jsonstream.JSONMessage{
ID: key,
Status: jsonstream.PullStatusDone,
Detail: &jsonstream.ProgressDetail{
Current: info.Size,
Total: info.Size,
},
UpdatedAt: info.CreatedAt,
}
} else {
progresses[key] = ProgressInfo{
Ref: key,
Status: "exists",
progresses[key] = jsonstream.JSONMessage{
ID: key,
Status: jsonstream.PullStatusExists,
}
}
} else if done {
if ok {
if status.Status != "done" && status.Status != "exists" {
status.Status = "done"
if status.Status != jsonstream.PullStatusDone &&
status.Status != jsonstream.PullStatusExists {

status.Status = jsonstream.PullStatusDone
progresses[key] = status
}
} else {
progresses[key] = ProgressInfo{
Ref: key,
Status: "done",
progresses[key] = jsonstream.JSONMessage{
ID: key,
Status: jsonstream.PullStatusDone,
}
}
}
}

var ordered []ProgressInfo
for _, key := range keys {
ordered = append(ordered, progresses[key])
stream.WriteObject(progresses[key])
}

stream.WriteObject(ordered)

if done {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/mgr/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (mgr *ImageManager) PullImage(ctx context.Context, ref string, authConfig *
}

pctx, cancel := context.WithCancel(ctx)
stream := jsonstream.New(out)
stream := jsonstream.New(out, nil)
wait := make(chan struct{})

go func() {
Expand Down
Loading

0 comments on commit b58ff0c

Please sign in to comment.