Skip to content

Commit

Permalink
Merge pull request #1679 from aluzzardi/logbroker
Browse files Browse the repository at this point in the history
Log Management
  • Loading branch information
aluzzardi authored Nov 4, 2016
2 parents 5afda15 + 3058ba9 commit 2eaae1a
Show file tree
Hide file tree
Showing 30 changed files with 4,534 additions and 46 deletions.
68 changes: 60 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
Expand Down Expand Up @@ -48,18 +49,16 @@ func New(config *Config) (*Agent, error) {
return nil, err
}

worker := newWorker(config.DB, config.Executor)

a := &Agent{
config: config,
worker: worker,
sessionq: make(chan sessionOperation),
started: make(chan struct{}),
stopped: make(chan struct{}),
closed: make(chan struct{}),
ready: make(chan struct{}),
}

a.worker = newWorker(config.DB, config.Executor, a)
return a, nil
}

Expand Down Expand Up @@ -147,18 +146,20 @@ func (a *Agent) run(ctx context.Context) {
defer nodeUpdateTicker.Stop()

var (
backoff time.Duration
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
backoff time.Duration
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
subscriptions = map[string]context.CancelFunc{}
)

if err := a.worker.Init(ctx); err != nil {
log.G(ctx).WithError(err).Error("worker initialization failed")
a.err = err
return // fatal?
}
defer a.worker.Close()

// setup a reliable reporter to call back to us.
reporter := newStatusReporter(ctx, a)
Expand Down Expand Up @@ -186,6 +187,23 @@ func (a *Agent) run(ctx context.Context) {
if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
case sub := <-session.subscriptions:
if sub.Close {
if cancel, ok := subscriptions[sub.ID]; ok {
cancel()
}
delete(subscriptions, sub.ID)
continue
}

if _, ok := subscriptions[sub.ID]; ok {
// Duplicate subscription
continue
}

subCtx, subCancel := context.WithCancel(ctx)
subscriptions[sub.ID] = subCancel
go a.worker.Subscribe(subCtx, sub)
case <-registered:
log.G(ctx).Debugln("agent: registered")
if ready != nil {
Expand Down Expand Up @@ -387,6 +405,40 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
}
}

// Publisher returns a LogPublisher for the given subscription
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, error) {
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
// These should only be best effort and really just buffer until a session is
// ready. Ideally, they would use a separate connection completely.

var (
err error
client api.LogBroker_PublishLogsClient
)

err = a.withSession(ctx, func(session *session) error {
client, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
return err
})
if err != nil {
return nil, err
}

return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
select {
case <-ctx.Done():
client.CloseSend()
return ctx.Err()
default:
}

return client.Send(&api.PublishLogsMessage{
SubscriptionID: subscriptionID,
Messages: []api.LogMessage{message},
})
}), nil
}

// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) {
desc, err := a.config.Executor.Describe(ctx)
Expand Down
39 changes: 39 additions & 0 deletions agent/exec/container/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,45 @@ func (c *containerAdapter) createVolumes(ctx context.Context) error {
return nil
}

func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
apiOptions := types.ContainerLogsOptions{
Follow: options.Follow,
Timestamps: true,
Details: false,
}

if options.Since != nil {
since, err := ptypes.Timestamp(options.Since)
if err != nil {
return nil, err
}
apiOptions.Since = since.Format(time.RFC3339Nano)
}

if options.Tail < 0 {
// See protobuf documentation for details of how this works.
apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
} else if options.Tail > 0 {
return nil, fmt.Errorf("tail relative to start of logs not supported via docker API")
}

if len(options.Streams) == 0 {
// empty == all
apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
} else {
for _, stream := range options.Streams {
switch stream {
case api.LogStreamStdout:
apiOptions.ShowStdout = true
case api.LogStreamStderr:
apiOptions.ShowStderr = true
}
}
}

return c.client.ContainerLogs(ctx, c.container.name(), apiOptions)
}

// TODO(mrjana/stevvooe): There is no proper error code for network not found
// error in engine-api. Resort to string matching until engine-api is fixed.

Expand Down
143 changes: 139 additions & 4 deletions agent/exec/container/controller.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package container

import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"strconv"
"strings"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
Expand All @@ -12,8 +17,10 @@ import (
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)

// controller implements agent.Controller against docker's API.
Expand Down Expand Up @@ -246,14 +253,11 @@ func (r *controller) Start(ctx context.Context) error {
}

