-
Notifications
You must be signed in to change notification settings - Fork 9
add test for assembler-consenter reconnection #126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add test for assembler-consenter reconnection #126
Conversation
Signed-off-by: Natalie Morad <natalie.morad@ibm.com>
| blockLock sync.Mutex | ||
| decisions chan *common.Block | ||
| stopped chan struct{} | ||
| mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a simple mutex is enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mutex no longer needed after the change, removed
| return stream.Send(&orderer.DeliverResponse{ | ||
| Type: &orderer.DeliverResponse_Block{Block: sc.storedBlock}, | ||
| func (sc *stubConsenter) Stop() { | ||
| close(sc.decisions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When stopping we shouldn't close this channel so that we can restart with the decisions that are already in this channel
I would add a shutdown function that closes this channel and calls stop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea to keep it open.
Actually, you don't need to close decisions at all; closing is only to notify consumers that there is no more content. Here the consumer is the Deliver, and it exits with close(stopped).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept decisions open in Stop() and closed everything in Shutdown()
| blockLock sync.Mutex | ||
| decisions chan *common.Block | ||
| stopped chan struct{} | ||
| mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree
| return stream.Send(&orderer.DeliverResponse{ | ||
| Type: &orderer.DeliverResponse_Block{Block: sc.storedBlock}, | ||
| func (sc *stubConsenter) Stop() { | ||
| close(sc.decisions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea to keep it open.
Actually, you don't need to close decisions at all; closing is only to notify consumers that there is no more content. Here the consumer is the Deliver, and it exits with close(stopped).
|
|
||
| sc.server = server | ||
| sc.stopped = make(chan struct{}) | ||
| sc.decisions = make(chan *common.Block, 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If in Stop() we don't close it, then here we don't need to recreate it.
| func (sc *stubConsenter) Deliver(stream orderer.AtomicBroadcast_DeliverServer) error { | ||
| for { | ||
| sc.mu.RLock() | ||
| decisions := sc.decisions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't close decisions there is no need to access it with a lock.
| sc.storedBlock = &common.Block{ | ||
| sc.mu.Lock() | ||
| defer sc.mu.Unlock() | ||
| sc.decisions <- &common.Block{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the block header as well, especially important is the decision number (Block number). The assembler saves it in the ledger and recovers from it; it is essential. It needs to correspond to what you have in the oba parameter, of course.
node/assembler/stub_batcher_test.go
Outdated
| blockLock sync.Mutex | ||
| batches chan *common.Block | ||
| stopped chan struct{} | ||
| mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple Mutex is enough
| } | ||
|
|
||
| func (sb *stubBatcher) Stop() { | ||
| close(sb.batches) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in the consenter, no need to close it.
node/assembler/stub_batcher_test.go
Outdated
|
|
||
| sb.server = server | ||
| sb.stopped = make(chan struct{}) | ||
| sb.batches = make(chan *common.Block, 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to recreate if we don't close it
node/assembler/stub_batcher_test.go
Outdated
| }) | ||
| for { | ||
| sb.mu.RLock() | ||
| batches := sb.batches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is not recreated, no need to access with a lock
| batcherStub.SetNextBatch(batch2) | ||
| batchersStub[0].SetNextBatch(batch2) | ||
|
|
||
| // restart consenter and send matching decision |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleep a little bit to see how many log messages we get...
Signed-off-by: Natalie Morad <natalie.morad@ibm.com>
| case <-stopped: | ||
| return nil | ||
| case <-stream.Context().Done(): | ||
| return stream.Context().Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed stopped since sc.server.Stop() makes Deliver() stop at this line
tock-ibm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to approve with two comments, that can be handled in the next PR.
| // wait for decistion will be sent | ||
| time.Sleep(3 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant sleep between stop and restart, below.
Fix in next PR.
| sc.server.Stop() | ||
| } | ||
|
|
||
| func (sc *stubConsenter) Shutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make sure you cannot restart a stub that was "Shutdown", rather then just "Stopped".
In Next PR
This PR fixes the comments from #8 and addresses issue #107.