Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

network: Kademlia Load Balancing #1774

Merged
merged 36 commits into from
Nov 12, 2019
Merged

Conversation

kortatu
Copy link
Contributor

@kortatu kortatu commented Sep 18, 2019

The idea is to honor SWIP ethersphere/SWIPs#28. We will distribute load among peers returning to the kademlia api users a different peer each time one is needed.

Load Balancing only makes sense when the resources to chose from are similar for the requester.
We can't balance among all peers in every request. Instead we must select in each case which peers are equidistant to the target address, and sort them by least used first.
To account the number of uses we have created KademliaLoadBalance, that proxies calls to Kademlia sorting the peers by use.

New peers can't be initialized with 0 uses, because in that case, if the system has been running for a long time, that peer will be overused until it reaches the level of the rest of the peers. Instead we are taking the number of uses of the least used peer in the same bin as this new peer.
There could be other alternatives (as resetting to 0 in each new peer addition, select the least used globally, etc...)

For the moment, the only part of the system using this Load Balancer is PSS forwarding as a proof of concept.

fixes #1757

@nolash
Copy link
Contributor

nolash commented Sep 19, 2019

@alvaro

There is no capabilityIndex for the global capability kademlia table, so we need to add another searching color field in the kademlia type:

Actually we could just move the embedded conns and addrs in kademlia to a capabilityIndex named eg "default" ? If it makes it easier, I mean.

@nolash
Copy link
Contributor

nolash commented Sep 19, 2019

jesus never mind thats exactly what you did

@kortatu
Copy link
Contributor Author

kortatu commented Sep 19, 2019

@alvaro

There is no capabilityIndex for the global capability kademlia table, so we need to add another searching color field in the kademlia type:

Actually we could just move the embedded conns and addrs in kademlia to a capabilityIndex named eg "default" ? If it makes it easier, I mean.

Hmm, I like "default" more than "global"

Copy link
Contributor

@nolash nolash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach.

network/kademlia_test.go Outdated Show resolved Hide resolved
network/kademlia_test.go Outdated Show resolved Hide resolved
network/kademlia.go Outdated Show resolved Hide resolved
@kortatu kortatu self-assigned this Sep 27, 2019
@kortatu kortatu requested a review from janos October 29, 2019 17:20
network/kademlia_load_balancer.go Outdated Show resolved Hide resolved
network/kademlia_load_balancer.go Outdated Show resolved Hide resolved
network/kademlia_load_balancer.go Outdated Show resolved Hide resolved
network/kademlia_load_balancer.go Outdated Show resolved Hide resolved
network/kademlia_load_balancer.go Show resolved Hide resolved
network/kademlia_load_balancer.go Show resolved Hide resolved
network/kademlia_load_balancer.go Outdated Show resolved Hide resolved
network/kademlia_load_balancer_test.go Outdated Show resolved Hide resolved
network/kademlia_load_balancer_test.go Outdated Show resolved Hide resolved
@acud
Copy link
Member

acud commented Oct 30, 2019

@kortatu I had a quick chat with @nolash about this PR. My assumption is that the improvement vector by merging this should be with better retrieval performance, however it seems that this code is not explicitly nor implicitly called from the retrieval package (which just calls kademlia.EachConn)

@janos
Copy link
Member

janos commented Oct 30, 2019

I am curious if pubsub ordering is important. This test shows that the ordering of published messages is not preserved on subscribers end:

func TestMessagesOrder(t *testing.T) {
    ps := pubsubchannel.New()
    defer ps.Close()

    s := ps.Subscribe()

    for i := 0; i < 1000; i++ {
        ps.Publish(i)
    }
    c := s.ReceiveChannel()

    var n int
    timeout := time.After(2 * time.Second)
loop:
    for {
        select {
        case i, ok := <-c:
            if i != n {
                t.Errorf("got %v, want %v", i, n)
            }
            if !ok {
                break loop
            }
            n++
        case <-timeout:
            break loop
        }
    }
}

By using two independent subscriptions for adding and removing peers in stats without preserving order, is it possible that in highly dynamic environment there is a data race? That first a message about peer being removed is received, resulting a removal from ResourceUseStats which silently passes, and then receiving a message from another channel about the same peer being added, the same that is already removed, and adding it to ResourceUseStats. Is that possible and can it create problems?

(previously posted on the wrong PR)

@janos
Copy link
Member

janos commented Oct 30, 2019

