-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(cbindings): Thread-safe communication between the main thread and the Waku Thread #1978
Conversation
You can find the image built from this PR at
Built from eb1cd3a |
be942aa
to
f5e5754
Compare
f5e5754
to
ae48b46
Compare
library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim
Outdated
Show resolved
Hide resolved
|
||
case request[].reqType | ||
of LIFECYCLE: | ||
waitFor cast[ptr NodeLifecycleRequest](request[].reqContent).process(node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking that mixing pointer and async could cause problems but IDK how async works in Nim so 🤷
I guess blocking is the right thing to do here. Is blocking on a async proc the same as a sync proc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, afaik, the waitFor
is "sync", and it keeps there making the dispatcher progress until the process
is done. The reqContent
pointer will only be deallocated by the process
proc when it completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitFor
turns async code into sync code indeed - but a problem with this setup is that the async loop will only be running while waitFor
is running - this means that it relies on a stead flow of requests in order to process networking buffers and other waku activity (timers, etc).
To solve this, process
itself must be async
and this should be turned into await process
- the "main" loop then needs to call poll
/ runForever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To solve this,
process
itself must beasync
and this should be turned intoawait process
- the "main" loop then needs to callpoll
/runForever
Thanks for the comment @arnetheduck!
The waitFor
is aimed to make the dispatcher to progress so that the request can be handled.
That process
proc is called in
nwaku/library/waku_thread/waku_thread.nim
Line 57 in ae48b46
let resultResponse = InterThreadRequest.process(request, addr node) |
On the other hand, for periods of no requests (no calls to any
libwaku
function,) the dispatcher progresses thanks to directly calling poll
in: nwaku/library/waku_thread/waku_thread.nim
Line 65 in ae48b46
poll() |
Kindly let me know if that's fine from your point of view :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right - this runs the risk of getting stuck in poll
because poll
itself will block until there is activity, and if there is no activity it will simply block forever - in fact, I'm not sure how this works at all - it should get stuck there and never perform any loop iterations - I'm probably missing some other detail which wakes up poll but this looks like a significant risk with the setup.
There are two ways to solve this: introducing a timer / sleepAsync
(waitFor sleepAsync(1.millis)
instead of poll
) or using a [ThreadSignal
]. Introducing a timer is the easier way to make this code correct.
Using a signal has the advantage of using fewer resources, but is slightly more difficult to implement. I recommend leaving it for a separate PR - the way to use that is to create a signal for every channel (this is the way channels are normally implemented, with a "notification mechanism"), and every time an item is added to the channel, the signal is fired - here's a simple example: https://gist.github.com/arnetheduck/b6a7ac8f4b85490d26d464674e09d57d#file-threadsynctask-nim
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"in general" I would look for ways / strive to avoid calling waitFor
in "inner" proc's and rather structure the code in such a way that the waitFor
happens only at the "outer" layer - the waitFor strategy is not incorrect, but it's unusual and runs the risk of accidentally ending up with nested waitFor
calls (calling waitFor
from an async
function) which is not supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"in general" I would look for ways / strive to avoid calling
waitFor
in "inner" proc's...
Okay perfect. I enhanced that in my last commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right - this runs the risk of getting stuck in
poll
becausepoll
itself will block...
Ok thanks! We'll apply waitFor sleepAsync(1)
for now and will implement the enhancement with signal in further PRs. I think the poll()
doesn't get stuck because the Relay protocol is continuously dispatching network events.
library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim
Outdated
Show resolved
Hide resolved
library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim
Outdated
Show resolved
Hide resolved
var ret = cast[ptr PeerManagementRequest]( | ||
allocShared0(sizeof(PeerManagementRequest))) | ||
ret[].operation = op | ||
ret[].peerMultiAddr = peerMultiAddr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string
is a garbage-collected type - here, a copy must be taken with the shared allocator (and later it must be deallocated) - no garbage collected types allowed in objects constructed with create
: https://status-im.github.io/nim-style-guide/interop.html#garbage-collected-types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(type needs changing to cstring
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, this is the trickiest part.
I was assuming that a thread-safe communication was ensured by just sending ptr
types, given that the ptr
type is not tracked by the GC by default. I also wanted to avoid the overhead of serializing/parsing JSON objects.
My assumption was that the next was secure:
Thread A
creates aptr
of the request in the shared space.Thread A
sends the address of the request object toThread B
.Thread B
handles the request and deallocates the memory from the shared space.
The next is used to communicate both threads:
reqChannel: ChannelSPSCSingle[ptr InterThreadRequest]
respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
The RelayRequest
type is the most complex example:
type
RelayRequest* = object
operation: RelayMsgType
pubsubTopic: PubsubTopic
relayEventCallback: WakuRelayHandler
message: WakuMessage
... which is created by Thread A
in
nwaku/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim
Line 34 in 78ebb3a
var ret = createShared(T) |
... and deallocated by Thread B
in:
nwaku/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim
Line 44 in 78ebb3a
defer: deallocShared(self) |
Isn't that a thread-safe approach to sending ptr
types over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
peerMultiAddr
is a string
which is a garbage-collected type (together with seq, ref and closures). Both the "main object" and all its fields need to be thread safe, ie non-ref.
This means that we manually allocate a copy of everything on createShared
and release it - I suggest implementing a destroyShared
function for every createShared
which deallocates all fields and finally the main object - in the future, these can be turned into proper destructors but they are good for now.
library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim
Outdated
Show resolved
Hide resolved
You can find the experimental image built from this PR at
|
## Waiting for the response | ||
var response: ptr InterThreadResponse | ||
var recvOk = ctx.respChannel.tryRecv(response) | ||
while recvOk == false: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of this loop, a signal can be used here too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay!, we'll apply the signal
enhancements in a future PR.
When two threads send data each other, that data cannot contain any GC'ed type (string, seq, ref, closures) at any level.
resp = ctx.respChannel.tryRecv() | ||
os.sleep(1) | ||
## Sending the request | ||
let sentOk = ctx.reqChannel.trySend(req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can it happen that requests are answered out-of-order because of async
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can it happen that requests are answered out-of-order because of
async
?
Good point! I believe this couldn't happen as requests are attended sequentially, thanks to the waitFor
in
nwaku/library/waku_thread/waku_thread.nim
Line 58 in 01793d2
waitFor InterThreadRequest.process(request, addr node) |
On the other hand, once we will apply the "signal" approach we'll have a better synchronization between both threads.
LGTM! looking forward to the next round |
Description
This PR is part of a PR suite aimed to follow Jacek's recommendations re thread-safe communication.
Concretely,
ChannelSPSCSingle
so that the main thread and the Waku Thread communicate safely.ptr
whose memory is being allocated in the thread-shared space.This change is motivated by the next comment:
#1865 (comment)
Changes
Channel
type in favour ofChannelSPSCSingle
( https://github.com/status-im/nim-taskpools/tree/15e23ef1cf0860330dcc32f50fcce5f840031e28 )nim-taskpools
as a new submodule/vendor ( https://github.com/status-im/nim-taskpools/tree/15e23ef1cf0860330dcc32f50fcce5f840031e28 )Issue
Closes #1878