Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track blockchain callback handlers per namespace #883

Merged
merged 6 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 96 additions & 48 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,56 @@ type Ethereum struct {
addressResolver *addressResolver
metrics metrics.Manager
ethconnectConf config.Section
subs map[string]string
subs map[string]subscriptionInfo
}

type subscriptionInfo struct {
namespace string
version int
}

type callbacks struct {
handlers []blockchain.Callbacks
handlers map[string]blockchain.Callbacks
}

func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
for _, cb := range cb.handlers {
cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput)
func (cb *callbacks) BlockchainOpUpdate(ctx context.Context, plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID)
if handler, ok := cb.handlers[namespace]; ok {
handler.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput)
return
}
log.L(ctx).Errorf("No handler found for blockchain operation '%s'", nsOpID)
}

func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error {
for _, cb := range cb.handlers {
if err := cb.BatchPinComplete(batch, signingKey); err != nil {
return err
}
func (cb *callbacks) BatchPinComplete(ctx context.Context, batch *blockchain.BatchPin, signingKey *core.VerifierRef) error {
if handler, ok := cb.handlers[batch.Namespace]; ok {
return handler.BatchPinComplete(batch, signingKey)
}
log.L(ctx).Errorf("No handler found for blockchain batch pin on namespace '%s'", batch.Namespace)
return nil
}

func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error {
for _, cb := range cb.handlers {
if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil {
return err
func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error {
if namespace == "" {
// V1 networks don't populate namespace, so deliver the event to every handler
for _, handler := range cb.handlers {
if err := handler.BlockchainNetworkAction(action, location, event, signingKey); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there should be a check for delivery of this to a V2 handler, and immediate rejection (worried about V1 pre-migrated contracts leaking actions to an unrelated V2 namespace that's new/migrated and happens to share a remote-name like default). Will look out for this later in the review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, found it, in the caller of this function:

// For V1 of the FireFly contract, action is sent to all namespaces
// For V2+, namespace is inferred from the subscription
var namespace string
if subInfo.version > 1 {
namespace = subInfo.namespace
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but you make a good point that if we can't infer the namespace from the subscription name, then V2 handlers should reject it outright. I think there should be a sanity check for that when we initialize the subscription info, to ensure that whenever we get to the point you noted above, subInfo.namespace is guaranteed to be valid for V2+ contracts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I pushed another commit to include this sanity check during init.

return err
}
}
} else {
if handler, ok := cb.handlers[namespace]; ok {
return handler.BlockchainNetworkAction(action, location, event, signingKey)
}
log.L(ctx).Errorf("No handler found for blockchain network action on namespace '%s'", namespace)
}
return nil
}

func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error {
for _, cb := range cb.handlers {
// Send the event to all handlers and let them match it to a contract listener
// TODO: can we push more listener/namespace knowledge down to this layer?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the same namespace-encoded-in-sub-name trick should work here, right? (with a migration consideration I guess)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not suggesting the TODO needs to be addressed in this PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, thanks for the prod - I need to add that follow-up to the board. Currently our listeners for custom events have no significance to their subscription names (it's just ff-sub-<listener UUID>, but it really doesn't matter at all). We should introduce a format that can be parsed (with a fallback to "send it to everyone" for pre-existing listeners).

if err := cb.BlockchainEvent(event); err != nil {
return err
}
Expand Down Expand Up @@ -190,6 +207,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr
e.ctx = log.WithLogField(ctx, "proto", "ethereum")
e.metrics = metrics
e.capabilities = &blockchain.Capabilities{}
e.callbacks.handlers = make(map[string]blockchain.Callbacks)

if addressResolverConf.GetString(AddressResolverURLTemplate) != "" {
if e.addressResolver, err = newAddressResolver(ctx, addressResolverConf); err != nil {
Expand Down Expand Up @@ -230,7 +248,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr
return err
}
e.streamID = stream.ID
e.subs = make(map[string]string)
e.subs = make(map[string]subscriptionInfo)
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic)

e.closed = make(chan struct{})
Expand All @@ -239,8 +257,8 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr
return nil
}

func (e *Ethereum) SetHandler(handler blockchain.Callbacks) {
e.callbacks.handlers = append(e.callbacks.handlers, handler)
func (e *Ethereum) SetHandler(namespace string, handler blockchain.Callbacks) {
e.callbacks.handlers[namespace] = handler
}

func (e *Ethereum) Start() (err error) {
Expand All @@ -264,15 +282,24 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace string,
firstEvent = "latest"
}

sub, err := e.streams.ensureFireFlySubscription(ctx, namespace, ethLocation.Address, firstEvent, e.streamID, batchPinEventABI)
sub, subNS, err := e.streams.ensureFireFlySubscription(ctx, namespace, ethLocation.Address, firstEvent, e.streamID, batchPinEventABI)
if err != nil {
return "", err
}
// TODO: We will probably need to save the namespace AND network version here
// Ultimately there needs to be a logic branch in the event handling, where for "V1" we expect to receive a namespace in every
// BatchPin event, but for "V2" we infer the namespace based on which subscription ID produced it.
e.subs[sub.ID] = namespace

version, err := e.GetNetworkVersion(ctx, location)
if err != nil {
return "", err
}

if version > 1 && subNS == "" {
return "", i18n.NewError(ctx, coremsgs.MsgInvalidSubscriptionForNetwork, sub.Name, version)
}

e.subs[sub.ID] = subscriptionInfo{
namespace: subNS,
version: version,
}
return sub.ID, nil
}

Expand Down Expand Up @@ -346,7 +373,7 @@ func (e *Ethereum) parseBlockchainEvent(ctx context.Context, msgJSON fftypes.JSO
}
}

func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
func (e *Ethereum) handleBatchPinEvent(ctx context.Context, location *fftypes.JSONAny, subInfo *subscriptionInfo, msgJSON fftypes.JSONObject) (err error) {
event := e.parseBlockchainEvent(ctx, msgJSON)
if event == nil {
return nil // move on
Expand Down Expand Up @@ -377,7 +404,24 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
// Check if this is actually an operator action
if strings.HasPrefix(nsOrAction, blockchain.FireFlyActionPrefix) {
action := nsOrAction[len(blockchain.FireFlyActionPrefix):]
return e.callbacks.BlockchainNetworkAction(action, event, verifier)

// For V1 of the FireFly contract, action is sent to all namespaces
// For V2+, namespace is inferred from the subscription
var namespace string
if subInfo.version > 1 {
namespace = subInfo.namespace
}

return e.callbacks.BlockchainNetworkAction(ctx, namespace, action, location, event, verifier)
}

// For V1 of the FireFly contract, namespace is passed explicitly
// For V2+, namespace is inferred from the subscription
var namespace string
if subInfo.version == 1 {
namespace = nsOrAction
} else {
namespace = subInfo.namespace
}

hexUUIDs, err := hex.DecodeString(strings.TrimPrefix(sUUIDs, "0x"))
Expand Down Expand Up @@ -409,7 +453,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
}

batch := &blockchain.BatchPin{
Namespace: nsOrAction,
Namespace: namespace,
TransactionID: &txnID,
BatchID: &batchID,
BatchHash: &batchHash,
Expand All @@ -419,7 +463,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
}

// If there's an error dispatching the event, we must return the error and shutdown
return e.callbacks.BatchPinComplete(batch, verifier)
return e.callbacks.BatchPinComplete(ctx, batch, verifier)
}

func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
Expand Down Expand Up @@ -450,7 +494,7 @@ func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject)
updateType = core.OpStatusFailed
}
l.Infof("Ethconnect '%s' reply: request=%s tx=%s message=%s", replyType, requestID, txHash, message)
e.callbacks.BlockchainOpUpdate(e, requestID, updateType, txHash, message, reply)
e.callbacks.BlockchainOpUpdate(ctx, e, requestID, updateType, txHash, message, reply)
}

func (e *Ethereum) buildEventLocationString(msgJSON fftypes.JSONObject) string {
Expand All @@ -476,10 +520,17 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{
l1.Tracef("Message: %+v", msgJSON)

// Matches one of the active FireFly BatchPin subscriptions
if _, ok := e.subs[sub]; ok {
if subInfo, ok := e.subs[sub]; ok {
location, err := encodeContractLocation(ctx, &Location{
Address: msgJSON.GetString("address"),
})
if err != nil {
return err
}

switch signature {
case broadcastBatchEventSignature:
if err := e.handleBatchPinEvent(ctx1, msgJSON); err != nil {
if err := e.handleBatchPinEvent(ctx1, location, &subInfo, msgJSON); err != nil {
return err
}
default:
Expand Down Expand Up @@ -719,15 +770,7 @@ func (e *Ethereum) NormalizeContractLocation(ctx context.Context, location *ffty
if err != nil {
return nil, err
}
parsed.Address, err = validateEthAddress(ctx, parsed.Address)
if err != nil {
return nil, err
}
normalized, err := json.Marshal(parsed)
if err == nil {
result = fftypes.JSONAnyPtrBytes(normalized)
}
return result, err
return encodeContractLocation(ctx, parsed)
}

func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Location, error) {
Expand All @@ -741,6 +784,18 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc
return &ethLocation, nil
}

func encodeContractLocation(ctx context.Context, location *Location) (result *fftypes.JSONAny, err error) {
location.Address, err = validateEthAddress(ctx, location.Address)
if err != nil {
return nil, err
}
normalized, err := json.Marshal(location)
if err == nil {
result = fftypes.JSONAnyPtrBytes(normalized)
}
return result, err
}

func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) error {
location, err := parseContractLocation(ctx, listener.Location)
if err != nil {
Expand Down Expand Up @@ -1157,16 +1212,9 @@ func (e *Ethereum) GetAndConvertDeprecatedContractConfig(ctx context.Context) (l
} else if strings.HasPrefix(address, "/instances/") {
address = strings.Replace(address, "/instances/", "", 1)
}
address, err = validateEthAddress(ctx, address)
if err != nil {
return nil, "", err
}

ethLocation := &Location{
location, err = encodeContractLocation(ctx, &Location{
Address: address,
}
normalized, _ := json.Marshal(ethLocation)
location = fftypes.JSONAnyPtrBytes(normalized)

return location, fromBlock, nil
})
return location, fromBlock, err
}
Loading