Skip to content

Commit

Permalink
Close out on converage
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Oct 5, 2023
1 parent 6cbf2ec commit 3a56ca3
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 100 deletions.
31 changes: 17 additions & 14 deletions pkg/rpcbackend/wsbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-common/pkg/wsclient"
"github.com/hyperledger/firefly-signer/internal/signermsgs"
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -200,16 +199,14 @@ func (s *sub) Notifications() chan *RPCSubscriptionNotification {
}

func (s *sub) Unsubscribe(ctx context.Context) *RPCError {
var unsubscribed bool
s.cancelCtx() // we unblock the receiver if it was previously trying to dispatch to this subscription, before invoking unsubscribe
s.rc.removeSubscription(s)
rpcErr := s.rc.CallRPC(ctx, &unsubscribed, "eth_unsubscribe", s.subscriptionID)
var resultBool bool
rpcErr := s.rc.CallRPC(ctx, &resultBool, "eth_unsubscribe", s.subscriptionID)
if rpcErr != nil {
return rpcErr
}
if !unsubscribed {
log.L(ctx).Warnf("Server returned false for unsubscribe on '%s'", s.subscriptionID)
}
log.L(ctx).Infof("Unsubscribed '%s' (result=%t)", s.subscriptionID, resultBool)
close(s.notifications)
return nil
}
Expand Down Expand Up @@ -251,7 +248,10 @@ func (rc *wsRPCClient) CallRPC(ctx context.Context, result interface{}, method s
if rpcErr = rc.sendRPC(ctx, reqID, rpcReq); rpcErr != nil {
return rpcErr
}
return rc.waitResponse(ctx, result, reqID, rpcReq, rpcStartTime, resChannel)
}

func (rc *wsRPCClient) waitResponse(ctx context.Context, result interface{}, reqID string, rpcReq *RPCRequest, rpcStartTime time.Time, resChannel chan *RPCResponse) *RPCError {
var rpcRes *RPCResponse
select {
case rpcRes = <-resChannel:
Expand All @@ -269,17 +269,19 @@ func (rc *wsRPCClient) CallRPC(ctx context.Context, result interface{}, method s
// We don't want a result for errors, but a null success response needs to go in there
rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString)
}
if err := json.Unmarshal(rpcRes.Result.Bytes(), &result); err != nil {
err = i18n.NewError(ctx, signermsgs.MsgResultParseFailed, result, err)
return &RPCError{Code: int64(RPCCodeParseError), Message: err.Error()}
if result != nil {
if err := json.Unmarshal(rpcRes.Result.Bytes(), &result); err != nil {
err = i18n.NewError(ctx, signermsgs.MsgResultParseFailed, result, err)
return &RPCError{Code: int64(RPCCodeParseError), Message: err.Error()}
}
}
return nil
}

func (rc *wsRPCClient) handleSubscriptionNotification(ctx context.Context, rpcRes *RPCResponse) {
type rpcSubscriptionParams struct {
Subscription ethtypes.HexBytes0xPrefix `json:"subscription"`
Result *fftypes.JSONAny `json:"result,omitempty"`
Subscription string `json:"subscription"` // probably hex, but not protocol assured
Result *fftypes.JSONAny `json:"result,omitempty"`
}
var subParams rpcSubscriptionParams
if rpcRes.Params != nil {
Expand All @@ -290,7 +292,7 @@ func (rc *wsRPCClient) handleSubscriptionNotification(ctx context.Context, rpcRe
return
}

sub := rc.getSubscription(subParams.Subscription.String())
sub := rc.getSubscription(subParams.Subscription)
if sub == nil {
log.L(ctx).Warnf("RPC[%s] <-- Notification for unknown subscription '%s'", rpcRes.ID.AsString(), subParams.Subscription)
return
Expand All @@ -315,7 +317,7 @@ func (rc *wsRPCClient) handleSubscriptionConfirm(ctx context.Context, rpcRes *RP
}
return
}
var subscriptionID ethtypes.HexBytes0xPrefix
var subscriptionID string // we know it's probably hex, but we cannot rely on that being guaranteed
if rpcRes.Result != nil {
_ = json.Unmarshal(rpcRes.Result.Bytes(), &subscriptionID)
}
Expand All @@ -328,10 +330,11 @@ func (rc *wsRPCClient) handleSubscriptionConfirm(ctx context.Context, rpcRes *RP
}
s := &sub{
rc: rc,
subscriptionID: subscriptionID.String(),
subscriptionID: subscriptionID,
notifications: make(chan *RPCSubscriptionNotification), // blocking channel for these, but Unsubscribe will unblock by cancelling ctx
}
s.ctx, s.cancelCtx = context.WithCancel(ctx)
log.L(ctx).Infof("Subscribed '%s'", s.subscriptionID)
rc.addSubscription(s)
inflightSubscribe <- &newSubResponse{
s: s,
Expand Down
Loading

0 comments on commit 3a56ca3

Please sign in to comment.