Skip to content

Commit

Permalink
Generate access event on unlock
Browse files Browse the repository at this point in the history
  • Loading branch information
wizeguyy committed Oct 23, 2024
1 parent 178ed39 commit 5d743e7
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 75 deletions.
5 changes: 5 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,3 +788,8 @@ func SetBlockHashForQi(blockHash Hash, nodeLocation Location) Hash {
blockHash[3] |= 0x80 // 10000000 in binary (set first bit to 1)
return blockHash
}

type Unlock struct {
Addr InternalAddress
Amt *big.Int
}
11 changes: 6 additions & 5 deletions core/bodydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,21 @@ func NewBodyDb(db ethdb.Database, engine consensus.Engine, hc *HeaderChain, chai
}

// Append
func (bc *BodyDb) Append(block *types.WorkObject) ([]*types.Log, error) {
func (bc *BodyDb) Append(block *types.WorkObject) ([]*types.Log, []common.Unlock, error) {
startLock := time.Now()

batch := bc.db.NewBatch()
stateApply := time.Now()
locktime := time.Since(startLock)
nodeCtx := bc.NodeCtx()
var logs []*types.Log
var unlocks []common.Unlock
var err error
if nodeCtx == common.ZONE_CTX && bc.ProcessingState() {
// Process our block
logs, err = bc.processor.Apply(batch, block)
logs, unlocks, err = bc.processor.Apply(batch, block)
if err != nil {
return nil, err
return nil, nil, err
}
rawdb.WriteTxLookupEntriesByBlock(batch, block, nodeCtx)
}
Expand All @@ -103,9 +104,9 @@ func (bc *BodyDb) Append(block *types.WorkObject) ([]*types.Log, error) {

bc.logger.WithField("apply state", common.PrettyDuration(time.Since(stateApply))).Debug("Time taken to")
if err = batch.Write(); err != nil {
return nil, err
return nil, nil, err
}
return logs, nil
return logs, unlocks, nil
}

func (bc *BodyDb) ProcessingState() bool {
Expand Down
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,10 @@ func (c *Core) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
return c.sl.miner.worker.pendingLogsFeed.Subscribe(ch)
}

func (c *Core) SubscribeUnlocks(ch chan<- UnlocksEvent) event.Subscription {
return c.sl.hc.unlocksFeed.Subscribe(ch)
}

// SubscribePendingBlock starts delivering the pending block to the given channel.
func (c *Core) SubscribePendingHeader(ch chan<- *types.WorkObject) event.Subscription {
return c.sl.miner.SubscribePendingHeader(ch)
Expand Down
5 changes: 5 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ type ChainEvent struct {
Entropy *big.Int
}

type UnlocksEvent struct {
Hash common.Hash
Unlocks []common.Unlock
}

type ChainSideEvent struct {
Blocks []*types.WorkObject
}
Expand Down
16 changes: 15 additions & 1 deletion core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type HeaderChain struct {
currentExpansionNumber uint8

chainHeadFeed event.Feed
unlocksFeed event.Feed
chainSideFeed event.Feed
scope event.SubscriptionScope

Expand Down Expand Up @@ -423,10 +424,18 @@ func (hc *HeaderChain) setStateProcessing() bool {
func (hc *HeaderChain) AppendBlock(block *types.WorkObject) error {
blockappend := time.Now()
// Append block else revert header append
logs, err := hc.bc.Append(block)
logs, unlocks, err := hc.bc.Append(block)
if err != nil {
return err
}
if unlocks != nil && len(unlocks) > 0 {
go func() {
hc.unlocksFeed.Send(UnlocksEvent{
Hash: block.Hash(),
Unlocks: unlocks,
})
}()
}
hc.logger.WithField("append block", common.PrettyDuration(time.Since(blockappend))).Debug("Time taken to")

if len(logs) > 0 {
Expand Down Expand Up @@ -1177,6 +1186,11 @@ func (hc *HeaderChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.S
return hc.scope.Track(hc.chainHeadFeed.Subscribe(ch))
}

// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
func (hc *HeaderChain) SubscribeUnlocksEvent(ch chan<- UnlocksEvent) event.Subscription {
return hc.scope.Track(hc.unlocksFeed.Subscribe(ch))
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (hc *HeaderChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return hc.scope.Track(hc.chainSideFeed.Subscribe(ch))
Expand Down
139 changes: 75 additions & 64 deletions core/state_processor.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ func (w *worker) prepareWork(genParams *generateParams, wo *types.WorkObject) (*
return nil, err
}

if err := RedeemLockedQuai(w.hc, proposedWo, parent, env.state); err != nil {
if err, _ := RedeemLockedQuai(w.hc, proposedWo, parent, env.state); err != nil {
w.logger.WithField("err", err).Error("Failed to redeem locked Quai")
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions quai/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@ func (b *QuaiAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event
return b.quai.core.SubscribePendingLogs(ch)
}

func (b *QuaiAPIBackend) SubscribeUnlocksEvent(ch chan<- core.UnlocksEvent) event.Subscription {
nodeCtx := b.quai.core.NodeCtx()
if nodeCtx != common.ZONE_CTX {
return nil
}
return b.quai.core.SubscribeUnlocks(ch)
}

func (b *QuaiAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return b.quai.Core().SubscribeChainEvent(ch)
}
Expand Down
19 changes: 17 additions & 2 deletions quai/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
quai "github.com/dominant-strategies/go-quai"
"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/common/hexutil"
"github.com/dominant-strategies/go-quai/core"
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/ethdb"
"github.com/dominant-strategies/go-quai/event"
Expand Down Expand Up @@ -317,6 +318,10 @@ func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) (
}

rpcSub := notifier.CreateSubscription()
internalAddr, err := addr.InternalAddress()
if err != nil {
return nil, err
}

go func() {
defer func() {
Expand All @@ -331,15 +336,16 @@ func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) (
api.activeSubscriptions += 1
headers := make(chan *types.WorkObject)
headersSub := api.events.SubscribeNewHeads(headers)

unlocks := make(chan core.UnlocksEvent)
unlocksSub := api.events.SubscribeUnlocks(unlocks)
for {
select {
case h := <-headers:
// Marshal the header data
hash := h.Hash()
nodeLocation := api.backend.NodeLocation()
nodeCtx := nodeLocation.Context()
if block, err := api.backend.GetBlock(h.Hash(), h.NumberU64(nodeCtx)); err != nil {
if block, err := api.backend.GetBlock(hash, h.NumberU64(nodeCtx)); err != nil {
for _, tx := range block.Transactions() {
// Check for external accesses
if tx.To() == &addr || tx.From(nodeLocation) == &addr {
Expand All @@ -355,11 +361,20 @@ func (api *PublicFilterAPI) Accesses(ctx context.Context, addr common.Address) (
}
}
}
case u := <-unlocks:
for _, unlock := range u.Unlocks {
if unlock.Addr == internalAddr {
notifier.Notify(rpcSub.ID, u.Hash)
break
}
}
case <-rpcSub.Err():
headersSub.Unsubscribe()
unlocksSub.Unsubscribe()
return
case <-notifier.Closed():
headersSub.Unsubscribe()
unlocksSub.Unsubscribe()
return
}
}
Expand Down
1 change: 1 addition & 0 deletions quai/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Backend interface {
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingHeaderEvent(ch chan<- *types.WorkObject) event.Subscription
SubscribeUnlocksEvent(ch chan<- core.UnlocksEvent) event.Subscription
ProcessingState() bool
NodeLocation() common.Location
NodeCtx() int
Expand Down
37 changes: 35 additions & 2 deletions quai/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// UnlocksSubscription queries balances that are recently unlocked
UnlocksSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
)
Expand All @@ -64,7 +66,8 @@ const (
// logsChanSize is the size of channel listening to LogsEvent.
logsChanSize = 10
// chainEvChanSize is the size of channel listening to ChainEvent.
chainEvChanSize = 10
chainEvChanSize = 10
unlocksEvChanSize = 10
)

type subscription struct {
Expand All @@ -75,6 +78,7 @@ type subscription struct {
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.WorkObject
unlocks chan core.UnlocksEvent
header chan *types.WorkObject
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
Expand All @@ -92,6 +96,7 @@ type EventSystem struct {
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event
unlocksSub event.Subscription // Subscription for new unlocks event

// Channels
install chan *subscription // install filter for event notification
Expand All @@ -101,6 +106,7 @@ type EventSystem struct {
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
unlocksCh chan core.UnlocksEvent // Channel to receive newly unlocked coinbases
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -119,6 +125,7 @@ func NewEventSystem(backend Backend) *EventSystem {
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
unlocksCh: make(chan core.UnlocksEvent, unlocksEvChanSize),
}

nodeCtx := backend.NodeCtx()
Expand All @@ -127,12 +134,13 @@ func NewEventSystem(backend Backend) *EventSystem {
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
m.unlocksSub = m.backend.SubscribeUnlocksEvent(m.unlocksCh)
}
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)

// Make sure none of the subscriptions are empty
if nodeCtx == common.ZONE_CTX && backend.ProcessingState() {
if m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
if m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil || m.unlocksSub == nil {
backend.Logger().Fatal("Subscribe for event system failed")
}
} else {
Expand Down Expand Up @@ -173,6 +181,7 @@ func (sub *Subscription) Unsubscribe() {
case <-sub.f.logs:
case <-sub.f.hashes:
case <-sub.f.headers:
case <-sub.f.unlocks:
}
}

Expand Down Expand Up @@ -296,6 +305,21 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.WorkObject) *Subscr
return es.subscribe(sub)
}

// SubscribeUnlocks creates a subscription that writes the recently unlocked balances
func (es *EventSystem) SubscribeUnlocks(unlocks chan core.UnlocksEvent) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: UnlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
unlocks: unlocks,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribePendingTxs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
Expand Down Expand Up @@ -384,6 +408,12 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
}
}

func (es *EventSystem) handleUnlocksEvent(filters filterIndex, ev core.UnlocksEvent) {
for _, f := range filters[UnlocksSubscription] {
f.unlocks <- ev
}
}

// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
defer func() {
Expand All @@ -401,6 +431,7 @@ func (es *EventSystem) eventLoop() {
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.pendingLogsSub.Unsubscribe()
es.unlocksSub.Unsubscribe()
}
es.chainSub.Unsubscribe()

Expand Down Expand Up @@ -457,6 +488,8 @@ func (es *EventSystem) handleZoneEventLoop(index filterIndex) {
es.handleRemovedLogs(index, ev)
case ev := <-es.pendingLogsCh:
es.handlePendingLogs(index, ev)
case ev := <-es.unlocksCh:
es.handleUnlocksEvent(index, ev)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
Expand Down
5 changes: 5 additions & 0 deletions quai/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type testBackend struct {
pendingLogsFeed event.Feed
chainFeed event.Feed
pendingHeaderFeed event.Feed
unlocksFeed event.Feed
}

func (b *testBackend) ChainDb() ethdb.Database {
Expand Down Expand Up @@ -182,6 +183,10 @@ func (b *testBackend) SubscribePendingHeaderEvent(ch chan<- *types.WorkObject) e
return b.pendingHeaderFeed.Subscribe(ch)
}

func (b *testBackend) SubscribeUnlocksEvent(ch chan<- core.UnlocksEvent) event.Subscription {
return b.unlocksFeed.Subscribe(ch)
}

// TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
func TestPendingTxFilter(t *testing.T) {
t.Skip("Todo: Fix broken test")
Expand Down

0 comments on commit 5d743e7

Please sign in to comment.