-
Notifications
You must be signed in to change notification settings - Fork 111
pss: Improve pressure backstop queue handling - no mutex #1695
Conversation
…ocess messages in parallel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks so much neater than #1680 no?
minor suggestions only
The Benchmark results for the channels are rather peculiar.
|
In fact I don't understand fully the go benchmark tests. Finally I got something more stable with a benchtime of 2s: 9:21 $ go test -v -cpu 4 -bench=BenchmarkMessageProcessing -benchtime 2s -run=^$
goos: linux
goarch: amd64
pkg: github.com/ethersphere/swarm/pss
BenchmarkMessageProcessing/FailProb0.00-4 3000000000 0.25 ns/op 0 B/op 0 allocs/op
BenchmarkMessageProcessing/FailProb0.01-4 3000000000 0.25 ns/op 0 B/op 0 allocs/op
BenchmarkMessageProcessing/FailProb0.05-4 1000000000 1.62 ns/op 0 B/op 0 allocs/op (issue-1654) 16:02 $ go test -v -cpu 4 -bench=BenchmarkMessageProcessing -benchtime 5s -run=^$
goos: linux
goarch: amd64
pkg: github.com/ethersphere/swarm/pss
BenchmarkMessageProcessing/0.00_-4 3000000000 0.48 ns/op 0 B/op 0 allocs/op
BenchmarkMessageProcessing/0.01_-4 2000000000 0.32 ns/op 0 B/op 0 allocs/op
BenchmarkMessageProcessing/0.05_-4 2000000000 0.30 ns/op 0 B/op 0 allocs/op
I don't think there is much change in performance between the two implementations, althoug times change a lot among executions of the benchmark tests. |
pss/pss.go
Outdated
if err != nil { | ||
log.Error(err.Error()) | ||
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1) | ||
// In any case, if the message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this comment belong here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIxed in 1b91b66
pss/pss_test.go
Outdated
select { | ||
case outmsg = <-ps.outbox: | ||
default: | ||
if len(processed) > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
superfluous
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed processed slice and checks on it. FIxed in 1b91b66
pss/pss_test.go
Outdated
Data: []byte{0x66, 0x6f, 0x6f}, | ||
outboxCapacity := 2 | ||
|
||
processed := make([]*PssMsg, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for this just time out on successC instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIxed in 1b91b66
ps.Stop() | ||
|
||
// finish processing message | ||
procChan <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we here instead just check if the channel is closed if that's what the test is about? The error state of a test should not be panic
select {
case _, ok := <-procChan
if !ok {
t.Fatal(...)
}
default:
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was closing ps /outbox in the middle of a message processing and avoid panics (because of closed channels). Not sure how to test that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which channel did you want to test for panic on close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, process
and slots
channels because they are the ones that could be used by one routine processing a message (process
if the forwarding failed in reenqueue()
, slots
if succeded in free()
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this should be tested. But if the problem is the attempt to re-enqueue on fail after channel is closed then maybe the re-enqueue should have a check on the quit channel instead on higher priority, and if its closed then don't even try to re-enqueue. Or am I missing the point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we shouldn't test that, but in the early stages of implementation I had panic
's when closing so I did this test to feel safe about it.
I can remove it completely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test removed
pss/pss_test.go
Outdated
} | ||
|
||
func benchmarkMessageProcessing(b *testing.B, failProb float32) { | ||
rand.Seed(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the b.N
loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That could be the reason for the strange results
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, added. Now it seems more stable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this benchmark is myterious, please rethink :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIxed in 1b91b66. Now it is more stable with these values
$ go test -v -bench=BenchmarkMessageProcessing -run=^$
goos: linux
goarch: amd64
pkg: github.com/ethersphere/swarm/pss
BenchmarkMessageProcessing/FailProb0.00-4 1 2915534902 ns/op 168677016 B/op 1752294 allocs/op
BenchmarkMessageProcessing/FailProb0.01-4 1 1072452793 ns/op 92210520 B/op 1668221 allocs/op
BenchmarkMessageProcessing/FailProb0.05-4 2 758722005 ns/op 89287224 B/op 1703359 allocs/op
PASS
ok github.com/ethersphere/swarm/pss 6.974s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zelig comments like that aren't helpful. If you have objections then please share them concisely.
pss/pss_test.go
Outdated
succesForward := func(msg *PssMsg) bool { | ||
roll := rand.Float32() | ||
if failProb > roll { | ||
failed++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
race on incrementing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove the failed counter as we are not using it anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed counter in commit dad34db
pss/pss_test.go
Outdated
} | ||
|
||
func benchmarkMessageProcessing(b *testing.B, failProb float32) { | ||
rand.Seed(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this benchmark is myterious, please rethink :)
pss/pss_test.go
Outdated
procChan<- struct{}{} | ||
procChan<- struct{}{} | ||
procChan <- struct{}{} | ||
procChan <- struct{}{} | ||
//Must wait a bit for the routines processing the messages to free the slots | ||
time.Sleep(1 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't like sleeps in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Me neither, but is the only way I've found to be sure the processing routine finish freeing the slot. Maybe I should loop with timeout until that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleep removed. Instead used a timed waiting in the slots channel
//There should be a slot again in the outbox
select {
case <-ps.outbox.slots:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for a free slot")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me. Any meaningful comparison with benchmarks?
Do u expect performance diffference?
No, I've been testing both branches and I think performance-wise there is no difference. But code-wise this one is way much better. |
If we're going with this one please remove the benchmark after we're done. |
Don't you think it is interesting to have a benchmark in case a future change deteriorate the performance of the outbox processing? |
I don't know, really. It's a very shallow benchmark, and I seriously doubt we will be changing this part of the code anytime soon. |
Benchmark test removed |
* 'master' of github.com:ethersphere/swarm: pss: Modularize crypto and remove Whisper. Step 1 - isolate whisper code (ethersphere#1698) pss: Improve pressure backstop queue handling - no mutex (ethersphere#1695) cmd/swarm-snapshot: if 2 nodes to create snapshot use connectChain (ethersphere#1709) network: Add API for Capabilities (ethersphere#1675) pss: fixed flaky test that was using a global variable instead of a local one (ethersphere#1702) pss: Port tests to `network/simulation` (ethersphere#1682) storage: fix hasherstore seen check to happen when error is nil (ethersphere#1700) vendor: upgrade go-ethereum to 1.9.2 (ethersphere#1689) bzzeth: initial support for bzz-eth protocol (ethersphere#1571) network/stream: terminate runUpdateSyncing on peer quit (ethersphere#1696) all: first working SWAP version (ethersphere#1554) version: update to v0.5.0 unstable (ethersphere#1694) chunk, storage: storage with multi chunk Set method (ethersphere#1684) chunk, storage: add HasMulti to chunk.Store (ethersphere#1686) chunk, shed, storage: chunk.Store GetMulti method (ethersphere#1691) api, chunk: progress bar support (ethersphere#1649)
Implementation of parallelization of forwarded messages using solution proposed by @zelig using only channels.
We have paralellize the message processing inside the main processing loop:
All functions and variables related to the outbox are encapsulated in a new outbox type
When a new message is received for forwarding we call outbox.enqueue():
Note also that with this implementation, the enqueue method is blocking in channel outbox.process, so some tests (that didn't start the ps process) have been modified to pull from that channel.
Added a benchmark test has been implemented (
BenchmarkMessageProcessing
) to compare performance with mutex implementation (PR #1680).Also added a new pluggable forward function in pss for testing purposes.
This PR is related with issue #1654