diff --git a/pkg/git/libgit2/managed/const.go b/pkg/git/libgit2/managed/const.go new file mode 100644 index 000000000..f41035da7 --- /dev/null +++ b/pkg/git/libgit2/managed/const.go @@ -0,0 +1,27 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package managed + +const ( + // URLMaxLength represents the max length for the entire URL + // when cloning Git repositories via HTTP(S). + URLMaxLength = 2048 + + // PathMaxLength represents the max length for the path element + // when cloning Git repositories via SSH. + PathMaxLength = 4096 +) diff --git a/pkg/git/libgit2/managed/http.go b/pkg/git/libgit2/managed/http.go index 04e1c54b1..09c0ee26a 100644 --- a/pkg/git/libgit2/managed/http.go +++ b/pkg/git/libgit2/managed/http.go @@ -171,6 +171,10 @@ func createClientRequest(targetUrl string, action git2go.SmartServiceAction, t * } } + if len(finalUrl) > URLMaxLength { + return nil, nil, fmt.Errorf("URL exceeds the max length (%d)", URLMaxLength) + } + client := &http.Client{ Transport: t, Timeout: fullHttpClientTimeOut, diff --git a/pkg/git/libgit2/managed/ssh.go b/pkg/git/libgit2/managed/ssh.go index 31dd6cdfe..a36ac1660 100644 --- a/pkg/git/libgit2/managed/ssh.go +++ b/pkg/git/libgit2/managed/ssh.go @@ -101,11 +101,18 @@ type sshSmartSubtransport struct { stdin io.WriteCloser stdout io.Reader currentStream *sshSmartSubtransportStream + ckey string + addr string } // aMux is the read-write mutex to control access to sshClients. var aMux sync.RWMutex +type cachedClient struct { + *ssh.Client + activeSessions uint16 +} + // sshClients stores active ssh clients/connections to be reused. // // Once opened, connections will be kept cached until an error occurs @@ -113,7 +120,7 @@ var aMux sync.RWMutex // a follow-up cache miss. // // The key must be based on cacheKey, refer to that function's comments. -var sshClients map[string]*ssh.Client = make(map[string]*ssh.Client) +var sshClients map[string]*cachedClient = make(map[string]*cachedClient) func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) { runtime.LockOSThread() @@ -124,13 +131,20 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return nil, err } + if len(u.Path) > PathMaxLength { + return nil, fmt.Errorf("path exceeds the max length (%d)", PathMaxLength) + } + + // decode URI's path + uPath, err := url.PathUnescape(u.Path) + if err != nil { + return nil, err + } + // Escape \ and '. - uPath := strings.Replace(u.Path, `\`, `\\`, -1) + uPath = strings.Replace(uPath, `\`, `\\`, -1) uPath = strings.Replace(uPath, `'`, `\'`, -1) - // TODO: Add percentage decode similar to libgit2. - // Refer: https://github.com/libgit2/libgit2/blob/358a60e1b46000ea99ef10b4dd709e92f75ff74b/src/str.c#L455-L481 - var cmd string switch action { case git2go.SmartServiceActionUploadpackLs, git2go.SmartServiceActionUploadpack: @@ -138,9 +152,8 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi if t.lastAction == git2go.SmartServiceActionUploadpackLs { return t.currentStream, nil } - if err := t.Close(); err != nil { - traceLog.Error(err, "[ssh]: error cleaning up previous stream") - } + // Disregard errors from previous stream, futher details inside Close(). + _ = t.Close() } cmd = fmt.Sprintf("git-upload-pack '%s'", uPath) @@ -149,9 +162,8 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi if t.lastAction == git2go.SmartServiceActionReceivepackLs { return t.currentStream, nil } - if err := t.Close(); err != nil { - traceLog.Error(err, "[ssh]: error cleaning up previous stream") - } + // Disregard errors from previous stream, futher details inside Close(). + _ = t.Close() } cmd = fmt.Sprintf("git-receive-pack '%s'", uPath) @@ -159,7 +171,7 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return nil, fmt.Errorf("unexpected action: %v", action) } - cred, err := t.transport.SmartCredentials("", git2go.CredentialTypeSSHKey|git2go.CredentialTypeSSHMemory) + cred, err := t.transport.SmartCredentials("", git2go.CredentialTypeSSHMemory) if err != nil { return nil, err } @@ -171,11 +183,14 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi port = u.Port() } addr = fmt.Sprintf("%s:%s", u.Hostname(), port) + t.addr = addr ckey, sshConfig, err := cacheKeyAndConfig(addr, cred) if err != nil { return nil, err } + t.ckey = ckey + sshConfig.HostKeyCallback = func(hostname string, remote net.Addr, key ssh.PublicKey) error { marshaledKey := key.Marshal() cert := &git2go.Certificate{ @@ -193,51 +208,47 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return t.transport.SmartCertificateCheck(cert, true, hostname) } - aMux.RLock() + var cacheHit bool + aMux.Lock() if c, ok := sshClients[ckey]; ok { traceLog.Info("[ssh]: cache hit", "remoteAddress", addr) - t.client = c + t.client = c.Client + cacheHit = true + c.activeSessions++ } - aMux.RUnlock() + aMux.Unlock() if t.client == nil { + cacheHit = false traceLog.Info("[ssh]: cache miss", "remoteAddress", addr) - - aMux.Lock() - defer aMux.Unlock() - - // In some scenarios the ssh handshake can hang indefinitely at - // golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop. - // - // xref: https://github.com/golang/go/issues/51926 - done := make(chan error, 1) - go func() { - t.client, err = ssh.Dial("tcp", addr, sshConfig) - done <- err - }() - - dialTimeout := sshConfig.Timeout + (30 * time.Second) - - select { - case doneErr := <-done: - if doneErr != nil { - err = fmt.Errorf("ssh.Dial: %w", doneErr) - } - case <-time.After(dialTimeout): - err = fmt.Errorf("timed out waiting for ssh.Dial after %s", dialTimeout) - } - + err := t.createConn(ckey, addr, sshConfig) if err != nil { return nil, err } - - sshClients[ckey] = t.client } traceLog.Info("[ssh]: creating new ssh session") if t.session, err = t.client.NewSession(); err != nil { discardCachedSshClient(ckey) - return nil, err + + // if the current connection was cached, we can try again + // as this may be a stale connection. + if !cacheHit { + return nil, err + } + + traceLog.Info("[ssh]: cached connection was stale, retrying...") + err = t.createConn(ckey, addr, sshConfig) + if err != nil { + return nil, err + } + + traceLog.Info("[ssh]: creating new ssh session with new connection") + t.session, err = t.client.NewSession() + if err != nil { + discardCachedSshClient(ckey) + return nil, err + } } if t.stdin, err = t.session.StdinPipe(); err != nil { @@ -264,28 +275,83 @@ func (t *sshSmartSubtransport) Action(urlString string, action git2go.SmartServi return t.currentStream, nil } -func (t *sshSmartSubtransport) Close() error { - var returnErr error +func (t *sshSmartSubtransport) createConn(ckey, addr string, sshConfig *ssh.ClientConfig) error { + // In some scenarios the ssh handshake can hang indefinitely at + // golang.org/x/crypto/ssh.(*handshakeTransport).kexLoop. + // + // xref: https://github.com/golang/go/issues/51926 + done := make(chan error, 1) + var err error + + var c *ssh.Client + go func() { + c, err = ssh.Dial("tcp", addr, sshConfig) + done <- err + }() + + dialTimeout := sshConfig.Timeout + (30 * time.Second) + + select { + case doneErr := <-done: + if doneErr != nil { + err = fmt.Errorf("ssh.Dial: %w", doneErr) + } + case <-time.After(dialTimeout): + err = fmt.Errorf("timed out waiting for ssh.Dial after %s", dialTimeout) + } + + if err != nil { + return err + } + + t.client = c + + // Mutex is set here to avoid the network latency being + // absorbed by all competing goroutines. + aMux.Lock() + defer aMux.Unlock() + + // A different goroutine won the race, dispose the connection + // and carry on. + if _, ok := sshClients[ckey]; ok { + go func() { + _ = c.Close() + }() + return nil + } + + sshClients[ckey] = &cachedClient{ + Client: c, + activeSessions: 1, + } + + return nil +} - traceLog.Info("[ssh]: sshSmartSubtransport.Close()") +// Close closes the smart subtransport. +// +// This is called internally ahead of a new action, and also +// upstream by the transport handler: +// https://github.com/libgit2/git2go/blob/0e8009f00a65034d196c67b1cdd82af6f12c34d3/transport.go#L409 +// +// Avoid returning errors, but focus on releasing anything that +// may impair the transport to have successful actions on a new +// SmartSubTransport (i.e. unreleased resources, staled connections). +func (t *sshSmartSubtransport) Close() error { + traceLog.Info("[ssh]: sshSmartSubtransport.Close()", "server", t.addr) t.currentStream = nil if t.client != nil && t.stdin != nil { - if err := t.stdin.Close(); err != nil { - returnErr = fmt.Errorf("cannot close stdin: %w", err) - } + _ = t.stdin.Close() } t.client = nil if t.session != nil { - traceLog.Info("[ssh]: skipping session.wait") - traceLog.Info("[ssh]: session.Close()") - if err := t.session.Close(); err != nil { - returnErr = fmt.Errorf("cannot close session: %w", err) - } + traceLog.Info("[ssh]: session.Close()", "server", t.addr) + _ = t.session.Close() } t.session = nil - return returnErr + return nil } func (t *sshSmartSubtransport) Free() { @@ -306,6 +372,13 @@ func (stream *sshSmartSubtransportStream) Write(buf []byte) (int, error) { func (stream *sshSmartSubtransportStream) Free() { traceLog.Info("[ssh]: sshSmartSubtransportStream.Free()") + if stream.owner == nil { + return + } + + if stream.owner.ckey != "" { + decrementActiveSessionIfFound(stream.owner.ckey) + } } func cacheKeyAndConfig(remoteAddress string, cred *git2go.Credential) (string, *ssh.ClientConfig, error) { @@ -376,8 +449,41 @@ func discardCachedSshClient(key string) { aMux.Lock() defer aMux.Unlock() - if _, found := sshClients[key]; found { - traceLog.Info("[ssh]: discard cached ssh client") + if v, found := sshClients[key]; found { + traceLog.Info("[ssh]: discard cached ssh client", "activeSessions", v.activeSessions) + closeConn := func() { + // run as async goroutine to minimise mutex time in immediate closures. + go func() { + if v.Client != nil { + _ = v.Client.Close() + } + }() + } + + // if no active sessions for this connection, close it right-away. + // otherwise, it may be used by other processes, so remove from cache, + // and schedule a delayed closure. + if v.activeSessions == 0 { + traceLog.Info("[ssh]: closing connection") + closeConn() + } else { + go func() { + // the delay must account for in-flight operations + // that depends on this connection. + time.Sleep(120 * time.Second) + traceLog.Info("[ssh]: closing connection after delay") + closeConn() + }() + } delete(sshClients, key) } } + +func decrementActiveSessionIfFound(key string) { + aMux.Lock() + defer aMux.Unlock() + + if v, found := sshClients[key]; found { + v.activeSessions-- + } +}