Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate event stream per namespace #1388

Merged
merged 27 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7ad5304
Use separate event stream per namespace
nguyer Aug 10, 2023
3f04c1e
Use separate event stream per namespace in tokens plugin
nguyer Aug 29, 2023
a37304f
Implement StopNamespace in fftokens. Fix unit tests.
nguyer Aug 30, 2023
596caed
Only init bc if not nil
nguyer Aug 30, 2023
66173b2
Recreate custom contract subscriptions after evenstream migration
nguyer Sep 1, 2023
e5f1ccc
Merge branch 'main' into eventstreams
nguyer Oct 13, 2023
9655d6f
Update Tezos plugin and fix Ethereum/Fabric EventStream migration
nguyer Oct 13, 2023
91a18fc
Add namespace to token connector API calls
nguyer Nov 1, 2023
94bfc75
Fix migration issues and increase test coverage
nguyer Nov 8, 2023
0559710
Test coverage back to 100%
nguyer Nov 9, 2023
cecee27
Re-activate token pool on startup
nguyer Nov 10, 2023
aa64620
Fix unit tests
nguyer Nov 14, 2023
e0b34a5
Address PR feedback
nguyer Nov 15, 2023
3fe2b08
Merge branch 'dependencies' into eventstreams
nguyer Jan 24, 2024
6b4fd88
Add AlternateLocators
nguyer Feb 1, 2024
bf4656b
Fixes for updating pool locators
nguyer Feb 2, 2024
a52d3bc
Fixes for updating pool locators
nguyer Feb 2, 2024
3ad8416
Update copyright year
nguyer Feb 13, 2024
65283ec
Fix unit test
nguyer Feb 13, 2024
813e29b
Add unit tests for updating pool locator
nguyer Feb 13, 2024
7972561
Merge branch 'main' into eventstreams
nguyer Feb 13, 2024
ba212a9
Address PR feedback
nguyer Feb 21, 2024
99c7bc1
Merge branch 'main' into eventstreams
nguyer Feb 21, 2024
ee0770d
PR feedback
nguyer Feb 26, 2024
b9b7a29
Update fftokens unit tests
nguyer Feb 26, 2024
716efcd
Merge branch 'main' into eventstreams
nguyer Mar 4, 2024
dd281b8
Use updated token connectors
nguyer Mar 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions db/migrations/postgres/000116_tx_type_not_null.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE messages ALTER COLUMN tx_parent_type DROP NOT NULL;
COMMIT;
5 changes: 5 additions & 0 deletions db/migrations/postgres/000116_tx_type_not_null.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;
UPDATE messages SET tx_parent_type = ''
WHERE tx_parent_type IS NULL;
ALTER TABLE messages ALTER COLUMN tx_parent_type SET NOT NULL;
COMMIT;
4 changes: 4 additions & 0 deletions db/migrations/sqlite/000116_tx_type_not_null.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE messages RENAME COLUMN tx_parent_type TO tx_parent_type_temp;
ALTER TABLE messages ADD COLUMN tx_parent_type VARCHAR(64);
UPDATE messages SET tx_parent_type = tx_parent_type_temp;
ALTER TABLE messages DROP COLUMN tx_parent_type_temp;
5 changes: 5 additions & 0 deletions db/migrations/sqlite/000116_tx_type_not_null.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
UPDATE messages SET tx_parent_type = '' WHERE tx_parent_type IS NULL;
ALTER TABLE messages RENAME COLUMN tx_parent_type TO tx_parent_type_temp;
ALTER TABLE messages ADD COLUMN tx_parent_type VARCHAR(64) DEFAULT '' NOT NULL;
UPDATE messages SET tx_parent_type = tx_parent_type_temp;
ALTER TABLE messages DROP COLUMN tx_parent_type_temp;
646 changes: 75 additions & 571 deletions go.work.sum

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Manager interface {
// From operations.OperationHandler
PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error)
RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error)

// Starts the namespace on each of the configured token plugins
Start(ctx context.Context) error
}

type assetManager struct {
Expand Down Expand Up @@ -170,6 +173,28 @@ func (am *assetManager) GetTokenConnectors(ctx context.Context) []*core.TokenCon
return connectors
}

func (am *assetManager) Start(ctx context.Context) error {
f := database.TokenPoolQueryFactory.NewFilter(ctx).And()
pools, _, err := am.database.GetTokenPools(ctx, am.namespace, f)
if err != nil {
return err
}

for _, plugin := range am.tokens {
activePools := []*core.TokenPool{}
for _, pool := range pools {
if pool.Connector == plugin.ConnectorName() && pool.Active {
activePools = append(activePools, pool)
}
}
err := plugin.StartNamespace(ctx, am.namespace, activePools)
if err != nil {
return err
}
}
return nil
}