// Wait on the container to exit.
func (r *controller) Wait(pctx context.Context) error {
func (r *controller) Wait(ctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}

ctx, cancel := context.WithCancel(pctx)
defer cancel()

// check the initial state and report that.
ctnr, err := r.adapter.inspect(ctx)
if err != nil {
Expand Down Expand Up @@ -400,6 +404,137 @@ func (r *controller) Remove(ctx context.Context) error {
return nil
}

// waitReady waits for a container to be "ready".
// Ready means it's past the started state.
func (r *controller) waitReady(pctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}

ctx, cancel := context.WithCancel(pctx)
defer cancel()

eventq, closed, err := r.adapter.events(ctx)
if err != nil {
return err
}

ctnr, err := r.adapter.inspect(ctx)
if err != nil {
if !isUnknownContainer(err) {
return errors.Wrap(err, "inspect container failed")
}
} else {
switch ctnr.State.Status {
case "running", "exited", "dead":
return nil
}
}

for {
select {
case event := <-eventq:
if !r.matchevent(event) {
continue
}

switch event.Action {
case "start":
return nil
}
case <-closed:
// restart!
eventq, closed, err = r.adapter.events(ctx)
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-r.closed:
return r.err
}
}
}

func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
if err := r.checkClosed(); err != nil {
return err
}

if err := r.waitReady(ctx); err != nil {
return errors.Wrap(err, "container not ready for logs")
}

rc, err := r.adapter.logs(ctx, options)
if err != nil {
return errors.Wrap(err, "failed getting container logs")
}
defer rc.Close()

var (
// use a rate limiter to keep things under control but also provides some
// ability coalesce messages.
limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s
msgctx = api.LogContext{
NodeID: r.task.NodeID,
ServiceID: r.task.ServiceID,
TaskID: r.task.ID,
}
)

brd := bufio.NewReader(rc)
for {
// so, message header is 8 bytes, treat as uint64, pull stream off MSB
var header uint64
if err := binary.Read(brd, binary.BigEndian, &header); err != nil {
if err == io.EOF {
return nil
}

return errors.Wrap(err, "failed reading log header")
}

stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3))

// limit here to decrease allocation back pressure.
if err := limiter.WaitN(ctx, int(size)); err != nil {
return errors.Wrap(err, "failed rate limiter")
}

buf := make([]byte, size)
_, err := io.ReadFull(brd, buf)
if err != nil {
return errors.Wrap(err, "failed reading buffer")
}

// Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish
parts := bytes.SplitN(buf, []byte(" "), 2)
if len(parts) != 2 {
return fmt.Errorf("invalid timestamp in log message: %v", buf)
}

ts, err := time.Parse(time.RFC3339Nano, string(parts[0]))
if err != nil {
return errors.Wrap(err, "failed to parse timestamp")
}

tsp, err := ptypes.TimestampProto(ts)
if err != nil {
return errors.Wrap(err, "failed to convert timestamp")
}

if err := publisher.Publish(ctx, api.LogMessage{
Context: msgctx,
Timestamp: tsp,
Stream: api.LogStream(stream),

Data: parts[1],
}); err != nil {
return errors.Wrap(err, "failed to publish log message")
}
}
}

// Close the controller and clean up any ephemeral resources.
func (r *controller) Close() error {
select {
Expand Down
24 changes: 23 additions & 1 deletion agent/exec/container/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,44 @@ func TestControllerFlowIntegration(t *testing.T) {
ID: "dockerexec-integration-task-id",
ServiceID: "dockerexec-integration-service-id",
NodeID: "dockerexec-integration-node-id",
ServiceAnnotations: api.Annotations{
Name: "dockerexec-integration",
},
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Command: []string{"sh", "-c", "sleep 5"},
Command: []string{"sh", "-c", "sleep 5; echo hello; echo stderr >&2"},
Image: "alpine",
},
},
},
}

var receivedLogs bool
publisher := exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
receivedLogs = true

switch message.Stream {
case api.LogStreamStdout:
assert.Equal(t, "hello\n", string(message.Data))
case api.LogStreamStderr:
assert.Equal(t, "stderr\n", string(message.Data))
}

t.Log(message)
return nil
})

ctlr, err := newController(client, task, nil)
assert.NoError(t, err)
assert.NotNil(t, ctlr)
assert.NoError(t, ctlr.Prepare(ctx))
assert.NoError(t, ctlr.Start(ctx))
assert.NoError(t, ctlr.(exec.ControllerLogs).Logs(ctx, publisher, api.LogSubscriptionOptions{
Follow: true,
}))
assert.NoError(t, ctlr.Wait(ctx))
assert.True(t, receivedLogs)
assert.NoError(t, ctlr.Shutdown(ctx))
assert.NoError(t, ctlr.Remove(ctx))
assert.NoError(t, ctlr.Close())
Expand Down
Loading

0 comments on commit 2eaae1a

Please sign in to comment.