Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Merge pull request #671 from GabyCT/topic/tracer
Browse files Browse the repository at this point in the history
tracing: Wrapper for tracing functions
  • Loading branch information
GabyCT authored Oct 22, 2019
2 parents 3e62565 + d3e66bf commit cc6866a
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 70 deletions.
86 changes: 41 additions & 45 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/opencontainers/runc/libcontainer/configs"
_ "github.com/opencontainers/runc/libcontainer/nsenter"
"github.com/opencontainers/runtime-spec/specs-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -175,9 +174,6 @@ var logsVSockPort = uint32(0)
// commType is used to denote the communication channel type used.
type commType int

// agentSpan is used to denote the span tracing that is being used.
type agentSpan opentracing.Span

const (
// virtio-serial channel
serialCh commType = iota
Expand Down Expand Up @@ -242,7 +238,7 @@ func (p *process) closePostExitFDs() {
}
}

func (c *container) trace(name string) (agentSpan, context.Context) {
func (c *container) trace(name string) (*agentSpan, context.Context) {
if c.ctx == nil {
agentLog.WithField("type", "bug").Error("trace called before context set")
c.ctx = context.Background()
Expand All @@ -259,16 +255,16 @@ func (c *container) setProcess(process *process) {

func (c *container) deleteProcess(execID string) {
span, _ := c.trace("deleteProcess")
span.SetTag("exec-id", execID)
defer span.Finish()
span.setTag("exec-id", execID)
defer span.finish()
c.Lock()
delete(c.processes, execID)
c.Unlock()
}

func (c *container) removeContainer() error {
span, _ := c.trace("removeContainer")
defer span.Finish()
defer span.finish()
// This will terminates all processes related to this container, and
// destroy the container right after. But this will error in case the
// container in not in the right state.
Expand All @@ -291,15 +287,15 @@ func (c *container) getProcess(execID string) (*process, error) {
return proc, nil
}

func (s *sandbox) trace(name string) (agentSpan, context.Context) {
func (s *sandbox) trace(name string) (*agentSpan, context.Context) {
if s.ctx == nil {
agentLog.WithField("type", "bug").Error("trace called before context set")
s.ctx = context.Background()
}

span, ctx := trace(s.ctx, "sandbox", name)

span.SetTag("sandbox", s.id)
span.setTag("sandbox", s.id)

return span, ctx
}
Expand Down Expand Up @@ -327,8 +323,8 @@ func (s *sandbox) setSandboxStorage(path string) bool {
// for any OCI hooks
func (s *sandbox) scanGuestHooks(guestHookPath string) {
span, _ := s.trace("scanGuestHooks")
span.SetTag("guest-hook-path", guestHookPath)
defer span.Finish()
span.setTag("guest-hook-path", guestHookPath)
defer span.finish()

fieldLogger := agentLog.WithField("oci-hook-path", guestHookPath)
fieldLogger.Info("Scanning guest filesystem for OCI hooks")
Expand All @@ -348,7 +344,7 @@ func (s *sandbox) scanGuestHooks(guestHookPath string) {
// found to the OCI spec
func (s *sandbox) addGuestHooks(spec *specs.Spec) {
span, _ := s.trace("addGuestHooks")
defer span.Finish()
defer span.finish()

if spec == nil {
return
Expand All @@ -374,8 +370,8 @@ func (s *sandbox) addGuestHooks(spec *specs.Spec) {
// acquiring a lock on sandbox.
func (s *sandbox) unSetSandboxStorage(path string) (bool, error) {
span, _ := s.trace("unSetSandboxStorage")
span.SetTag("path", path)
defer span.Finish()
span.setTag("path", path)
defer span.finish()

if sbs, ok := s.storages[path]; ok {
sbs.refCount--
Expand All @@ -397,8 +393,8 @@ func (s *sandbox) unSetSandboxStorage(path string) (bool, error) {
// acquiring a lock on sandbox.
func (s *sandbox) removeSandboxStorage(path string) error {
span, _ := s.trace("removeSandboxStorage")
span.SetTag("path", path)
defer span.Finish()
span.setTag("path", path)
defer span.finish()

err := removeMounts([]string{path})
if err != nil {
Expand All @@ -419,8 +415,8 @@ func (s *sandbox) removeSandboxStorage(path string) error {
// acquiring a lock on sandbox.
func (s *sandbox) unsetAndRemoveSandboxStorage(path string) error {
span, _ := s.trace("unsetAndRemoveSandboxStorage")
span.SetTag("path", path)
defer span.Finish()
span.setTag("path", path)
defer span.finish()

removeSbs, err := s.unSetSandboxStorage(path)
if err != nil {
Expand Down Expand Up @@ -455,9 +451,9 @@ func (s *sandbox) setContainer(ctx context.Context, id string, ctr *container) {
s.ctx = ctx

span, _ := s.trace("setContainer")
span.SetTag("id", id)
span.SetTag("container", ctr.id)
defer span.Finish()
span.setTag("id", id)
span.setTag("container", ctr.id)
defer span.finish()

s.Lock()
s.containers[id] = ctr
Expand All @@ -466,8 +462,8 @@ func (s *sandbox) setContainer(ctx context.Context, id string, ctr *container) {

func (s *sandbox) deleteContainer(id string) {
span, _ := s.trace("deleteContainer")
span.SetTag("container", id)
defer span.Finish()
span.setTag("container", id)
defer span.finish()

s.Lock()

Expand Down Expand Up @@ -554,7 +550,7 @@ func (s *sandbox) readStdio(cid, execID string, length int, stdout bool) ([]byte

func (s *sandbox) setupSharedNamespaces(ctx context.Context) error {
span, _ := trace(ctx, "sandbox", "setupSharedNamespaces")
defer span.Finish()
defer span.finish()

// Set up shared IPC namespace
ns, err := setupPersistentNs(nsTypeIPC)
Expand All @@ -575,7 +571,7 @@ func (s *sandbox) setupSharedNamespaces(ctx context.Context) error {

func (s *sandbox) unmountSharedNamespaces() error {
span, _ := s.trace("unmountSharedNamespaces")
defer span.Finish()
defer span.finish()

if err := unix.Unmount(s.sharedIPCNs.path, unix.MNT_DETACH); err != nil {
return err
Expand All @@ -592,7 +588,7 @@ func (s *sandbox) unmountSharedNamespaces() error {
// be destroyed if other processes are terminated.
func (s *sandbox) setupSharedPidNs() error {
span, _ := s.trace("setupSharedPidNs")
defer span.Finish()
defer span.finish()

cmd := &exec.Cmd{
Path: selfBinPath,
Expand Down Expand Up @@ -620,7 +616,7 @@ func (s *sandbox) setupSharedPidNs() error {

func (s *sandbox) teardownSharedPidNs() error {
span, _ := s.trace("teardownSharedPidNs")
defer span.Finish()
defer span.finish()

if !s.sandboxPidNs {
// We are not in a case where we have created a pause process.
Expand Down Expand Up @@ -648,7 +644,7 @@ func (s *sandbox) teardownSharedPidNs() error {

func (s *sandbox) waitForStopServer() {
span, _ := s.trace("waitForStopServer")
defer span.Finish()
defer span.finish()

fieldLogger := agentLog.WithField("subsystem", "stopserverwatcher")

Expand Down Expand Up @@ -685,7 +681,7 @@ func (s *sandbox) waitForStopServer() {

fieldLogger.Info("Force stopping the server now")

span.SetTag("forced", true)
span.setTag("forced", true)
s.stopGRPC()
}

Expand Down Expand Up @@ -714,11 +710,11 @@ func (s *sandbox) listenToUdevEvents() {
}

span, _ := trace(rootContext, "udev", "udev event")
span.SetTag("udev-action", uEv.Action)
span.SetTag("udev-name", uEv.DevName)
span.SetTag("udev-path", uEv.DevPath)
span.SetTag("udev-subsystem", uEv.SubSystem)
span.SetTag("udev-seqno", uEv.SeqNum)
span.setTag("udev-action", uEv.Action)
span.setTag("udev-name", uEv.DevName)
span.setTag("udev-path", uEv.DevPath)
span.setTag("udev-subsystem", uEv.SubSystem)
span.setTag("udev-seqno", uEv.SeqNum)

fieldLogger = fieldLogger.WithFields(logrus.Fields{
"uevent-action": uEv.Action,
Expand Down Expand Up @@ -781,7 +777,7 @@ func (s *sandbox) listenToUdevEvents() {
}
}

span.Finish()
span.finish()
}
}

Expand Down Expand Up @@ -832,7 +828,7 @@ func (s *sandbox) signalHandlerLoop(sigCh chan os.Signal, errCh chan error) {

func (s *sandbox) setupSignalHandler() error {
span, _ := s.trace("setupSignalHandler")
defer span.Finish()
defer span.finish()

sigCh := make(chan os.Signal, 512)
signal.Notify(sigCh, unix.SIGCHLD)
Expand Down Expand Up @@ -1011,7 +1007,7 @@ func (s *sandbox) initLogger(ctx context.Context) error {

func (s *sandbox) initChannel() error {
span, ctx := s.trace("initChannel")
defer span.Finish()
defer span.finish()

c, err := newChannel(ctx)
if err != nil {
Expand All @@ -1031,20 +1027,20 @@ func makeUnaryInterceptor() grpc.UnaryServerInterceptor {

grpcCall := info.FullMethod
var ctx context.Context
var span agentSpan
var span *agentSpan

if tracing {
ctx = getGRPCContext()
span, _ = trace(ctx, "gRPC", grpcCall)
span.SetTag("grpc-method-type", "unary")
span.setTag("grpc-method-type", "unary")

if strings.HasSuffix(grpcCall, "/ReadStdout") || strings.HasSuffix(grpcCall, "/WriteStdin") {
// Add a tag to allow filtering of those calls dealing
// input and output. These tend to be very long and
// being able to filter them out allows the
// performance of "core" calls to be determined
// without the "noise" of these calls.
span.SetTag("api-category", "interactive")
span.setTag("api-category", "interactive")
}
} else {
// Just log call details
Expand Down Expand Up @@ -1079,7 +1075,7 @@ func makeUnaryInterceptor() grpc.UnaryServerInterceptor {
// - Tracing was enabled but the handler (StopTracing()) disabled it.
// - Tracing was disabled but the handler (StartTracing()) enabled it.
if span != nil {
span.Finish()
span.finish()
}

if stopTracingCalled {
Expand All @@ -1092,7 +1088,7 @@ func makeUnaryInterceptor() grpc.UnaryServerInterceptor {

func (s *sandbox) startGRPC() {
span, _ := s.trace("startGRPC")
defer span.Finish()
defer span.finish()

// Save the context for gRPC calls. They are provided with a different
// context, but we need them to use our context as it contains trace
Expand All @@ -1111,9 +1107,9 @@ func (s *sandbox) startGRPC() {
if collatedTrace {
// "collated" tracing (allow agent traces to be
// associated with runtime-initiated traces.
tracer := span.Tracer()
tracer := span.tracer()

serverOpts = append(serverOpts, grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))
serverOpts = append(serverOpts, grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer.tracer)))
} else {
// Enable interceptor whether tracing is enabled or not. This
// is necessary to support StartTracing() and StopTracing()
Expand Down
12 changes: 6 additions & 6 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type channel interface {
// If there are neither vsocks nor serial ports, an error is returned.
func newChannel(ctx context.Context) (channel, error) {
span, _ := trace(ctx, "channel", "newChannel")
defer span.Finish()
defer span.finish()

var serialErr error
var vsockErr error
Expand Down Expand Up @@ -91,13 +91,13 @@ func newChannel(ctx context.Context) (channel, error) {

func checkForSerialChannel(ctx context.Context) (*serialChannel, error) {
span, _ := trace(ctx, "channel", "checkForSerialChannel")
defer span.Finish()
defer span.finish()

// Check serial port path
serialPath, serialErr := findVirtualSerialPath(serialChannelName)
if serialErr == nil {
span.SetTag("channel-type", "serial")
span.SetTag("serial-path", serialPath)
span.setTag("channel-type", "serial")
span.setTag("serial-path", serialPath)
agentLog.Debug("Serial channel type detected")
return &serialChannel{serialPath: serialPath}, nil
}
Expand All @@ -107,7 +107,7 @@ func checkForSerialChannel(ctx context.Context) (*serialChannel, error) {

func checkForVsockChannel(ctx context.Context) (*vSockChannel, error) {
span, _ := trace(ctx, "channel", "checkForVsockChannel")
defer span.Finish()
defer span.finish()

// check vsock path
if _, err := os.Stat(vSockDevPath); err != nil {
Expand All @@ -116,7 +116,7 @@ func checkForVsockChannel(ctx context.Context) (*vSockChannel, error) {

vSockSupported, vsockErr := isAFVSockSupportedFunc()
if vSockSupported && vsockErr == nil {
span.SetTag("channel-type", "vsock")
span.setTag("channel-type", "vsock")
agentLog.Debug("Vsock channel type detected")
return &vSockChannel{}, nil
}
Expand Down
6 changes: 3 additions & 3 deletions mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ func mountStorage(storage pb.Storage) error {
// each storage.
func addStorages(ctx context.Context, storages []*pb.Storage, s *sandbox) (mounts []string, err error) {
span, ctx := trace(ctx, "mount", "addStorages")
span.SetTag("sandbox", s.id)
defer span.Finish()
span.setTag("sandbox", s.id)
defer span.finish()

var mountList []string
var storageList []string
Expand Down Expand Up @@ -392,7 +392,7 @@ func addStorages(ctx context.Context, storages []*pb.Storage, s *sandbox) (mount
// code to each driver.
handlerSpan, _ := trace(ctx, "mount", storage.Driver)
mountPoint, err := devHandler(ctx, *storage, s)
handlerSpan.Finish()
handlerSpan.finish()

if _, ok := s.storages[storage.MountPoint]; ok {
storageList = append([]string{storage.MountPoint}, storageList...)
Expand Down
2 changes: 1 addition & 1 deletion network.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func (s *sandbox) removeNetwork() error {
// Bring up localhost network interface.
func (s *sandbox) handleLocalhost() error {
span, _ := s.trace("handleLocalhost")
defer span.Finish()
defer span.finish()

// If not running as the init daemon, there is nothing to do as the
// localhost interface will already exist.
Expand Down
Loading

0 comments on commit cc6866a

Please sign in to comment.