Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: filecoin-project/lotus
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: a2c541e3f73fd578668df2158d95575241f4d297
Choose a base ref
..
head repository: filecoin-project/lotus
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 4a6362b3643e18bdef810e7f4c184841d6d19e6c
Choose a head ref
Showing with 40 additions and 34 deletions.
  1. +2 −0 node/builder.go
  2. +32 −33 paychmgr/manager.go
  3. +6 −1 paychmgr/store.go
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
@@ -107,6 +107,7 @@ const (
HandleIncomingMessagesKey

RegisterClientValidatorKey
HandlePaymentChannelManagerKey

// miner
GetParamsKey
@@ -272,6 +273,7 @@ func Online() Option {

Override(new(*paychmgr.Store), paychmgr.NewStore),
Override(new(*paychmgr.Manager), paychmgr.NewManager),
Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager),
Override(new(*market.FundMgr), market.NewFundMgr),
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
),
65 changes: 32 additions & 33 deletions paychmgr/manager.go
Original file line number Diff line number Diff line change
@@ -41,7 +41,9 @@ type StateManagerApi interface {

type Manager struct {
// The Manager context is used to terminate wait operations on shutdown
ctx context.Context
ctx context.Context
shutdown context.CancelFunc

store *Store
sm StateManagerApi
sa *stateAccessor
@@ -56,68 +58,59 @@ type Manager struct {
state full.StateAPI
}

type paychApiImpl struct {
mpool full.MpoolAPI
state full.StateAPI
}

func (p *paychApiImpl) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
return p.state.StateWaitMsg(ctx, msg, confidence)
}

func (p *paychApiImpl) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
return p.mpool.MpoolPushMessage(ctx, msg)
type paychAPIImpl struct {
full.MpoolAPI
full.StateAPI
}

func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manager {
// TODO: What should I use for ctx here? Should it be passed as a parameter?
// Not sure how to do that with dependency injection
ctx := context.TODO()

pm := &Manager{
ctx: ctx,
return &Manager{
store: pchstore,
sm: sm,
sa: &stateAccessor{sm: sm},
channels: make(map[string]*channelAccessor),
// TODO: Is this the correct way to do this or can I do something different
// with dependency injection?
pchapi: &paychApiImpl{mpool: api.MpoolAPI, state: api.StateAPI},
pchapi: &paychAPIImpl{api.MpoolAPI, api.StateAPI},

mpool: api.MpoolAPI,
wallet: api.WalletAPI,
state: api.StateAPI,
}

err := pm.init()
if err != nil {
// TODO: Should we return error from the constructor? Is this possible
// with dependency injection?
log.Errorf("%s", err)
}

return pm
}

// Used by the tests to supply mocks
// newManager is used by the tests to supply mocks
func newManager(sm StateManagerApi, pchstore *Store, pchapi paychApi) (*Manager, error) {
pm := &Manager{
ctx: context.TODO(),
store: pchstore,
sm: sm,
sa: &stateAccessor{sm: sm},
channels: make(map[string]*channelAccessor),
pchapi: pchapi,
}
return pm, pm.init()
return pm, pm.Start(context.Background())
}

// init checks the datastore to see if there are any channels that have
// HandleManager is called by dependency injection to set up hooks
func HandleManager(lc fx.Lifecycle, pm *Manager) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return pm.Start(ctx)
},
OnStop: func(context.Context) error {
return pm.Stop()
},
})
}

// Start checks the datastore to see if there are any channels that have
// outstanding add funds messages, and if so, waits on the messages.
// Outstanding messages can occur if an add funds message was sent
// and then lotus was shut down or crashed before the result was
// received.
func (pm *Manager) init() error {
func (pm *Manager) Start(ctx context.Context) error {
pm.ctx, pm.shutdown = context.WithCancel(ctx)

cis, err := pm.store.WithPendingAddFunds()
if err != nil {
return err
@@ -155,6 +148,12 @@ func (pm *Manager) init() error {
return group.Wait()
}

// Stop shuts down any processes used by the manager
func (pm *Manager) Stop() error {
pm.shutdown()
return nil
}

func (pm *Manager) TrackOutboundChannel(ctx context.Context, ch address.Address) error {
return pm.trackChannel(ctx, ch, DirOutbound)
}
7 changes: 6 additions & 1 deletion paychmgr/store.go
Original file line number Diff line number Diff line change
@@ -130,6 +130,8 @@ func (ps *Store) ListChannels() ([]address.Address, error) {
//return out, nil
}

// findChans loops over all channels, only including those that pass the filter.
// max is the maximum number of channels to return. Set to zero to return unlimited channels.
func (ps *Store) findChans(filter func(*ChannelInfo) bool, max int) ([]ChannelInfo, error) {
res, err := ps.ds.Query(dsq.Query{})
if err != nil {
@@ -165,12 +167,15 @@ func (ps *Store) findChans(filter func(*ChannelInfo) bool, max int) ([]ChannelIn
//}

matches = append(matches, *ci)

// If we've reached the maximum number of matches, return.
// Note that if max is zero we return an unlimited number of matches
// because len(matches) will always be at least 1.
if len(matches) == max {
return matches, nil
}
}

//return addrs, nil
return matches, nil
}