Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Block as needed inside core.App until started #560

Merged
merged 3 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

This changelog is a work in progress and may contain notes for versions which have not actually been released. Check the [Releases](https://github.com/0xProject/0x-mesh/releases) page to see full release notes and more information about the latest released versions.

## v6.1.2-beta

### Bug fixes 🐞

- Fixed a bug which could cause Mesh to crash with a nil pointer exception if RPC requests are sent too quickly during/immediately after start up ([#560](https://github.com/0xProject/0x-mesh/pull/560)).


## v6.1.1-beta

### Bug fixes 🐞
Expand Down
30 changes: 30 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ type App struct {
ethRPCClient ethrpcclient.Client
orderSelector *orderSelector
db *meshdb.MeshDB

// started is closed to signal that the App has been started. Some methods
// will block until after the App is started.
started chan struct{}
}

func New(config Config) (*App, error) {
Expand Down Expand Up @@ -282,6 +286,7 @@ func New(config Config) (*App, error) {
}

app := &App{
started: make(chan struct{}),
config: config,
privKey: privKey,
peerID: peerID,
Expand Down Expand Up @@ -446,6 +451,12 @@ func (app *App) Start(ctx context.Context) error {
}

// Initialize the p2p node.
// Note(albrow): The main reason that we need to use a `started` channel in
// some methods is that we cannot call p2p.New without passing in a context
// (due to how libp2p works). This means that before app.Start is called,
// app.node will be nil and attempting to call any methods on app.node will
// panic with a nil pointer exception. All the other fields of core.App that
// we need to use will have already been initialized and are ready to use.
bootstrapList := p2p.DefaultBootstrapList
if app.config.BootstrapList != "" {
bootstrapList = strings.Split(app.config.BootstrapList, ",")
Expand Down Expand Up @@ -493,6 +504,10 @@ func (app *App) Start(ctx context.Context) error {
app.periodicallyLogStats(innerCtx)
}()

// Signal that the app has been started.
log.Info("core.App was started")
close(app.started)

// If any error channel returns a non-nil error, we cancel the inner context
// and return the error. Note that this means we only return the first error
// that occurs.
Expand Down Expand Up @@ -530,6 +545,8 @@ func (app *App) Start(ctx context.Context) error {
}

func (app *App) periodicallyCheckForNewAddrs(ctx context.Context, startingAddrs []ma.Multiaddr) {
<-app.started

// TODO(albrow): There might be a more efficient way to do this if we have access to
// an event bus. See: https://github.com/libp2p/go-libp2p/issues/467
seenAddrs := stringset.New()
Expand Down Expand Up @@ -570,6 +587,8 @@ func (e ErrSnapshotNotFound) Error() string {
// continue to make requests supplying the `snapshotID` returned from the first request. After 1 minute of not
// received further requests referencing a specific snapshot, the snapshot expires and can no longer be used.
func (app *App) GetOrders(page, perPage int, snapshotID string) (*rpc.GetOrdersResponse, error) {
<-app.started

ordersInfos := []*rpc.OrderInfo{}
if perPage <= 0 {
return &rpc.GetOrdersResponse{
Expand Down Expand Up @@ -643,6 +662,8 @@ func (app *App) GetOrders(page, perPage int, snapshotID string) (*rpc.GetOrdersR
// they will only be removed if they become unfillable and will not be removed
// due to having a high expiration time or any incentive mechanisms.
func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage, pinned bool) (*ordervalidator.ValidationResults, error) {
<-app.started

allValidationResults := &ordervalidator.ValidationResults{
Accepted: []*ordervalidator.AcceptedOrderInfo{},
Rejected: []*ordervalidator.RejectedOrderInfo{},
Expand Down Expand Up @@ -752,6 +773,8 @@ func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage, pinned bool) (*ord

// shareOrder immediately shares the given order on the GossipSub network.
func (app *App) shareOrder(order *zeroex.SignedOrder) error {
<-app.started

encoded, err := encodeOrder(order)
if err != nil {
return err
Expand All @@ -761,11 +784,15 @@ func (app *App) shareOrder(order *zeroex.SignedOrder) error {

// AddPeer can be used to manually connect to a new peer.
func (app *App) AddPeer(peerInfo peerstore.PeerInfo) error {
<-app.started

return app.node.Connect(peerInfo, peerConnectTimeout)
}

// GetStats retrieves stats about the Mesh node
func (app *App) GetStats() (*rpc.GetStatsResponse, error) {
<-app.started

latestBlockHeader, err := app.blockWatcher.GetLatestBlockProcessed()
if err != nil {
return nil, err
Expand Down Expand Up @@ -815,6 +842,8 @@ func (app *App) GetStats() (*rpc.GetStatsResponse, error) {
}

func (app *App) periodicallyLogStats(ctx context.Context) {
<-app.started

ticker := time.NewTicker(logStatsInterval)
for {
select {
Expand Down Expand Up @@ -849,6 +878,7 @@ func (app *App) periodicallyLogStats(ctx context.Context) {

// SubscribeToOrderEvents let's one subscribe to order events emitted by the OrderWatcher
func (app *App) SubscribeToOrderEvents(sink chan<- []*zeroex.OrderEvent) event.Subscription {
// app.orderWatcher is guaranteed to be initialized. No need to wait.
subscription := app.orderWatcher.Subscribe(sink)
return subscription
}
Expand Down
1 change: 0 additions & 1 deletion ethereum/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ type Watcher struct {
blockScope event.SubscriptionScope // Subscription scope tracking current live listeners
wasStartedOnce bool // Whether the block watcher has previously been started
pollingInterval time.Duration
ticker *time.Ticker
withLogs bool
topics []common.Hash
mu sync.RWMutex
Expand Down