-
Notifications
You must be signed in to change notification settings - Fork 110
Conversation
1a0c1fe
to
4e67e36
Compare
swarm/pss/pss.go
Outdated
time.Sleep(time.Millisecond * 100) | ||
return | ||
} | ||
msg := self.outbox[self.outboxFirstCursor] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure that slices are not safe even for concurrent read. Shouldn't this be locked with the outboxMu
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's not a slice, it's just a pointer (*PssMsg
). You mean maps?
Right you are: https://blog.golang.org/go-maps-in-action
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outbox []*PssMsg
You are reading from the outbox
slice in dequeue()
, while at the same time outbox
could be getting modified in enqueue
(in another goroutine).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. They're called slices not arrays maybe in Go? Of course, this is not a map, sorry.
So, I'm not sure if this is a problem here. The array size will not be changed, and the data is just a pointer, whose write would be a single instruction anyway, I assume?
Also, it will only write if the First
and Last
cursor is not the same value (thus pointing to the same array element). The First
cursor is only incremented after the read.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in Go you have slices
, arrays
and maps
. slice
is like an array
but a bit different.
Both slices
and maps
are not safe for concurrent use, even when you are only reading from them. You can see it reported here:
WARNING: DATA RACE
Read at 0x00c420892000 by goroutine 38:
github.com/ethereum/go-ethereum/swarm/pss.(*Pss).dequeue()
/Users/nonsense/code/src/github.com/ethereum/go-ethereum/swarm/pss/pss.go:703 +0xc7
github.com/ethereum/go-ethereum/swarm/pss.(*Pss).Start.func2()
/Users/nonsense/code/src/github.com/ethereum/go-ethereum/swarm/pss/pss.go:186 +0x15a
Previous write at 0x00c420892000 by goroutine 197:
[failed to restore the stack]
Anytime you have more than one goroutine accessing a slice or a map, you have to lock the slice or the map. I think the only exception is if you KNOW that you are going to be reading different indices within a slice (for map this doesn't apply).
Bottom line - better lock any slice or map that you use concurrently :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the only exception is if you KNOW that you are going to be reading different indices within a slice
Wow, crazy. You mean it's actually a problem to read the same index of a slice? What exactly is the problem, do you know?
What is the traditional equivalent of go slice? Is it analogous to a dynamic array? As in, are all dynamic arrays in go "slices," and all static arrays in go "arrays"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You always have a static array, behind a slice. However you can change the size of the slice, and go type system will kick in if you go off-bounds.
Slice is just a pointer to an array, with capacity and size, that's all. Two slices can share the same static array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nonsense I just added the lock; messing with atomics here seemed to require several calls, which in the end probably would end up not contributing to efficienty, but rather to obscurity instead.
swarm/pss/pss.go
Outdated
log.Warn(fmt.Sprintf("could not store message %v to cache: %v", msg, err)) | ||
} | ||
if self.checkFwdCache(nil, digest) { | ||
log.Trace(fmt.Sprintf("pss relay block-cache match: FROM %x TO %x", self.Overlay.BaseAddr(), common.ToHex(msg.To))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick, but it is easier to trace if the logs are structured. For example:
log.Trace("pss relay block-cache match", "from", self.Overlay.BaseAddr(), "to", common.ToHex(msg.To))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks I'll fix it in the next PR for deduplication (because this line is moved there)
swarm/pss/pss.go
Outdated
|
||
self.outboxMu.Lock() | ||
defer self.outboxMu.Unlock() | ||
nextPos := (self.outboxLastCursor + 1) % defaultOutboxQueueSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why all this is not a simple buffered channel, that is the way you do a queue. no lock needed no ticker needed
swarm/pss/pss.go
Outdated
msg := self.outbox[self.outboxFirstCursor] | ||
self.outboxFirstCursor = (self.outboxFirstCursor + 1) % defaultOutboxQueueSize | ||
if self.forward(msg) != nil { | ||
self.enqueue(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe here the enqueue should be in a go routing slightly delayed, otherwise dequeueing should be immediate, ie use channel
@zelig Yeah, you're right. Channels are better. I've updated and pushed. The code is simpler with them, of course. But now, we go over capacity with channels, we will have blocking calls instead of errors reporting back. I don't know how important it is to know whether we could enqueue a message or not, or what the risks of (more) deadlocks are. Those channels everywhere tend to make stack traces a nightmare to read. I decided to check the performance too, and channels are much faster unless you have a low capacity and running async (try running with With pss, though, it should be in the thousands at least. So it has no relevance to this case. |
swarm/pss/pss.go
Outdated
|
||
func (self *Pss) isMsgExpired(msg *PssMsg) bool { | ||
msgexp := time.Unix(int64(msg.Expire), 0) | ||
// if msgexp.Before(time.Now()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented code
Implements a queue manager to enable resending when forwarding fails. Messages are not forwarded right away, but put in a queue which is in turn fetched by a loop started when the service starts. swarm/pss: WIP outbox swarm/pss: Add read mutexes swarm/pss: Implement queue as channel swarm/pss: Remove commented code
#339
Previous calls to
forward
now callsenqueue
which adds message to queue for sending. Sending (forwarding) is done by a loop started with the service callingdequeue
.The (handshake) symkey memory cleaner was not a loop due to previous omission. Adding the loop revealed a bug where directly added symkeys weren't protected from deletion.