diff --git a/core/commands/p2p.go b/core/commands/p2p.go index 1fbdc8a2861..cc30cd19848 100644 --- a/core/commands/p2p.go +++ b/core/commands/p2p.go @@ -50,9 +50,17 @@ type P2PStreamsOutput struct { Streams []P2PStreamInfoOutput } +// P2PForegroundOutput is output type for foreground mode status messages +type P2PForegroundOutput struct { + Status string // "active" or "closing" + Protocol string + Address string +} + const ( allowCustomProtocolOptionName = "allow-custom-protocol" reportPeerIDOptionName = "report-peer-id" + foregroundOptionName = "foreground" ) var resolveTimeout = 10 * time.Second @@ -83,15 +91,35 @@ var p2pForwardCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Forward connections to libp2p service.", ShortDescription: ` -Forward connections made to to . +Forward connections made to to via libp2p. + +Creates a local TCP listener that tunnels connections through libp2p to a +remote peer's p2p listener. Similar to SSH port forwarding (-L flag). + +ARGUMENTS: + + Protocol name (must start with '` + P2PProtoPrefix + `') + Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000) + Remote peer multiaddr (e.g., /p2p/PeerID) + +FOREGROUND MODE (--foreground, -f): - specifies the libp2p protocol name to use for libp2p -connections and/or handlers. It must be prefixed with '` + P2PProtoPrefix + `'. + By default, the forwarder runs in the daemon and the command returns + immediately. Use --foreground to block until interrupted: -Example: - ipfs p2p forward ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/4567 /p2p/QmPeer - - Forward connections to 127.0.0.1:4567 to '` + P2PProtoPrefix + `myproto' service on /p2p/QmPeer + - Ctrl+C or SIGTERM: Removes the forwarder and exits + - 'ipfs p2p close': Removes the forwarder and exits + - Daemon shutdown: Forwarder is automatically removed + Useful for systemd services or scripts that need cleanup on exit. + +EXAMPLES: + + # Persistent forwarder (command returns immediately) + ipfs p2p forward /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID + + # Temporary forwarder (removed when command exits) + ipfs p2p forward -f /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID `, }, Arguments: []cmds.Argument{ @@ -101,6 +129,7 @@ Example: }, Options: []cmds.Option{ cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"), + cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; forwarder is removed when command exits"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { n, err := p2pGetNode(env) @@ -130,7 +159,51 @@ Example: return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace") } - return forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets) + listener, err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets) + if err != nil { + return err + } + + foreground, _ := req.Options[foregroundOptionName].(bool) + if foreground { + if err := res.Emit(&P2PForegroundOutput{ + Status: "active", + Protocol: protoOpt, + Address: listenOpt, + }); err != nil { + return err + } + // Wait for either context cancellation (Ctrl+C/daemon shutdown) + // or listener removal (ipfs p2p close) + select { + case <-req.Context.Done(): + // SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing) + n.P2P.ListenersLocal.Close(func(l p2p.Listener) bool { + return l == listener + }) + return nil + case <-listener.Done(): + // Closed via "ipfs p2p close" - emit closing message + return res.Emit(&P2PForegroundOutput{ + Status: "closing", + Protocol: protoOpt, + Address: listenOpt, + }) + } + } + + return nil + }, + Type: P2PForegroundOutput{}, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error { + if out.Status == "active" { + fmt.Fprintf(w, "Forwarding %s to %s, waiting for interrupt...\n", out.Protocol, out.Address) + } else if out.Status == "closing" { + fmt.Fprintf(w, "Received interrupt, removing forwarder for %s\n", out.Protocol) + } + return nil + }), }, } @@ -185,14 +258,38 @@ var p2pListenCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Create libp2p service.", ShortDescription: ` -Create libp2p service and forward connections made to . +Create a libp2p protocol handler that forwards incoming connections to +. - specifies the libp2p handler name. It must be prefixed with '` + P2PProtoPrefix + `'. +When a remote peer connects using 'ipfs p2p forward', the connection is +forwarded to your local service. Similar to SSH port forwarding (server side). -Example: - ipfs p2p listen ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/1234 - - Forward connections to 'myproto' libp2p service to 127.0.0.1:1234 +ARGUMENTS: + Protocol name (must start with '` + P2PProtoPrefix + `') + Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000) + +FOREGROUND MODE (--foreground, -f): + + By default, the listener runs in the daemon and the command returns + immediately. Use --foreground to block until interrupted: + + - Ctrl+C or SIGTERM: Removes the listener and exits + - 'ipfs p2p close': Removes the listener and exits + - Daemon shutdown: Listener is automatically removed + + Useful for systemd services or scripts that need cleanup on exit. + +EXAMPLES: + + # Persistent listener (command returns immediately) + ipfs p2p listen /x/myapp /ip4/127.0.0.1/tcp/3000 + + # Temporary listener (removed when command exits) + ipfs p2p listen -f /x/myapp /ip4/127.0.0.1/tcp/3000 + + # Report connecting peer ID to the target application + ipfs p2p listen -r /x/myapp /ip4/127.0.0.1/tcp/3000 `, }, Arguments: []cmds.Argument{ @@ -202,6 +299,7 @@ Example: Options: []cmds.Option{ cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"), cmds.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"), + cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; listener is removed when command exits"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { n, err := p2pGetNode(env) @@ -231,8 +329,51 @@ Example: return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace") } - _, err = n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID) - return err + listener, err := n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID) + if err != nil { + return err + } + + foreground, _ := req.Options[foregroundOptionName].(bool) + if foreground { + if err := res.Emit(&P2PForegroundOutput{ + Status: "active", + Protocol: protoOpt, + Address: targetOpt, + }); err != nil { + return err + } + // Wait for either context cancellation (Ctrl+C/daemon shutdown) + // or listener removal (ipfs p2p close) + select { + case <-req.Context.Done(): + // SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing) + n.P2P.ListenersP2P.Close(func(l p2p.Listener) bool { + return l == listener + }) + return nil + case <-listener.Done(): + // Closed via "ipfs p2p close" - emit closing message + return res.Emit(&P2PForegroundOutput{ + Status: "closing", + Protocol: protoOpt, + Address: targetOpt, + }) + } + } + + return nil + }, + Type: P2PForegroundOutput{}, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error { + if out.Status == "active" { + fmt.Fprintf(w, "Listening on %s, forwarding to %s, waiting for interrupt...\n", out.Protocol, out.Address) + } else if out.Status == "closing" { + fmt.Fprintf(w, "Received interrupt, removing listener for %s\n", out.Protocol) + } + return nil + }), }, } @@ -271,11 +412,9 @@ func checkPort(target ma.Multiaddr) error { } // forwardLocal forwards local connections to a libp2p service -func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) error { +func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) (p2p.Listener, error) { ps.AddAddrs(addr.ID, addr.Addrs, pstore.TempAddrTTL) - // TODO: return some info - _, err := p.ForwardLocal(ctx, addr.ID, proto, bindAddr) - return err + return p.ForwardLocal(ctx, addr.ID, proto, bindAddr) } const ( diff --git a/docs/changelogs/v0.40.md b/docs/changelogs/v0.40.md index 29780937f4b..1db7ec31bdc 100644 --- a/docs/changelogs/v0.40.md +++ b/docs/changelogs/v0.40.md @@ -11,7 +11,8 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team. - [Overview](#overview) - [๐Ÿ”ฆ Highlights](#-highlights) - [Routing V1 HTTP API now exposed by default](#routing-v1-http-api-now-exposed-by-default) - - [Track total size when adding pins](#track-total-size-when-adding-pins] + - [Track total size when adding pins](#track-total-size-when-adding-pins) + - [`ipfs p2p` foreground mode](#ipfs-p2p-foreground-mode) - [๐Ÿ“ Changelog](#-changelog) - [๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors](#-contributors) @@ -32,6 +33,14 @@ Example output: Fetched/Processed 336 nodes (83 MB) ``` +#### `ipfs p2p` foreground mode + +The `ipfs p2p listen` and `ipfs p2p forward` commands now support a `--foreground` (`-f`) flag that keeps the command running until interrupted. When the command exits (via Ctrl+C, SIGTERM, or daemon shutdown), the listener/forwarder is automatically removed. + +Without `--foreground`, the commands return immediately and the listener persists in the daemon (existing behavior). + +Run `ipfs p2p listen --help` or `ipfs p2p forward --help` for details and examples. + ### ๐Ÿ“ Changelog ### ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors diff --git a/docs/experimental-features.md b/docs/experimental-features.md index ad3fbdfed59..228cf5abf7d 100644 --- a/docs/experimental-features.md +++ b/docs/experimental-features.md @@ -308,6 +308,8 @@ ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /p2p/$SERVER_ID You should now be able to connect to your ssh server through a libp2p connection with `ssh [user]@127.0.0.1 -p 2222`. +> **Tip:** Both commands support `--foreground` (`-f`) flag for blocking behavior. +> Run `ipfs p2p listen --help` or `ipfs p2p forward --help` for details. ### Road to being a real feature diff --git a/p2p/listener.go b/p2p/listener.go index f5942ffa0a3..823f68e8116 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -20,6 +20,10 @@ type Listener interface { // close closes the listener. Does not affect child streams close() + + // Done returns a channel that is closed when the listener is closed. + // This allows callers to detect when a listener has been removed. + Done() <-chan struct{} } // Listeners manages a group of Listener implementations, @@ -73,15 +77,13 @@ func (r *Listeners) Register(l Listener) error { return nil } +// Close removes and closes all listeners for which matchFunc returns true. +// Returns the number of listeners closed. func (r *Listeners) Close(matchFunc func(listener Listener) bool) int { - todo := make([]Listener, 0) + var todo []Listener r.Lock() for _, l := range r.Listeners { - if !matchFunc(l) { - continue - } - - if _, ok := r.Listeners[l.key()]; ok { + if matchFunc(l) { delete(r.Listeners, l.key()) todo = append(todo, l) } diff --git a/p2p/local.go b/p2p/local.go index 98028c5d4aa..31f70e5fca1 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -23,6 +23,7 @@ type localListener struct { peer peer.ID listener manet.Listener + done chan struct{} } // ForwardLocal creates new P2P stream to a remote listener. @@ -32,6 +33,7 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I p2p: p2p, proto: proto, peer: peer, + done: make(chan struct{}), } maListener, err := manet.Listen(bindAddr) @@ -98,6 +100,11 @@ func (l *localListener) setupStream(local manet.Conn) { func (l *localListener) close() { l.listener.Close() + close(l.done) +} + +func (l *localListener) Done() <-chan struct{} { + return l.done } func (l *localListener) Protocol() protocol.ID { diff --git a/p2p/remote.go b/p2p/remote.go index b867cb313f1..fb7b7ccbae1 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -25,6 +25,8 @@ type remoteListener struct { // reportRemote if set to true makes the handler send '\n' // to target before any data is forwarded reportRemote bool + + done chan struct{} } // ForwardRemote creates new p2p listener. @@ -36,6 +38,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu addr: addr, reportRemote: reportRemote, + done: make(chan struct{}), } if err := p2p.ListenersP2P.Register(listener); err != nil { @@ -99,7 +102,13 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr { return l.addr } -func (l *remoteListener) close() {} +func (l *remoteListener) close() { + close(l.done) +} + +func (l *remoteListener) Done() <-chan struct{} { + return l.done +} func (l *remoteListener) key() protocol.ID { return l.proto diff --git a/test/cli/p2p_test.go b/test/cli/p2p_test.go new file mode 100644 index 00000000000..2400d7d8bb4 --- /dev/null +++ b/test/cli/p2p_test.go @@ -0,0 +1,430 @@ +package cli + +import ( + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "os/exec" + "slices" + "syscall" + "testing" + "time" + + "github.com/ipfs/kubo/core/commands" + "github.com/ipfs/kubo/test/cli/harness" + "github.com/stretchr/testify/require" +) + +// waitForListenerCount waits until the node has exactly the expected number of listeners. +func waitForListenerCount(t *testing.T, node *harness.Node, expectedCount int) { + t.Helper() + require.Eventually(t, func() bool { + lsOut := node.IPFS("p2p", "ls", "--enc=json") + var lsResult commands.P2PLsOutput + if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil { + return false + } + return len(lsResult.Listeners) == expectedCount + }, 5*time.Second, 100*time.Millisecond, "expected %d listeners", expectedCount) +} + +// waitForListenerProtocol waits until the node has a listener with the given protocol. +func waitForListenerProtocol(t *testing.T, node *harness.Node, protocol string) { + t.Helper() + require.Eventually(t, func() bool { + lsOut := node.IPFS("p2p", "ls", "--enc=json") + var lsResult commands.P2PLsOutput + if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil { + return false + } + return slices.ContainsFunc(lsResult.Listeners, func(l commands.P2PListenerInfoOutput) bool { + return l.Protocol == protocol + }) + }, 5*time.Second, 100*time.Millisecond, "expected listener with protocol %s", protocol) +} + +func TestP2PForeground(t *testing.T) { + t.Parallel() + + t.Run("listen foreground creates listener and removes on interrupt", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // Start foreground listener asynchronously + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/fgtest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for listener to be created + waitForListenerProtocol(t, node, "/x/fgtest") + + // Send SIGTERM + _ = res.Cmd.Process.Signal(syscall.SIGTERM) + _ = res.Cmd.Wait() + + // Wait for listener to be removed + waitForListenerCount(t, node, 0) + }) + + t.Run("listen foreground text output on SIGTERM", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // Run without --enc=json to test actual text output users see + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/sigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + waitForListenerProtocol(t, node, "/x/sigterm") + + _ = res.Cmd.Process.Signal(syscall.SIGTERM) + _ = res.Cmd.Wait() + + // Verify stdout shows "waiting for interrupt" message + stdout := res.Stdout.String() + require.Contains(t, stdout, "waiting for interrupt") + + // Note: "Received interrupt, removing listener" message is NOT visible to CLI on SIGTERM + // because the command runs in the daemon via RPC and the response stream closes before + // the message can be emitted. The important behavior is verified in the first test: + // the listener IS removed when SIGTERM is sent. + }) + + t.Run("forward foreground creates forwarder and removes on interrupt", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + forwardPort := harness.NewRandPort() + + // Start foreground forwarder asynchronously on node 0 + res := nodes[0].Runner.Run(harness.RunRequest{ + Path: nodes[0].IPFSBin, + Args: []string{"p2p", "forward", "--foreground", "/x/fgfwd", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for forwarder to be created + waitForListenerCount(t, nodes[0], 1) + + // Send SIGTERM + _ = res.Cmd.Process.Signal(syscall.SIGTERM) + _ = res.Cmd.Wait() + + // Wait for forwarder to be removed + waitForListenerCount(t, nodes[0], 0) + }) + + t.Run("forward foreground text output on SIGTERM", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + forwardPort := harness.NewRandPort() + + // Run without --enc=json to test actual text output users see + res := nodes[0].Runner.Run(harness.RunRequest{ + Path: nodes[0].IPFSBin, + Args: []string{"p2p", "forward", "--foreground", "/x/fwdsigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + waitForListenerCount(t, nodes[0], 1) + + _ = res.Cmd.Process.Signal(syscall.SIGTERM) + _ = res.Cmd.Wait() + + // Verify stdout shows "waiting for interrupt" message + stdout := res.Stdout.String() + require.Contains(t, stdout, "waiting for interrupt") + + // Note: "Received interrupt, removing forwarder" message is NOT visible to CLI on SIGTERM + // because the response stream closes before the message can be emitted. + }) + + t.Run("listen without foreground returns immediately and persists", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // This should return immediately (not block) + node.IPFS("p2p", "listen", "/x/nofg", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)) + + // Listener should still exist + waitForListenerProtocol(t, node, "/x/nofg") + + // Clean up + node.IPFS("p2p", "close", "-p", "/x/nofg") + }) + + t.Run("listen foreground text output on p2p close", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // Run without --enc=json to test actual text output users see + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/closetest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for listener to be created + waitForListenerProtocol(t, node, "/x/closetest") + + // Close the listener via ipfs p2p close command + node.IPFS("p2p", "close", "-p", "/x/closetest") + + // Wait for foreground command to exit (it should exit quickly after close) + done := make(chan error, 1) + go func() { + done <- res.Cmd.Wait() + }() + + select { + case <-done: + // Good - command exited + case <-time.After(5 * time.Second): + _ = res.Cmd.Process.Kill() + t.Fatal("foreground command did not exit after listener was closed via ipfs p2p close") + } + + // Wait for listener to be removed + waitForListenerCount(t, node, 0) + + // Verify text output shows BOTH messages when closed via p2p close + // (unlike SIGTERM, the stream is still open so "Received interrupt" is emitted) + out := res.Stdout.String() + require.Contains(t, out, "waiting for interrupt") + require.Contains(t, out, "Received interrupt, removing listener") + }) + + t.Run("forward foreground text output on p2p close", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + forwardPort := harness.NewRandPort() + + // Run without --enc=json to test actual text output users see + res := nodes[0].Runner.Run(harness.RunRequest{ + Path: nodes[0].IPFSBin, + Args: []string{"p2p", "forward", "--foreground", "/x/fwdclose", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for forwarder to be created + waitForListenerCount(t, nodes[0], 1) + + // Close the forwarder via ipfs p2p close command + nodes[0].IPFS("p2p", "close", "-a") + + // Wait for foreground command to exit + done := make(chan error, 1) + go func() { + done <- res.Cmd.Wait() + }() + + select { + case <-done: + // Good - command exited + case <-time.After(5 * time.Second): + _ = res.Cmd.Process.Kill() + t.Fatal("foreground command did not exit after forwarder was closed via ipfs p2p close") + } + + // Wait for forwarder to be removed + waitForListenerCount(t, nodes[0], 0) + + // Verify text output shows BOTH messages when closed via p2p close + out := res.Stdout.String() + require.Contains(t, out, "waiting for interrupt") + require.Contains(t, out, "Received interrupt, removing forwarder") + }) + + t.Run("listen foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + httpServerPort := harness.NewRandPort() + forwardPort := harness.NewRandPort() + + // Start HTTP server + expectedBody := "Hello from p2p tunnel!" + httpServer := &http.Server{ + Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort), + Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(expectedBody)) + }), + } + listener, err := net.Listen("tcp", httpServer.Addr) + require.NoError(t, err) + go func() { _ = httpServer.Serve(listener) }() + defer httpServer.Close() + + // Node 0: listen --foreground + listenRes := nodes[0].Runner.Run(harness.RunRequest{ + Path: nodes[0].IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, listenRes.Err) + + // Wait for listener to be created + waitForListenerProtocol(t, nodes[0], "/x/httptest") + + // Node 1: forward (non-foreground) + nodes[1].IPFS("p2p", "forward", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/"+nodes[0].PeerID().String()) + + // Verify data flows through tunnel + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort)) + require.NoError(t, err) + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + require.NoError(t, err) + require.Equal(t, expectedBody, string(body)) + + // Clean up forwarder on node 1 + nodes[1].IPFS("p2p", "close", "-a") + + // SIGTERM the listen --foreground command + _ = listenRes.Cmd.Process.Signal(syscall.SIGTERM) + _ = listenRes.Cmd.Wait() + + // Wait for listener to be removed on node 0 + waitForListenerCount(t, nodes[0], 0) + }) + + t.Run("forward foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + httpServerPort := harness.NewRandPort() + forwardPort := harness.NewRandPort() + + // Start HTTP server + expectedBody := "Hello from forward foreground tunnel!" + httpServer := &http.Server{ + Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort), + Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(expectedBody)) + }), + } + listener, err := net.Listen("tcp", httpServer.Addr) + require.NoError(t, err) + go func() { _ = httpServer.Serve(listener) }() + defer httpServer.Close() + + // Node 0: listen (non-foreground) + nodes[0].IPFS("p2p", "listen", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort)) + + // Node 1: forward --foreground + forwardRes := nodes[1].Runner.Run(harness.RunRequest{ + Path: nodes[1].IPFSBin, + Args: []string{"p2p", "forward", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[0].PeerID().String()}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, forwardRes.Err) + + // Wait for forwarder to be created + waitForListenerCount(t, nodes[1], 1) + + // Verify data flows through tunnel + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort)) + require.NoError(t, err) + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + require.NoError(t, err) + require.Equal(t, expectedBody, string(body)) + + // SIGTERM the forward --foreground command + _ = forwardRes.Cmd.Process.Signal(syscall.SIGTERM) + _ = forwardRes.Cmd.Wait() + + // Wait for forwarder to be removed on node 1 + waitForListenerCount(t, nodes[1], 0) + + // Clean up listener on node 0 + nodes[0].IPFS("p2p", "close", "-a") + }) + + t.Run("foreground command exits when daemon shuts down", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // Start foreground listener + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/daemontest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for listener to be created + waitForListenerProtocol(t, node, "/x/daemontest") + + // Stop the daemon + node.StopDaemon() + + // Wait for foreground command to exit + done := make(chan error, 1) + go func() { + done <- res.Cmd.Wait() + }() + + select { + case <-done: + // Good - foreground command exited when daemon stopped + case <-time.After(5 * time.Second): + _ = res.Cmd.Process.Kill() + t.Fatal("foreground command did not exit when daemon was stopped") + } + }) +}