From cfd21dac774d10c5af36ca3cf93b622f2dcea731 Mon Sep 17 00:00:00 2001 From: dfawley Date: Fri, 9 Jun 2017 09:24:50 -0700 Subject: [PATCH] Create latency package for realistically simulating network latency (#1286) --- benchmark/latency/latency.go | 298 ++++++++++++++++++++++++++++++ benchmark/latency/latency_test.go | 287 ++++++++++++++++++++++++++++ 2 files changed, 585 insertions(+) create mode 100644 benchmark/latency/latency.go create mode 100644 benchmark/latency/latency_test.go diff --git a/benchmark/latency/latency.go b/benchmark/latency/latency.go new file mode 100644 index 000000000000..fc70695557a4 --- /dev/null +++ b/benchmark/latency/latency.go @@ -0,0 +1,298 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package latency provides wrappers for net.Conn, net.Listener, and +// net.Dialers, designed to interoperate to inject real-world latency into +// network connections. +package latency + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "net" + "time" + + "golang.org/x/net/context" +) + +// Dialer is a function matching the signature of net.Dial. +type Dialer func(network, address string) (net.Conn, error) + +// TimeoutDialer is a function matching the signature of net.DialTimeout. +type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error) + +// ContextDialer is a function matching the signature of +// net.Dialer.DialContext. +type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error) + +// Network represents a network with the given bandwidth, latency, and MTU +// (Maximum Transmission Unit) configuration, and can produce wrappers of +// net.Listeners, net.Conn, and various forms of dialing functions. The +// Listeners and Dialers/Conns on both sides of connections must come from this +// package, but need not be created from the same Network. Latency is computed +// when sending (in Write), and is injected when receiving (in Read). This +// allows senders' Write calls to be non-blocking, as in real-world +// applications. +// +// Note: Latency is injected by the sender specifying the absolute time data +// should be available, and the reader delaying until that time arrives to +// provide the data. This package attempts to counter-act the effects of clock +// drift and existing network latency by measuring the delay between the +// sender's transmission time and the receiver's reception time during startup. +// No attempt is made to measure the existing bandwidth of the connection. +type Network struct { + Kbps int // Kilobits per second; if non-positive, infinite + Latency time.Duration // One-way latency (sending); if non-positive, no delay + MTU int // Bytes per packet; if non-positive, infinite +} + +// Conn returns a net.Conn that wraps c and injects n's latency into that +// connection. This function also imposes latency for connection creation. +// If n's Latency is lower than the measured latency in c, an error is +// returned. +func (n *Network) Conn(c net.Conn) (net.Conn, error) { + start := now() + nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)} + if err := nc.sync(); err != nil { + return nil, err + } + sleep(start.Add(nc.delay).Sub(now())) + return nc, nil +} + +type conn struct { + net.Conn + network *Network + + readBuf *bytes.Buffer // one packet worth of data received + lastSendEnd time.Time // time the previous Write should be fully on the wire + delay time.Duration // desired latency - measured latency +} + +// header is sent before all data transmitted by the application. +type header struct { + ReadTime int64 // Time the reader is allowed to read this packet (UnixNano) + Sz int32 // Size of the data in the packet +} + +func (c *conn) Write(p []byte) (n int, err error) { + tNow := now() + if c.lastSendEnd.Before(tNow) { + c.lastSendEnd = tNow + } + for len(p) > 0 { + pkt := p + if c.network.MTU > 0 && len(pkt) > c.network.MTU { + pkt = pkt[:c.network.MTU] + p = p[c.network.MTU:] + } else { + p = nil + } + c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt))) + hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))} + if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil { + return n, err + } + x, err := c.Conn.Write(pkt) + n += x + if err != nil { + return n, err + } + } + return n, nil +} + +func (c *conn) Read(p []byte) (n int, err error) { + if c.readBuf.Len() == 0 { + var hdr header + if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil { + return 0, err + } + defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }() + + if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil { + return 0, err + } + } + // Read from readBuf. + return c.readBuf.Read(p) +} + +// sync does a handshake and then measures the latency on the network in +// coordination with the other side. +func (c *conn) sync() error { + const ( + pingMsg = "syncPing" + warmup = 10 // minimum number of iterations to measure latency + giveUp = 50 // maximum number of iterations to measure latency + accuracy = time.Millisecond // req'd accuracy to stop early + goodRun = 3 // stop early if latency within accuracy this many times + ) + + type syncMsg struct { + SendT int64 // Time sent. If zero, stop. + RecvT int64 // Time received. If zero, fill in and respond. + } + + // A trivial handshake + if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil { + return err + } + var ping [8]byte + if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil { + return err + } else if string(ping[:]) != pingMsg { + return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg) + } + + // Both sides are alive and syncing. Calculate network delay / clock skew. + att := 0 + good := 0 + var latency time.Duration + localDone, remoteDone := false, false + send := true + for !localDone || !remoteDone { + if send { + if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil { + return err + } + att++ + send = false + } + + // Block until we get a syncMsg + m := syncMsg{} + if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil { + return err + } + + if m.RecvT == 0 { + // Message initiated from other side. + if m.SendT == 0 { + remoteDone = true + continue + } + // Send response. + m.RecvT = now().UnixNano() + if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil { + return err + } + continue + } + + lag := time.Duration(m.RecvT - m.SendT) + latency += lag + avgLatency := latency / time.Duration(att) + if e := lag - avgLatency; e > -accuracy && e < accuracy { + good++ + } else { + good = 0 + } + if att < giveUp && (att < warmup || good < goodRun) { + send = true + continue + } + localDone = true + latency = avgLatency + // Tell the other side we're done. + if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil { + return err + } + } + if c.network.Latency <= 0 { + return nil + } + c.delay = c.network.Latency - latency + if c.delay < 0 { + return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency) + } + return nil +} + +// Listener returns a net.Listener that wraps l and injects n's latency in its +// connections. +func (n *Network) Listener(l net.Listener) net.Listener { + return &listener{Listener: l, network: n} +} + +type listener struct { + net.Listener + network *Network +} + +func (l *listener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + return l.network.Conn(c) +} + +// Dialer returns a Dialer that wraps d and injects n's latency in its +// connections. n's Latency is also injected to the connection's creation. +func (n *Network) Dialer(d Dialer) Dialer { + return func(network, address string) (net.Conn, error) { + conn, err := d(network, address) + if err != nil { + return nil, err + } + return n.Conn(conn) + } +} + +// TimeoutDialer returns a TimeoutDialer that wraps d and injects n's latency +// in its connections. n's Latency is also injected to the connection's +// creation. +func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer { + return func(network, address string, timeout time.Duration) (net.Conn, error) { + conn, err := d(network, address, timeout) + if err != nil { + return nil, err + } + return n.Conn(conn) + } +} + +// ContextDialer returns a ContextDialer that wraps d and injects n's latency +// in its connections. n's Latency is also injected to the connection's +// creation. +func (n *Network) ContextDialer(d ContextDialer) ContextDialer { + return func(ctx context.Context, network, address string) (net.Conn, error) { + conn, err := d(ctx, network, address) + if err != nil { + return nil, err + } + return n.Conn(conn) + } +} + +// pktTime returns the time it takes to transmit one packet of data of size b +// in bytes. +func (n *Network) pktTime(b int) time.Duration { + if n.Kbps <= 0 { + return time.Duration(0) + } + return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8)) +} + +// Wrappers for testing + +var now = time.Now +var sleep = time.Sleep diff --git a/benchmark/latency/latency_test.go b/benchmark/latency/latency_test.go new file mode 100644 index 000000000000..778efcdfbe91 --- /dev/null +++ b/benchmark/latency/latency_test.go @@ -0,0 +1,287 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package latency + +import ( + "bytes" + "net" + "reflect" + "sync" + "testing" + "time" +) + +// bufConn is a net.Conn implemented by a bytes.Buffer (which is a ReadWriter). +type bufConn struct { + *bytes.Buffer +} + +func (bufConn) Close() error { panic("unimplemented") } +func (bufConn) LocalAddr() net.Addr { panic("unimplemented") } +func (bufConn) RemoteAddr() net.Addr { panic("unimplemented") } +func (bufConn) SetDeadline(t time.Time) error { panic("unimplemneted") } +func (bufConn) SetReadDeadline(t time.Time) error { panic("unimplemneted") } +func (bufConn) SetWriteDeadline(t time.Time) error { panic("unimplemneted") } + +func restoreHooks() func() { + s := sleep + n := now + return func() { + sleep = s + now = n + } +} + +func TestConn(t *testing.T) { + defer restoreHooks()() + + // Constant time. + now = func() time.Time { return time.Unix(123, 456) } + + // Capture sleep times for checking later. + var sleepTimes []time.Duration + sleep = func(t time.Duration) { sleepTimes = append(sleepTimes, t) } + + wantSleeps := func(want ...time.Duration) { + if !reflect.DeepEqual(want, sleepTimes) { + t.Fatalf("sleepTimes = %v; want %v", sleepTimes, want) + } + sleepTimes = nil + } + + latency := 10 * time.Millisecond + c, err := (&Network{Kbps: 1, Latency: latency, MTU: 5}).Conn(bufConn{&bytes.Buffer{}}) + if err != nil { + t.Fatalf("Unexpected error creating connection: %v", err) + } + wantSleeps(latency) // Connection creation delay. + + // 1 kbps = 128 Bps. Divides evenly by 1 second using nanos. + byteLatency := time.Duration(time.Second / 128) + + write := func(b []byte) { + n, err := c.Write(b) + if n != len(b) || err != nil { + t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b)) + } + } + + write([]byte{1, 2, 3, 4, 5}) // One full packet + pkt1Time := latency + byteLatency*5 + write([]byte{6}) // One partial packet + pkt2Time := pkt1Time + byteLatency + write([]byte{7, 8, 9, 10, 11, 12, 13}) // Two packets + pkt3Time := pkt2Time + byteLatency*5 + pkt4Time := pkt3Time + byteLatency*2 + + // No reads, so no sleeps yet. + wantSleeps() + + read := func(n int, want []byte) { + b := make([]byte, n) + if rd, err := c.Read(b); err != nil || rd != len(want) { + t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil", n, rd, err, len(want)) + } + if !reflect.DeepEqual(b[:len(want)], want) { + t.Fatalf("read %v; want %v", b, want) + } + } + + read(1, []byte{1}) + wantSleeps(pkt1Time) + read(1, []byte{2}) + wantSleeps() + read(3, []byte{3, 4, 5}) + wantSleeps() + read(2, []byte{6}) + wantSleeps(pkt2Time) + read(2, []byte{7, 8}) + wantSleeps(pkt3Time) + read(10, []byte{9, 10, 11}) + wantSleeps() + read(10, []byte{12, 13}) + wantSleeps(pkt4Time) +} + +func TestSync(t *testing.T) { + defer restoreHooks()() + + // Infinitely fast CPU: time doesn't pass unless sleep is called. + tn := time.Unix(123, 0) + now = func() time.Time { return tn } + sleep = func(d time.Duration) { tn = tn.Add(d) } + + // Simulate a 20ms latency network, then run sync across that and expect to + // measure 20ms latency, or 10ms additional delay for a 30ms network. + slowConn, err := (&Network{Kbps: 0, Latency: 20 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}}) + if err != nil { + t.Fatalf("Unexpected error creating connection: %v", err) + } + c, err := (&Network{Latency: 30 * time.Millisecond}).Conn(slowConn) + if err != nil { + t.Fatalf("Unexpected error creating connection: %v", err) + } + if c.(*conn).delay != 10*time.Millisecond { + t.Fatalf("c.delay = %v; want 10ms", c.(*conn).delay) + } +} + +func TestSyncTooSlow(t *testing.T) { + defer restoreHooks()() + + // Infinitely fast CPU: time doesn't pass unless sleep is called. + tn := time.Unix(123, 0) + now = func() time.Time { return tn } + sleep = func(d time.Duration) { tn = tn.Add(d) } + + // Simulate a 10ms latency network, then attempt to simulate a 5ms latency + // network and expect an error. + slowConn, err := (&Network{Kbps: 0, Latency: 10 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}}) + if err != nil { + t.Fatalf("Unexpected error creating connection: %v", err) + } + + errWant := "measured network latency (10ms) higher than desired latency (5ms)" + if _, err := (&Network{Latency: 5 * time.Millisecond}).Conn(slowConn); err == nil || err.Error() != errWant { + t.Fatalf("Conn() = _, %q; want _, %q", err, errWant) + } +} + +func TestListenerAndDialer(t *testing.T) { + defer restoreHooks()() + + tn := time.Unix(123, 0) + startTime := tn + mu := &sync.Mutex{} + now = func() time.Time { + mu.Lock() + defer mu.Unlock() + return tn + } + + n := &Network{Kbps: 2, Latency: 20 * time.Millisecond, MTU: 10} + // 2 kbps = .25 kBps = 256 Bps + byteLatency := func(n int) time.Duration { + return time.Duration(n) * time.Second / 256 + } + + // Create a real listener and wrap it. + l, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("Unexpected error creating listener: %v", err) + } + defer l.Close() + l = n.Listener(l) + + var serverConn net.Conn + var scErr error + scDone := make(chan struct{}) + go func() { + serverConn, scErr = l.Accept() + close(scDone) + }() + + // Create a dialer and use it. + clientConn, err := n.TimeoutDialer(net.DialTimeout)("tcp", l.Addr().String(), 2*time.Second) + if err != nil { + t.Fatalf("Unexpected error dialing: %v", err) + } + defer clientConn.Close() + + // Block until server's Conn is available. + <-scDone + if scErr != nil { + t.Fatalf("Unexpected error listening: %v", scErr) + } + defer serverConn.Close() + + // sleep (only) advances tn. Done after connections established so sync detects zero delay. + sleep = func(d time.Duration) { + mu.Lock() + defer mu.Unlock() + if d > 0 { + tn = tn.Add(d) + } + } + + seq := func(a, b int) []byte { + buf := make([]byte, b-a) + for i := 0; i < b-a; i++ { + buf[i] = byte(i + a) + } + return buf + } + + pkt1 := seq(0, 10) + pkt2 := seq(10, 30) + pkt3 := seq(30, 35) + + write := func(c net.Conn, b []byte) { + n, err := c.Write(b) + if n != len(b) || err != nil { + t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b)) + } + } + + write(serverConn, pkt1) + write(serverConn, pkt2) + write(serverConn, pkt3) + write(clientConn, pkt3) + write(clientConn, pkt1) + write(clientConn, pkt2) + + if tn != startTime { + t.Fatalf("unexpected sleep in write; tn = %v; want %v", tn, startTime) + } + + read := func(c net.Conn, n int, want []byte, timeWant time.Time) { + b := make([]byte, n) + if rd, err := c.Read(b); err != nil || rd != len(want) { + t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil (read: %v)", n, rd, err, len(want), b[:rd]) + } + if !reflect.DeepEqual(b[:len(want)], want) { + t.Fatalf("read %v; want %v", b, want) + } + if !tn.Equal(timeWant) { + t.Errorf("tn after read(%v) = %v; want %v", want, tn, timeWant) + } + } + + read(clientConn, len(pkt1)+1, pkt1, startTime.Add(n.Latency+byteLatency(len(pkt1)))) + read(serverConn, len(pkt3)+1, pkt3, tn) // tn was advanced by the above read; pkt3 is shorter than pkt1 + + read(clientConn, len(pkt2), pkt2[:10], startTime.Add(n.Latency+byteLatency(len(pkt1)+10))) + read(clientConn, len(pkt2), pkt2[10:], startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2)))) + read(clientConn, len(pkt3), pkt3, startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2)+len(pkt3)))) + + read(serverConn, len(pkt1), pkt1, tn) // tn already past the arrival time due to prior reads + read(serverConn, len(pkt2), pkt2[:10], tn) + read(serverConn, len(pkt2), pkt2[10:], tn) + + // Sleep awhile and make sure the read happens disregarding previous writes + // (lastSendEnd handling). + sleep(10 * time.Second) + write(clientConn, pkt1) + read(serverConn, len(pkt1), pkt1, tn.Add(n.Latency+byteLatency(len(pkt1)))) + + // Send, sleep longer than the network delay, then make sure the read happens + // instantly. + write(serverConn, pkt1) + sleep(10 * time.Second) + read(clientConn, len(pkt1), pkt1, tn) +}