Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockchain: subscriptions API refactoring #3281

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type FakeChain struct {
config.Blockchain
*mempool.Pool
blocksCh []chan *block.Block
blocksCh []chan<- *block.Block
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
Blockheight atomic.Uint32
PoolTxF func(*transaction.Transaction) error
poolTxWithData func(*transaction.Transaction, any, *mempool.Pool) error
Expand Down Expand Up @@ -346,7 +346,7 @@ func (chain *FakeChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool)
}

// SubscribeForBlocks implements the Blockchainer interface.
func (chain *FakeChain) SubscribeForBlocks(ch chan *block.Block) {
func (chain *FakeChain) SubscribeForBlocks(ch chan<- *block.Block) {
chain.blocksCh = append(chain.blocksCh, ch)
}

Expand Down Expand Up @@ -379,7 +379,7 @@ func (chain *FakeChain) VerifyWitness(util.Uint160, hash.Hashable, *transaction.
}

// UnsubscribeFromBlocks implements the Blockchainer interface.
func (chain *FakeChain) UnsubscribeFromBlocks(ch chan *block.Block) {
func (chain *FakeChain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
for i, c := range chain.blocksCh {
if c == ch {
if i < len(chain.blocksCh) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type Ledger interface {
GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error)
ComputeNextBlockValidators() []*keys.PublicKey
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
SubscribeForBlocks(ch chan *coreb.Block)
UnsubscribeFromBlocks(ch chan *coreb.Block)
SubscribeForBlocks(ch chan<- *coreb.Block)
UnsubscribeFromBlocks(ch chan<- *coreb.Block)
GetBaseExecFee() int64
CalculateAttributesFee(tx *transaction.Transaction) int64
interop.Ledger
Expand Down
95 changes: 30 additions & 65 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,42 +1327,42 @@ func (bc *Blockchain) notificationDispatcher() {
// These are just sets of subscribers, though modelled as maps
// for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements).
blockFeed = make(map[chan *block.Block]bool)
headerFeed = make(map[chan *block.Header]bool)
txFeed = make(map[chan *transaction.Transaction]bool)
notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan *state.AppExecResult]bool)
blockFeed = make(map[chan<- *block.Block]bool)
headerFeed = make(map[chan<- *block.Header]bool)
txFeed = make(map[chan<- *transaction.Transaction]bool)
notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan<- *state.AppExecResult]bool)
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
)
for {
select {
case <-bc.stopCh:
return
case sub := <-bc.subCh:
switch ch := sub.(type) {
case chan *block.Header:
case chan<- *block.Header:
headerFeed[ch] = true
case chan *block.Block:
case chan<- *block.Block:
blockFeed[ch] = true
case chan *transaction.Transaction:
case chan<- *transaction.Transaction:
txFeed[ch] = true
case chan *state.ContainedNotificationEvent:
case chan<- *state.ContainedNotificationEvent:
notificationFeed[ch] = true
case chan *state.AppExecResult:
case chan<- *state.AppExecResult:
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
executionFeed[ch] = true
default:
panic(fmt.Sprintf("bad subscription: %T", sub))
}
case unsub := <-bc.unsubCh:
switch ch := unsub.(type) {
case chan *block.Header:
case chan<- *block.Header:
delete(headerFeed, ch)
case chan *block.Block:
case chan<- *block.Block:
delete(blockFeed, ch)
case chan *transaction.Transaction:
case chan<- *transaction.Transaction:
delete(txFeed, ch)
case chan *state.ContainedNotificationEvent:
case chan<- *state.ContainedNotificationEvent:
delete(notificationFeed, ch)
case chan *state.AppExecResult:
case chan<- *state.AppExecResult:
delete(executionFeed, ch)
default:
panic(fmt.Sprintf("bad unsubscription: %T", unsub))
Expand Down Expand Up @@ -2285,7 +2285,7 @@ func (bc *Blockchain) GetConfig() config.Blockchain {
// Make sure it's read from regularly as not reading these events might affect
// other Blockchain functions. Make sure you're not changing the received blocks,
// as it may affect the functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) {
func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) {
bc.subCh <- ch
}

Expand All @@ -2295,7 +2295,7 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) {
// affect other Blockchain functions. Make sure you're not changing the received
// headers, as it may affect the functionality of Blockchain and other
// subscribers.
func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) {
func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan<- *block.Header) {
bc.subCh <- ch
}

Expand All @@ -2305,7 +2305,7 @@ func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) {
// as not reading these events might affect other Blockchain functions. Make sure
// you're not changing the received transactions, as it may affect the
// functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) {
func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) {
bc.subCh <- ch
}

Expand All @@ -2317,7 +2317,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction)
// read from regularly as not reading these events might affect other Blockchain
// functions. Make sure you're not changing the received notification events, as
// it may affect the functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) {
func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) {
bc.subCh <- ch
}

