Skip to content

Commit

Permalink
Consolidate namespace database operations into namespace manager
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Jul 12, 2022
1 parent b838313 commit a5ddc24
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 275 deletions.
17 changes: 1 addition & 16 deletions internal/events/network_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,14 @@
package events

import (
"context"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/pkg/blockchain"
"github.com/hyperledger/firefly/pkg/core"
)

func (em *eventManager) actionTerminate(location *fftypes.JSONAny, event *blockchain.Event) error {
namespace, err := em.database.GetNamespace(em.ctx, em.namespace.LocalName)
if err != nil {
return err
}
if err := em.multiparty.TerminateContract(em.ctx, &namespace.Contracts, location, event); err != nil {
return err
}
// Currently, a termination event is implied to apply to ALL namespaces
return em.database.RunAsGroup(em.ctx, func(ctx context.Context) error {
if err := em.database.UpsertNamespace(em.ctx, namespace, true); err != nil {
return err
}
return nil
})
return em.multiparty.TerminateContract(em.ctx, location, event)
}

func (em *eventManager) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error {
Expand Down
43 changes: 2 additions & 41 deletions internal/events/network_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ func TestNetworkAction(t *testing.T) {
return be.ProtocolID == "0001"
})).Return(nil)
mdi.On("InsertEvent", em.ctx, mock.Anything).Return(nil)
mdi.On("GetNamespace", em.ctx, "ns1").Return(&core.Namespace{}, nil)
mdi.On("UpsertNamespace", em.ctx, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.MultipartyContracts"), location, mock.AnythingOfType("*blockchain.Event")).Return(nil)
mmp.On("TerminateContract", em.ctx, location, mock.AnythingOfType("*blockchain.Event")).Return(nil)

err := em.BlockchainNetworkAction("terminate", location, event, verifier)
assert.NoError(t, err)
Expand Down Expand Up @@ -145,22 +143,6 @@ func TestNetworkActionUnknown(t *testing.T) {
mii.AssertExpectations(t)
}

func TestActionTerminateQueryFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()

location := fftypes.JSONAnyPtr("{}")

mdi := em.database.(*databasemocks.Plugin)

mdi.On("GetNamespace", em.ctx, "ns1").Return(nil, fmt.Errorf("pop"))

err := em.actionTerminate(location, &blockchain.Event{})
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
}

func TestActionTerminateFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()
Expand All @@ -170,28 +152,7 @@ func TestActionTerminateFail(t *testing.T) {
mmp := em.multiparty.(*multipartymocks.Manager)
mdi := em.database.(*databasemocks.Plugin)

mdi.On("GetNamespace", em.ctx, "ns1").Return(&core.Namespace{}, nil)
mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.MultipartyContracts"), location, mock.AnythingOfType("*blockchain.Event")).Return(fmt.Errorf("pop"))

err := em.actionTerminate(location, &blockchain.Event{})
assert.EqualError(t, err, "pop")

mmp.AssertExpectations(t)
mdi.AssertExpectations(t)
}

func TestActionTerminateUpsertFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()

location := fftypes.JSONAnyPtr("{}")

mmp := em.multiparty.(*multipartymocks.Manager)
mdi := em.database.(*databasemocks.Plugin)

mdi.On("GetNamespace", em.ctx, "ns1").Return(&core.Namespace{}, nil)
mdi.On("UpsertNamespace", em.ctx, mock.AnythingOfType("*core.Namespace"), true).Return(fmt.Errorf("pop"))
mmp.On("TerminateContract", em.ctx, mock.AnythingOfType("*core.MultipartyContracts"), location, mock.AnythingOfType("*blockchain.Event")).Return(nil)
mmp.On("TerminateContract", em.ctx, location, mock.AnythingOfType("*blockchain.Event")).Return(fmt.Errorf("pop"))

