-
Notifications
You must be signed in to change notification settings - Fork 54
/
conn.go
275 lines (233 loc) · 7.84 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package measurexlite
//
// Conn tracing
//
import (
"fmt"
"net"
"time"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)
// MaybeClose is a convenience function for closing a [net.Conn] when it is not nil.
func MaybeClose(conn net.Conn) (err error) {
if conn != nil {
err = conn.Close()
}
return
}
// MaybeWrapNetConn implements model.Trace.MaybeWrapNetConn.
func (tx *Trace) MaybeWrapNetConn(conn net.Conn) net.Conn {
return &connTrace{
Conn: conn,
tx: tx,
}
}
// connTrace is a trace-aware net.Conn.
type connTrace struct {
// Implementation note: it seems safe to use embedding here because net.Conn
// is an interface from the standard library that we don't control
net.Conn
tx *Trace
}
var _ net.Conn = &connTrace{}
type remoteAddrProvider interface {
RemoteAddr() net.Addr
}
func safeRemoteAddrNetwork(rap remoteAddrProvider) (result string) {
if addr := rap.RemoteAddr(); addr != nil {
result = addr.Network()
}
return result
}
func safeRemoteAddrString(rap remoteAddrProvider) (result string) {
if addr := rap.RemoteAddr(); addr != nil {
result = addr.String()
}
return result
}
// Read implements net.Conn.Read and saves network events.
func (c *connTrace) Read(b []byte) (int, error) {
// collect preliminary stats when the connection is surely active
network := safeRemoteAddrNetwork(c)
addr := safeRemoteAddrString(c)
started := c.tx.TimeSince(c.tx.ZeroTime())
// perform the underlying network operation
count, err := c.Conn.Read(b)
// emit the network event
finished := c.tx.TimeSince(c.tx.ZeroTime())
select {
case c.tx.networkEvent <- NewArchivalNetworkEvent(
c.tx.Index(), started, netxlite.ReadOperation, network, addr, count,
err, finished, c.tx.tags...):
default: // buffer is full
}
// update per receiver statistics
c.tx.updateBytesReceivedMapNetConn(network, addr, count)
// return to the caller
return count, err
}
// updateBytesReceivedMapNetConn updates the [*Trace] bytes received map for a [net.Conn].
func (tx *Trace) updateBytesReceivedMapNetConn(network, address string, count int) {
// normalize the network name
switch network {
case "udp", "udp4", "udp6":
network = "udp"
case "tcp", "tcp4", "tcp6":
network = "tcp"
}
// create the key for inserting inside the map
key := fmt.Sprintf("%s %s", address, network)
// lock and insert into the map
tx.bytesReceivedMu.Lock()
tx.bytesReceivedMap[key] += int64(count)
tx.bytesReceivedMu.Unlock()
}
// CloneBytesReceivedMap returns a clone of the internal bytes received map. The key
// of the map is a string following the "EPNT_ADDRESS PROTO" pattern where the "EPNT_ADDRESS"
// contains the endpoint address and "PROTO" is "tcp" or "udp".
func (tx *Trace) CloneBytesReceivedMap() (out map[string]int64) {
out = make(map[string]int64)
tx.bytesReceivedMu.Lock()
for key, value := range tx.bytesReceivedMap {
out[key] = value
}
tx.bytesReceivedMu.Unlock()
return
}
// Write implements net.Conn.Write and saves network events.
func (c *connTrace) Write(b []byte) (int, error) {
network := safeRemoteAddrNetwork(c)
addr := safeRemoteAddrString(c)
started := c.tx.TimeSince(c.tx.ZeroTime())
count, err := c.Conn.Write(b)
finished := c.tx.TimeSince(c.tx.ZeroTime())
select {
case c.tx.networkEvent <- NewArchivalNetworkEvent(
c.tx.Index(), started, netxlite.WriteOperation, network, addr, count,
err, finished, c.tx.tags...):
default: // buffer is full
}
return count, err
}
// MaybeCloseUDPLikeConn is a convenience function for closing a [model.UDPLikeConn] when it is not nil.
func MaybeCloseUDPLikeConn(conn model.UDPLikeConn) (err error) {
if conn != nil {
err = conn.Close()
}
return
}
// MaybeWrapUDPLikeConn implements model.Trace.MaybeWrapUDPLikeConn.
func (tx *Trace) MaybeWrapUDPLikeConn(conn model.UDPLikeConn) model.UDPLikeConn {
return &udpLikeConnTrace{
UDPLikeConn: conn,
tx: tx,
}
}
// udpLikeConnTrace is a trace-aware model.UDPLikeConn.
type udpLikeConnTrace struct {
// Implementation note: it seems ~safe to use embedding here because model.UDPLikeConn
// contains fields deriving from how quic-go/quic-go uses the standard library
model.UDPLikeConn
tx *Trace
}
// Read implements model.UDPLikeConn.ReadFrom and saves network events.
func (c *udpLikeConnTrace) ReadFrom(b []byte) (int, net.Addr, error) {
// record when we started measuring
started := c.tx.TimeSince(c.tx.ZeroTime())
// perform the network operation
count, addr, err := c.UDPLikeConn.ReadFrom(b)
// emit the network event
finished := c.tx.TimeSince(c.tx.ZeroTime())
address := addrStringIfNotNil(addr)
select {
case c.tx.networkEvent <- NewArchivalNetworkEvent(
c.tx.Index(), started, netxlite.ReadFromOperation, "udp", address, count,
err, finished, c.tx.tags...):
default: // buffer is full
}
// possibly collect a download speed sample
c.tx.maybeUpdateBytesReceivedMapUDPLikeConn(addr, count)
// return results to the caller
return count, addr, err
}
// maybeUpdateBytesReceivedMapUDPLikeConn updates the [*Trace] bytes received map for a [model.UDPLikeConn].
func (tx *Trace) maybeUpdateBytesReceivedMapUDPLikeConn(addr net.Addr, count int) {
// Implementation note: the address may be nil if the operation failed given that we don't
// have a fixed peer address for UDP connections
if addr != nil {
tx.updateBytesReceivedMapNetConn(addr.Network(), addr.String(), count)
}
}
// Write implements model.UDPLikeConn.WriteTo and saves network events.
func (c *udpLikeConnTrace) WriteTo(b []byte, addr net.Addr) (int, error) {
started := c.tx.TimeSince(c.tx.ZeroTime())
address := addr.String()
count, err := c.UDPLikeConn.WriteTo(b, addr)
finished := c.tx.TimeSince(c.tx.ZeroTime())
select {
case c.tx.networkEvent <- NewArchivalNetworkEvent(
c.tx.Index(), started, netxlite.WriteToOperation, "udp", address, count,
err, finished, c.tx.tags...):
default: // buffer is full
}
return count, err
}
// addrStringIfNotNil returns the string of the given addr
// unless the addr is nil, in which case it returns an empty string.
func addrStringIfNotNil(addr net.Addr) (out string) {
if addr != nil {
out = addr.String()
}
return
}
// NewArchivalNetworkEvent creates a new model.ArchivalNetworkEvent.
func NewArchivalNetworkEvent(index int64, started time.Duration, operation string,
network string, address string, count int, err error, finished time.Duration,
tags ...string) *model.ArchivalNetworkEvent {
return &model.ArchivalNetworkEvent{
Address: address,
Failure: NewFailure(err),
NumBytes: int64(count),
Operation: operation,
Proto: network,
T0: started.Seconds(),
T: finished.Seconds(),
TransactionID: index,
Tags: copyAndNormalizeTags(tags),
}
}
// NewAnnotationArchivalNetworkEvent is a simplified NewArchivalNetworkEvent
// where we create a simple annotation without attached I/O info.
func NewAnnotationArchivalNetworkEvent(
index int64, time time.Duration, operation string, tags ...string) *model.ArchivalNetworkEvent {
return NewArchivalNetworkEvent(index, time, operation, "", "", 0, nil, time, tags...)
}
// NetworkEvents drains the network events buffered inside the NetworkEvent channel.
func (tx *Trace) NetworkEvents() (out []*model.ArchivalNetworkEvent) {
for {
select {
case ev := <-tx.networkEvent:
out = append(out, ev)
default:
return // done
}
}
}
// FirstNetworkEventOrNil drains the network events buffered inside the NetworkEvents channel
// and returns the first NetworkEvent, if any. Otherwise, it returns nil.
func (tx *Trace) FirstNetworkEventOrNil() *model.ArchivalNetworkEvent {
ev := tx.NetworkEvents()
if len(ev) < 1 {
return nil
}
return ev[0]
}
// copyAndNormalizeTags ensures that we map nil tags to []string
// and that we return a copy of the tags.
func copyAndNormalizeTags(tags []string) []string {
if len(tags) <= 0 {
tags = []string{}
}
return append([]string{}, tags...)
}