Skip to content

Commit

Permalink
[network] Gossip topic subscribe back pressure (#203)
Browse files Browse the repository at this point in the history
# Description

fix issue:

* `Topic` and `*pubsub.Subscription` not closing on exit
*  Every message creates a goroutine

# Changes include

- [x] Bugfix (non-breaking change that solves an issue)
- [ ] Hotfix (change that solves an urgent issue, and requires immediate
attention)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (change that is not backwards-compatible and/or
changes current functionality)

# Checklist

- [x] I have assigned this PR to myself
- [x] I have added at least 1 reviewer
- [x] I have added the relevant labels
- [ ] I have updated the official documentation
- [ ] I have added sufficient documentation in code

## Testing

- [x] I have tested this code with the official test suite
- [ ] I have tested this code manually
  • Loading branch information
0xcb9ff9 authored Oct 11, 2022
1 parent 3b344b7 commit a840ee8
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 20 deletions.
19 changes: 19 additions & 0 deletions consensus/ibft/ibft.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"reflect"
"time"

"go.uber.org/atomic"

"github.com/dogechain-lab/dogechain/consensus"
"github.com/dogechain-lab/dogechain/consensus/ibft/proto"
"github.com/dogechain-lab/dogechain/contracts/upgrader"
Expand Down Expand Up @@ -74,6 +76,7 @@ type Ibft struct {
blockchain blockchainInterface // Interface exposed by the blockchain layer
executor *state.Executor // Reference to the state executor
closeCh chan struct{} // Channel for closing
isClosed *atomic.Bool

validatorKey *ecdsa.PrivateKey // Private key for the validator
validatorKeyAddr types.Address
Expand Down Expand Up @@ -161,6 +164,7 @@ func Factory(
blockchain: params.Blockchain,
executor: params.Executor,
closeCh: make(chan struct{}),
isClosed: atomic.NewBool(false),
txpool: params.Txpool,
state: &currentState{},
network: params.Network,
Expand Down Expand Up @@ -230,6 +234,7 @@ func (i *Ibft) GetSyncProgression() *progress.Progression {

type transport interface {
Gossip(msg *proto.MessageReq) error
Close() error
}

// Define the IBFT libp2p protocol
Expand All @@ -244,6 +249,10 @@ func (g *gossipTransport) Gossip(msg *proto.MessageReq) error {
return g.topic.Publish(msg)
}

func (g *gossipTransport) Close() error {
return g.topic.Close()
}

// GetIBFTForks returns IBFT fork configurations from chain config
func GetIBFTForks(ibftConfig map[string]interface{}) ([]IBFTFork, error) {
// no fork, only specifying IBFT type in chain config
Expand Down Expand Up @@ -1422,6 +1431,14 @@ func (i *Ibft) IsLastOfEpoch(number uint64) bool {

// Close closes the IBFT consensus mechanism, and does write back to disk
func (i *Ibft) Close() error {
if i.isClosed.Load() {
i.logger.Error("IBFT consensus is Closed")

return nil
}

i.isClosed.Store(true)

close(i.closeCh)

if i.config.Path != "" {
Expand All @@ -1432,6 +1449,8 @@ func (i *Ibft) Close() error {
}
}

i.transport.Close()

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions consensus/ibft/ibft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/dogechain-lab/dogechain/types"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
anypb "google.golang.org/protobuf/types/known/anypb"
)

Expand Down Expand Up @@ -1211,6 +1212,7 @@ func newMockIbft(t *testing.T, accounts []string, account string) *mockIbft {
validatorKey: addr.priv,
validatorKeyAddr: addr.Address(),
closeCh: make(chan struct{}),
isClosed: atomic.NewBool(false),
updateCh: make(chan struct{}),
operator: &operator{},
state: newState(),
Expand Down Expand Up @@ -1269,6 +1271,7 @@ func newMockIBFTWithMockBlockchain(
validatorKey: addr.priv,
validatorKeyAddr: addr.Address(),
closeCh: make(chan struct{}),
isClosed: atomic.NewBool(false),
updateCh: make(chan struct{}),
operator: &operator{},
state: newState(),
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ require (
lukechampine.com/blake3 v1.1.7 // indirect
)

require go.uber.org/atomic v1.9.0

require (
github.com/armon/go-radix v1.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -176,7 +178,6 @@ require (
github.com/valyala/fasthttp v1.4.0 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/mod v0.5.1 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
Expand Down
91 changes: 73 additions & 18 deletions network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package network
import (
"context"
"reflect"
"runtime"
"sync"
"time"

"github.com/dogechain-lab/dogechain/helper/common"

"github.com/hashicorp/go-hclog"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -16,12 +21,17 @@ const (
subscribeOutputBufferSize = 1024
)

// max worker number (min 1 and max 32)
var workerNum = int(common.Min(common.Max(uint64(runtime.NumCPU()), 1), 32))

type Topic struct {
logger hclog.Logger

topic *pubsub.Topic
typ reflect.Type
closeCh chan struct{}
topic *pubsub.Topic
typ reflect.Type

wg sync.WaitGroup
unsubscribeCh chan struct{}
}

func (t *Topic) createObj() proto.Message {
Expand Down Expand Up @@ -53,32 +63,74 @@ func (t *Topic) Subscribe(handler func(obj interface{})) error {
return nil
}

func (t *Topic) Close() error {
close(t.unsubscribeCh)
t.wg.Wait()

return t.topic.Close()
}

func (t *Topic) readLoop(sub *pubsub.Subscription, handler func(obj interface{})) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

workqueue := make(chan proto.Message, workerNum)
defer close(workqueue)

go func() {
<-t.closeCh
cancelFn()
}()
t.wg.Add(1)
defer t.wg.Done()

for i := 0; i < workerNum; i++ {
go func() {
for {
obj, ok := <-workqueue
if !ok {
return
}

handler(obj)
}
}()
}

for {
msg, err := sub.Next(ctx)
if err != nil {
t.logger.Error("failed to get topic", "err", err)
select {
case <-t.unsubscribeCh:
// send cancel timeout
timeoutCtx, cancelTimeoutFn := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeoutFn()

continue
}
go func() {
sub.Cancel()

// cancelTimeoutFn() is idempotent, so it's safe to call it multiple times
// https://stackoverflow.com/questions/59858033/is-cancel-so-mandatory-for-context
cancelTimeoutFn()
}()

// wait for completion or timeout
<-timeoutCtx.Done()

return

default:
msg, err := sub.Next(ctx)
if err != nil {
t.logger.Error("failed to get topic", "err", err)

continue
}

go func() {
obj := t.createObj()
if err := proto.Unmarshal(msg.Data, obj); err != nil {
t.logger.Error("failed to unmarshal topic", "err", err)
t.logger.Error("unmarshal message from", "peer", msg.GetFrom())

return
continue
}

handler(obj)
}()
workqueue <- obj
}
}
}

Expand All @@ -90,8 +142,11 @@ func (s *Server) NewTopic(protoID string, obj proto.Message) (*Topic, error) {

tt := &Topic{
logger: s.logger.Named(protoID),
topic: topic,
typ: reflect.TypeOf(obj).Elem(),

topic: topic,
typ: reflect.TypeOf(obj).Elem(),

unsubscribeCh: make(chan struct{}),
}

return tt, nil
Expand Down
Loading

0 comments on commit a840ee8

Please sign in to comment.