err := em.actionTerminate(location, &blockchain.Event{})
assert.EqualError(t, err, "pop")
Expand Down
45 changes: 22 additions & 23 deletions internal/multiparty/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ type Manager interface {
RootOrg() RootOrg

// ConfigureContract initializes the subscription to the FireFly contract
// - Checks the provided contract info against the plugin's configuration, and updates it as needed
// - Checks the namespace contract info against the plugin's configuration, and updates it as needed
// - Initializes the contract info for performing BatchPin transactions, and initializes subscriptions for BatchPin events
ConfigureContract(ctx context.Context, contracts *core.MultipartyContracts) (err error)
ConfigureContract(ctx context.Context) (err error)

// TerminateContract marks the given event as the last one to be parsed on the current FireFly contract
// - Validates that the event came from the currently active FireFly contract
// - Re-initializes the plugin against the next configured FireFly contract
// - Updates the provided contract info to record the point of termination and the newly active contract
TerminateContract(ctx context.Context, contracts *core.MultipartyContracts, location *fftypes.JSONAny, termination *blockchain.Event) (err error)
// - Updates the namespace contract info to record the point of termination and the newly active contract
TerminateContract(ctx context.Context, location *fftypes.JSONAny, termination *blockchain.Event) (err error)

// GetNetworkVersion returns the network version of the active FireFly contract
GetNetworkVersion() int
Expand Down Expand Up @@ -82,22 +82,17 @@ type Contract struct {

type multipartyManager struct {
ctx context.Context
namespace core.NamespaceRef
namespace *core.Namespace
database database.Plugin
blockchain blockchain.Plugin
operations operations.Manager
metrics metrics.Manager
txHelper txcommon.Helper
config Config
activeContract struct {
location *fftypes.JSONAny
firstEvent string
networkVersion int
subscription string
}
networkVersion int
}

func NewMultipartyManager(ctx context.Context, ns core.NamespaceRef, config Config, di database.Plugin, bi blockchain.Plugin, om operations.Manager, mm metrics.Manager, th txcommon.Helper) (Manager, error) {
func NewMultipartyManager(ctx context.Context, ns *core.Namespace, config Config, di database.Plugin, bi blockchain.Plugin, om operations.Manager, mm metrics.Manager, th txcommon.Helper) (Manager, error) {
if di == nil || bi == nil || mm == nil || om == nil || th == nil {
return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "MultipartyManager")
}
Expand Down Expand Up @@ -125,7 +120,8 @@ func (mm *multipartyManager) RootOrg() RootOrg {
return mm.config.Org
}

func (mm *multipartyManager) ConfigureContract(ctx context.Context, contracts *core.MultipartyContracts) (err error) {
func (mm *multipartyManager) ConfigureContract(ctx context.Context) (err error) {
contracts := &mm.namespace.Contracts
log.L(ctx).Infof("Resolving FireFly contract at index %d", contracts.Active.Index)
location, firstEvent, err := mm.resolveFireFlyContract(ctx, contracts.Active.Index)
if err != nil {
Expand All @@ -137,27 +133,30 @@ func (mm *multipartyManager) ConfigureContract(ctx context.Context, contracts *c
return err
}

if !contracts.Active.Location.IsNil() && contracts.Active.Location.String() != location.String() {
log.L(ctx).Warnf("FireFly contract location changed from %s to %s", contracts.Active.Location, location)
}

subID, err := mm.blockchain.AddFireflySubscription(ctx, mm.namespace.RemoteName, location, firstEvent)
if err == nil {
mm.activeContract.location = location
mm.activeContract.firstEvent = firstEvent
mm.activeContract.networkVersion = version
mm.activeContract.subscription = subID
mm.networkVersion = version
contracts.Active = core.MultipartyContract{
Location: location,
FirstEvent: firstEvent,
Info: core.MultipartyContractInfo{
Subscription: subID,
},
}
err = mm.database.UpsertNamespace(ctx, mm.namespace, true)
}
return err
}

func (mm *multipartyManager) resolveFireFlyContract(ctx context.Context, contractIndex int) (location *fftypes.JSONAny, firstEvent string, err error) {
if len(mm.config.Contracts) > 0 {
if contractIndex >= len(mm.config.Contracts) {
return nil, "", i18n.NewError(ctx, coremsgs.MsgInvalidFireFlyContractIndex, fmt.Sprintf("%s.multiparty.contracts[%d]", mm.namespace, contractIndex))
return nil, "", i18n.NewError(ctx, coremsgs.MsgInvalidFireFlyContractIndex,
fmt.Sprintf("%s.multiparty.contracts[%d]", mm.namespace.LocalName, contractIndex))
}
active := mm.config.Contracts[contractIndex]
location = active.Location
Expand All @@ -173,8 +172,8 @@ func (mm *multipartyManager) resolveFireFlyContract(ctx context.Context, contrac
return location, firstEvent, err
}

func (mm *multipartyManager) TerminateContract(ctx context.Context, contracts *core.MultipartyContracts, location *fftypes.JSONAny, termination *blockchain.Event) (err error) {
// TODO: Investigate if it better to consolidate DB termination here
func (mm *multipartyManager) TerminateContract(ctx context.Context, location *fftypes.JSONAny, termination *blockchain.Event) (err error) {
contracts := &mm.namespace.Contracts
if contracts.Active.Location.String() != location.String() {
log.L(ctx).Warnf("Ignoring termination event from contract at '%s', which does not match active '%s'", location, contracts.Active.Location)
return nil
Expand All @@ -183,15 +182,15 @@ func (mm *multipartyManager) TerminateContract(ctx context.Context, contracts *c
contracts.Active.Info.FinalEvent = termination.ProtocolID
contracts.Terminated = append(contracts.Terminated, contracts.Active)
contracts.Active = core.MultipartyContract{Index: contracts.Active.Index + 1}
err = mm.blockchain.RemoveFireflySubscription(ctx, mm.activeContract.subscription)
err = mm.blockchain.RemoveFireflySubscription(ctx, contracts.Active.Info.Subscription)
if err != nil {
return err
}
return mm.ConfigureContract(ctx, contracts)
return mm.ConfigureContract(ctx)
}

func (mm *multipartyManager) GetNetworkVersion() int {
return mm.activeContract.networkVersion
return mm.networkVersion
}

func (mm *multipartyManager) SubmitNetworkAction(ctx context.Context, signingKey string, action *core.NetworkAction) error {
Expand Down
Loading

0 comments on commit a5ddc24

Please sign in to comment.