Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clarify channels and mutexes #1421

Merged
merged 4 commits into from
Aug 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 16 additions & 43 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,8 @@ type batchRegistry struct {
storage storage.Storage
logger gethlog.Logger

// Channel on which batches will be pushed. It is held by another caller outside the
// batch registry.
batchSubscription *chan *core.Batch
// Channel for pushing batch height numbers which are needed in order
// to figure out what events to send to subscribers.
eventSubscription *chan uint64

subscriptionMutex sync.Mutex
batchesCallback func(*core.Batch)
callbackMutex sync.RWMutex
}

func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegistry {
Expand All @@ -38,48 +32,27 @@ func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegis
}
}

func (br *batchRegistry) SubscribeForEvents() chan uint64 {
evSub := make(chan uint64)
br.eventSubscription = &evSub
return *br.eventSubscription
func (br *batchRegistry) SubscribeForBatches(callback func(*core.Batch)) {
br.callbackMutex.Lock()
defer br.callbackMutex.Unlock()
br.batchesCallback = callback
}

func (br *batchRegistry) UnsubscribeFromEvents() {
br.eventSubscription = nil
}

func (br *batchRegistry) NotifySubscribers(batch *core.Batch) {
defer br.logger.Debug("Sending batch and events", log.BatchHashKey, batch.Hash(), log.DurationKey, measure.NewStopwatch())

br.subscriptionMutex.Lock()
subscriptionChan := br.batchSubscription
eventChan := br.eventSubscription
br.subscriptionMutex.Unlock()
func (br *batchRegistry) UnsubscribeFromBatches() {
br.callbackMutex.Lock()
defer br.callbackMutex.Unlock()

if subscriptionChan != nil {
*subscriptionChan <- batch
}

if br.eventSubscription != nil {
*eventChan <- batch.NumberU64()
}
br.batchesCallback = nil
}

func (br *batchRegistry) Subscribe() chan *core.Batch {
br.subscriptionMutex.Lock()
defer br.subscriptionMutex.Unlock()
subChannel := make(chan *core.Batch)
func (br *batchRegistry) NotifySubscribers(batch *core.Batch) {
br.callbackMutex.RLock()
defer br.callbackMutex.RUnlock()

br.batchSubscription = &subChannel
return *br.batchSubscription
}
defer br.logger.Debug("Sending batch and events", log.BatchHashKey, batch.Hash(), log.DurationKey, measure.NewStopwatch())

func (br *batchRegistry) Unsubscribe() {
br.subscriptionMutex.Lock()
defer br.subscriptionMutex.Unlock()
if br.batchSubscription != nil {
close(*br.batchSubscription)
br.batchSubscription = nil
if br.batchesCallback != nil {
go br.batchesCallback(batch)
}
}

Expand Down
12 changes: 3 additions & 9 deletions go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,9 @@ type BatchRegistry interface {
// rather than its stateDB only.
GetBatchAtHeight(height gethrpc.BlockNumber) (*core.Batch, error)

// Subscribe - creates and returns a channel that will be used to push any newly created batches
// to the subscriber.
Subscribe() chan *core.Batch
// Unsubscribe - informs the registry that the subscriber is no longer listening, allowing it to
// gracefully terminate any streaming and stop queueing new batches.
Unsubscribe()

SubscribeForEvents() chan uint64
UnsubscribeFromEvents()
// SubscribeForBatches - register a callback for new batches
SubscribeForBatches(func(*core.Batch))
UnsubscribeFromBatches()

NotifySubscribers(batch *core.Batch)

Expand Down
76 changes: 8 additions & 68 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (e *enclaveImpl) sendBatch(batch *core.Batch, outChannel chan common.Stream

func (e *enclaveImpl) sendEvents(batchHead uint64, outChannel chan common.StreamL2UpdatesResponse) {
e.logger.Info("Send Events", "batchHead", batchHead)
logs, err := e.subscriptionLogs(big.NewInt(int64(batchHead)))
logs, err := e.subscriptionManager.GetSubscribedLogsForBatch(big.NewInt(int64(batchHead)))
if err != nil {
e.logger.Error("Error while getting subscription logs", log.ErrKey, err)
return
Expand All @@ -360,32 +360,6 @@ func (e *enclaveImpl) sendEvents(batchHead uint64, outChannel chan common.Stream
}
}

func (e *enclaveImpl) sendBatchesFromSubscription(l2UpdatesChannel chan common.StreamL2UpdatesResponse) {
batchChan := e.registry.Subscribe()
for {
batch, ok := <-batchChan
if !ok {
e.logger.Warn("batch channel closed - stopping stream")
break
}

e.sendBatch(batch, l2UpdatesChannel)
}
}

func (e *enclaveImpl) sendEventsFromSubscription(l2UpdatesChannel chan common.StreamL2UpdatesResponse) {
eventChan := e.registry.SubscribeForEvents()
for {
eventsHead, ok := <-eventChan
if !ok {
e.logger.Warn("events channel closed - stopping stream")
break
}

e.sendEvents(eventsHead, l2UpdatesChannel)
}
}

func (e *enclaveImpl) StreamL2Updates() (chan common.StreamL2UpdatesResponse, func()) {
l2UpdatesChannel := make(chan common.StreamL2UpdatesResponse, 100)

Expand All @@ -394,12 +368,13 @@ func (e *enclaveImpl) StreamL2Updates() (chan common.StreamL2UpdatesResponse, fu
return l2UpdatesChannel, func() {}
}

go e.sendBatchesFromSubscription(l2UpdatesChannel)
go e.sendEventsFromSubscription(l2UpdatesChannel)
e.registry.SubscribeForBatches(func(batch *core.Batch) {
e.sendBatch(batch, l2UpdatesChannel)
e.sendEvents(batch.NumberU64(), l2UpdatesChannel)
})

return l2UpdatesChannel, func() {
e.registry.Unsubscribe()
e.registry.UnsubscribeFromEvents()
e.registry.UnsubscribeFromBatches()
}
}

Expand Down Expand Up @@ -991,7 +966,7 @@ func (e *enclaveImpl) GetCode(address gethcommon.Address, batchHash *common.L2Ba

func (e *enclaveImpl) Subscribe(id gethrpc.ID, encryptedSubscription common.EncryptedParamsLogSubscription) common.SystemError {
if e.stopControl.IsStopping() {
return responses.ToInternalError(fmt.Errorf("requested Subscribe with the enclave stopping"))
return responses.ToInternalError(fmt.Errorf("requested SubscribeForBatches with the enclave stopping"))
}

return e.subscriptionManager.AddSubscription(id, encryptedSubscription)
Expand All @@ -1018,7 +993,7 @@ func (e *enclaveImpl) Stop() common.SystemError {
}

if e.registry != nil {
e.registry.Unsubscribe()
e.registry.UnsubscribeFromBatches()
}

time.Sleep(time.Second)
Expand Down Expand Up @@ -1539,41 +1514,6 @@ func extractGetLogsParams(paramList []interface{}) (*filters.FilterCriteria, *ge
return &filter, &forAddress, nil
}

// Retrieves and encrypts the logs for the block.
func (e *enclaveImpl) subscriptionLogs(upToBatchNr *big.Int) (common.EncryptedSubscriptionLogs, error) {
result := map[gethrpc.ID][]*types.Log{}

// Go through each subscription and collect the logs
err := e.subscriptionManager.ForEachSubscription(func(id gethrpc.ID, subscription *common.LogSubscription, previousHead *big.Int) error {
// 1. fetch the logs since the last request
from := big.NewInt(previousHead.Int64() + 1)
to := upToBatchNr

if from.Cmp(to) > 0 {
e.logger.Warn(fmt.Sprintf("Skipping subscription step id=%s: [%d, %d]", id, from, to))
return nil
}

logs, err := e.storage.FilterLogs(subscription.Account, from, to, nil, subscription.Filter.Addresses, subscription.Filter.Topics)
e.logger.Info(fmt.Sprintf("Subscription id=%s: [%d, %d]. Logs %d, Err: %s", id, from, to, len(logs), err))
if err != nil {
return err
}

// 2. store the current l2Head in the Subscription
e.subscriptionManager.SetLastHead(id, to)
result[id] = logs
return nil
})
if err != nil {
e.logger.Error("Could not retrieve subscription logs", log.ErrKey, err)
return nil, err
}

// Encrypt the results
return e.subscriptionManager.EncryptLogs(result)
}

func (e *enclaveImpl) rejectBlockErr(cause error) *errutil.BlockRejectError {
var hash common.L1BlockHash
l1Head, err := e.storage.FetchHeadBlock()
Expand Down
83 changes: 60 additions & 23 deletions go/enclave/events/subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"math/big"
"sync"

"github.com/obscuronet/go-obscuro/go/common/log"

"github.com/obscuronet/go-obscuro/go/enclave/storage"

"github.com/ethereum/go-ethereum/core/state"
Expand Down Expand Up @@ -36,8 +38,9 @@ type SubscriptionManager struct {

subscriptions map[gethrpc.ID]*common.LogSubscription
lastHead map[gethrpc.ID]*big.Int // This is the batch height up to which events were returned to the user
subscriptionMutex *sync.RWMutex
logger gethlog.Logger
subscriptionMutex *sync.RWMutex // the mutex guards the subscriptions/lastHead pair

logger gethlog.Logger
}

func NewSubscriptionManager(rpcEncryptionManager *rpc.EncryptionManager, storage storage.Storage, logger gethlog.Logger) *SubscriptionManager {
Expand All @@ -52,23 +55,9 @@ func NewSubscriptionManager(rpcEncryptionManager *rpc.EncryptionManager, storage
}
}

func (s *SubscriptionManager) ForEachSubscription(f func(gethrpc.ID, *common.LogSubscription, *big.Int) error) error {
// grab a write lock because the function will mutate the lastHead map
s.subscriptionMutex.Lock()
defer s.subscriptionMutex.Unlock()

for id, subscription := range s.subscriptions {
err := f(id, subscription, s.lastHead[id])
if err != nil {
return err
}
}
return nil
}

// SetLastHead - only call with a write lock on the subscription mutex
func (s *SubscriptionManager) SetLastHead(id gethrpc.ID, nr *big.Int) {
s.lastHead[id] = nr
s.lastHead[id] = big.NewInt(nr.Int64())
}

// AddSubscription adds a log subscription to the enclave under the given ID, provided the request is authenticated
Expand All @@ -91,8 +80,6 @@ func (s *SubscriptionManager) AddSubscription(id gethrpc.ID, encryptedSubscripti
}
subscription.VkHandler = encryptor

s.subscriptionMutex.Lock()
defer s.subscriptionMutex.Unlock()
startAt := subscription.Filter.FromBlock
// Set the subscription to start from the current head if a specific start is not specified
if startAt == nil || startAt.Int64() < 0 {
Expand All @@ -103,6 +90,8 @@ func (s *SubscriptionManager) AddSubscription(id gethrpc.ID, encryptedSubscripti
// adjust to -1 because the subscription will increment
startAt = big.NewInt(int64(head.NumberU64() - 1))
}
s.subscriptionMutex.Lock()
defer s.subscriptionMutex.Unlock()
s.SetLastHead(id, startAt)
s.subscriptions[id] = subscription
return nil
Expand All @@ -114,6 +103,7 @@ func (s *SubscriptionManager) RemoveSubscription(id gethrpc.ID) {
s.subscriptionMutex.Lock()
defer s.subscriptionMutex.Unlock()
delete(s.subscriptions, id)
delete(s.lastHead, id)
}

// FilterLogs takes a list of logs and the hash of the rollup to use to create the state DB. It returns the logs
Expand All @@ -135,11 +125,48 @@ func (s *SubscriptionManager) FilterLogs(logs []*types.Log, rollupHash common.L2
return filteredLogs, nil
}

// EncryptLogs Encrypts each log with the appropriate viewing key.
func (s *SubscriptionManager) EncryptLogs(logsByID map[gethrpc.ID][]*types.Log) (map[gethrpc.ID][]byte, error) {
// GetSubscribedLogsForBatch - Retrieves and encrypts the logs for the batch.
func (s *SubscriptionManager) GetSubscribedLogsForBatch(batch *big.Int) (common.EncryptedSubscriptionLogs, error) {
result := map[gethrpc.ID][]*types.Log{}

// grab a write lock because the function will mutate the lastHead map
s.subscriptionMutex.Lock()
defer s.subscriptionMutex.Unlock()

// Go through each subscription and collect the logs
err := s.forEachSubscription(func(id gethrpc.ID, subscription *common.LogSubscription, previousHead *big.Int) error {
// 1. fetch the logs since the last request
from := big.NewInt(previousHead.Int64() + 1)
to := batch

if from.Cmp(to) > 0 {
s.logger.Warn(fmt.Sprintf("Skipping subscription step id=%s: [%d, %d]", id, from, to))
return nil
}

logs, err := s.storage.FilterLogs(subscription.Account, from, to, nil, subscription.Filter.Addresses, subscription.Filter.Topics)
s.logger.Info(fmt.Sprintf("Subscription id=%s: [%d, %d]. Logs %d, Err: %s", id, from, to, len(logs), err))
if err != nil {
return err
}

// 2. store the current l2Head in the Subscription
s.SetLastHead(id, to)
result[id] = logs
return nil
})
if err != nil {
s.logger.Error("Could not retrieve subscription logs", log.ErrKey, err)
return nil, err
}

// Encrypt the results
return s.encryptLogs(result)
}

// Encrypts each log with the appropriate viewing key.
func (s *SubscriptionManager) encryptLogs(logsByID map[gethrpc.ID][]*types.Log) (map[gethrpc.ID][]byte, error) {
encryptedLogsByID := map[gethrpc.ID][]byte{}
s.subscriptionMutex.RLock()
defer s.subscriptionMutex.RUnlock()

for subID, logs := range logsByID {
subscription, found := s.subscriptions[subID]
Expand All @@ -163,6 +190,16 @@ func (s *SubscriptionManager) EncryptLogs(logsByID map[gethrpc.ID][]*types.Log)
return encryptedLogsByID, nil
}

func (s *SubscriptionManager) forEachSubscription(f func(gethrpc.ID, *common.LogSubscription, *big.Int) error) error {
for id, subscription := range s.subscriptions {
err := f(id, subscription, s.lastHead[id])
if err != nil {
return err
}
}
return nil
}

// Of the log's topics, returns those that are (potentially) user addresses. A topic is considered a user address if:
// - It has 12 leading zero bytes (since addresses are 20 bytes long, while hashes are 32)
// - It has a non-zero nonce (to prevent accidental or malicious creation of the address matching a given topic,
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/nodetype/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Sequencer interface {
}

type ObsValidator interface {
// ExecuteBatches - try to execute all stored by unexecuted batches
// ExecuteStoredBatches - try to execute all stored by unexecuted batches
ExecuteStoredBatches() error

VerifySequencerSignature(*core.Batch) error
Expand Down
8 changes: 0 additions & 8 deletions go/enclave/nodetype/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"math/big"
"sort"
"sync"
"time"

"github.com/obscuronet/go-obscuro/go/common/measure"
Expand Down Expand Up @@ -53,10 +52,6 @@ type sequencer struct {
dataEncryptionService crypto.DataEncryptionService
dataCompressionService compression.DataCompressionService
settings SequencerSettings

// This is used to coordinate creating
// new batches and creating fork batches.
batchProductionMutex sync.Mutex
}

func NewSequencer(
Expand Down Expand Up @@ -96,9 +91,6 @@ func NewSequencer(
}

func (s *sequencer) CreateBatch() error {
s.batchProductionMutex.Lock()
defer s.batchProductionMutex.Unlock()

hasGenesis, err := s.batchRegistry.HasGenesisBatch()
if err != nil {
return fmt.Errorf("unknown genesis batch state. Cause: %w", err)
Expand Down
Loading
Loading