From d43bfab80fc8eed572567197a43f1d31303be94b Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Mon, 27 Oct 2025 12:52:42 +0100 Subject: [PATCH 1/5] chore: Handle sftp reconnections Signed-off-by: Javier Aliaga --- bindings/sftp/proxy/proxy.go | 111 +++++++++++ bindings/sftp/sftp.go | 55 +++--- bindings/sftp/sftp_client.go | 254 +++++++++++++++++++++++++ bindings/sftp/sftp_integration_test.go | 149 ++++++++++++++- 4 files changed, 540 insertions(+), 29 deletions(-) create mode 100644 bindings/sftp/proxy/proxy.go create mode 100644 bindings/sftp/sftp_client.go diff --git a/bindings/sftp/proxy/proxy.go b/bindings/sftp/proxy/proxy.go new file mode 100644 index 0000000000..00c70a57d3 --- /dev/null +++ b/bindings/sftp/proxy/proxy.go @@ -0,0 +1,111 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sftp + +import ( + "errors" + "io" + "log" + "net" + "sync/atomic" +) + +type Proxy struct { + ListenAddr string + UpstreamAddr string + Client net.Conn + Server net.Conn + ReconnectionCount atomic.Int32 + Listener net.Listener +} + +func (p *Proxy) ListenAndServe() error { + ln, err := net.Listen("tcp", p.ListenAddr) + if err != nil { + log.Fatalf("listen: %v", err) + } + log.Printf("Proxy listening on %s -> %s", p.ListenAddr, p.UpstreamAddr) + p.Listener = ln + + for { + client, err := ln.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return nil + } + log.Printf("accept error: %v", err) + continue + } + go p.handle(client) + } +} + +func (p *Proxy) handle(client net.Conn) { + defer client.Close() + + // Connect to upstream SFTP server + server, err := net.Dial("tcp", p.UpstreamAddr) + if err != nil { + log.Printf("dial upstream: %v", err) + return + } + defer server.Close() + + p.Client = client + p.Server = server + p.ReconnectionCount.Add(1) + errCh := make(chan error, 2) + + // client -> server + go func() { + _, cErr := io.Copy(server, client) + errCh <- cErr + }() + + // server -> client + go func() { + _, cErr := io.Copy(client, server) + errCh <- cErr + }() + + // When either direction ends, close both ends + if err := <-errCh; err != nil && !isUsefullyClosed(err) { + log.Printf("proxy stream ended with error: %v", err) + } +} + +func (p *Proxy) KillServerConn() error { + return p.Server.Close() +} + +func (p *Proxy) Close() { + if p.Client != nil { + _ = p.Client.Close() + } + + if p.Server != nil { + _ = p.Server.Close() + } + + if p.Listener != nil { + _ = p.Listener.Close() + } + + p.ReconnectionCount.Store(0) +} + +// isUsefullyClosed filters common close conditions from logging noise +func isUsefullyClosed(err error) bool { + return err == io.EOF || errors.Is(err, net.ErrClosed) +} diff --git a/bindings/sftp/sftp.go b/bindings/sftp/sftp.go index 211cce85d3..536a224afb 100644 --- a/bindings/sftp/sftp.go +++ b/bindings/sftp/sftp.go @@ -1,3 +1,16 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sftp import ( @@ -25,9 +38,9 @@ const ( // Sftp is a binding for file operations on sftp server. type Sftp struct { - metadata *sftpMetadata - logger logger.Logger - sftpClient *sftpClient.Client + metadata *sftpMetadata + logger logger.Logger + c *Client } // sftpMetadata defines the sftp metadata. @@ -115,19 +128,12 @@ func (sftp *Sftp) Init(_ context.Context, metadata bindings.Metadata) error { HostKeyCallback: hostKeyCallback, } - sshClient, err := ssh.Dial("tcp", m.Address, config) - if err != nil { - return fmt.Errorf("sftp binding error: error create ssh client: %w", err) - } - - newSftpClient, err := sftpClient.NewClient(sshClient) + sftp.metadata = m + sftp.c, err = newClient(m.Address, config) if err != nil { - return fmt.Errorf("sftp binding error: error create sftp client: %w", err) + return fmt.Errorf("sftp binding error: create sftp client error: %w", err) } - sftp.metadata = m - sftp.sftpClient = newSftpClient - return nil } @@ -161,14 +167,9 @@ func (sftp *Sftp) create(_ context.Context, req *bindings.InvokeRequest) (*bindi return nil, fmt.Errorf("sftp binding error: %w", err) } - dir, fileName := sftpClient.Split(path) + c := sftp.c - err = sftp.sftpClient.MkdirAll(dir) - if err != nil { - return nil, fmt.Errorf("sftp binding error: error create dir %s: %w", dir, err) - } - - file, err := sftp.sftpClient.Create(path) + file, fileName, err := c.create(path) if err != nil { return nil, fmt.Errorf("sftp binding error: error create file %s: %w", path, err) } @@ -211,7 +212,9 @@ func (sftp *Sftp) list(_ context.Context, req *bindings.InvokeRequest) (*binding return nil, fmt.Errorf("sftp binding error: %w", err) } - files, err := sftp.sftpClient.ReadDir(path) + c := sftp.c + + files, err := c.list(path) if err != nil { return nil, fmt.Errorf("sftp binding error: error read dir %s: %w", path, err) } @@ -246,7 +249,9 @@ func (sftp *Sftp) get(_ context.Context, req *bindings.InvokeRequest) (*bindings return nil, fmt.Errorf("sftp binding error: %w", err) } - file, err := sftp.sftpClient.Open(path) + c := sftp.c + + file, err := c.get(path) if err != nil { return nil, fmt.Errorf("sftp binding error: error open file %s: %w", path, err) } @@ -272,7 +277,9 @@ func (sftp *Sftp) delete(_ context.Context, req *bindings.InvokeRequest) (*bindi return nil, fmt.Errorf("sftp binding error: %w", err) } - err = sftp.sftpClient.Remove(path) + c := sftp.c + + err = c.delete(path) if err != nil { return nil, fmt.Errorf("sftp binding error: error remove file %s: %w", path, err) } @@ -296,7 +303,7 @@ func (sftp *Sftp) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bin } func (sftp *Sftp) Close() error { - return sftp.sftpClient.Close() + return sftp.c.Close() } func (metadata sftpMetadata) getPath(requestMetadata map[string]string) (path string, err error) { diff --git a/bindings/sftp/sftp_client.go b/bindings/sftp/sftp_client.go new file mode 100644 index 0000000000..4b392b70c9 --- /dev/null +++ b/bindings/sftp/sftp_client.go @@ -0,0 +1,254 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sftp + +import ( + "errors" + "fmt" + "io" + "os" + "strings" + "sync" + + sftpClient "github.com/pkg/sftp" + "golang.org/x/crypto/ssh" +) + +type Client struct { + sshClient *ssh.Client + sftpClient *sftpClient.Client + address string + config *ssh.ClientConfig + lock sync.RWMutex + rLock sync.Mutex +} + +func newClient(address string, config *ssh.ClientConfig) (*Client, error) { + if address == "" || config == nil { + return nil, errors.New("sftp binding error: client not initialized") + } + + sshClient, err := ssh.Dial("tcp", address, config) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error create ssh client: %w", err) + } + + newSftpClient, err := sftpClient.NewClient(sshClient) + if err != nil { + _ = sshClient.Close() + return nil, fmt.Errorf("sftp binding error: error create sftp client: %w", err) + } + + return &Client{ + sshClient: sshClient, + sftpClient: newSftpClient, + address: address, + config: config, + }, nil +} + +func (c *Client) Close() error { + _ = c.sshClient.Close() + c.lock.Lock() + defer c.lock.Unlock() + return c.sftpClient.Close() +} + +func (c *Client) list(path string) ([]os.FileInfo, error) { + var fi []os.FileInfo + + fn := func() error { + var err error + c.lock.RLock() + defer c.lock.RUnlock() + fi, err = c.sftpClient.ReadDir(path) + return err + } + + err := withReconnection(c, fn) + if err != nil { + return nil, err + } + + return fi, nil +} + +func (c *Client) create(path string) (*sftpClient.File, string, error) { + dir, fileName := sftpClient.Split(path) + + var file *sftpClient.File + + createFn := func() error { + c.lock.RLock() + defer c.lock.RUnlock() + cErr := c.sftpClient.MkdirAll(dir) + if cErr != nil { + return fmt.Errorf("sftp binding error: error create dir %s: %w", dir, cErr) + } + + file, cErr = c.sftpClient.Create(path) + if cErr != nil { + return fmt.Errorf("sftp binding error: error create file %s: %w", path, cErr) + } + + return nil + } + + rErr := withReconnection(c, createFn) + if rErr != nil { + return nil, "", rErr + } + + return file, fileName, nil +} + +func (c *Client) get(path string) (*sftpClient.File, error) { + var f *sftpClient.File + + fn := func() error { + var err error + c.lock.RLock() + defer c.lock.RUnlock() + f, err = c.sftpClient.Open(path) + return err + } + + err := withReconnection(c, fn) + if err != nil { + return nil, err + } + + return f, nil +} + +func (c *Client) delete(path string) error { + fn := func() error { + var err error + c.lock.RLock() + defer c.lock.RUnlock() + err = c.sftpClient.Remove(path) + return err + } + + err := withReconnection(c, fn) + if err != nil { + return err + } + + return nil +} + +func (c *Client) ping() error { + c.lock.RLock() + defer c.lock.RUnlock() + _, err := c.sftpClient.Getwd() + if err != nil { + return err + } + return nil +} + +func withReconnection(c *Client, fn func() error) error { + err := fn() + if err == nil { + return nil + } + + if !shouldReconnect(err) { + return err + } + + rErr := doReconnect(c) + if rErr != nil { + return errors.Join(err, rErr) + } + + err = fn() + if err != nil { + return err + } + + return nil +} + +func doReconnect(c *Client) error { + c.rLock.Lock() + defer c.rLock.Unlock() + + err := c.ping() + if !shouldReconnect(err) { + return nil + } + + sshClient, err := ssh.Dial("tcp", c.address, c.config) + if err != nil { + return fmt.Errorf("sftp binding error: error create ssh client: %w", err) + } + + newSftpClient, err := sftpClient.NewClient(sshClient) + if err != nil { + _ = sshClient.Close() + return fmt.Errorf("sftp binding error: error create sftp client: %w", err) + } + + // Swap under short lock; close old clients after unlocking. + c.lock.Lock() + oldSftp := c.sftpClient + oldSSH := c.sshClient + c.sftpClient = newSftpClient + c.sshClient = sshClient + c.lock.Unlock() + + if oldSftp != nil { + _ = oldSftp.Close() + } + if oldSSH != nil { + _ = oldSSH.Close() + } + + return nil +} + +// shouldReconnect returns true if the error looks like a transport-level failure +func shouldReconnect(err error) bool { + if err == nil { + return false + } + + // Network/timeout conditions + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, os.ErrDeadlineExceeded) { + return true + } + + // Common wrapped network error messages + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "use of closed network connection"), + strings.Contains(msg, "connection reset by peer"), + strings.Contains(msg, "broken pipe"), + strings.Contains(msg, "connection refused"), + strings.Contains(msg, "network is unreachable"), + strings.Contains(msg, "no such host"): + return true + } + + // SFTP status errors that are logical, not connectivity (avoid reconnect) + if errors.Is(err, sftpClient.ErrSSHFxPermissionDenied) || + errors.Is(err, sftpClient.ErrSSHFxNoSuchFile) || + errors.Is(err, sftpClient.ErrSSHFxOpUnsupported) { + return false + } + + return true +} diff --git a/bindings/sftp/sftp_integration_test.go b/bindings/sftp/sftp_integration_test.go index 178c7535db..be70aec770 100644 --- a/bindings/sftp/sftp_integration_test.go +++ b/bindings/sftp/sftp_integration_test.go @@ -1,3 +1,16 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sftp import ( @@ -9,32 +22,49 @@ import ( "github.com/stretchr/testify/require" "github.com/dapr/components-contrib/bindings" + sftp "github.com/dapr/components-contrib/bindings/sftp/proxy" ) +const ProxySftp = "0.0.0.0:2223" + var connectionStringEnvKey = "DAPR_TEST_SFTP_CONNSTRING" // Run docker from the file location as the upload folder is relative to the test -// docker run -v ./upload:/home/foo/upload -p 2222:22 -d atmoz/sftp foo:pass:1001 +// cd proxy +// docker run --name sftp -v ./upload:/home/foo/upload -p 2222:22 -d atmoz/sftp foo:pass:1001 +// export DAPR_TEST_SFTP_CONNSTRING=localhost:2222 func TestIntegrationCases(t *testing.T) { connectionString := os.Getenv(connectionStringEnvKey) if connectionString == "" { - t.Skipf(`sftp binding integration tests skipped. To enable this test, define the connection string using environment variable '%[1]s' (example 'export %[1]s="localhost:2222")'`, connectionStringEnvKey) + t.Skipf("sftp binding integration skipped. To enable this test, define the connection string using environment variable '%[1]s' (example 'export %[1]s=\"localhost:2222\")'", connectionStringEnvKey) } t.Run("List operation", testListOperation) t.Run("Create operation", testCreateOperation) + t.Run("Reconnections", testReconnect) } func testListOperation(t *testing.T) { + proxy := &sftp.Proxy{ + ListenAddr: ProxySftp, + UpstreamAddr: os.Getenv(connectionStringEnvKey), + } + + defer proxy.Close() + go proxy.ListenAndServe() + c := Sftp{} + m := bindings.Metadata{} + m.Properties = map[string]string{ "rootPath": "/upload", - "address": os.Getenv(connectionStringEnvKey), + "address": ProxySftp, "username": "foo", "password": "pass", "insecureIgnoreHostKey": "true", } + err := c.Init(t.Context(), m) require.NoError(t, err) @@ -45,14 +75,22 @@ func testListOperation(t *testing.T) { var d []listResponse err = json.Unmarshal(r.Data, &d) require.NoError(t, err) + + assert.EqualValues(t, 1, proxy.ReconnectionCount.Load()) } func testCreateOperation(t *testing.T) { + proxy := &sftp.Proxy{ + ListenAddr: ProxySftp, + UpstreamAddr: os.Getenv(connectionStringEnvKey), + } + defer proxy.Close() + go proxy.ListenAndServe() c := Sftp{} m := bindings.Metadata{} m.Properties = map[string]string{ "rootPath": "/upload", - "address": os.Getenv(connectionStringEnvKey), + "address": ProxySftp, "username": "foo", "password": "pass", "insecureIgnoreHostKey": "true", @@ -76,7 +114,108 @@ func testCreateOperation(t *testing.T) { require.NoError(t, err) assert.NotNil(t, r.Data) - file, err := os.Stat("./upload/test.txt") + file, err := os.Stat("./proxy/upload/test.txt") require.NoError(t, err) assert.Equal(t, "test.txt", file.Name()) + assert.EqualValues(t, 1, proxy.ReconnectionCount.Load()) +} + +func testReconnect(t *testing.T) { + proxy := &sftp.Proxy{ + ListenAddr: ProxySftp, + UpstreamAddr: os.Getenv(connectionStringEnvKey), + } + defer proxy.Close() + go proxy.ListenAndServe() + + c := Sftp{} + + m := bindings.Metadata{} + + m.Properties = map[string]string{ + "rootPath": "/upload", + "address": ProxySftp, + "username": "foo", + "password": "pass", + "insecureIgnoreHostKey": "true", + } + + err := c.Init(t.Context(), m) + require.NoError(t, err) + + t.Run("List operation", func(t *testing.T) { + r, err := c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation}) + require.NoError(t, err) + assert.NotNil(t, r.Data) + + _ = proxy.KillServerConn() + + r, err = c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation}) + require.NoError(t, err) + assert.NotNil(t, r.Data) + + var d []listResponse + err = json.Unmarshal(r.Data, &d) + require.NoError(t, err) + + assert.EqualValues(t, 2, proxy.ReconnectionCount.Load()) + }) + + t.Run("List delete - no reconnection", func(t *testing.T) { + numReconnects := proxy.ReconnectionCount.Load() + _, err := c.Invoke(t.Context(), &bindings.InvokeRequest{ + Operation: bindings.DeleteOperation, + Metadata: map[string]string{ + "fileName": "file_does_not_exist.txt", + }, + }) + + require.Error(t, err) + + assert.EqualValues(t, numReconnects, proxy.ReconnectionCount.Load()) + }) + + t.Run("List delete - reconnection", func(t *testing.T) { + numReconnects := proxy.ReconnectionCount.Load() + _ = proxy.KillServerConn() + _, err := c.Invoke(t.Context(), &bindings.InvokeRequest{ + Operation: bindings.DeleteOperation, + Metadata: map[string]string{ + "fileName": "file_does_not_exist.txt", + }, + }) + + require.Error(t, err) + + assert.EqualValues(t, numReconnects+1, proxy.ReconnectionCount.Load()) + }) + + t.Run("List get - no reconnection", func(t *testing.T) { + numReconnects := proxy.ReconnectionCount.Load() + _, err := c.Invoke(t.Context(), &bindings.InvokeRequest{ + Operation: bindings.GetOperation, + Metadata: map[string]string{ + "fileName": "file_does_not_exist.txt", + }, + }) + + require.Error(t, err) + + assert.EqualValues(t, numReconnects, proxy.ReconnectionCount.Load()) + }) + + t.Run("List get - reconnection", func(t *testing.T) { + numReconnects := proxy.ReconnectionCount.Load() + _ = proxy.KillServerConn() + _, err := c.Invoke(t.Context(), &bindings.InvokeRequest{ + Operation: bindings.GetOperation, + Metadata: map[string]string{ + "fileName": "file_does_not_exist.txt", + }, + }) + + require.Error(t, err) + + assert.EqualValues(t, numReconnects+1, proxy.ReconnectionCount.Load()) + }) } From e12e24f07dc74e8d35c54efc26e5867aecaf0def Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 6 Nov 2025 17:38:15 +0100 Subject: [PATCH 2/5] chore: Review comments Signed-off-by: Javier Aliaga --- bindings/sftp/{sftp_client.go => client.go} | 3 ++- bindings/sftp/sftp_integration_test.go | 2 +- {bindings/sftp/proxy => tests/utils/sftpproxy}/proxy.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) rename bindings/sftp/{sftp_client.go => client.go} (98%) rename {bindings/sftp/proxy => tests/utils/sftpproxy}/proxy.go (99%) diff --git a/bindings/sftp/sftp_client.go b/bindings/sftp/client.go similarity index 98% rename from bindings/sftp/sftp_client.go rename to bindings/sftp/client.go index 4b392b70c9..429d2df0c7 100644 --- a/bindings/sftp/sftp_client.go +++ b/bindings/sftp/client.go @@ -20,6 +20,7 @@ import ( "os" "strings" "sync" + "syscall" sftpClient "github.com/pkg/sftp" "golang.org/x/crypto/ssh" @@ -227,7 +228,7 @@ func shouldReconnect(err error) bool { } // Network/timeout conditions - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, os.ErrDeadlineExceeded) { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, syscall.ECONNRESET) { return true } diff --git a/bindings/sftp/sftp_integration_test.go b/bindings/sftp/sftp_integration_test.go index be70aec770..22ae4c6e91 100644 --- a/bindings/sftp/sftp_integration_test.go +++ b/bindings/sftp/sftp_integration_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/dapr/components-contrib/bindings" - sftp "github.com/dapr/components-contrib/bindings/sftp/proxy" + sftp "github.com/dapr/components-contrib/tests/utils/sftpproxy" ) const ProxySftp = "0.0.0.0:2223" diff --git a/bindings/sftp/proxy/proxy.go b/tests/utils/sftpproxy/proxy.go similarity index 99% rename from bindings/sftp/proxy/proxy.go rename to tests/utils/sftpproxy/proxy.go index 00c70a57d3..3e024d1371 100644 --- a/bindings/sftp/proxy/proxy.go +++ b/tests/utils/sftpproxy/proxy.go @@ -11,7 +11,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package sftp +package sftpproxy import ( "errors" From 2501818a991237e669846be92568d060c304da5d Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Fri, 7 Nov 2025 10:02:26 +0100 Subject: [PATCH 3/5] docs: Add lock explanation Signed-off-by: Javier Aliaga --- bindings/sftp/client.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/bindings/sftp/client.go b/bindings/sftp/client.go index 429d2df0c7..ec5f0ea39b 100644 --- a/bindings/sftp/client.go +++ b/bindings/sftp/client.go @@ -183,6 +183,27 @@ func withReconnection(c *Client, fn func() error) error { return nil } +// 1) c.rLock (sync.Mutex) — reconnect serialization: +// - Ensures only one goroutine performs the reconnect sequence at a time +// (ping/check, dial SSH, create SFTP client), preventing a thundering herd +// of concurrent reconnect attempts. +// - Does NOT protect day-to-day client usage; it only coordinates who +// is allowed to perform a reconnect. +// +// 2) c.lock (sync.RWMutex) — data-plane safety and atomic swap: +// - Guards reads/writes of the active client handles (sshClient, sftpClient). +// - Regular operations hold RLock while using the clients. +// - Reconnect performs a short critical section with Lock to atomically swap +// the client pointers; old clients are closed after unlocking to keep the +// critical section small and avoid blocking readers. +// +// Why not a single RWMutex? +// - If we used only c.lock and held it while dialing/handshaking, all I/O would +// be blocked for the entire network operation, increasing latency and risk of +// contention. Worse, reconnects triggered while a caller holds RLock could +// deadlock or starve the writer. +// - Separating concerns allows: (a) fast, minimal swap under c.lock, and +// (b) serialized reconnect work under c.rLock without blocking readers. func doReconnect(c *Client) error { c.rLock.Lock() defer c.rLock.Unlock() From 90f6d6b1291acf8c965ba00526838b7adcd5ecbd Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Fri, 7 Nov 2025 10:36:52 +0100 Subject: [PATCH 4/5] chore: NewSSHClient Signed-off-by: Javier Aliaga --- bindings/sftp/client.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/bindings/sftp/client.go b/bindings/sftp/client.go index ec5f0ea39b..be9af2a74c 100644 --- a/bindings/sftp/client.go +++ b/bindings/sftp/client.go @@ -40,9 +40,9 @@ func newClient(address string, config *ssh.ClientConfig) (*Client, error) { return nil, errors.New("sftp binding error: client not initialized") } - sshClient, err := ssh.Dial("tcp", address, config) + sshClient, err := newSSHClient(address, config) if err != nil { - return nil, fmt.Errorf("sftp binding error: error create ssh client: %w", err) + return nil, err } newSftpClient, err := sftpClient.NewClient(sshClient) @@ -213,9 +213,9 @@ func doReconnect(c *Client) error { return nil } - sshClient, err := ssh.Dial("tcp", c.address, c.config) + sshClient, err := newSSHClient(c.address, c.config) if err != nil { - return fmt.Errorf("sftp binding error: error create ssh client: %w", err) + return err } newSftpClient, err := sftpClient.NewClient(sshClient) @@ -242,6 +242,14 @@ func doReconnect(c *Client) error { return nil } +func newSSHClient(address string, config *ssh.ClientConfig) (*ssh.Client, error) { + sshClient, err := ssh.Dial("tcp", address, config) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error create ssh client: %w", err) + } + return sshClient, nil +} + // shouldReconnect returns true if the error looks like a transport-level failure func shouldReconnect(err error) bool { if err == nil { From b203628ac101315183adcfa35be86e613c1874df Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Fri, 7 Nov 2025 10:39:13 +0100 Subject: [PATCH 5/5] chore: Do not reconnect if the error is sftp error Signed-off-by: Javier Aliaga --- bindings/sftp/client.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/bindings/sftp/client.go b/bindings/sftp/client.go index be9af2a74c..3c04247ce3 100644 --- a/bindings/sftp/client.go +++ b/bindings/sftp/client.go @@ -16,11 +16,8 @@ package sftp import ( "errors" "fmt" - "io" "os" - "strings" "sync" - "syscall" sftpClient "github.com/pkg/sftp" "golang.org/x/crypto/ssh" @@ -256,23 +253,6 @@ func shouldReconnect(err error) bool { return false } - // Network/timeout conditions - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, syscall.ECONNRESET) { - return true - } - - // Common wrapped network error messages - msg := strings.ToLower(err.Error()) - switch { - case strings.Contains(msg, "use of closed network connection"), - strings.Contains(msg, "connection reset by peer"), - strings.Contains(msg, "broken pipe"), - strings.Contains(msg, "connection refused"), - strings.Contains(msg, "network is unreachable"), - strings.Contains(msg, "no such host"): - return true - } - // SFTP status errors that are logical, not connectivity (avoid reconnect) if errors.Is(err, sftpClient.ErrSSHFxPermissionDenied) || errors.Is(err, sftpClient.ErrSSHFxNoSuchFile) ||