Skip to content

Commit bcc66bb

Browse files
authored
Merge pull request #883 from kaleido-io/blockchain
Track blockchain callback handlers per namespace
2 parents 3b19da1 + 51d6f63 commit bcc66bb

26 files changed

+1246
-318
lines changed

internal/blockchain/ethereum/ethereum.go

Lines changed: 96 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -68,39 +68,56 @@ type Ethereum struct {
6868
addressResolver *addressResolver
6969
metrics metrics.Manager
7070
ethconnectConf config.Section
71-
subs map[string]string
71+
subs map[string]subscriptionInfo
72+
}
73+
74+
type subscriptionInfo struct {
75+
namespace string
76+
version int
7277
}
7378

7479
type callbacks struct {
75-
handlers []blockchain.Callbacks
80+
handlers map[string]blockchain.Callbacks
7681
}
7782

78-
func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
79-
for _, cb := range cb.handlers {
80-
cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput)
83+
func (cb *callbacks) BlockchainOpUpdate(ctx context.Context, plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
84+
namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID)
85+
if handler, ok := cb.handlers[namespace]; ok {
86+
handler.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput)
87+
return
8188
}
89+
log.L(ctx).Errorf("No handler found for blockchain operation '%s'", nsOpID)
8290
}
8391

84-
func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error {
85-
for _, cb := range cb.handlers {
86-
if err := cb.BatchPinComplete(batch, signingKey); err != nil {
87-
return err
88-
}
92+
func (cb *callbacks) BatchPinComplete(ctx context.Context, batch *blockchain.BatchPin, signingKey *core.VerifierRef) error {
93+
if handler, ok := cb.handlers[batch.Namespace]; ok {
94+
return handler.BatchPinComplete(batch, signingKey)
8995
}
96+
log.L(ctx).Errorf("No handler found for blockchain batch pin on namespace '%s'", batch.Namespace)
9097
return nil
9198
}
9299

93-
func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error {
94-
for _, cb := range cb.handlers {
95-
if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil {
96-
return err
100+
func (cb *callbacks) BlockchainNetworkAction(ctx context.Context, namespace, action string, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef) error {
101+
if namespace == "" {
102+
// V1 networks don't populate namespace, so deliver the event to every handler
103+
for _, handler := range cb.handlers {
104+
if err := handler.BlockchainNetworkAction(action, location, event, signingKey); err != nil {
105+
return err
106+
}
107+
}
108+
} else {
109+
if handler, ok := cb.handlers[namespace]; ok {
110+
return handler.BlockchainNetworkAction(action, location, event, signingKey)
97111
}
112+
log.L(ctx).Errorf("No handler found for blockchain network action on namespace '%s'", namespace)
98113
}
99114
return nil
100115
}
101116

102117
func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error {
103118
for _, cb := range cb.handlers {
119+
// Send the event to all handlers and let them match it to a contract listener
120+
// TODO: can we push more listener/namespace knowledge down to this layer?
104121
if err := cb.BlockchainEvent(event); err != nil {
105122
return err
106123
}
@@ -190,6 +207,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr
190207
e.ctx = log.WithLogField(ctx, "proto", "ethereum")
191208
e.metrics = metrics
192209
e.capabilities = &blockchain.Capabilities{}
210+
e.callbacks.handlers = make(map[string]blockchain.Callbacks)
193211

194212
if addressResolverConf.GetString(AddressResolverURLTemplate) != "" {
195213
if e.addressResolver, err = newAddressResolver(ctx, addressResolverConf); err != nil {
@@ -230,7 +248,7 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr
230248
return err
231249
}
232250
e.streamID = stream.ID
233-
e.subs = make(map[string]string)
251+
e.subs = make(map[string]subscriptionInfo)
234252
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic)
235253

236254
e.closed = make(chan struct{})
@@ -239,8 +257,8 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr
239257
return nil
240258
}
241259

242-
func (e *Ethereum) SetHandler(handler blockchain.Callbacks) {
243-
e.callbacks.handlers = append(e.callbacks.handlers, handler)
260+
func (e *Ethereum) SetHandler(namespace string, handler blockchain.Callbacks) {
261+
e.callbacks.handlers[namespace] = handler
244262
}
245263

246264
func (e *Ethereum) Start() (err error) {
@@ -264,15 +282,24 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace string,
264282
firstEvent = "latest"
265283
}
266284

267-
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace, ethLocation.Address, firstEvent, e.streamID, batchPinEventABI)
285+
sub, subNS, err := e.streams.ensureFireFlySubscription(ctx, namespace, ethLocation.Address, firstEvent, e.streamID, batchPinEventABI)
268286
if err != nil {
269287
return "", err
270288
}
271-
// TODO: We will probably need to save the namespace AND network version here
272-
// Ultimately there needs to be a logic branch in the event handling, where for "V1" we expect to receive a namespace in every
273-
// BatchPin event, but for "V2" we infer the namespace based on which subscription ID produced it.
274-
e.subs[sub.ID] = namespace
275289

290+
version, err := e.GetNetworkVersion(ctx, location)
291+
if err != nil {
292+
return "", err
293+
}
294+
295+
if version > 1 && subNS == "" {
296+
return "", i18n.NewError(ctx, coremsgs.MsgInvalidSubscriptionForNetwork, sub.Name, version)
297+
}
298+
299+
e.subs[sub.ID] = subscriptionInfo{
300+
namespace: subNS,
301+
version: version,
302+
}
276303
return sub.ID, nil
277304
}
278305

@@ -346,7 +373,7 @@ func (e *Ethereum) parseBlockchainEvent(ctx context.Context, msgJSON fftypes.JSO
346373
}
347374
}
348375

349-
func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
376+
func (e *Ethereum) handleBatchPinEvent(ctx context.Context, location *fftypes.JSONAny, subInfo *subscriptionInfo, msgJSON fftypes.JSONObject) (err error) {
350377
event := e.parseBlockchainEvent(ctx, msgJSON)
351378
if event == nil {
352379
return nil // move on
@@ -377,7 +404,24 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
377404
// Check if this is actually an operator action
378405
if strings.HasPrefix(nsOrAction, blockchain.FireFlyActionPrefix) {
379406
action := nsOrAction[len(blockchain.FireFlyActionPrefix):]
380-
return e.callbacks.BlockchainNetworkAction(action, event, verifier)
407+
408+
// For V1 of the FireFly contract, action is sent to all namespaces
409+
// For V2+, namespace is inferred from the subscription
410+
var namespace string
411+
if subInfo.version > 1 {
412+
namespace = subInfo.namespace
413+
}
414+
415+
return e.callbacks.BlockchainNetworkAction(ctx, namespace, action, location, event, verifier)
416+
}
417+
418+
// For V1 of the FireFly contract, namespace is passed explicitly
419+
// For V2+, namespace is inferred from the subscription
420+
var namespace string
421+
if subInfo.version == 1 {
422+
namespace = nsOrAction
423+
} else {
424+
namespace = subInfo.namespace
381425
}
382426

383427
hexUUIDs, err := hex.DecodeString(strings.TrimPrefix(sUUIDs, "0x"))
@@ -409,7 +453,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
409453
}
410454

411455
batch := &blockchain.BatchPin{
412-
Namespace: nsOrAction,
456+
Namespace: namespace,
413457
TransactionID: &txnID,
414458
BatchID: &batchID,
415459
BatchHash: &batchHash,
@@ -419,7 +463,7 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
419463
}
420464

421465
// If there's an error dispatching the event, we must return the error and shutdown
422-
return e.callbacks.BatchPinComplete(batch, verifier)
466+
return e.callbacks.BatchPinComplete(ctx, batch, verifier)
423467
}
424468

425469
func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
@@ -450,7 +494,7 @@ func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject)
450494
updateType = core.OpStatusFailed
451495
}
452496
l.Infof("Ethconnect '%s' reply: request=%s tx=%s message=%s", replyType, requestID, txHash, message)
453-
e.callbacks.BlockchainOpUpdate(e, requestID, updateType, txHash, message, reply)
497+
e.callbacks.BlockchainOpUpdate(ctx, e, requestID, updateType, txHash, message, reply)
454498
}
455499

456500
func (e *Ethereum) buildEventLocationString(msgJSON fftypes.JSONObject) string {
@@ -476,10 +520,17 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{
476520
l1.Tracef("Message: %+v", msgJSON)
477521

478522
// Matches one of the active FireFly BatchPin subscriptions
479-
if _, ok := e.subs[sub]; ok {
523+
if subInfo, ok := e.subs[sub]; ok {
524+
location, err := encodeContractLocation(ctx, &Location{
525+
Address: msgJSON.GetString("address"),
526+
})
527+
if err != nil {
528+
return err
529+
}
530+
480531
switch signature {
481532
case broadcastBatchEventSignature:
482-
if err := e.handleBatchPinEvent(ctx1, msgJSON); err != nil {
533+
if err := e.handleBatchPinEvent(ctx1, location, &subInfo, msgJSON); err != nil {
483534
return err
484535
}
485536
default:
@@ -719,15 +770,7 @@ func (e *Ethereum) NormalizeContractLocation(ctx context.Context, location *ffty
719770
if err != nil {
720771
return nil, err
721772
}
722-
parsed.Address, err = validateEthAddress(ctx, parsed.Address)
723-
if err != nil {
724-
return nil, err
725-
}
726-
normalized, err := json.Marshal(parsed)
727-
if err == nil {
728-
result = fftypes.JSONAnyPtrBytes(normalized)
729-
}
730-
return result, err
773+
return encodeContractLocation(ctx, parsed)
731774
}
732775

733776
func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Location, error) {
@@ -741,6 +784,18 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc
741784
return &ethLocation, nil
742785
}
743786

787+
func encodeContractLocation(ctx context.Context, location *Location) (result *fftypes.JSONAny, err error) {
788+
location.Address, err = validateEthAddress(ctx, location.Address)
789+
if err != nil {
790+
return nil, err
791+
}
792+
normalized, err := json.Marshal(location)
793+
if err == nil {
794+
result = fftypes.JSONAnyPtrBytes(normalized)
795+
}
796+
return result, err
797+
}
798+
744799
func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) error {
745800
location, err := parseContractLocation(ctx, listener.Location)
746801
if err != nil {
@@ -1157,16 +1212,9 @@ func (e *Ethereum) GetAndConvertDeprecatedContractConfig(ctx context.Context) (l
11571212
} else if strings.HasPrefix(address, "/instances/") {
11581213
address = strings.Replace(address, "/instances/", "", 1)
11591214
}
1160-
address, err = validateEthAddress(ctx, address)
1161-
if err != nil {
1162-
return nil, "", err
1163-
}
11641215

1165-
ethLocation := &Location{
1216+
location, err = encodeContractLocation(ctx, &Location{
11661217
Address: address,
1167-
}
1168-
normalized, _ := json.Marshal(ethLocation)
1169-
location = fftypes.JSONAnyPtrBytes(normalized)
1170-
1171-
return location, fromBlock, nil
1218+
})
1219+
return location, fromBlock, err
11721220
}

0 commit comments

Comments
 (0)