func (am *assetManager) getDefaultTokenConnector(ctx context.Context) (string, error) {
tokenConnectors := am.GetTokenConnectors(ctx)
if len(tokenConnectors) != 1 {
Expand Down
84 changes: 84 additions & 0 deletions internal/assets/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package assets
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -160,3 +161,86 @@ func TestGetTokenConnectors(t *testing.T) {
assert.Equal(t, 1, len(connectors))
assert.Equal(t, "magic-tokens", connectors[0].Name)
}

func TestStart(t *testing.T) {
coreconfig.Reset()
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mim := &identitymanagermocks.Manager{}
msa := &syncasyncmocks.Bridge{}
mbm := &broadcastmocks.Manager{}
mpm := &privatemessagingmocks.Manager{}
mti := &tokenmocks.Plugin{}
mm := &metricsmocks.Manager{}
mom := &operationmocks.Manager{}
mcm := &contractmocks.Manager{}
cmi := &cachemocks.Manager{}
cmi.On("GetCache", mock.Anything).Return(nil, nil)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
mdi.On("GetTokenPools", mock.Anything, mock.Anything, mock.Anything).Return([]*core.TokenPool{
{
Connector: "hot_tokens",
Active: true,
},
}, nil, nil)
mti.On("StartNamespace", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mti.On("ConnectorName").Return("hot_tokens")
txHelper, _ := txcommon.NewTransactionHelper(context.Background(), "ns1", mdi, mdm, cmi)
am, err := NewAssetManager(context.Background(), "ns1", "blockchain_plugin", mdi, map[string]tokens.Plugin{"magic-tokens": mti}, mim, msa, mbm, mpm, mm, mom, mcm, txHelper, cmi)
assert.NoError(t, err)
err = am.Start(context.Background())
assert.NoError(t, err)
}

func TestStartDBError(t *testing.T) {
coreconfig.Reset()
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mim := &identitymanagermocks.Manager{}
msa := &syncasyncmocks.Bridge{}
mbm := &broadcastmocks.Manager{}
mpm := &privatemessagingmocks.Manager{}
mti := &tokenmocks.Plugin{}
mm := &metricsmocks.Manager{}
mom := &operationmocks.Manager{}
mcm := &contractmocks.Manager{}
cmi := &cachemocks.Manager{}
cmi.On("GetCache", mock.Anything).Return(nil, nil)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
mdi.On("GetTokenPools", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop"))
txHelper, _ := txcommon.NewTransactionHelper(context.Background(), "ns1", mdi, mdm, cmi)
am, err := NewAssetManager(context.Background(), "ns1", "blockchain_plugin", mdi, map[string]tokens.Plugin{"magic-tokens": mti}, mim, msa, mbm, mpm, mm, mom, mcm, txHelper, cmi)
assert.NoError(t, err)
err = am.Start(context.Background())
assert.Regexp(t, "pop", err)
}

func TestStartError(t *testing.T) {
coreconfig.Reset()
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mim := &identitymanagermocks.Manager{}
msa := &syncasyncmocks.Bridge{}
mbm := &broadcastmocks.Manager{}
mpm := &privatemessagingmocks.Manager{}
mti := &tokenmocks.Plugin{}
mm := &metricsmocks.Manager{}
mom := &operationmocks.Manager{}
mcm := &contractmocks.Manager{}
cmi := &cachemocks.Manager{}
cmi.On("GetCache", mock.Anything).Return(nil, nil)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
mdi.On("GetTokenPools", mock.Anything, mock.Anything, mock.Anything).Return([]*core.TokenPool{
{
Connector: "hot_tokens",
Active: true,
},
}, nil, nil)
mti.On("StartNamespace", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
mti.On("ConnectorName").Return("hot_tokens")
txHelper, _ := txcommon.NewTransactionHelper(context.Background(), "ns1", mdi, mdm, cmi)
am, err := NewAssetManager(context.Background(), "ns1", "blockchain_plugin", mdi, map[string]tokens.Plugin{"magic-tokens": mti}, mim, msa, mbm, mpm, mm, mom, mcm, txHelper, cmi)
assert.NoError(t, err)
err = am.Start(context.Background())
assert.Regexp(t, "pop", err)
}
3 changes: 3 additions & 0 deletions internal/assets/token_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func (am *assetManager) NewApproval(approval *core.TokenApprovalInput) syncasync
approval: approval,
idempotentSubmit: approval.IdempotencyKey != "",
}
if approval.Namespace == "" {
approval.Namespace = am.namespace
}
sender.setDefaults()
return sender
}
Expand Down
9 changes: 9 additions & 0 deletions internal/assets/token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ func (am *assetManager) validateTransfer(ctx context.Context, transfer *core.Tok

func (am *assetManager) MintTokens(ctx context.Context, transfer *core.TokenTransferInput, waitConfirm bool) (out *core.TokenTransfer, err error) {
transfer.Type = core.TokenTransferTypeMint
if transfer.Namespace == "" {
transfer.Namespace = am.namespace
}

sender := am.NewTransfer(transfer)
if am.metrics.IsMetricsEnabled() {
Expand All @@ -135,6 +138,9 @@ func (am *assetManager) MintTokens(ctx context.Context, transfer *core.TokenTran

func (am *assetManager) BurnTokens(ctx context.Context, transfer *core.TokenTransferInput, waitConfirm bool) (out *core.TokenTransfer, err error) {
transfer.Type = core.TokenTransferTypeBurn
if transfer.Namespace == "" {
transfer.Namespace = am.namespace
}

sender := am.NewTransfer(transfer)
if am.metrics.IsMetricsEnabled() {
Expand All @@ -150,6 +156,9 @@ func (am *assetManager) BurnTokens(ctx context.Context, transfer *core.TokenTran

func (am *assetManager) TransferTokens(ctx context.Context, transfer *core.TokenTransferInput, waitConfirm bool) (out *core.TokenTransfer, err error) {
transfer.Type = core.TokenTransferTypeTransfer
if transfer.Namespace == "" {
transfer.Namespace = am.namespace
}

sender := am.NewTransfer(transfer)
if am.metrics.IsMetricsEnabled() {
Expand Down
Loading
Loading