-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
dial.go
189 lines (149 loc) · 4.74 KB
/
dial.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package client
import (
"context"
"fmt"
"time"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
ma "github.com/multiformats/go-multiaddr"
)
const maxMessageSize = 4096
var DialTimeout = time.Minute
var DialRelayTimeout = 5 * time.Second
// relay protocol errors; used for signalling deduplication
type relayError struct {
err string
}
func (e relayError) Error() string {
return e.err
}
func newRelayError(t string, args ...interface{}) error {
return relayError{err: fmt.Sprintf(t, args...)}
}
func isRelayError(err error) bool {
_, ok := err.(relayError)
return ok
}
// dialer
func (c *Client) dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, error) {
// split /a/p2p-circuit/b into (/a, /p2p-circuit/b)
relayaddr, destaddr := ma.SplitFunc(a, func(c ma.Component) bool {
return c.Protocol().Code == ma.P_CIRCUIT
})
// If the address contained no /p2p-circuit part, the second part is nil.
if destaddr == nil {
return nil, fmt.Errorf("%s is not a relay address", a)
}
if relayaddr == nil {
return nil, fmt.Errorf("can't dial a p2p-circuit without specifying a relay: %s", a)
}
dinfo := peer.AddrInfo{ID: p}
// Strip the /p2p-circuit prefix from the destaddr so that we can pass the destination address
// (if present) for active relays
_, destaddr = ma.SplitFirst(destaddr)
if destaddr != nil {
dinfo.Addrs = append(dinfo.Addrs, destaddr)
}
rinfo, err := peer.AddrInfoFromP2pAddr(relayaddr)
if err != nil {
return nil, fmt.Errorf("error parsing relay multiaddr '%s': %w", relayaddr, err)
}
// deduplicate active relay dials to the same peer
retry:
c.mx.Lock()
dedup, active := c.activeDials[p]
if !active {
dedup = &completion{ch: make(chan struct{}), relay: rinfo.ID}
c.activeDials[p] = dedup
}
c.mx.Unlock()
if active {
select {
case <-dedup.ch:
if dedup.err != nil {
if dedup.relay != rinfo.ID {
// different relay, retry
goto retry
}
if !isRelayError(dedup.err) {
// not a relay protocol error, retry
goto retry
}
// don't try the same relay if it failed to connect with a protocol error
return nil, fmt.Errorf("concurrent active dial through the same relay failed with a protocol error")
}
return nil, fmt.Errorf("concurrent active dial succeeded")
case <-ctx.Done():
return nil, ctx.Err()
}
}
conn, err := c.dialPeer(ctx, *rinfo, dinfo)
c.mx.Lock()
dedup.err = err
close(dedup.ch)
delete(c.activeDials, p)
c.mx.Unlock()
return conn, err
}
func (c *Client) dialPeer(ctx context.Context, relay, dest peer.AddrInfo) (*Conn, error) {
log.Debugf("dialing peer %s through relay %s", dest.ID, relay.ID)
if len(relay.Addrs) > 0 {
c.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, peerstore.TempAddrTTL)
}
dialCtx, cancel := context.WithTimeout(ctx, DialRelayTimeout)
defer cancel()
s, err := c.host.NewStream(dialCtx, relay.ID, proto.ProtoIDv2Hop)
if err != nil {
return nil, fmt.Errorf("error opening hop stream to relay: %w", err)
}
return c.connect(s, dest)
}
func (c *Client) connect(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
if err := s.Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways); err != nil {
s.Reset()
return nil, err
}
defer s.Scope().ReleaseMemory(maxMessageSize)
rd := util.NewDelimitedReader(s, maxMessageSize)
wr := util.NewDelimitedWriter(s)
defer rd.Close()
var msg pbv2.HopMessage
msg.Type = pbv2.HopMessage_CONNECT.Enum()
msg.Peer = util.PeerInfoToPeerV2(dest)
s.SetDeadline(time.Now().Add(DialTimeout))
err := wr.WriteMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
msg.Reset()
err = rd.ReadMsg(&msg)
if err != nil {
s.Reset()
return nil, err
}
s.SetDeadline(time.Time{})
if msg.GetType() != pbv2.HopMessage_STATUS {
s.Reset()
return nil, newRelayError("unexpected relay response; not a status message (%d)", msg.GetType())
}
status := msg.GetStatus()
if status != pbv2.Status_OK {
s.Reset()
return nil, newRelayError("error opening relay circuit: %s (%d)", pbv2.Status_name[int32(status)], status)
}
// check for a limit provided by the relay; if the limit is not nil, then this is a limited
// relay connection and we mark the connection as transient.
var stat network.ConnStats
if limit := msg.GetLimit(); limit != nil {
stat.Limited = true
stat.Extra = make(map[interface{}]interface{})
stat.Extra[StatLimitDuration] = time.Duration(limit.GetDuration()) * time.Second
stat.Extra[StatLimitData] = limit.GetData()
}
return &Conn{stream: s, remote: dest, stat: stat, client: c}, nil
}