Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the possibility to delay UDP messages #118

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 78 additions & 12 deletions network/udp/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,23 @@ import (
"log"
"net"
"sync"
"time"

"github.com/ConsenSys/handel"
h "github.com/ConsenSys/handel"
"github.com/ConsenSys/handel/network"
)

// Network is a handel.Network implementation using UDP as its transport layer
type Network interface {
Send(identities []h.Identity, packet *h.Packet)
RegisterListener(listener h.Listener)
getListeners() []handel.Listener
Stop()
}

// UDPNetwork is a handel.Network implementation using UDP as its transport layer
// listens on 0.0.0.0
type Network struct {
type UDPNetwork struct {
sync.RWMutex
udpSock *net.UDPConn
listeners []h.Listener
Expand All @@ -31,8 +39,65 @@ type Network struct {
rcvd int
}

type delayedPacket struct {
toSendAt time.Time
identities []h.Identity
data h.Packet
}

type DelayedUDPNetwork struct {
network Network
delay time.Duration
in chan *delayedPacket
}

func (n *DelayedUDPNetwork) Send(identities []h.Identity, packet *h.Packet) {
n.in <- &delayedPacket{time.Now().Add(n.delay), identities,*packet}
}

func (n *DelayedUDPNetwork) backgroundSend() {
for dp := range n.in {
delta := dp.toSendAt.Sub(time.Now())
if delta.Nanoseconds() > time.Millisecond.Nanoseconds() {
// Not really useful to sleep for less than 1ms
time.Sleep(delta)
}
n.network.Send(dp.identities, &dp.data)
}
}

func (n *DelayedUDPNetwork) Stop() {
n.network.Stop()
close(n.in)
}

func (n *DelayedUDPNetwork) RegisterListener(listener h.Listener) {
n.network.RegisterListener(listener)
}

func (n *DelayedUDPNetwork) getListeners() []handel.Listener{
return n.network.getListeners()
}

func NewDelayedUDPNetwork(delay time.Duration, addr string, enc network.Encoding) (*DelayedUDPNetwork, error) {
n, err := NewNetwork(addr, enc)
if err != nil {
return nil, err
}

res := &DelayedUDPNetwork{
n,
delay,
make(chan *delayedPacket, 10000),
}

go res.backgroundSend()

return res, nil
}

// NewNetwork creates Network baked by udp protocol
func NewNetwork(addr string, enc network.Encoding) (*Network, error) {
func NewNetwork(addr string, enc network.Encoding) (Network, error) {
_, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
Expand All @@ -50,22 +115,23 @@ func NewNetwork(addr string, enc network.Encoding) (*Network, error) {
return nil, err
}

udpNet := &Network{
udpNet := &UDPNetwork{
udpSock: udpSock,
enc: enc,
newPacket: make(chan *handel.Packet, 20000),
process: make(chan *handel.Packet, 100),
ready: make(chan bool, 1),
done: make(chan bool, 1),
}

go udpNet.handler()
go udpNet.loop()
go udpNet.dispatchLoop()
return udpNet, nil
}

// Stop closes
func (udpNet *Network) Stop() {
func (udpNet *UDPNetwork) Stop() {
udpNet.Lock()
defer udpNet.Unlock()
if udpNet.quit {
Expand All @@ -77,14 +143,14 @@ func (udpNet *Network) Stop() {
}

//RegisterListener registers listener for processing incoming packets
func (udpNet *Network) RegisterListener(listener h.Listener) {
func (udpNet *UDPNetwork) RegisterListener(listener h.Listener) {
udpNet.Lock()
defer udpNet.Unlock()
udpNet.listeners = append(udpNet.listeners, listener)
}

//Send sends a packet to supplied identities
func (udpNet *Network) Send(identities []h.Identity, packet *h.Packet) {
func (udpNet *UDPNetwork) Send(identities []h.Identity, packet *h.Packet) {
udpNet.Lock()
udpNet.sent += len(identities)
udpNet.Unlock()
Expand All @@ -93,7 +159,7 @@ func (udpNet *Network) Send(identities []h.Identity, packet *h.Packet) {
}
}

func (udpNet *Network) send(identity h.Identity, packet *h.Packet) {
func (udpNet *UDPNetwork) send(identity h.Identity, packet *h.Packet) {
addr := identity.Address()
udpAddr, err := net.ResolveUDPAddr("udp4", addr)
if err != nil {
Expand Down Expand Up @@ -121,7 +187,7 @@ func (udpNet *Network) send(identity h.Identity, packet *h.Packet) {
//fmt.Printf("%s -> sending packet to %s\n", udpSock.LocalAddr().String(), addr)
}

func (udpNet *Network) handler() {
func (udpNet *UDPNetwork) handler() {
enc := udpNet.enc
for {
//udpNet.quit and udpNet.listeners have to be guarded by a read lock
Expand All @@ -145,7 +211,7 @@ func (udpNet *Network) handler() {
}
}

func (udpNet *Network) loop() {
func (udpNet *UDPNetwork) loop() {
pendings := list.New()
var ready = false
send := func() {
Expand Down Expand Up @@ -179,14 +245,14 @@ func (udpNet *Network) loop() {
}
}

func (udpNet *Network) getListeners() []handel.Listener {
func (udpNet *UDPNetwork) getListeners() []handel.Listener {
udpNet.RLock()
defer udpNet.RUnlock()
udpNet.rcvd++
return udpNet.listeners
}

func (udpNet *Network) dispatchLoop() {
func (udpNet *UDPNetwork) dispatchLoop() {
dispatch := func(p *handel.Packet) {
listeners := udpNet.getListeners()
for _, listener := range listeners {
Expand Down
2 changes: 1 addition & 1 deletion network/udp/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestUDPNetwork(t *testing.T) {
n1, err := NewNetwork("127.0.0.1:3000", network.NewGOBEncoding())
n1, err := NewDelayedUDPNetwork(1*time.Millisecond, "127.0.0.1:3000", network.NewGOBEncoding())
require.NoError(t, err)
n2, err := NewNetwork("127.0.0.1:3001", network.NewGOBEncoding())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion simul/bad.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Network = "udp"
Network = "delayed_udp"
Curve = "bn256/cf"
Encoding = "gob"
MonitorPort = 10000
Expand Down
2 changes: 1 addition & 1 deletion simul/config_example.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Network = "udp"
Network = "delayed_udp"
Curve = "bn256/cf"
Encoding = "gob"
MonitorPort = 9980
Expand Down
2 changes: 2 additions & 0 deletions simul/lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (c *Config) selectNetwork(id handel.Identity) (handel.Network, error) {
switch c.Network {
case "udp":
return udp.NewNetwork(id.Address(), encoding)
case "delayed_udp":
return udp.NewDelayedUDPNetwork(2000* time.Millisecond, id.Address(), encoding)
case "quic-test-insecure":
cfg := quic.NewInsecureTestConfig()
return quic.NewNetwork(id.Address(), encoding, cfg)
Expand Down
4 changes: 2 additions & 2 deletions simul/lib/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type SyncMaster struct {
exp int
probExp int // probabilistically expected nb,i.e. 95% of exp
total int
n *udp.Network
n udp.Network
states map[int]*state
}

Expand Down Expand Up @@ -193,7 +193,7 @@ type SyncSlave struct {
sync.Mutex
own string
master string
net *udp.Network
net udp.Network
ids []int
states map[int]*slaveState
}
Expand Down