Skip to content

Commit

Permalink
Merge pull request #2274 from HusterWan/zr/fix-events-connect
Browse files Browse the repository at this point in the history
bugfix:  event api connection may be recycled if keeps idle too long
  • Loading branch information
fuweid authored Sep 26, 2018
2 parents 36d308e + a41b6bd commit 8ac6e9e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
2 changes: 1 addition & 1 deletion apis/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func initRoute(s *Server) http.Handler {
s.addRoute(r, http.MethodGet, "/info", s.info)
s.addRoute(r, http.MethodGet, "/version", s.version)
s.addRoute(r, http.MethodPost, "/auth", s.auth)
s.addRoute(r, http.MethodGet, "/events", s.events)
s.addRoute(r, http.MethodGet, "/events", withCancelHandler(s.events))

// daemon, we still list this API into system manager.
s.addRoute(r, http.MethodPost, "/daemon/update", s.updateDaemon)
Expand Down
8 changes: 5 additions & 3 deletions apis/server/system_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/docker/docker/pkg/ioutils"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func (s *Server) ping(context context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
Expand Down Expand Up @@ -69,9 +70,6 @@ func (s *Server) auth(ctx context.Context, rw http.ResponseWriter, req *http.Req
}

func (s *Server) events(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

rw.Header().Set("Content-Type", "application/json")
output := ioutils.NewWriteFlusher(rw)
defer output.Close()
Expand Down Expand Up @@ -128,6 +126,7 @@ func (s *Server) events(ctx context.Context, rw http.ResponseWriter, req *http.R
select {
case ev := <-eventq:
if err := enc.Encode(ev); err != nil {
logrus.Errorf("encode events got an error: %v", err)
return err
}
case err := <-errq:
Expand All @@ -137,6 +136,9 @@ func (s *Server) events(ctx context.Context, rw http.ResponseWriter, req *http.R
return nil
case <-timeout:
return nil
case <-ctx.Done():
logrus.Debug("client context is cancelled, stop sending events")
return nil
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions cli/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,24 @@ type eventProcessor func(event types.EventsMessage, err error) error
// Each output includes the event type, actor id, name and action.
// Actor attributes are printed at the end if the actor has any.
func printOutput(event types.EventsMessage, output io.Writer) {
// skip empty event message
if event == (types.EventsMessage{}) {
return
}

if event.TimeNano != 0 {
fmt.Fprintf(output, "%s ", time.Unix(0, event.TimeNano).Format(utils.RFC3339NanoFixed))
} else if event.Time != 0 {
fmt.Fprintf(output, "%s ", time.Unix(event.Time, 0).Format(utils.RFC3339NanoFixed))
}

fmt.Fprintf(output, "%s %s %s", event.Type, event.Action, event.Actor.ID)
id := ""
if event.Actor != nil {
id = event.Actor.ID
}
fmt.Fprintf(output, "%s %s %s", event.Type, event.Action, id)

if len(event.Actor.Attributes) > 0 {
if event.Actor != nil && len(event.Actor.Attributes) > 0 {
var attrs []string
var keys []string
for k := range event.Actor.Attributes {
Expand Down
12 changes: 6 additions & 6 deletions ctrd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (c *Client) Cleanup() error {
// collectContainerdEvents collects events generated by containerd.
func (c *Client) collectContainerdEvents() {
ctx := context.Background()
topicsToHandle := []string{TaskOOMEventTopic, ContainersDeleteEventTopic}
topicsToHandle := []string{TaskOOMEventTopic, TaskExitEventTopic}

// set filters for subscribe containerd events,
// now we only care about task and container events.
Expand Down Expand Up @@ -384,15 +384,15 @@ func (c *Client) collectContainerdEvents() {
}

switch e.Topic {
case ContainersDeleteEventTopic:
cDelEvent, ok := out.(*eventstypes.ContainerDelete)
case TaskExitEventTopic:
exitEvent, ok := out.(*eventstypes.TaskExit)
if !ok {
logrus.Warnf("failed to parse %s event: %#v", ContainersDeleteEventTopic, out)
logrus.Warnf("failed to parse %s event: %#v", TaskExitEventTopic, out)
continue
}

action = "die"
containerID = cDelEvent.ID
containerID = exitEvent.ContainerID
attributes["exitcode"] = strconv.Itoa(int(exitEvent.ExitStatus))
case TaskOOMEventTopic:
oomEvent, ok := out.(*eventstypes.TaskOOM)
if !ok {
Expand Down

0 comments on commit 8ac6e9e

Please sign in to comment.