Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 157 additions & 18 deletions core/commands/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ type P2PStreamsOutput struct {
Streams []P2PStreamInfoOutput
}

// P2PForegroundOutput is output type for foreground mode status messages
type P2PForegroundOutput struct {
Status string // "active" or "closing"
Protocol string
Address string
}

const (
allowCustomProtocolOptionName = "allow-custom-protocol"
reportPeerIDOptionName = "report-peer-id"
foregroundOptionName = "foreground"
)

var resolveTimeout = 10 * time.Second
Expand Down Expand Up @@ -83,15 +91,35 @@ var p2pForwardCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Forward connections to libp2p service.",
ShortDescription: `
Forward connections made to <listen-address> to <target-address>.
Forward connections made to <listen-address> to <target-address> via libp2p.

Creates a local TCP listener that tunnels connections through libp2p to a
remote peer's p2p listener. Similar to SSH port forwarding (-L flag).

ARGUMENTS:

<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
<listen-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
<target-address> Remote peer multiaddr (e.g., /p2p/PeerID)

FOREGROUND MODE (--foreground, -f):

<protocol> specifies the libp2p protocol name to use for libp2p
connections and/or handlers. It must be prefixed with '` + P2PProtoPrefix + `'.
By default, the forwarder runs in the daemon and the command returns
immediately. Use --foreground to block until interrupted:

Example:
ipfs p2p forward ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/4567 /p2p/QmPeer
- Forward connections to 127.0.0.1:4567 to '` + P2PProtoPrefix + `myproto' service on /p2p/QmPeer
- Ctrl+C or SIGTERM: Removes the forwarder and exits
- 'ipfs p2p close': Removes the forwarder and exits
- Daemon shutdown: Forwarder is automatically removed

Useful for systemd services or scripts that need cleanup on exit.

EXAMPLES:

# Persistent forwarder (command returns immediately)
ipfs p2p forward /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID

# Temporary forwarder (removed when command exits)
ipfs p2p forward -f /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
`,
},
Arguments: []cmds.Argument{
Expand All @@ -101,6 +129,7 @@ Example:
},
Options: []cmds.Option{
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; forwarder is removed when command exits"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
Expand Down Expand Up @@ -130,7 +159,51 @@ Example:
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}

return forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
listener, err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
if err != nil {
return err
}

foreground, _ := req.Options[foregroundOptionName].(bool)
if foreground {
if err := res.Emit(&P2PForegroundOutput{
Status: "active",
Protocol: protoOpt,
Address: listenOpt,
}); err != nil {
return err
}
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
// or listener removal (ipfs p2p close)
select {
case <-req.Context.Done():
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
n.P2P.ListenersLocal.Close(func(l p2p.Listener) bool {
return l == listener
})
return nil
case <-listener.Done():
// Closed via "ipfs p2p close" - emit closing message
return res.Emit(&P2PForegroundOutput{
Status: "closing",
Protocol: protoOpt,
Address: listenOpt,
})
}
}

return nil
},
Type: P2PForegroundOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
if out.Status == "active" {
fmt.Fprintf(w, "Forwarding %s to %s, waiting for interrupt...\n", out.Protocol, out.Address)
} else if out.Status == "closing" {
fmt.Fprintf(w, "Received interrupt, removing forwarder for %s\n", out.Protocol)
}
return nil
}),
},
}

Expand Down Expand Up @@ -185,14 +258,38 @@ var p2pListenCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Create libp2p service.",
ShortDescription: `
Create libp2p service and forward connections made to <target-address>.
Create a libp2p protocol handler that forwards incoming connections to
<target-address>.

<protocol> specifies the libp2p handler name. It must be prefixed with '` + P2PProtoPrefix + `'.
When a remote peer connects using 'ipfs p2p forward', the connection is
forwarded to your local service. Similar to SSH port forwarding (server side).

Example:
ipfs p2p listen ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/1234
- Forward connections to 'myproto' libp2p service to 127.0.0.1:1234
ARGUMENTS:

<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
<target-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)

FOREGROUND MODE (--foreground, -f):

By default, the listener runs in the daemon and the command returns
immediately. Use --foreground to block until interrupted:

- Ctrl+C or SIGTERM: Removes the listener and exits
- 'ipfs p2p close': Removes the listener and exits
- Daemon shutdown: Listener is automatically removed

Useful for systemd services or scripts that need cleanup on exit.

EXAMPLES:

# Persistent listener (command returns immediately)
ipfs p2p listen /x/myapp /ip4/127.0.0.1/tcp/3000

# Temporary listener (removed when command exits)
ipfs p2p listen -f /x/myapp /ip4/127.0.0.1/tcp/3000

# Report connecting peer ID to the target application
ipfs p2p listen -r /x/myapp /ip4/127.0.0.1/tcp/3000
`,
},
Arguments: []cmds.Argument{
Expand All @@ -202,6 +299,7 @@ Example:
Options: []cmds.Option{
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
cmds.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"),
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; listener is removed when command exits"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
Expand Down Expand Up @@ -231,8 +329,51 @@ Example:
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}

_, err = n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
return err
listener, err := n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
if err != nil {
return err
}

foreground, _ := req.Options[foregroundOptionName].(bool)
if foreground {
if err := res.Emit(&P2PForegroundOutput{
Status: "active",
Protocol: protoOpt,
Address: targetOpt,
}); err != nil {
return err
}
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
// or listener removal (ipfs p2p close)
select {
case <-req.Context.Done():
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
n.P2P.ListenersP2P.Close(func(l p2p.Listener) bool {
return l == listener
})
return nil
case <-listener.Done():
// Closed via "ipfs p2p close" - emit closing message
return res.Emit(&P2PForegroundOutput{
Status: "closing",
Protocol: protoOpt,
Address: targetOpt,
})
}
}

return nil
},
Type: P2PForegroundOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
if out.Status == "active" {
fmt.Fprintf(w, "Listening on %s, forwarding to %s, waiting for interrupt...\n", out.Protocol, out.Address)
} else if out.Status == "closing" {
fmt.Fprintf(w, "Received interrupt, removing listener for %s\n", out.Protocol)
}
return nil
}),
},
}

