Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple copies of things on different namespaces #915

Merged
merged 9 commits into from
Aug 1, 2022
1 change: 0 additions & 1 deletion cmd/firefly.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func run() error {
// Setup logging after reading config (even if failed), to output header correctly
ctx, cancelCtx := context.WithCancel(context.Background())
ctx = log.WithLogger(ctx, logrus.WithField("pid", fmt.Sprintf("%d", os.Getpid())))
ctx = log.WithLogger(ctx, logrus.WithField("prefix", config.GetString(coreconfig.NodeName)))

config.SetupLogging(ctx)
log.L(ctx).Infof("Hyperledger FireFly")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
BEGIN;
DROP INDEX identities_id;
CREATE UNIQUE INDEX identities_id ON identities(id);

DROP INDEX verifiers_hash;
CREATE UNIQUE INDEX verifiers_hash on verifiers(hash);

DROP INDEX verifiers_identity;
CREATE UNIQUE INDEX verifiers_identity on verifiers(identity);

DROP INDEX tokenpool_locator;
CREATE UNIQUE INDEX tokenpool_locator ON tokenpool(connector, locator);

DROP INDEX ffi_id;
CREATE UNIQUE INDEX ffi_id ON ffi(id);

DROP INDEX messages_id;
CREATE UNIQUE INDEX messages_id ON messages(id);

DROP INDEX data_id;
CREATE UNIQUE INDEX data_id ON data(id);

ALTER TABLE messages_data RENAME TO messages_data_old;
CREATE TABLE messages_data (
seq SERIAL PRIMARY KEY,
message_id UUID NOT NULL,
data_id UUID NOT NULL,
data_hash CHAR(64) NOT NULL,
data_idx INT NOT NULL
);
INSERT INTO messages_data(message_id, data_id, data_hash, data_idx)
SELECT message_id, data_id, data_hash, data_idx FROM messages_data_old;
DROP TABLE messages_data_old;

CREATE INDEX messages_data_message ON messages_data(message_id);
CREATE INDEX messages_data_data ON messages_data(data_id);
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
BEGIN;
DROP INDEX identities_id;
CREATE UNIQUE INDEX identities_id ON identities(namespace, id);

DROP INDEX verifiers_hash;
CREATE UNIQUE INDEX verifiers_hash on verifiers(namespace, hash);

DROP INDEX verifiers_identity;
CREATE UNIQUE INDEX verifiers_identity on verifiers(namespace, identity);

DROP INDEX tokenpool_locator;
CREATE UNIQUE INDEX tokenpool_locator ON tokenpool(namespace, connector, locator);

DROP INDEX ffi_id;
CREATE UNIQUE INDEX ffi_id ON ffi(namespace, id);

DROP INDEX messages_id;
CREATE UNIQUE INDEX messages_id ON messages(namespace_local, id);

DROP INDEX data_id;
CREATE UNIQUE INDEX data_id ON data(namespace, id);

ALTER TABLE messages_data RENAME TO messages_data_old;
CREATE TABLE messages_data (
seq SERIAL PRIMARY KEY,
message_id UUID NOT NULL,
data_id UUID NOT NULL,
data_hash CHAR(64) NOT NULL,
data_idx INT NOT NULL,
namespace VARCHAR(64)
);
INSERT INTO messages_data(message_id, data_id, data_hash, data_idx)
SELECT message_id, data_id, data_hash, data_idx FROM messages_data_old;
DROP TABLE messages_data_old;
UPDATE messages_data SET namespace = msg.namespace
FROM (SELECT namespace, id FROM messages) AS msg
WHERE messages_data.message_id = msg.id;

CREATE INDEX messages_data_message ON messages_data(namespace, message_id);
CREATE INDEX messages_data_data ON messages_data(namespace, data_id);
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
DROP INDEX identities_id;
CREATE UNIQUE INDEX identities_id ON identities(id);

DROP INDEX verifiers_hash;
CREATE UNIQUE INDEX verifiers_hash on verifiers(hash);

DROP INDEX verifiers_identity;
CREATE UNIQUE INDEX verifiers_identity on verifiers(identity);

DROP INDEX tokenpool_locator;
CREATE UNIQUE INDEX tokenpool_locator ON tokenpool(connector, locator);

DROP INDEX ffi_id;
CREATE UNIQUE INDEX ffi_id ON ffi(id);

DROP INDEX messages_id;
CREATE UNIQUE INDEX messages_id ON messages(id);

DROP INDEX data_id;
CREATE UNIQUE INDEX data_id ON data(id);

ALTER TABLE messages_data RENAME TO messages_data_old;
CREATE TABLE messages_data (
seq SERIAL PRIMARY KEY,
message_id UUID NOT NULL,
data_id UUID NOT NULL,
data_hash CHAR(64) NOT NULL,
data_idx INT NOT NULL
);
INSERT INTO messages_data(message_id, data_id, data_hash, data_idx)
SELECT message_id, data_id, data_hash, data_idx FROM messages_data_old;
DROP TABLE messages_data_old;

CREATE INDEX messages_data_message ON messages_data(message_id);
CREATE INDEX messages_data_data ON messages_data(data_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
DROP INDEX identities_id;
CREATE UNIQUE INDEX identities_id ON identities(namespace, id);

DROP INDEX verifiers_hash;
CREATE UNIQUE INDEX verifiers_hash on verifiers(namespace, hash);

DROP INDEX verifiers_identity;
CREATE UNIQUE INDEX verifiers_identity on verifiers(namespace, identity);

DROP INDEX tokenpool_locator;
CREATE UNIQUE INDEX tokenpool_locator ON tokenpool(namespace, connector, locator);

DROP INDEX ffi_id;
CREATE UNIQUE INDEX ffi_id ON ffi(namespace, id);

DROP INDEX messages_id;
CREATE UNIQUE INDEX messages_id ON messages(namespace_local, id);

DROP INDEX data_id;
CREATE UNIQUE INDEX data_id ON data(namespace, id);

ALTER TABLE messages_data RENAME TO messages_data_old;
CREATE TABLE messages_data (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
message_id UUID NOT NULL,
data_id UUID NOT NULL,
data_hash CHAR(64) NOT NULL,
data_idx INT NOT NULL,
namespace VARCHAR(64)
);
INSERT INTO messages_data(message_id, data_id, data_hash, data_idx)
SELECT message_id, data_id, data_hash, data_idx FROM messages_data_old;
DROP TABLE messages_data_old;
UPDATE messages_data SET namespace = msg.namespace
FROM (SELECT namespace, id FROM messages) AS msg
WHERE messages_data.message_id = msg.id;

CREATE INDEX messages_data_message ON messages_data(namespace, message_id);
CREATE INDEX messages_data_data ON messages_data(namespace, data_id);
7 changes: 7 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,13 @@ nav_order: 2
|firstEvent|The first event the contract should process. Valid options are `oldest` or `newest`|`string`|`<nil>`
|location|A blockchain-specific contract location. For example, an Ethereum contract address, or a Fabric chaincode name and channe|`string`|`<nil>`

## namespaces.predefined[].multiparty.node

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|description|A description for the node in this namespace|`string`|`<nil>`
|name|The node name for this namespace|`string`|`<nil>`

## namespaces.predefined[].multiparty.org

|Key|Description|Type|Default Value|
Expand Down
3 changes: 1 addition & 2 deletions internal/assets/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/operations"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/core"
)
Expand Down Expand Up @@ -146,7 +145,7 @@ func (am *assetManager) RunOperation(ctx context.Context, op *core.PreparedOpera
}
}

