Skip to content

Commit

Permalink
Additional cleanup around ff_system
Browse files Browse the repository at this point in the history
Do not attempt to stop the ff_system orchestrator - it's actually possible that
there may be unaggregated pins prior to the "terminate" event. Therefore, we
must continue listening indefinitely even if the handler is likely to be idle.

Also added some TODOs for next steps.

Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Jul 20, 2022
1 parent ee981ad commit c9b1f45
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 103 deletions.
10 changes: 1 addition & 9 deletions internal/events/network_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package events

import (
"context"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/pkg/blockchain"
Expand All @@ -33,13 +31,7 @@ func (em *eventManager) actionTerminate(location *fftypes.JSONAny, event *blockc
if err := em.multiparty.TerminateContract(em.ctx, &namespace.Contracts, location, event); err != nil {
return err
}
// Currently, a termination event is implied to apply to ALL namespaces
return em.database.RunAsGroup(em.ctx, func(ctx context.Context) error {
if err := em.database.UpsertNamespace(em.ctx, namespace, true); err != nil {
return err
}
return nil
})
return em.database.UpsertNamespace(em.ctx, namespace, true)
}

func (em *eventManager) BlockchainNetworkAction(action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error {
Expand Down
12 changes: 1 addition & 11 deletions internal/multiparty/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ type Contract struct {
}

type multipartyManager struct {
ctx context.Context
stop func()
namespace core.NamespaceRef
database database.Plugin
blockchain blockchain.Plugin
Expand All @@ -98,13 +96,11 @@ type multipartyManager struct {
}
}

func NewMultipartyManager(ctx context.Context, stop func(), ns core.NamespaceRef, config Config, di database.Plugin, bi blockchain.Plugin, om operations.Manager, mm metrics.Manager, th txcommon.Helper) (Manager, error) {
func NewMultipartyManager(ctx context.Context, ns core.NamespaceRef, config Config, di database.Plugin, bi blockchain.Plugin, om operations.Manager, mm metrics.Manager, th txcommon.Helper) (Manager, error) {
if di == nil || bi == nil || mm == nil || om == nil || th == nil {
return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "MultipartyManager")
}
mp := &multipartyManager{
ctx: ctx,
stop: stop,
namespace: ns,
config: config,
database: di,
Expand Down Expand Up @@ -140,12 +136,6 @@ func (mm *multipartyManager) ConfigureContract(ctx context.Context, contracts *c
return err
}

if mm.namespace.LocalName == core.LegacySystemNamespace && version > 1 {
// ff_system namespace should stop its orchestrator for network V2+
mm.stop()
return nil
}

subID, err := mm.blockchain.AddFireflySubscription(ctx, mm.namespace.LocalName, location, firstEvent)
if err == nil {
mm.activeContract.location = location
Expand Down
34 changes: 2 additions & 32 deletions internal/multiparty/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func newTestMultipartyManager() *testMultipartyManager {
mmi: &metricsmocks.Manager{},
mth: &txcommonmocks.Helper{},
multipartyManager: multipartyManager{
ctx: context.Background(),
namespace: core.NamespaceRef{LocalName: "ns1", RemoteName: "ns1"},
},
}
Expand All @@ -82,7 +81,7 @@ func TestNewMultipartyManager(t *testing.T) {
core.OpTypeBlockchainNetworkAction,
}).Return()
ns := core.NamespaceRef{LocalName: "ns1", RemoteName: "ns1"}
nm, err := NewMultipartyManager(context.Background(), func() {}, ns, config, mdi, mbi, mom, mmi, mth)
nm, err := NewMultipartyManager(context.Background(), ns, config, mdi, mbi, mom, mmi, mth)
assert.NotNil(t, nm)
assert.NoError(t, err)
assert.Equal(t, "MultipartyManager", nm.Name())
Expand All @@ -91,7 +90,7 @@ func TestNewMultipartyManager(t *testing.T) {

func TestInitFail(t *testing.T) {
config := Config{Contracts: []Contract{}}
_, err := NewMultipartyManager(context.Background(), func() {}, core.NamespaceRef{}, config, nil, nil, nil, nil, nil)
_, err := NewMultipartyManager(context.Background(), core.NamespaceRef{}, config, nil, nil, nil, nil, nil)
assert.Regexp(t, "FF10128", err)
}

Expand Down Expand Up @@ -121,35 +120,6 @@ func TestConfigureContract(t *testing.T) {
assert.NoError(t, err)
}

func TestConfigureContractTerminateSystem(t *testing.T) {
contracts := make([]Contract, 1)
location := fftypes.JSONAnyPtr(fftypes.JSONObject{
"address": "0x123",
}.String())
contract := Contract{
FirstEvent: "0",
Location: location,
}

contracts[0] = contract
mp := newTestMultipartyManager()
defer mp.cleanup(t)
stopped := false
mp.stop = func() { stopped = true }
mp.namespace.LocalName = core.LegacySystemNamespace

mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(2, nil)
mp.multipartyManager.config.Contracts = contracts

cf := &core.FireFlyContracts{
Active: core.FireFlyContractInfo{Index: 0},
}

err := mp.ConfigureContract(context.Background(), cf)
assert.NoError(t, err)
assert.True(t, stopped)
}

func TestResolveContractDeprecatedConfig(t *testing.T) {
mp := newTestMultipartyManager()
defer mp.cleanup(t)
Expand Down
74 changes: 50 additions & 24 deletions internal/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type namespace struct {
description string
orchestrator orchestrator.Orchestrator
config orchestrator.Config
plugins orchestrator.Plugins
plugins []string
}

type namespaceManager struct {
Expand Down Expand Up @@ -156,6 +156,18 @@ type authPlugin struct {
plugin auth.Plugin
}

func stringSlicesEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

func NewNamespaceManager(withDefaults bool) Manager {
nm := &namespaceManager{
namespaces: make(map[string]*namespace),
Expand Down Expand Up @@ -201,10 +213,7 @@ func (nm *namespaceManager) Init(ctx context.Context, cancelCtx context.CancelFu
metrics.Registry()
}

defaultName := config.GetString(coreconfig.NamespacesDefault)
var v1Namespace *namespace

// Start an orchestrator per namespace
for _, ns := range nm.namespaces {
if err := nm.initNamespace(ns); err != nil {
return err
Expand All @@ -216,16 +225,29 @@ func (nm *namespaceManager) Init(ctx context.Context, cancelCtx context.CancelFu
}
log.L(ctx).Infof("Initialized namespace '%s' multiparty=%s version=%s", ns.name, strconv.FormatBool(multiparty), version)
if multiparty && ns.orchestrator.MultiParty().GetNetworkVersion() == 1 {
if v1Namespace == nil || ns.name == defaultName {
// TODO:
// We should check the contract address too, AND should check previously-terminated contracts
// in addition to the active contract. That implies:
// - we must cache the address and version of each contract on the namespace in the database
// - when the orchestrator loads that info from the database (triggered from Orchestrator.Init),
// it needs to somehow be available here
//
// In short, if any namespace was EVER pointed at a V1 contract, that contract and that namespace's plugins
// become the de facto configuration for ff_system as well. There can only be one V1 contract in the history
// of a given FireFly node, because it's impossible to re-create ff_system against a different contract
// or different set of plugins.
if v1Namespace == nil {
v1Namespace = ns
} else if !stringSlicesEqual(v1Namespace.plugins, ns.plugins) {
// TODO: localize error
return fmt.Errorf("could not initialize legacy '%s' namespace - found conflicting V1 multi-party config in %s and %s",
core.LegacySystemNamespace, v1Namespace.name, ns.name)
}
}
}

// If any namespace is a multiparty V1 namespace, insert the legacy ff_system namespace.
// The plugin and contract config for ff_system will be copied from:
// - the default namespace (if it is a multiparty V1 namespace) OR
// - the first multiparty V1 namespace found in the config
// Note that the contract address and plugin list must match for ALL V1 namespaces.
if v1Namespace != nil {
systemNS := *v1Namespace
systemNS.name = core.LegacySystemNamespace
Expand All @@ -237,11 +259,20 @@ func (nm *namespaceManager) Init(ctx context.Context, cancelCtx context.CancelFu
return err
}

func (nm *namespaceManager) initNamespace(ns *namespace) error {
func (nm *namespaceManager) initNamespace(ns *namespace) (err error) {
or := nm.utOrchestrator
if or == nil {
names := core.NamespaceRef{LocalName: ns.name, RemoteName: ns.remoteName}
or = orchestrator.NewOrchestrator(names, ns.config, ns.plugins, nm.metrics)
var plugins *orchestrator.Plugins
if ns.config.Multiparty.Enabled {
plugins, err = nm.validateMultiPartyConfig(nm.ctx, ns.name, ns.plugins)
} else {
plugins, err = nm.validateNonMultipartyConfig(nm.ctx, ns.name, ns.plugins)
}
if err != nil {
return err
}
or = orchestrator.NewOrchestrator(names, ns.config, plugins, nm.metrics)
}
orCtx, orCancel := context.WithCancel(nm.ctx)
if err := or.Init(orCtx, orCancel); err != nil {
Expand Down Expand Up @@ -784,8 +815,6 @@ func (nm *namespaceManager) loadNamespace(ctx context.Context, name string, inde
DefaultKey: conf.GetString(coreconfig.NamespaceDefaultKey),
TokenRemoteNames: nm.tokenRemoteNames,
}
var p *orchestrator.Plugins
var err error
if multipartyEnabled.(bool) {
contractsConf := multipartyConf.SubArray(coreconfig.NamespaceMultipartyContract)
contractConfArraySize := contractsConf.ArraySize()
Expand All @@ -811,25 +840,14 @@ func (nm *namespaceManager) loadNamespace(ctx context.Context, name string, inde
config.Multiparty.Org.Key = orgKey
config.Multiparty.Org.Description = orgDesc
config.Multiparty.Contracts = contracts
p, err = nm.validateMultiPartyConfig(ctx, name, plugins)
} else {
p, err = nm.validateNonMultipartyConfig(ctx, name, plugins)
}
if err != nil {
return nil, err
}

p.Events = make(map[string]events.Plugin, len(nm.plugins.events))
for name, entry := range nm.plugins.events {
p.Events[name] = entry.plugin
}

return &namespace{
name: name,
remoteName: remoteName,
description: conf.GetString(coreconfig.NamespaceDescription),
config: config,
plugins: *p,
plugins: plugins,
}, nil
}

Expand Down Expand Up @@ -908,6 +926,10 @@ func (nm *namespaceManager) validateMultiPartyConfig(ctx context.Context, name s
return nil, i18n.NewError(ctx, coremsgs.MsgNamespaceWrongPluginsMultiparty, name)
}

result.Events = make(map[string]events.Plugin, len(nm.plugins.events))
for name, entry := range nm.plugins.events {
result.Events[name] = entry.plugin
}
return &result, nil
}

Expand Down Expand Up @@ -962,6 +984,10 @@ func (nm *namespaceManager) validateNonMultipartyConfig(ctx context.Context, nam
return nil, i18n.NewError(ctx, coremsgs.MsgNamespaceNoDatabase, name)
}

result.Events = make(map[string]events.Plugin, len(nm.plugins.events))
for name, entry := range nm.plugins.events {
result.Events[name] = entry.plugin
}
return &result, nil
}

Expand Down
1 change: 1 addition & 0 deletions internal/namespace/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func TestNewNamespaceManager(t *testing.T) {
func TestInit(t *testing.T) {
nm := newTestNamespaceManager(true)
defer nm.cleanup(t)
nm.metricsEnabled = true

mo := &orchestratormocks.Orchestrator{}
mmp := &multipartymocks.Manager{}
Expand Down
6 changes: 3 additions & 3 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ type orchestrator struct {
started bool
namespace core.NamespaceRef
config Config
plugins Plugins
plugins *Plugins
multiparty multiparty.Manager // only for multiparty
batch batch.Manager // only for multiparty
broadcast broadcast.Manager // only for multiparty
Expand All @@ -202,7 +202,7 @@ type orchestrator struct {
txHelper txcommon.Helper
}

func NewOrchestrator(ns core.NamespaceRef, config Config, plugins Plugins, metrics metrics.Manager) Orchestrator {
func NewOrchestrator(ns core.NamespaceRef, config Config, plugins *Plugins, metrics metrics.Manager) Orchestrator {
or := &orchestrator{
namespace: ns,
config: config,
Expand Down Expand Up @@ -399,7 +399,7 @@ func (or *orchestrator) initManagers(ctx context.Context) (err error) {

if or.config.Multiparty.Enabled {
if or.multiparty == nil {
or.multiparty, err = multiparty.NewMultipartyManager(or.ctx, or.cancelCtx, or.namespace, or.config.Multiparty, or.database(), or.blockchain(), or.operations, or.metrics, or.txHelper)
or.multiparty, err = multiparty.NewMultipartyManager(or.ctx, or.namespace, or.config.Multiparty, or.database(), or.blockchain(), or.operations, or.metrics, or.txHelper)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/core/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type FireFlyContractInfo struct {
Location *fftypes.JSONAny `ffstruct:"FireFlyContractInfo" json:"location,omitempty"`
FirstEvent string `ffstruct:"FireFlyContractInfo" json:"firstEvent,omitempty"`
Subscription string `ffstruct:"FireFlyContractInfo" json:"subscription,omitempty"`
// TODO: add Version
}

type NamespaceRef struct {
Expand Down
17 changes: 15 additions & 2 deletions test/e2e/client/restclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,14 @@ func (client *FireFlyClient) GetBlob(t *testing.T, data *core.Data, expectedStat
return blob
}

func (client *FireFlyClient) GetOrgs(t *testing.T, expectedStatus int) (orgs []*core.Identity) {
func (client *FireFlyClient) GetOrgs(t *testing.T) (orgs []*core.Identity) {
path := client.namespaced(urlGetOrganizations)
resp, err := client.Client.R().
SetQueryParam("sort", "created").
SetResult(&orgs).
Get(path)
require.NoError(t, err)
require.Equal(t, expectedStatus, resp.StatusCode(), "GET %s [%d]: %s", path, resp.StatusCode(), resp.String())
require.Equal(t, 200, resp.StatusCode(), "GET %s [%d]: %s", path, resp.StatusCode(), resp.String())
return orgs
}

Expand Down Expand Up @@ -345,6 +345,19 @@ func (client *FireFlyClient) GetIdentity(t *testing.T, id *fftypes.UUID) *core.I
return &identity
}

func (client *FireFlyClient) GetOrganization(t *testing.T, idOrName string) *core.Identity {
var identity core.Identity
res, err := client.Client.R().
SetResult(&identity).
Get(client.namespaced(fmt.Sprintf("%s/%s", urlGetOrganizations, idOrName)))
assert.NoError(t, err)
if res.StatusCode() == 404 {
return nil
}
assert.True(t, res.IsSuccess())
return &identity
}

func (client *FireFlyClient) GetVerifiers(t *testing.T) []*core.Verifier {
var verifiers []*core.Verifier
res, err := client.Client.R().
Expand Down
Loading

0 comments on commit c9b1f45

Please sign in to comment.