diff --git a/controllers/gitrepository_controller.go b/controllers/gitrepository_controller.go index 912b28982..b16930c88 100644 --- a/controllers/gitrepository_controller.go +++ b/controllers/gitrepository_controller.go @@ -29,6 +29,7 @@ import ( securejoin "github.com/cyphar/filepath-securejoin" "github.com/fluxcd/pkg/runtime/logger" + "github.com/google/uuid" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -159,7 +160,13 @@ func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, o func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) { start := time.Now() - log := ctrl.LoggerFrom(ctx) + log := ctrl.LoggerFrom(ctx). + // Sets a correlation ID for all transport level logs. + WithValues("cid", uuid.New()) + + // logger will be associated to the new context that is + // returned from ctrl.LoggerInto. + ctx = ctrl.LoggerInto(ctx, log) // Fetch the GitRepository obj := &sourcev1.GitRepository{} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index a8ccb8039..a633bbcdc 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -40,7 +40,6 @@ import ( feathelper "github.com/fluxcd/pkg/runtime/features" "github.com/fluxcd/pkg/runtime/testenv" "github.com/fluxcd/pkg/testserver" - "github.com/go-logr/logr" "github.com/phayes/freeport" "github.com/distribution/distribution/v3/configuration" @@ -209,7 +208,7 @@ func TestMain(m *testing.M) { fg := feathelper.FeatureGates{} fg.SupportedFeatures(features.FeatureGates()) - managed.InitManagedTransport(logr.Discard()) + managed.InitManagedTransport() if err := (&GitRepositoryReconciler{ Client: testEnv, diff --git a/main.go b/main.go index 83d3cd429..75dd6d34f 100644 --- a/main.go +++ b/main.go @@ -311,7 +311,7 @@ func main() { }() if enabled, _ := features.Enabled(features.GitManagedTransport); enabled { - managed.InitManagedTransport(ctrl.Log.WithName("managed-transport")) + managed.InitManagedTransport() } else { if optimize, _ := feathelper.Enabled(features.OptimizedGitClones); optimize { features.Disable(features.OptimizedGitClones) diff --git a/pkg/git/libgit2/managed/http.go b/pkg/git/libgit2/managed/http.go index 32630768f..4330a1fd7 100644 --- a/pkg/git/libgit2/managed/http.go +++ b/pkg/git/libgit2/managed/http.go @@ -45,6 +45,7 @@ package managed import ( "bytes" + "context" "crypto/tls" "crypto/x509" "errors" @@ -55,9 +56,12 @@ import ( "strings" "sync" + "github.com/fluxcd/pkg/runtime/logger" pool "github.com/fluxcd/source-controller/internal/transport" "github.com/fluxcd/source-controller/pkg/git" + "github.com/go-logr/logr" git2go "github.com/libgit2/git2go/v33" + ctrl "sigs.k8s.io/controller-runtime" ) var actionSuffixes = []string{ @@ -81,10 +85,11 @@ func registerManagedHTTP() error { } func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) { - traceLog.Info("[http]: httpSmartSubtransportFactory") sst := &httpSmartSubtransport{ transport: transport, httpTransport: pool.NewOrIdle(nil), + ctx: context.Background(), + logger: logr.Discard(), } return sst, nil @@ -93,6 +98,21 @@ func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Trans type httpSmartSubtransport struct { transport *git2go.Transport httpTransport *http.Transport + + // once is used to ensure that logger and context is set only once, + // on the initial (or only) Action call. Without this a mutex must + // be applied to ensure that ctx won't be changed, as this would be + // prone to race conditions in the stdout processing goroutine. + once sync.Once + // ctx defines the context to be used across long-running or + // cancellable operations. + // Defaults to context.Background(). + ctx context.Context + // logger keeps a logger instance for logging. This was preferred + // due to the need to have a correlation ID and URL set and + // reused across all log calls. + // If context is not set, this defaults to logr.Discard(). + logger logr.Logger } func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) { @@ -133,6 +153,15 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go } t.httpTransport.DisableCompression = false + t.once.Do(func() { + if opts.Context != nil { + t.ctx = opts.Context + t.logger = ctrl.LoggerFrom(t.ctx, + "transportType", "http", + "url", opts.TargetURL) + } + }) + client, req, err := createClientRequest(targetURL, action, t.httpTransport, opts.AuthOpts) if err != nil { return nil, err @@ -176,8 +205,10 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go opts.TargetURL = trimActionSuffix(newURL.String()) AddTransportOptions(transportOptionsURL, *opts) - debugLog.Info("[http]: server responded with redirect", - "newURL", opts.TargetURL, "StatusCode", req.Response.StatusCode) + // show as info, as this should be visible regardless of the + // chosen log-level. + t.logger.Info("server responded with redirect", + "newUrl", opts.TargetURL, "StatusCode", req.Response.StatusCode) } } } @@ -270,15 +301,16 @@ func createClientRequest(targetURL string, action git2go.SmartServiceAction, } func (t *httpSmartSubtransport) Close() error { - traceLog.Info("[http]: httpSmartSubtransport.Close()") + t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Close()") return nil } func (t *httpSmartSubtransport) Free() { - traceLog.Info("[http]: httpSmartSubtransport.Free()") + t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Free()") if t.httpTransport != nil { - traceLog.Info("[http]: release http transport back to pool") + t.logger.V(logger.TraceLevel).Info("release http transport back to pool") + pool.Release(t.httpTransport) t.httpTransport = nil } @@ -345,18 +377,18 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) { func (self *httpSmartSubtransportStream) Free() { if self.resp != nil { - traceLog.Info("[http]: httpSmartSubtransportStream.Free()") + self.owner.logger.V(logger.TraceLevel).Info("httpSmartSubtransportStream.Free()") if self.resp.Body != nil { // ensure body is fully processed and closed // for increased likelihood of transport reuse in HTTP/1.x. // it should not be a problem to do this more than once. if _, err := io.Copy(io.Discard, self.resp.Body); err != nil { - traceLog.Error(err, "[http]: cannot discard response body") + self.owner.logger.V(logger.TraceLevel).Error(err, "cannot discard response body") } if err := self.resp.Body.Close(); err != nil { - traceLog.Error(err, "[http]: cannot close response body") + self.owner.logger.V(logger.TraceLevel).Error(err, "cannot close response body") } } } @@ -399,7 +431,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error { req.ContentLength = -1 } - traceLog.Info("[http]: new request", "method", req.Method, "URL", req.URL) + self.owner.logger.V(logger.TraceLevel).Info("new request", "method", req.Method, "postUrl", req.URL) resp, err = self.client.Do(req) if err != nil { return err diff --git a/pkg/git/libgit2/managed/http_test.go b/pkg/git/libgit2/managed/http_test.go index 0aeeba45f..be47a690d 100644 --- a/pkg/git/libgit2/managed/http_test.go +++ b/pkg/git/libgit2/managed/http_test.go @@ -25,7 +25,6 @@ import ( "github.com/fluxcd/pkg/gittestserver" "github.com/fluxcd/source-controller/pkg/git" - "github.com/go-logr/logr" . "github.com/onsi/gomega" git2go "github.com/libgit2/git2go/v33" @@ -170,7 +169,7 @@ func TestHTTPManagedTransport_E2E(t *testing.T) { defer server.StopHTTP() // Force managed transport to be enabled - InitManagedTransport(logr.Discard()) + InitManagedTransport() repoPath := "test.git" err = server.InitRepo("../../testdata/git/repo", git.DefaultBranch, repoPath) @@ -253,7 +252,7 @@ func TestHTTPManagedTransport_HandleRedirect(t *testing.T) { } // Force managed transport to be enabled - InitManagedTransport(logr.Discard()) + InitManagedTransport() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/git/libgit2/managed/init.go b/pkg/git/libgit2/managed/init.go index 1d0370d4d..f452f1142 100644 --- a/pkg/git/libgit2/managed/init.go +++ b/pkg/git/libgit2/managed/init.go @@ -19,9 +19,6 @@ package managed import ( "sync" "time" - - "github.com/fluxcd/pkg/runtime/logger" - "github.com/go-logr/logr" ) var ( @@ -38,9 +35,7 @@ var ( // handshake, put/get). fullHttpClientTimeOut time.Duration = 10 * time.Minute - debugLog logr.Logger - traceLog logr.Logger - enabled bool + enabled bool ) // Enabled defines whether the use of Managed Transport is enabled which @@ -63,14 +58,10 @@ func Enabled() bool { // // This function will only register managed transports once, subsequent calls // leads to no-op. -func InitManagedTransport(log logr.Logger) error { +func InitManagedTransport() error { var err error once.Do(func() { - log.Info("Initializing managed transport") - debugLog = log.V(logger.DebugLevel) - traceLog = log.V(logger.TraceLevel) - if err = registerManagedHTTP(); err != nil { return } diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index b990dd0af..024a586a2 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -58,8 +58,11 @@ import ( "golang.org/x/crypto/ssh" "golang.org/x/net/proxy" + ctrl "sigs.k8s.io/controller-runtime" + "github.com/fluxcd/pkg/runtime/logger" "github.com/fluxcd/source-controller/pkg/git" + "github.com/go-logr/logr" git2go "github.com/libgit2/git2go/v33" ) @@ -79,17 +82,32 @@ func registerManagedSSH() error { func sshSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) { return &sshSmartSubtransport{ transport: transport, + ctx: context.Background(), + logger: logr.Discard(), }, nil } type sshSmartSubtransport struct { transport *git2go.Transport + // once is used to ensure that logger and context is set only once, + // on the initial (or only) Action call. Without this a mutex must + // be applied to ensure that ctx won't be changed, as this would be + // prone to race conditions in the stdout processing goroutine. + once sync.Once + // ctx defines the context to be used across long-running or + // cancellable operations. + // Defaults to context.Background(). + ctx context.Context + // logger keeps a logger instance for logging. This was preferred + // due to the need to have a correlation ID and Address set and + // reused across all log calls. + // If context is not set, this defaults to logr.Discard(). + logger logr.Logger + lastAction git2go.SmartServiceAction stdin io.WriteCloser stdout io.Reader - addr string - ctx context.Context con connection } @@ -111,8 +129,6 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go. return nil, fmt.Errorf("could not find transport options for object: %s", transportOptionsURL) } - t.ctx = opts.Context - u, err := url.Parse(opts.TargetURL) if err != nil { return nil, err @@ -158,7 +174,16 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go. if u.Port() != "" { port = u.Port() } - t.addr = net.JoinHostPort(u.Hostname(), port) + addr := net.JoinHostPort(u.Hostname(), port) + + t.once.Do(func() { + if opts.Context != nil { + t.ctx = opts.Context + t.logger = ctrl.LoggerFrom(t.ctx, + "transportType", "ssh", + "addr", addr) + } + }) sshConfig, err := createClientConfig(opts.AuthOpts) if err != nil { @@ -191,12 +216,12 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go. } t.con.m.RUnlock() - err = t.createConn(t.addr, sshConfig) + err = t.createConn(addr, sshConfig) if err != nil { return nil, err } - traceLog.Info("[ssh]: creating new ssh session") + t.logger.V(logger.TraceLevel).Info("creating new ssh session") if t.con.session, err = t.con.client.NewSession(); err != nil { return nil, err } @@ -222,8 +247,8 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go. // In case this goroutine panics, handle recovery. if r := recover(); r != nil { - traceLog.Error(errors.New(r.(string)), - "[ssh]: recovered from libgit2 ssh smart subtransport panic", "address", t.addr) + t.logger.V(logger.TraceLevel).Error(errors.New(r.(string)), + "recovered from libgit2 ssh smart subtransport panic") } }() @@ -259,7 +284,7 @@ func (t *sshSmartSubtransport) Action(transportOptionsURL string, action git2go. } }() - traceLog.Info("[ssh]: run on remote", "cmd", cmd) + t.logger.V(logger.TraceLevel).Info("run on remote", "cmd", cmd) if err := t.con.session.Start(cmd); err != nil { return nil, err } @@ -276,6 +301,7 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf ctx, cancel := context.WithTimeout(context.TODO(), sshConnectionTimeOut) defer cancel() + t.logger.V(logger.TraceLevel).Info("dial connection") conn, err := proxy.Dial(ctx, "tcp", addr) if err != nil { return err @@ -303,9 +329,10 @@ func (t *sshSmartSubtransport) createConn(addr string, sshConfig *ssh.ClientConf // may impair the transport to have successful actions on a new // SmartSubTransport (i.e. unreleased resources, staled connections). func (t *sshSmartSubtransport) Close() error { - traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr) + t.logger.V(logger.TraceLevel).Info("sshSmartSubtransport.Close()") t.con.m.Lock() defer t.con.m.Unlock() + t.con.currentStream = nil if t.con.client != nil && t.stdin != nil { _ = t.stdin.Close() @@ -313,13 +340,14 @@ func (t *sshSmartSubtransport) Close() error { t.stdin = nil if t.con.session != nil { - traceLog.Info("[ssh]: session.Close()", "server", t.addr) + t.logger.V(logger.TraceLevel).Info("session.Close()") _ = t.con.session.Close() } t.con.session = nil if t.con.client != nil { _ = t.con.client.Close() + t.logger.V(logger.TraceLevel).Info("close client") } t.con.connected = false @@ -343,7 +371,6 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) { } func (stream *sshSmartSubtransportStream) Free() { - traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()") } func createClientConfig(authOpts *git.AuthOptions) (*ssh.ClientConfig, error) { diff --git a/pkg/git/libgit2/managed/ssh_test.go b/pkg/git/libgit2/managed/ssh_test.go index 81b83f3cc..0d18c1a83 100644 --- a/pkg/git/libgit2/managed/ssh_test.go +++ b/pkg/git/libgit2/managed/ssh_test.go @@ -23,7 +23,6 @@ import ( "github.com/fluxcd/pkg/ssh" "github.com/fluxcd/source-controller/pkg/git" - "github.com/go-logr/logr" . "github.com/onsi/gomega" "github.com/fluxcd/pkg/gittestserver" @@ -89,7 +88,7 @@ func TestSSHManagedTransport_E2E(t *testing.T) { server.StartSSH() }() defer server.StopSSH() - InitManagedTransport(logr.Discard()) + InitManagedTransport() kp, err := ssh.NewEd25519Generator().Generate() g.Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/git/libgit2/managed_test.go b/pkg/git/libgit2/managed_test.go index deda75618..a99fe906e 100644 --- a/pkg/git/libgit2/managed_test.go +++ b/pkg/git/libgit2/managed_test.go @@ -30,7 +30,6 @@ import ( "github.com/fluxcd/gitkit" "github.com/fluxcd/pkg/gittestserver" "github.com/fluxcd/pkg/ssh" - "github.com/go-logr/logr" feathelper "github.com/fluxcd/pkg/runtime/features" . "github.com/onsi/gomega" @@ -471,5 +470,5 @@ func getTransportOptionsURL(transport git.TransportType) string { func enableManagedTransport() { fg := feathelper.FeatureGates{} fg.SupportedFeatures(features.FeatureGates()) - managed.InitManagedTransport(logr.Discard()) + managed.InitManagedTransport() } diff --git a/pkg/git/strategy/proxy/strategy_proxy_test.go b/pkg/git/strategy/proxy/strategy_proxy_test.go index 2e83c6602..6f0564eff 100644 --- a/pkg/git/strategy/proxy/strategy_proxy_test.go +++ b/pkg/git/strategy/proxy/strategy_proxy_test.go @@ -30,7 +30,6 @@ import ( "github.com/elazarl/goproxy" "github.com/fluxcd/pkg/gittestserver" feathelper "github.com/fluxcd/pkg/runtime/features" - "github.com/go-logr/logr" . "github.com/onsi/gomega" "github.com/fluxcd/source-controller/internal/features" @@ -50,7 +49,7 @@ func TestCheckoutStrategyForImplementation_Proxied(t *testing.T) { fg := feathelper.FeatureGates{} fg.SupportedFeatures(features.FeatureGates()) - managed.InitManagedTransport(logr.Discard()) + managed.InitManagedTransport() type cleanupFunc func()