-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
mpool.go
124 lines (97 loc) · 3.09 KB
/
mpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package full
import (
"context"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
type MpoolAPI struct {
fx.In
WalletAPI
Chain *store.ChainStore
Mpool *messagepool.MessagePool
}
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
pending, mpts := a.Mpool.Pending()
haveCids := map[cid.Cid]struct{}{}
for _, m := range pending {
haveCids[m.Cid()] = struct{}{}
}
if ts == nil || mpts.Height() > ts.Height() {
return pending, nil
}
for {
if mpts.Height() == ts.Height() {
if mpts.Equals(ts) {
return pending, nil
}
// different blocks in tipsets
have, err := a.Mpool.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf("getting messages for base ts: %w", err)
}
for _, m := range have {
haveCids[m.Cid()] = struct{}{}
}
}
msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
for _, m := range msgs {
if _, ok := haveCids[m.Cid()]; ok {
continue
}
haveCids[m.Cid()] = struct{}{}
pending = append(pending, m)
}
if mpts.Height() >= ts.Height() {
return pending, nil
}
ts, err = a.Chain.LoadTipSet(ts.Parents())
if err != nil {
return nil, xerrors.Errorf("loading parent tipset: %w", err)
}
}
}
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return a.Mpool.Push(smsg)
}
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
if msg.Nonce != 0 {
return nil, xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce)
}
return a.Mpool.PushWithNonce(ctx, msg.From, func(from address.Address, nonce uint64) (*types.SignedMessage, error) {
msg.Nonce = nonce
if msg.From.Protocol() == address.ID {
log.Warnf("Push from ID address (%s), adjusting to %s", msg.From, from)
msg.From = from
}
b, err := a.WalletBalance(ctx, msg.From)
if err != nil {
return nil, xerrors.Errorf("mpool push: getting origin balance: %w", err)
}
if b.LessThan(msg.Value) {
return nil, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, msg.Value)
}
return a.WalletSignMessage(ctx, from, msg)
})
}
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return a.Mpool.GetNonce(addr)
}
func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error) {
return a.Mpool.Updates(ctx)
}
func (a *MpoolAPI) MpoolEstimateGasPrice(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) {
return a.Mpool.EstimateGasPrice(ctx, nblocksincl, sender, gaslimit, tsk)
}