Expand All @@ -2327,80 +2327,45 @@ func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotifica
// reading these events might affect other Blockchain functions. Make sure you're
// not changing the received execution results, as it may affect the
// functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) {
func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
bc.subCh <- ch
}

// UnsubscribeFromBlocks unsubscribes given channel from new block notifications,
// you can close it afterwards. Passing non-subscribed channel is a no-op, but
// the method can read from this channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromBlocks(ch chan *block.Block) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) {
bc.unsubCh <- ch
Comment on lines -2337 to +2338
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that's the case @roman-khimov told about in #2884 (comment). We can't remove this unsubloop code, because starting from the unsubscription moment all data sent to this channel should be discarded.

}

// UnsubscribeFromHeadersOfAddedBlocks unsubscribes given channel from new
// block's header notifications, you can close it afterwards. Passing
// non-subscribed channel is a no-op, but the method can read from this
// channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
func (bc *Blockchain) UnsubscribeFromHeadersOfAddedBlocks(ch chan<- *block.Header) {
bc.unsubCh <- ch
}

// UnsubscribeFromTransactions unsubscribes given channel from new transaction
// notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op, but the method can read from this channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromTransactions(ch chan *transaction.Transaction) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) {
bc.unsubCh <- ch
}

// UnsubscribeFromNotifications unsubscribes given channel from new
// execution-generated notifications, you can close it afterwards. Passing
// non-subscribed channel is a no-op, but the method can read from this channel
// (discarding any read data).
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) {
bc.unsubCh <- ch
}

// UnsubscribeFromExecutions unsubscribes given channel from new execution
// notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op, but the method can read from this channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromExecutions(ch chan *state.AppExecResult) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) {
bc.unsubCh <- ch
}

// CalculateClaimable calculates the amount of GAS generated by owning specified
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ type (
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
PoolTxWithData(t *transaction.Transaction, data any, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data any) error) error
RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block))
SubscribeForBlocks(ch chan *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block)
SubscribeForBlocks(ch chan<- *block.Block)
UnsubscribeFromBlocks(ch chan<- *block.Block)
}

// Service is a service abstraction (oracle, state root, consensus, etc).
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/notary/notary.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type (
BlockHeight() uint32
GetMaxVerificationGAS() int64
GetNotaryContractScriptHash() util.Uint160
SubscribeForBlocks(ch chan *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block)
SubscribeForBlocks(ch chan<- *block.Block)
UnsubscribeFromBlocks(ch chan<- *block.Block)
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/services/rpcsrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,16 @@ type (
HeaderHeight() uint32
InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error
P2PSigExtensionsEnabled() bool
SubscribeForBlocks(ch chan *block.Block)
SubscribeForHeadersOfAddedBlocks(ch chan *block.Header)
SubscribeForExecutions(ch chan *state.AppExecResult)
SubscribeForNotifications(ch chan *state.ContainedNotificationEvent)
SubscribeForTransactions(ch chan *transaction.Transaction)
UnsubscribeFromBlocks(ch chan *block.Block)
UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header)
UnsubscribeFromExecutions(ch chan *state.AppExecResult)
UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent)
UnsubscribeFromTransactions(ch chan *transaction.Transaction)
SubscribeForBlocks(ch chan<- *block.Block)
SubscribeForHeadersOfAddedBlocks(ch chan<- *block.Header)
SubscribeForExecutions(ch chan<- *state.AppExecResult)
SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent)
SubscribeForTransactions(ch chan<- *transaction.Transaction)
UnsubscribeFromBlocks(ch chan<- *block.Block)
UnsubscribeFromHeadersOfAddedBlocks(ch chan<- *block.Header)
UnsubscribeFromExecutions(ch chan<- *state.AppExecResult)
UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent)
UnsubscribeFromTransactions(ch chan<- *transaction.Transaction)
VerifyTx(*transaction.Transaction) error
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
mempool.Feer // fee interface
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/stateroot/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type (
GetConfig() config.Blockchain
GetDesignatedByRole(role noderoles.Role) (keys.PublicKeys, uint32, error)
HeaderHeight() uint32
SubscribeForBlocks(ch chan *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block)
SubscribeForBlocks(ch chan<- *block.Block)
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
UnsubscribeFromBlocks(ch chan<- *block.Block)
}

// Service represents a state root service.
Expand Down
Loading