Skip to content

Commit

Permalink
libgit2: improve subtransport logging
Browse files Browse the repository at this point in the history
Debugging connection issues can be extremely difficult,
even more so at scale or when concurrent connections are
required to trigger specific issues.

The changes create a correlation ID for each
SubTransport instance, which allows for greater traceability
when going through all the steps throughout the transport
lifecicle.

Signed-off-by: Paulo Gomes <paulo.gomes@weave.works>
  • Loading branch information
Paulo Gomes committed Jun 13, 2022
1 parent 7a797e3 commit 40a43a4
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 47 deletions.
9 changes: 8 additions & 1 deletion controllers/gitrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

securejoin "github.com/cyphar/filepath-securejoin"
"github.com/fluxcd/pkg/runtime/logger"
"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -159,7 +160,13 @@ func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, o

func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a correlation ID for all transport level logs.
WithValues("cid", uuid.New())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the GitRepository
obj := &sourcev1.GitRepository{}
Expand Down
3 changes: 1 addition & 2 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
feathelper "github.com/fluxcd/pkg/runtime/features"
"github.com/fluxcd/pkg/runtime/testenv"
"github.com/fluxcd/pkg/testserver"
"github.com/go-logr/logr"
"github.com/phayes/freeport"

"github.com/distribution/distribution/v3/configuration"
Expand Down Expand Up @@ -209,7 +208,7 @@ func TestMain(m *testing.M) {

fg := feathelper.FeatureGates{}
fg.SupportedFeatures(features.FeatureGates())
managed.InitManagedTransport(logr.Discard())
managed.InitManagedTransport()

if err := (&GitRepositoryReconciler{
Client: testEnv,
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func main() {
}()

if enabled, _ := features.Enabled(features.GitManagedTransport); enabled {
managed.InitManagedTransport(ctrl.Log.WithName("managed-transport"))
managed.InitManagedTransport()
} else {
if optimize, _ := feathelper.Enabled(features.OptimizedGitClones); optimize {
features.Disable(features.OptimizedGitClones)
Expand Down
52 changes: 42 additions & 10 deletions pkg/git/libgit2/managed/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ package managed

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"errors"
Expand All @@ -55,9 +56,12 @@ import (
"strings"
"sync"

"github.com/fluxcd/pkg/runtime/logger"
pool "github.com/fluxcd/source-controller/internal/transport"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
git2go "github.com/libgit2/git2go/v33"
ctrl "sigs.k8s.io/controller-runtime"
)

var actionSuffixes = []string{
Expand All @@ -81,10 +85,11 @@ func registerManagedHTTP() error {
}

func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
traceLog.Info("[http]: httpSmartSubtransportFactory")
sst := &httpSmartSubtransport{
transport: transport,
httpTransport: pool.NewOrIdle(nil),
ctx: context.Background(),
logger: logr.Discard(),
}

return sst, nil
Expand All @@ -93,6 +98,21 @@ func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Trans
type httpSmartSubtransport struct {
transport *git2go.Transport
httpTransport *http.Transport

// once is used to ensure that logger and ctx is set only once,
// on the initial (or only) Action call. Without this a mutex must
// be applied to ensure that ctx won't be changed, as this would be
// prone to race conditions in the stdout processing goroutine.
once sync.Once
// ctx defines the context to be used across long-running or
// cancellable operations.
// Defaults to context.Background().
ctx context.Context
// logger keeps a Logger instance for logging. This was preferred
// due to the need to have a correlation ID and URL set and
// reused across all log calls.
// If context is not set, this defaults to logr.Discard().
logger logr.Logger
}

func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
Expand Down Expand Up @@ -133,6 +153,15 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
}
t.httpTransport.DisableCompression = false

t.once.Do(func() {
if opts.Context != nil {
t.ctx = opts.Context
t.logger = ctrl.LoggerFrom(t.ctx,
"transportType", "http",
"url", opts.TargetURL)
}
})

client, req, err := createClientRequest(targetURL, action, t.httpTransport, opts.AuthOpts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -176,8 +205,10 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
opts.TargetURL = trimActionSuffix(newURL.String())
AddTransportOptions(transportOptionsURL, *opts)

debugLog.Info("[http]: server responded with redirect",
"newURL", opts.TargetURL, "StatusCode", req.Response.StatusCode)
// show as info, as this should be visible regardless of the
// chosen log-level.
t.logger.Info("server responded with redirect",
"newUrl", opts.TargetURL, "StatusCode", req.Response.StatusCode)
}
}
}
Expand Down Expand Up @@ -270,15 +301,16 @@ func createClientRequest(targetURL string, action git2go.SmartServiceAction,
}

func (t *httpSmartSubtransport) Close() error {
traceLog.Info("[http]: httpSmartSubtransport.Close()")
t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Close()")
return nil
}

func (t *httpSmartSubtransport) Free() {
traceLog.Info("[http]: httpSmartSubtransport.Free()")
t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Free()")

if t.httpTransport != nil {
traceLog.Info("[http]: release http transport back to pool")
t.logger.V(logger.TraceLevel).Info("release http transport back to pool")

pool.Release(t.httpTransport)
t.httpTransport = nil
}
Expand Down Expand Up @@ -345,18 +377,18 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {

func (self *httpSmartSubtransportStream) Free() {
if self.resp != nil {
traceLog.Info("[http]: httpSmartSubtransportStream.Free()")
self.owner.logger.V(logger.TraceLevel).Info("httpSmartSubtransportStream.Free()")

if self.resp.Body != nil {
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
// it should not be a problem to do this more than once.
if _, err := io.Copy(io.Discard, self.resp.Body); err != nil {
traceLog.Error(err, "[http]: cannot discard response body")
self.owner.logger.V(logger.TraceLevel).Error(err, "cannot discard response body")
}

if err := self.resp.Body.Close(); err != nil {
traceLog.Error(err, "[http]: cannot close response body")
self.owner.logger.V(logger.TraceLevel).Error(err, "cannot close response body")
}
}
}
Expand Down Expand Up @@ -399,7 +431,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
req.ContentLength = -1
}

traceLog.Info("[http]: new request", "method", req.Method, "URL", req.URL)
self.owner.logger.V(logger.TraceLevel).Info("new request", "method", req.Method, "postUrl", req.URL)
resp, err = self.client.Do(req)
if err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions pkg/git/libgit2/managed/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
. "github.com/onsi/gomega"

git2go "github.com/libgit2/git2go/v33"
Expand Down Expand Up @@ -170,7 +169,7 @@ func TestHTTPManagedTransport_E2E(t *testing.T) {
defer server.StopHTTP()

// Force managed transport to be enabled
InitManagedTransport(logr.Discard())
InitManagedTransport()

repoPath := "test.git"
err = server.InitRepo("../../testdata/git/repo", git.DefaultBranch, repoPath)
Expand Down Expand Up @@ -253,7 +252,7 @@ func TestHTTPManagedTransport_HandleRedirect(t *testing.T) {
}

// Force managed transport to be enabled
InitManagedTransport(logr.Discard())
InitManagedTransport()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
13 changes: 2 additions & 11 deletions pkg/git/libgit2/managed/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package managed
import (
"sync"
"time"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
)

var (
Expand All @@ -38,9 +35,7 @@ var (
// handshake, put/get).
fullHttpClientTimeOut time.Duration = 10 * time.Minute

debugLog logr.Logger
traceLog logr.Logger
enabled bool
enabled bool
)

// Enabled defines whether the use of Managed Transport is enabled which
Expand All @@ -63,14 +58,10 @@ func Enabled() bool {
//
// This function will only register managed transports once, subsequent calls
// leads to no-op.
func InitManagedTransport(log logr.Logger) error {
func InitManagedTransport() error {
var err error

once.Do(func() {
log.Info("Initializing managed transport")
debugLog = log.V(logger.DebugLevel)
traceLog = log.V(logger.TraceLevel)

if err = registerManagedHTTP(); err != nil {
return
}
Expand Down
Loading

0 comments on commit 40a43a4

Please sign in to comment.