Skip to content

Commit

Permalink
libgit2: improve subtransport logging
Browse files Browse the repository at this point in the history
Debugging connection issues can be extremely difficult,
even more so at scale or when concurrent connections are
required to trigger specific issues.

The changes create a correlation ID for each
SubTransport instance, which allows for greater traceability
when going through all the steps throughout the transport
lifecicle.

Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
  • Loading branch information
Paulo Gomes committed Jun 10, 2022
1 parent f6a389c commit 6a4741c
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 46 deletions.
3 changes: 1 addition & 2 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
feathelper "github.com/fluxcd/pkg/runtime/features"
"github.com/fluxcd/pkg/runtime/testenv"
"github.com/fluxcd/pkg/testserver"
"github.com/go-logr/logr"
"github.com/phayes/freeport"

"github.com/distribution/distribution/v3/configuration"
Expand Down Expand Up @@ -209,7 +208,7 @@ func TestMain(m *testing.M) {

fg := feathelper.FeatureGates{}
fg.SupportedFeatures(features.FeatureGates())
managed.InitManagedTransport(logr.Discard())
managed.InitManagedTransport()

if err := (&GitRepositoryReconciler{
Client: testEnv,
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func main() {
}()

if enabled, _ := features.Enabled(features.GitManagedTransport); enabled {
managed.InitManagedTransport(ctrl.Log.WithName("managed-transport"))
managed.InitManagedTransport()
} else {
if optimize, _ := feathelper.Enabled(features.OptimizedGitClones); optimize {
features.Disable(features.OptimizedGitClones)
Expand Down
53 changes: 43 additions & 10 deletions pkg/git/libgit2/managed/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ package managed

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"errors"
Expand All @@ -55,8 +56,11 @@ import (
"strings"
"sync"

"github.com/fluxcd/pkg/runtime/logger"
pool "github.com/fluxcd/source-controller/internal/transport"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
"github.com/google/uuid"
git2go "github.com/libgit2/git2go/v33"
)

Expand All @@ -81,10 +85,11 @@ func registerManagedHTTP() error {
}

func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
traceLog.Info("[http]: httpSmartSubtransportFactory")
sst := &httpSmartSubtransport{
transport: transport,
httpTransport: pool.NewOrIdle(nil),
ctx: context.Background(),
logger: logr.Discard(),
}

return sst, nil
Expand All @@ -93,6 +98,21 @@ func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Trans
type httpSmartSubtransport struct {
transport *git2go.Transport
httpTransport *http.Transport

// once is used to ensure that logger and context is set only once,
// on the initial (or only) Action call. Without this a mutex must
// be applied to ensure that ctx won't be changed, as this would be
// prone to race conditions in the stdout processing goroutine.
once sync.Once
// ctx defines the context to be used across long-running or
// cancellable operations.
// Defaults to context.Background().
ctx context.Context
// logger keeps a logger instance for logging. This was preferred
// due to the need to have a correlation ID and URL set and
// reused across all log calls.
// If context is not set, this defaults to logr.Discard().
logger logr.Logger
}

func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
Expand Down Expand Up @@ -133,6 +153,16 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
}
t.httpTransport.DisableCompression = false

t.once.Do(func() {
if opts.Context != nil {
t.ctx = opts.Context
t.logger = logr.FromContextOrDiscard(t.ctx).
WithValues("url", opts.TargetURL).
// Sets a correlation ID for all transport level logs.
WithValues("cid", uuid.New())
}
})

client, req, err := createClientRequest(targetURL, action, t.httpTransport, opts.AuthOpts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -176,8 +206,10 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
opts.TargetURL = trimActionSuffix(newURL.String())
AddTransportOptions(transportOptionsURL, *opts)

debugLog.Info("[http]: server responded with redirect",
"newURL", opts.TargetURL, "StatusCode", req.Response.StatusCode)
// show as info, as this should be visible regardless of the
// chosen log-level.
t.logger.Info("[http]: server responded with redirect",
"newUrl", opts.TargetURL, "StatusCode", req.Response.StatusCode)
}
}
}
Expand Down Expand Up @@ -270,15 +302,16 @@ func createClientRequest(targetURL string, action git2go.SmartServiceAction,
}

func (t *httpSmartSubtransport) Close() error {
traceLog.Info("[http]: httpSmartSubtransport.Close()")
t.logger.V(logger.TraceLevel).Info("[http]: httpSmartSubtransport.Close()")
return nil
}

func (t *httpSmartSubtransport) Free() {
traceLog.Info("[http]: httpSmartSubtransport.Free()")
t.logger.V(logger.TraceLevel).Info("[http]: httpSmartSubtransport.Free()")

if t.httpTransport != nil {
traceLog.Info("[http]: release http transport back to pool")
t.logger.V(logger.TraceLevel).Info("[http]: release http transport back to pool")

pool.Release(t.httpTransport)
t.httpTransport = nil
}
Expand Down Expand Up @@ -345,18 +378,18 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {

func (self *httpSmartSubtransportStream) Free() {
if self.resp != nil {
traceLog.Info("[http]: httpSmartSubtransportStream.Free()")
self.owner.logger.V(logger.TraceLevel).Info("[http]: httpSmartSubtransportStream.Free()")

if self.resp.Body != nil {
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
// it should not be a problem to do this more than once.
if _, err := io.Copy(io.Discard, self.resp.Body); err != nil {
traceLog.Error(err, "[http]: cannot discard response body")
self.owner.logger.V(logger.TraceLevel).Error(err, "[http]: cannot discard response body")
}

if err := self.resp.Body.Close(); err != nil {
traceLog.Error(err, "[http]: cannot close response body")
self.owner.logger.V(logger.TraceLevel).Error(err, "[http]: cannot close response body")
}
}
}
Expand Down Expand Up @@ -399,7 +432,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
req.ContentLength = -1
}

traceLog.Info("[http]: new request", "method", req.Method, "URL", req.URL)
self.owner.logger.V(logger.TraceLevel).Info("[http]: new request", "method", req.Method, "postUrl", req.URL)
resp, err = self.client.Do(req)
if err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions pkg/git/libgit2/managed/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
. "github.com/onsi/gomega"

git2go "github.com/libgit2/git2go/v33"
Expand Down Expand Up @@ -170,7 +169,7 @@ func TestHTTPManagedTransport_E2E(t *testing.T) {
defer server.StopHTTP()

// Force managed transport to be enabled
InitManagedTransport(logr.Discard())
InitManagedTransport()

repoPath := "test.git"
err = server.InitRepo("../../testdata/git/repo", git.DefaultBranch, repoPath)
Expand Down Expand Up @@ -253,7 +252,7 @@ func TestHTTPManagedTransport_HandleRedirect(t *testing.T) {
}

// Force managed transport to be enabled
InitManagedTransport(logr.Discard())
InitManagedTransport()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
13 changes: 2 additions & 11 deletions pkg/git/libgit2/managed/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package managed
import (
"sync"
"time"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
)

var (
Expand All @@ -38,9 +35,7 @@ var (
// handshake, put/get).
fullHttpClientTimeOut time.Duration = 10 * time.Minute

debugLog logr.Logger
traceLog logr.Logger
enabled bool
enabled bool
)

// Enabled defines whether the use of Managed Transport is enabled which
Expand All @@ -63,14 +58,10 @@ func Enabled() bool {
//
// This function will only register managed transports once, subsequent calls
// leads to no-op.
func InitManagedTransport(log logr.Logger) error {
func InitManagedTransport() error {
var err error

once.Do(func() {
log.Info("Initializing managed transport")
debugLog = log.V(logger.DebugLevel)
traceLog = log.V(logger.TraceLevel)

if err = registerManagedHTTP(); err != nil {
return
}
Expand Down
54 changes: 41 additions & 13 deletions pkg/git/libgit2/managed/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ import (
"golang.org/x/crypto/ssh"
"golang.org/x/net/proxy"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
"github.com/google/uuid"
git2go "github.com/libgit2/git2go/v33"
)

Expand All @@ -79,17 +82,32 @@ func registerManagedSSH() error {
func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
return &sshSmartSubtransport{
transport: transport,
logger: logr.Discard(),
ctx: context.Background(),
}, nil
}

type sshSmartSubtransport struct {
transport *git2go.Transport

// once is used to ensure that logger and context is set only once,
// on the initial (or only) Action call. Without this a mutex must
// be applied to ensure that ctx won't be changed, as this would be
// prone to race conditions in the stdout processing goroutine.
once sync.Once
// ctx defines the context to be used across long-running or
// cancellable operations.
// Defaults to context.Background().
ctx context.Context
// logger keeps a logger instance for logging. This was preferred
// due to the need to have a correlation ID and Address set and
// reused across all log calls.
// If context is not set, this defaults to logr.Discard().
logger logr.Logger

lastAction git2go.SmartServiceAction
stdin io.WriteCloser
stdout io.Reader
addr string
ctx context.Context

con connection
}
Expand All @@ -111,8 +129,6 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
return nil, fmt.Errorf("could not find transport options for object: %s", transportOptionsURL)
}

t.ctx = opts.Context

u, err := url.Parse(opts.TargetURL)
if err != nil {
return nil, err
Expand Down Expand Up @@ -158,7 +174,17 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
if u.Port() != "" {
port = u.Port()
}
t.addr = net.JoinHostPort(u.Hostname(), port)
addr := net.JoinHostPort(u.Hostname(), port)

once.Do(func() {
if opts.Context != nil {
t.ctx = opts.Context
t.logger = logr.FromContextOrDiscard(t.ctx).
WithValues("addr", addr).
// Sets a correlation ID for all transport level logs.
WithValues("cid", uuid.New())
}
})

sshConfig, err := createClientConfig(opts.AuthOpts)
if err != nil {
Expand Down Expand Up @@ -191,12 +217,12 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
}
t.con.m.RUnlock()

err = t.createConn(t.addr, sshConfig)
err = t.createConn(addr, sshConfig)
if err != nil {
return nil, err
}

traceLog.Info("[ssh]: creating new ssh session")
t.logger.V(logger.TraceLevel).Info("[ssh]: creating new ssh session")
if t.con.session, err = t.con.client.NewSession(); err != nil {
return nil, err
}
Expand All @@ -222,8 +248,8 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.

// In case this goroutine panics, handle recovery.
if r := recover(); r != nil {
traceLog.Error(errors.New(r.(string)),
"[ssh]: recovered from libgit2 ssh smart subtransport panic", "address", t.addr)
t.logger.V(logger.TraceLevel).Error(errors.New(r.(string)),
"[ssh]: recovered from libgit2 ssh smart subtransport panic")
}
}()

Expand Down Expand Up @@ -259,7 +285,7 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go.
}
}()

traceLog.Info("[ssh]: run on remote", "cmd", cmd)
t.logger.V(logger.TraceLevel).Info("[ssh]: run on remote", "cmd", cmd)
if err := t.con.session.Start(cmd); err != nil {
return nil, err
}
Expand All @@ -276,6 +302,7 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
ctx, cancel := context.WithTimeout(context.TODO(), sshConnectionTimeOut)
defer cancel()

t.logger.V(logger.TraceLevel).Info("[ssh] dial connection")
conn, err := proxy.Dial(ctx, "tcp", addr)
if err != nil {
return err
Expand Down Expand Up @@ -303,23 +330,25 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf
// may impair the transport to have successful actions on a new
// SmartSubTransport (i.e. unreleased resources, staled connections).
func (t *sshSmartSubtransport) Close() error {
traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr)
t.logger.V(logger.TraceLevel).Info("[ssh]: sshSmartSubtransport.Close()")
t.con.m.Lock()
defer t.con.m.Unlock()

t.con.currentStream = nil
if t.con.client != nil && t.stdin != nil {
_ = t.stdin.Close()
}
t.stdin = nil

if t.con.session != nil {
traceLog.Info("[ssh]: session.Close()", "server", t.addr)
t.logger.V(logger.TraceLevel).Info("[ssh]: session.Close()")
_ = t.con.session.Close()
}
t.con.session = nil

if t.con.client != nil {
_ = t.con.client.Close()
t.logger.V(logger.TraceLevel).Info("[ssh] close client")
}

t.con.connected = false
Expand All @@ -343,7 +372,6 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) {
}

func (stream *sshSmartSubtransportStream) Free() {
traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()")
}

func createClientConfig(authOpts *git.AuthOptions) (*ssh.ClientConfig, error) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/git/libgit2/managed/ssh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/fluxcd/pkg/ssh"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
. "github.com/onsi/gomega"

"github.com/fluxcd/pkg/gittestserver"
Expand Down Expand Up @@ -89,7 +88,7 @@ func TestSSHManagedTransport_E2E(t *testing.T) {
server.StartSSH()
}()
defer server.StopSSH()
InitManagedTransport(logr.Discard())
InitManagedTransport()

kp, err := ssh.NewEd25519Generator().Generate()
g.Expect(err).ToNot(HaveOccurred())
Expand Down
Loading

0 comments on commit 6a4741c

Please sign in to comment.