func (am *assetManager) OnOperationUpdate(ctx context.Context, op *core.Operation, update *operations.OperationUpdate) error {
func (am *assetManager) OnOperationUpdate(ctx context.Context, op *core.Operation, update *core.OperationUpdate) error {
// Write an event for failed pool operations
if op.Type == core.OpTypeTokenCreatePool && update.Status == core.OpStatusFailed {
tokenPool, err := txcommon.RetrieveTokenPoolCreateInputs(ctx, op)
Expand Down
19 changes: 9 additions & 10 deletions internal/assets/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly/internal/operations"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/tokenmocks"
Expand Down Expand Up @@ -551,7 +550,7 @@ func TestOperationUpdatePool(t *testing.T) {
err := txcommon.AddTokenPoolCreateInputs(op, pool)
assert.NoError(t, err)

update := &operations.OperationUpdate{
update := &core.OperationUpdate{
Status: core.OpStatusFailed,
}

Expand All @@ -575,7 +574,7 @@ func TestOperationUpdatePoolBadInput(t *testing.T) {
ID: fftypes.NewUUID(),
Type: core.OpTypeTokenCreatePool,
}
update := &operations.OperationUpdate{
update := &core.OperationUpdate{
Status: core.OpStatusFailed,
}

Expand All @@ -599,7 +598,7 @@ func TestOperationUpdatePoolEventFail(t *testing.T) {
ID: fftypes.NewUUID(),
Type: core.OpTypeTokenCreatePool,
}
update := &operations.OperationUpdate{
update := &core.OperationUpdate{
Status: core.OpStatusFailed,
}

Expand Down Expand Up @@ -629,7 +628,7 @@ func TestOperationUpdateTransfer(t *testing.T) {
err := txcommon.AddTokenTransferInputs(op, transfer)
assert.NoError(t, err)

update := &operations.OperationUpdate{
update := &core.OperationUpdate{
Status: core.OpStatusFailed,
}

Expand All @@ -653,7 +652,7 @@ func TestOperationUpdateTransferBadInput(t *testing.T) {
ID: fftypes.NewUUID(),
Type: core.OpTypeTokenTransfer,
}
update := &operations.OperationUpdate{
update := &core.OperationUpdate{
Status: core.OpStatusFailed,
}

Expand All @@ -677,7 +676,7 @@ func TestOperationUpdateTransferEventFail(t *testing.T) {
ID: fftypes.NewUUID(),
Type: core.OpTypeTokenTransfer,
}
update := &operations.OperationUpdate{
update := &core.OperationUpdate{
Status: core.OpStatusFailed,
}

Expand Down Expand Up @@ -706,7 +705,7 @@ func TestOperationUpdateApproval(t *testing.T) {
err := txcommon.AddTokenApprovalInputs(op, approval)
assert.NoError(t, err)

update := &operations.OperationUpdate{
update := &core.OperationUpdate{
Status: core.OpStatusFailed,
}

Expand All @@ -730,7 +729,7 @@ func TestOperationUpdateApprovalBadInput(t *testing.T) {
ID: fftypes.NewUUID(),
Type: core.OpTypeTokenApproval,
}
update := &operations.OperationUpdate{
update := &core.OperationUpdate{
Status: core.OpStatusFailed,
}

Expand All @@ -754,7 +753,7 @@ func TestOperationUpdateApprovalEventFail(t *testing.T) {
ID: fftypes.NewUUID(),
Type: core.OpTypeTokenApproval,
}
update := &operations.OperationUpdate{
update := &core.OperationUpdate{
Status: core.OpStatusFailed,
}

Expand Down
8 changes: 7 additions & 1 deletion internal/blockchain/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ func (cb *callbacks) SetOperationalHandler(namespace string, handler core.Operat
func (cb *callbacks) OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID)
if handler, ok := cb.opHandlers[namespace]; ok {
handler.OperationUpdate(plugin, nsOpID, status, blockchainTXID, errorMessage, opOutput)
handler.OperationUpdate(plugin, &core.OperationUpdate{
NamespacedOpID: nsOpID,
Status: status,
BlockchainTXID: blockchainTXID,
ErrorMessage: errorMessage,
Output: opOutput,
})
return
}
log.L(ctx).Errorf("No handler found for blockchain operation '%s'", nsOpID)
Expand Down
7 changes: 6 additions & 1 deletion internal/blockchain/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ func TestCallbackOperationUpdate(t *testing.T) {
cb := NewBlockchainCallbacks()
cb.SetOperationalHandler("ns1", mcb)

mcb.On("OperationUpdate", mbi, nsOpID, core.OpStatusSucceeded, "tx1", "err", mock.Anything).Return().Once()
mcb.On("OperationUpdate", mbi, mock.MatchedBy(func(update *core.OperationUpdate) bool {
return update.NamespacedOpID == nsOpID &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "tx1" &&
update.ErrorMessage == "err"
})).Return().Once()
cb.OperationUpdate(context.Background(), mbi, nsOpID, core.OpStatusSucceeded, "tx1", "err", fftypes.JSONObject{})

nsOpID = "ns2:" + fftypes.NewUUID().String()
Expand Down
24 changes: 10 additions & 14 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,13 +1339,11 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
"transactionIndex": "0"
}`)

em.On("OperationUpdate",
e,
"ns1:"+operationID.String(),
core.OpStatusSucceeded,
"0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8",
"",
mock.Anything).Return(nil)
em.On("OperationUpdate", e, mock.MatchedBy(func(update *core.OperationUpdate) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8"
})).Return(nil)

err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)
Expand Down Expand Up @@ -1381,13 +1379,11 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {

em := &coremocks.OperationCallbacks{}
e.SetOperationHandler("ns1", em)
txsu := em.On("OperationUpdate",
e,
"ns1:"+operationID.String(),
core.OpStatusFailed,
"",
"Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument",
mock.Anything).Return(fmt.Errorf("Shutdown"))
txsu := em.On("OperationUpdate", e, mock.MatchedBy(func(update *core.OperationUpdate) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusFailed &&
update.ErrorMessage == "Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument"
})).Return(fmt.Errorf("Shutdown"))
done := make(chan struct{})
txsu.RunFn = func(a mock.Arguments) {
close(done)
Expand Down
24 changes: 10 additions & 14 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,13 +1256,11 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
"receivedAt": 1630033474675
}`)

em.On("OperationUpdate",
e,
"ns1:"+operationID.String(),
core.OpStatusSucceeded,
"ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2",
"",
mock.Anything).Return(nil)
em.On("OperationUpdate", e, mock.MatchedBy(func(update *core.OperationUpdate) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2"
})).Return(nil)

err := json.Unmarshal(data, &reply)
assert.NoError(t, err)
Expand Down Expand Up @@ -1318,13 +1316,11 @@ func TestHandleReceiptFailedTx(t *testing.T) {
"transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2"
}`)

em.On("OperationUpdate",
e,
"ns1:"+operationID.String(),
core.OpStatusFailed,
"ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2",
"",
mock.Anything).Return(nil)
em.On("OperationUpdate", e, mock.MatchedBy(func(update *core.OperationUpdate) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusFailed &&
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2"
})).Return(nil)

err := json.Unmarshal(data, &reply)
assert.NoError(t, err)
Expand Down
Loading