I have looked more about the problem with order of removals in ResourceUseStats. It appears that it can be quite drastic. This tests just adds and removes peers one by one, but on subtest has a delay between them, and one does not. The test without delays with high number of unexpected length 5-9 out of 10. Higher number of peers produce proportional results.

It can be easily seen that the problem is in ResourceUseStats.RemoveResource when removing a not yet added key.

func TestResourceUseStats(t *testing.T) {

	testResourceUseStats := func(t *testing.T, delay time.Duration) {
		k := NewKademlia(make([]byte, 32), NewKadParams())
		lb := NewKademliaLoadBalancer(k, false)

		for i := uint64(0); i < 10; i++ {
			a := make([]byte, 8)
			binary.BigEndian.PutUint64(a, i)
			p := NewPeer(&BzzPeer{BzzAddr: NewBzzAddr(a, a)}, nil)
			k.On(p)
			if delay > 0 {
				time.Sleep(delay)
			}
			k.Off(p)
			if delay > 0 {
				time.Sleep(delay)
			}
		}

		lb.Stop()

		count := lb.resourceUseStats.Len()
		if count > 0 {
			t.Errorf("got resourceUseStats %v, want 0, uses: %v", count, lb.resourceUseStats.DumpAllUses())
		}
	}

	t.Run("no delay", func(t *testing.T) {
		testResourceUseStats(t, 0)
	})
	t.Run("1ms delay", func(t *testing.T) {
		testResourceUseStats(t, time.Millisecond)
	})
}
=== RUN   TestResourceUseStats
=== RUN   TestResourceUseStats/no_delay
=== RUN   TestResourceUseStats/1ms_delay
�[33mWARN �[0m[10-30|17:02:48.033|github.com/ethersphere/swarm/network/pubsubchannel/pubsub.go:95] Subscription closed before message delivery 
�[33mWARN �[0m[10-30|17:02:48.033|github.com/ethersphere/swarm/network/pubsubchannel/pubsub.go:95] Subscription closed before message delivery 
�[33mWARN �[0m[10-30|17:02:48.033|github.com/ethersphere/swarm/network/pubsubchannel/pubsub.go:95] Subscription closed before message delivery 
--- FAIL: TestResourceUseStats (0.03s)
    --- FAIL: TestResourceUseStats/no_delay (0.00s)
        /Users/janos/go/src/github.com/ethersphere/swarm/network/kademlia_load_balancer_test.go:56: got resourceUseStats 8, want 0, uses: map[0x0000000000000000:0 0x0000000000000001:0 0x0000000000000002:0 0x0000000000000003:0 0x0000000000000004:0 0x0000000000000005:0 0x0000000000000006:0 0x0000000000000007:0]
    --- PASS: TestResourceUseStats/1ms_delay (0.03s)
FAIL
FAIL	github.com/ethersphere/swarm/network	0.876s

I do not think that we should have such data races. Message delivery time and order by pubsub is not guaranteed and it is possible to create invalid ResourceUseStats state at high load on production depending on goroutine scheduling, which is not deterministic.

@kortatu
Copy link
Contributor Author

kortatu commented Oct 30, 2019

@kortatu I had a quick chat with @nolash about this PR. My assumption is that the improvement vector by merging this should be with better retrieval performance, however it seems that this code is not explicitly nor implicitly called from the retrieval package (which just calls kademlia.EachConn)

You are right, I expose also this from the start. I just implemented it and used it in pss. I didn't use this package in retrieval because I don't have gthe knowledge to test that part, so if anyone could jump in and help or direct me a bit would be great.

@kortatu kortatu requested a review from acud October 31, 2019 07:57
@acud
Copy link
Member

acud commented Nov 3, 2019

You are right, I expose also this from the start. I just implemented it and used it in pss. I didn't use this package in retrieval because I don't have gthe knowledge to test that part, so if anyone could jump in and help or direct me a bit would be great.

Cool. Thanks for letting me know. Let's try to get this merged and I'll try to jump on this

…a subscription inbox before blocking. This will ensure order messages delivery if the inbox is never full
@kortatu
Copy link
Contributor Author

kortatu commented Nov 4, 2019

I have looked more about the problem with order of removals in ResourceUseStats. It appears that it can be quite drastic. This tests just adds and removes peers one by one, but on subtest has a delay between them, and one does not. The test without delays with high number of unexpected length 5-9 out of 10. Higher number of peers produce proportional results.

It can be easily seen that the problem is in ResourceUseStats.RemoveResource when removing a not yet added key.

func TestResourceUseStats(t *testing.T) {

	testResourceUseStats := func(t *testing.T, delay time.Duration) {
		k := NewKademlia(make([]byte, 32), NewKadParams())
		lb := NewKademliaLoadBalancer(k, false)

		for i := uint64(0); i < 10; i++ {
			a := make([]byte, 8)
			binary.BigEndian.PutUint64(a, i)
			p := NewPeer(&BzzPeer{BzzAddr: NewBzzAddr(a, a)}, nil)
			k.On(p)
			if delay > 0 {
				time.Sleep(delay)
			}
			k.Off(p)
			if delay > 0 {
				time.Sleep(delay)
			}
		}

		lb.Stop()

		count := lb.resourceUseStats.Len()
		if count > 0 {
			t.Errorf("got resourceUseStats %v, want 0, uses: %v", count, lb.resourceUseStats.DumpAllUses())
		}
	}

	t.Run("no delay", func(t *testing.T) {
		testResourceUseStats(t, 0)
	})
	t.Run("1ms delay", func(t *testing.T) {
		testResourceUseStats(t, time.Millisecond)
	})
}
=== RUN   TestResourceUseStats
=== RUN   TestResourceUseStats/no_delay
=== RUN   TestResourceUseStats/1ms_delay
�[33mWARN �[0m[10-30|17:02:48.033|github.com/ethersphere/swarm/network/pubsubchannel/pubsub.go:95] Subscription closed before message delivery 
�[33mWARN �[0m[10-30|17:02:48.033|github.com/ethersphere/swarm/network/pubsubchannel/pubsub.go:95] Subscription closed before message delivery 
�[33mWARN �[0m[10-30|17:02:48.033|github.com/ethersphere/swarm/network/pubsubchannel/pubsub.go:95] Subscription closed before message delivery 
--- FAIL: TestResourceUseStats (0.03s)
    --- FAIL: TestResourceUseStats/no_delay (0.00s)
        /Users/janos/go/src/github.com/ethersphere/swarm/network/kademlia_load_balancer_test.go:56: got resourceUseStats 8, want 0, uses: map[0x0000000000000000:0 0x0000000000000001:0 0x0000000000000002:0 0x0000000000000003:0 0x0000000000000004:0 0x0000000000000005:0 0x0000000000000006:0 0x0000000000000007:0]
    --- PASS: TestResourceUseStats/1ms_delay (0.03s)
FAIL
FAIL	github.com/ethersphere/swarm/network	0.876s

I do not think that we should have such data races. Message delivery time and order by pubsub is not guaranteed and it is possible to create invalid ResourceUseStats state at high load on production depending on goroutine scheduling, which is not deterministic.

Although I think that we don't need to care for message ordering, I have reimplemented both kademlia_load_balancer.go and pubsub.go so the order of messages is kept.
First, I have used only one subscription (both for On/Off peers) with different type of messages, and then I have reimplemented the publishing goroutine with an itermediate inbox channel so the publisher does not need to block when publishing.

@janos
Copy link
Member

janos commented Nov 4, 2019

Thanks @kortatu. I am curious why do you think that the ordering is not important if that creates a data race?

Copy link
Member

@janos janos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kortatu for fixing the ordering issue.

Should we have a feature flag and retrieval integration implemented in this PR or in a different one? @kortatu @acud

log.Warn("listenOnOffPeers received message is not a on/off peer signal!")
continue
}
//log.Warn("OnOff peer", "key", signal.peer.Key(), "on", signal.on)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented code.

// publishing goroutine. It closes the signal channel whenever it receives the quitC signal
go func(sub *Subscription) {
for {
select {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This select should have case <-psc.quitC: with return.

case msg := <-sub.inbox:
log.Debug("Retrieved inbox message", "msg", msg)
select {
case <-psc.quitC:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this case return and terminate the goroutine?

}

// we need to sleep to allow all messages to be received by lb
time.Sleep(100 * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is causing flakiness on Travis, we should replace it with a retry at least.

Copy link
Member

@janos janos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @kortatu for your hard work and addressing my comments. I appreciate it. LGTM

@acud
Copy link
Member

acud commented Nov 8, 2019

@kortatu please fix the panic that shows up in appveyor:

[11-06|11:40:49.317|github.com/ethersphere/swarm/pss/api.go:85]                 caught subscription error in pss sub topic 75d158a6: <nil> 
ERROR[11-06|11:40:49.318|github.com/ethersphere/swarm/p2p/protocols/protocol.go:249] dropping peer with DiscSubprotocolError  peer=bbe01eef67a69b88 reason="hive stopping"
WARN [11-06|11:40:49.319|p2p/simulations/network.go:598]                             Can't stop node                          id=bbe01eef67a69b88 err="server: <nil>, services: map[*network.Bzz:could not save peers to persistence store: could not save peers: leveldb: closed]"
WARN [11-06|11:40:49.839|github.com/ethersphere/swarm/pss/crypto/crypto.go:183]      Size of payload field                    size=1
fatal error: concurrent map read and map write
goroutine 3612 [running]:
runtime.throw(0xd22f2e, 0x21)
	C:/go/src/runtime/panic.go:774 +0x64 fp=0x14890c18 sp=0x14890c04 pc=0x42e844
runtime.mapaccess1_faststr(0xc232e0, 0x18c8cf20, 0x1346d3b0, 0x42, 0x441bff)
	C:/go/src/runtime/map_faststr.go:21 +0x363 fp=0x14890c4c sp=0x14890c18 pc=0x4113b3
github.com/ethersphere/swarm/network/resourceusestats.(*ResourceUseStats).getAllUseCounts(0x1363d050, 0x18dd46a0, 0x1, 0x1, 0x14, 0x14, 0x13444380)
	C:/gopath/src/github.com/ethersphere/swarm/network/resourceusestats/resource_use_stats.go:90 +0x96 fp=0x14890c74 sp=0x14890c4c pc=0x87a6c6
github.com/ethersphere/swarm/network/resourceusestats.(*ResourceUseStats).SortResources(0x1363d050, 0x18dd46a0, 0x1, 0x1, 0x18dd46b0, 0x18dd4601, 0x13405760)
	C:/gopath/src/github.com/ethersphere/swarm/network/resourceusestats/resource_use_stats.go:55 +0x5f fp=0x14890cb4 sp=0x14890c74 pc=0x87a2af
github.com/ethersphere/swarm/network.(*KademliaLoadBalancer).resourcesToLbPeers(0x18c8cf60, 0x18dd46a0, 0x1, 0x1, 0x13444380, 0x3a0008, 0x13394eb0)
	C:/gopath/src/github.com/ethersphere/swarm/network/kademlia_load_balancer.go:139 +0x3c fp=0x14890cd4 sp=0x14890cb4 pc=0x883e0c
github.com/ethersphere/swarm/network.(*KademliaLoadBalancer).peerBinToPeerList(0x18c8cf60, 0x13405780, 0x8, 0xc411c0, 0x1)
	C:/gopath/src/github.com/ethersphere/swarm/network/kademlia_load_balancer.go:135 +0xc9 fp=0x14890d00 sp=0x14890cd4 pc=0x883d89
github.com/ethersphere/swarm/network.(*KademliaLoadBalancer).EachBinDesc.func1(0x13405780, 0x18dd4698)
	C:/gopath/src/github.com/ethersphere/swarm/network/kademlia_load_balancer.go:122 +0x2f fp=0x14890d1c sp=0x14890d00 pc=0x88a7af
github.com/ethersphere/swarm/network.(*Kademlia).eachBinDesc.func1(0x13405760, 0x18dd4690)
	C:/gopath/src/github.com/ethersphere/swarm/network/kademlia.go:689 +0x7d fp=0x14890d30 sp=0x14890d1c pc=0x88919d
github.com/ethersphere/swarm/pot.(*Pot).eachBinDesc(0x18eb2e60, 0xbe4980, 0x13405750, 0x1351a4e8, 0x0, 0x14890db0, 0x13405750)
	C:/gopath/src/github.com/ethersphere/swarm/pot/pot.go:620 +0x366 fp=0x14890d74 sp=0x14890d30 pc=0x878bb6
github.com/ethersphere/swarm/pot.(*Pot).EachBin(0x18eb2e60, 0xbe4980, 0x13405750, 0x1351a4e8, 0x0, 0x14890db0, 0x42d600)

@acud acud merged commit 868b945 into ethersphere:master Nov 12, 2019
@kortatu kortatu deleted the issue-1757 branch November 13, 2019 07:52
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kademlia load balancing
4 participants