Expand Down Expand Up @@ -271,11 +412,9 @@ func checkPort(target ma.Multiaddr) error {
}

// forwardLocal forwards local connections to a libp2p service
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) error {
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) (p2p.Listener, error) {
ps.AddAddrs(addr.ID, addr.Addrs, pstore.TempAddrTTL)
// TODO: return some info
_, err := p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
return err
return p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
}

const (
Expand Down
11 changes: 10 additions & 1 deletion docs/changelogs/v0.40.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team.
- [Overview](#overview)
- [🔦 Highlights](#-highlights)
- [Routing V1 HTTP API now exposed by default](#routing-v1-http-api-now-exposed-by-default)
- [Track total size when adding pins](#track-total-size-when-adding-pins]
- [Track total size when adding pins](#track-total-size-when-adding-pins)
- [`ipfs p2p` foreground mode](#ipfs-p2p-foreground-mode)
- [📝 Changelog](#-changelog)
- [👨‍👩‍👧‍👦 Contributors](#-contributors)

Expand All @@ -32,6 +33,14 @@ Example output:
Fetched/Processed 336 nodes (83 MB)
```

#### `ipfs p2p` foreground mode

The `ipfs p2p listen` and `ipfs p2p forward` commands now support a `--foreground` (`-f`) flag that keeps the command running until interrupted. When the command exits (via Ctrl+C, SIGTERM, or daemon shutdown), the listener/forwarder is automatically removed.

Without `--foreground`, the commands return immediately and the listener persists in the daemon (existing behavior).

Run `ipfs p2p listen --help` or `ipfs p2p forward --help` for details and examples.

### 📝 Changelog

### 👨‍👩‍👧‍👦 Contributors
2 changes: 2 additions & 0 deletions docs/experimental-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /p2p/$SERVER_ID
You should now be able to connect to your ssh server through a libp2p connection
with `ssh [user]@127.0.0.1 -p 2222`.

> **Tip:** Both commands support `--foreground` (`-f`) flag for blocking behavior.
> Run `ipfs p2p listen --help` or `ipfs p2p forward --help` for details.

### Road to being a real feature

Expand Down
14 changes: 8 additions & 6 deletions p2p/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type Listener interface {

// close closes the listener. Does not affect child streams
close()

// Done returns a channel that is closed when the listener is closed.
// This allows callers to detect when a listener has been removed.
Done() <-chan struct{}
}

// Listeners manages a group of Listener implementations,
Expand Down Expand Up @@ -73,15 +77,13 @@ func (r *Listeners) Register(l Listener) error {
return nil
}

// Close removes and closes all listeners for which matchFunc returns true.
// Returns the number of listeners closed.
func (r *Listeners) Close(matchFunc func(listener Listener) bool) int {
todo := make([]Listener, 0)
var todo []Listener
r.Lock()
for _, l := range r.Listeners {
if !matchFunc(l) {
continue
}

if _, ok := r.Listeners[l.key()]; ok {
if matchFunc(l) {
delete(r.Listeners, l.key())
todo = append(todo, l)
}
Expand Down
7 changes: 7 additions & 0 deletions p2p/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type localListener struct {
peer peer.ID

listener manet.Listener
done chan struct{}
}

// ForwardLocal creates new P2P stream to a remote listener.
Expand All @@ -32,6 +33,7 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I
p2p: p2p,
proto: proto,
peer: peer,
done: make(chan struct{}),
}

maListener, err := manet.Listen(bindAddr)
Expand Down Expand Up @@ -98,6 +100,11 @@ func (l *localListener) setupStream(local manet.Conn) {

func (l *localListener) close() {
l.listener.Close()
close(l.done)
}

func (l *localListener) Done() <-chan struct{} {
return l.done
}

func (l *localListener) Protocol() protocol.ID {
Expand Down
11 changes: 10 additions & 1 deletion p2p/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type remoteListener struct {
// reportRemote if set to true makes the handler send '<base58 remote peerid>\n'
// to target before any data is forwarded
reportRemote bool

done chan struct{}
}

// ForwardRemote creates new p2p listener.
Expand All @@ -36,6 +38,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu
addr: addr,

reportRemote: reportRemote,
done: make(chan struct{}),
}

if err := p2p.ListenersP2P.Register(listener); err != nil {
Expand Down Expand Up @@ -99,7 +102,13 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr {
return l.addr
}

func (l *remoteListener) close() {}
func (l *remoteListener) close() {
close(l.done)
}

func (l *remoteListener) Done() <-chan struct{} {
return l.done
}

func (l *remoteListener) key() protocol.ID {
return l.proto
Expand Down
Loading
Loading