-
Notifications
You must be signed in to change notification settings - Fork 209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track blockchain callback handlers per namespace #883
Changes from 4 commits
3060445
f30fba1
91a7c8c
5d31178
6b89dd0
51d6f63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,39 +68,56 @@ type Ethereum struct { | |
addressResolver *addressResolver | ||
metrics metrics.Manager | ||
ethconnectConf config.Section | ||
subs map[string]string | ||
subs map[string]subscriptionInfo | ||
} | ||
|
||
type subscriptionInfo struct { | ||
namespace string | ||
version int | ||
} | ||
|
||
type callbacks struct { | ||
handlers []blockchain.Callbacks | ||
handlers map[string]blockchain.Callbacks | ||
} | ||
|
||
func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { | ||
for _, cb := range cb.handlers { | ||
cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) | ||
func (cb *callbacks) BlockchainOpUpdate(ctx context.Context, plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { | ||
namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID) | ||
if handler, ok := cb.handlers[namespace]; ok { | ||
handler.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) | ||
return | ||
} | ||
log.L(ctx).Errorf("No handler found for blockchain operation '%s'", nsOpID) | ||
} | ||
|
||
func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { | ||
for _, cb := range cb.handlers { | ||
if err := cb.BatchPinComplete(batch, signingKey); err != nil { | ||
return err | ||
} | ||
func (cb *callbacks) BatchPinComplete(ctx context.Context, batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { | ||
if handler, ok := cb.handlers[batch.Namespace]; ok { | ||
return handler.BatchPinComplete(batch, signingKey) | ||
} | ||
log.L(ctx).Errorf("No handler found for blockchain batch pin on namespace '%s'", batch.Namespace) | ||
return nil | ||
} | ||
|
||
func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { | ||
for _, cb := range cb.handlers { | ||
if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil { | ||
return err | ||
func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error { | ||
if namespace == "" { | ||
// V1 networks don't populate namespace, so deliver the event to every handler | ||
for _, handler := range cb.handlers { | ||
if err := handler.BlockchainNetworkAction(action, location, event, signingKey); err != nil { | ||
return err | ||
} | ||
} | ||
} else { | ||
if handler, ok := cb.handlers[namespace]; ok { | ||
return handler.BlockchainNetworkAction(action, location, event, signingKey) | ||
} | ||
log.L(ctx).Errorf("No handler found for blockchain network action on namespace '%s'", namespace) | ||
} | ||
return nil | ||
} | ||
|
||
func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { | ||
for _, cb := range cb.handlers { | ||
// Send the event to all handlers and let them match it to a contract listener | ||
// TODO: can we push more listener/namespace knowledge down to this layer? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like the same namespace-encoded-in-sub-name trick should work here, right? (with a migration consideration I guess) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (not suggesting the TODO needs to be addressed in this PR) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, thanks for the prod - I need to add that follow-up to the board. Currently our listeners for custom events have no significance to their subscription names (it's just |
||
if err := cb.BlockchainEvent(event); err != nil { | ||
return err | ||
} | ||
|
@@ -190,6 +207,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr | |
e.ctx = log.WithLogField(ctx, "proto", "ethereum") | ||
e.metrics = metrics | ||
e.capabilities = &blockchain.Capabilities{} | ||
e.callbacks.handlers = make(map[string]blockchain.Callbacks) | ||
|
||
if addressResolverConf.GetString(AddressResolverURLTemplate) != "" { | ||
if e.addressResolver, err = newAddressResolver(ctx, addressResolverConf); err != nil { | ||
|
@@ -230,7 +248,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr | |
return err | ||
} | ||
e.streamID = stream.ID | ||
e.subs = make(map[string]string) | ||
e.subs = make(map[string]subscriptionInfo) | ||
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic) | ||
|
||
e.closed = make(chan struct{}) | ||
|
@@ -239,8 +257,8 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr | |
return nil | ||
} | ||
|
||
func (e *Ethereum) SetHandler(handler blockchain.Callbacks) { | ||
e.callbacks.handlers = append(e.callbacks.handlers, handler) | ||
func (e *Ethereum) SetHandler(namespace string, handler blockchain.Callbacks) { | ||
e.callbacks.handlers[namespace] = handler | ||
} | ||
|
||
func (e *Ethereum) Start() (err error) { | ||
|
@@ -268,11 +286,16 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace string, | |
if err != nil { | ||
return "", err | ||
} | ||
// TODO: We will probably need to save the namespace AND network version here | ||
// Ultimately there needs to be a logic branch in the event handling, where for "V1" we expect to receive a namespace in every | ||
// BatchPin event, but for "V2" we infer the namespace based on which subscription ID produced it. | ||
e.subs[sub.ID] = namespace | ||
|
||
version, err := e.GetNetworkVersion(ctx, location) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
e.subs[sub.ID] = subscriptionInfo{ | ||
namespace: namespace, | ||
version: version, | ||
} | ||
return sub.ID, nil | ||
} | ||
|
||
|
@@ -346,7 +369,7 @@ func (e *Ethereum) parseBlockchainEvent(ctx context.Context, msgJSON fftypes.JSO | |
} | ||
} | ||
|
||
func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { | ||
func (e *Ethereum) handleBatchPinEvent(ctx context.Context, location *fftypes.JSONAny, subInfo *subscriptionInfo, msgJSON fftypes.JSONObject) (err error) { | ||
event := e.parseBlockchainEvent(ctx, msgJSON) | ||
if event == nil { | ||
return nil // move on | ||
|
@@ -377,7 +400,24 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON | |
// Check if this is actually an operator action | ||
if strings.HasPrefix(nsOrAction, blockchain.FireFlyActionPrefix) { | ||
action := nsOrAction[len(blockchain.FireFlyActionPrefix):] | ||
return e.callbacks.BlockchainNetworkAction(action, event, verifier) | ||
|
||
// For V1 of the FireFly contract, action is sent to all namespaces | ||
// For V2+, namespace is inferred from the subscription | ||
var namespace string | ||
if subInfo.version > 1 { | ||
namespace = subInfo.namespace | ||
} | ||
|
||
return e.callbacks.BlockchainNetworkAction(ctx, namespace, action, location, event, verifier) | ||
} | ||
|
||
// For V1 of the FireFly contract, namespace is passed explicitly | ||
// For V2+, namespace is inferred from the subscription | ||
var namespace string | ||
if subInfo.version == 1 { | ||
namespace = nsOrAction | ||
} else { | ||
namespace = subInfo.namespace | ||
} | ||
|
||
hexUUIDs, err := hex.DecodeString(strings.TrimPrefix(sUUIDs, "0x")) | ||
|
@@ -409,7 +449,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON | |
} | ||
|
||
batch := &blockchain.BatchPin{ | ||
Namespace: nsOrAction, | ||
Namespace: namespace, | ||
TransactionID: &txnID, | ||
BatchID: &batchID, | ||
BatchHash: &batchHash, | ||
|
@@ -419,7 +459,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON | |
} | ||
|
||
// If there's an error dispatching the event, we must return the error and shutdown | ||
return e.callbacks.BatchPinComplete(batch, verifier) | ||
return e.callbacks.BatchPinComplete(ctx, batch, verifier) | ||
} | ||
|
||
func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) { | ||
|
@@ -450,7 +490,7 @@ func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject) | |
updateType = core.OpStatusFailed | ||
} | ||
l.Infof("Ethconnect '%s' reply: request=%s tx=%s message=%s", replyType, requestID, txHash, message) | ||
e.callbacks.BlockchainOpUpdate(e, requestID, updateType, txHash, message, reply) | ||
e.callbacks.BlockchainOpUpdate(ctx, e, requestID, updateType, txHash, message, reply) | ||
} | ||
|
||
func (e *Ethereum) buildEventLocationString(msgJSON fftypes.JSONObject) string { | ||
|
@@ -476,10 +516,17 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{ | |
l1.Tracef("Message: %+v", msgJSON) | ||
|
||
// Matches one of the active FireFly BatchPin subscriptions | ||
if _, ok := e.subs[sub]; ok { | ||
if subInfo, ok := e.subs[sub]; ok { | ||
location, err := encodeContractLocation(ctx, &Location{ | ||
Address: msgJSON.GetString("address"), | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
switch signature { | ||
case broadcastBatchEventSignature: | ||
if err := e.handleBatchPinEvent(ctx1, msgJSON); err != nil { | ||
if err := e.handleBatchPinEvent(ctx1, location, &subInfo, msgJSON); err != nil { | ||
return err | ||
} | ||
default: | ||
|
@@ -719,15 +766,7 @@ func (e *Ethereum) NormalizeContractLocation(ctx context.Context, location *ffty | |
if err != nil { | ||
return nil, err | ||
} | ||
parsed.Address, err = validateEthAddress(ctx, parsed.Address) | ||
if err != nil { | ||
return nil, err | ||
} | ||
normalized, err := json.Marshal(parsed) | ||
if err == nil { | ||
result = fftypes.JSONAnyPtrBytes(normalized) | ||
} | ||
return result, err | ||
return encodeContractLocation(ctx, parsed) | ||
} | ||
|
||
func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Location, error) { | ||
|
@@ -741,6 +780,18 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc | |
return ðLocation, nil | ||
} | ||
|
||
func encodeContractLocation(ctx context.Context, location *Location) (result *fftypes.JSONAny, err error) { | ||
location.Address, err = validateEthAddress(ctx, location.Address) | ||
if err != nil { | ||
return nil, err | ||
} | ||
normalized, err := json.Marshal(location) | ||
if err == nil { | ||
result = fftypes.JSONAnyPtrBytes(normalized) | ||
} | ||
return result, err | ||
} | ||
|
||
func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) error { | ||
location, err := parseContractLocation(ctx, listener.Location) | ||
if err != nil { | ||
|
@@ -1157,16 +1208,9 @@ func (e *Ethereum) GetAndConvertDeprecatedContractConfig(ctx context.Context) (l | |
} else if strings.HasPrefix(address, "/instances/") { | ||
address = strings.Replace(address, "/instances/", "", 1) | ||
} | ||
address, err = validateEthAddress(ctx, address) | ||
if err != nil { | ||
return nil, "", err | ||
} | ||
|
||
ethLocation := &Location{ | ||
location, err = encodeContractLocation(ctx, &Location{ | ||
Address: address, | ||
} | ||
normalized, _ := json.Marshal(ethLocation) | ||
location = fftypes.JSONAnyPtrBytes(normalized) | ||
|
||
return location, fromBlock, nil | ||
}) | ||
return location, fromBlock, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there should be a check for delivery of this to a V2 handler, and immediate rejection (worried about V1 pre-migrated contracts leaking actions to an unrelated V2 namespace that's new/migrated and happens to share a remote-name like
default
). Will look out for this later in the reviewThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, found it, in the caller of this function:
firefly/internal/blockchain/ethereum/ethereum.go
Lines 404 to 409 in 5d31178
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but you make a good point that if we can't infer the namespace from the subscription name, then V2 handlers should reject it outright. I think there should be a sanity check for that when we initialize the subscription info, to ensure that whenever we get to the point you noted above,
subInfo.namespace
is guaranteed to be valid for V2+ contracts.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I pushed another commit to include this sanity check during init.