Skip to content

Commit

Permalink
feat: use structured concurrency for tss processes (#160)
Browse files Browse the repository at this point in the history
Co-authored-by: mace <mak@chainsafe.io>
  • Loading branch information
mpetrun5 and MakMuftic authored May 9, 2023
1 parent d9c0860 commit 986966f
Show file tree
Hide file tree
Showing 39 changed files with 508 additions and 596 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.18
go-version: 1.19

- uses: actions/checkout@v2

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
name: E2E Tests
strategy:
matrix:
go-version: [ 1.18.x ]
go-version: [ 1.19.x ]
platform: [ ubuntu-latest ]
runs-on: ${{ matrix.platform }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-binaries.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.18'
go-version: '1.19'

- name: Build
run: make build-all
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.18.x]
go-version: [1.19.x]
platform: [ ubuntu-latest ]
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
FROM alpine:3.6 as alpine
RUN apk add -U --no-cache ca-certificates

FROM golang:1.18-stretch AS builder
FROM golang:1.19 AS builder
ADD . /src
WORKDIR /src
RUN cd /src && echo $(ls -1 /src)
Expand Down
49 changes: 28 additions & 21 deletions chains/evm/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/binance-chain/tss-lib/common"
"github.com/sourcegraph/conc/pool"

ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -109,53 +110,48 @@ func (e *Executor) Execute(msgs []*message.Message) error {
}

sigChn := make(chan interface{})
statusChn := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
go e.coordinator.Execute(ctx, signing, sigChn, statusChn)
ctx := context.Background()
pool := pool.New().WithContext(ctx).WithCancelOnError()
pool.Go(func(ctx context.Context) error { return e.coordinator.Execute(ctx, signing, sigChn) })
pool.Go(func(ctx context.Context) error { return e.watchExecution(ctx, proposals, sigChn, sessionID) })
return pool.Wait()
}

func (e *Executor) watchExecution(ctx context.Context, proposals []*chains.Proposal, sigChn chan interface{}, sessionID string) error {
ticker := time.NewTicker(executionCheckPeriod)
timeout := time.NewTicker(signingTimeout)
defer ticker.Stop()
defer timeout.Stop()
defer cancel()
for {
select {
case sigResult := <-sigChn:
{
signatureData := sigResult.(*common.SignatureData)
hash, err := e.executeProposal(proposals, signatureData)
if err != nil {
go e.comm.Broadcast(e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID, nil)
_ = e.comm.Broadcast(e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID)
return err
}

log.Info().Str("SessionID", sessionID).Msgf("Sent proposals execution with hash: %s", hash)
}
case err := <-statusChn:
{
return err
}
case <-ticker.C:
{
allExecuted := true
for _, prop := range proposals {
isExecuted, err := e.bridge.IsProposalExecuted(prop)
if err != nil || !isExecuted {
allExecuted = false
continue
}

log.Info().Str("SessionID", sessionID).Msgf("Successfully executed proposal %v", prop)
if !e.areProposalsExecuted(proposals, sessionID) {
continue
}

if allExecuted {
return nil
}
log.Info().Str("SessionID", sessionID).Msgf("Successfully executed proposals")
return nil
}
case <-timeout.C:
{
return fmt.Errorf("execution timed out in %s", signingTimeout)
}
case <-ctx.Done():
{
return nil
}
}
}
}
Expand Down Expand Up @@ -183,6 +179,17 @@ func (e *Executor) executeProposal(proposals []*chains.Proposal, signatureData *
return hash, err
}

func (e *Executor) areProposalsExecuted(proposals []*chains.Proposal, sessionID string) bool {
for _, prop := range proposals {
isExecuted, err := e.bridge.IsProposalExecuted(prop)
if err != nil || !isExecuted {
return false
}
}

return true
}

func (e *Executor) sessionID(hash []byte) string {
return fmt.Sprintf("signing-%s", ethCommon.Bytes2Hex(hash))
}
8 changes: 2 additions & 6 deletions chains/evm/listener/event-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,7 @@ func (eh *KeygenEventHandler) HandleEvent(

keygenBlockNumber := big.NewInt(0).SetUint64(keygenEvents[0].BlockNumber)
keygen := keygen.NewKeygen(eh.sessionID(keygenBlockNumber), eh.threshold, eh.host, eh.communication, eh.storer)
go eh.coordinator.Execute(context.Background(), keygen, make(chan interface{}, 1), make(chan error, 1))

return nil
return eh.coordinator.Execute(context.Background(), keygen, make(chan interface{}, 1))
}

func (eh *KeygenEventHandler) sessionID(block *big.Int) string {
Expand Down Expand Up @@ -336,9 +334,7 @@ func (eh *RefreshEventHandler) HandleEvent(
resharing := resharing.NewResharing(
eh.sessionID(startBlock), topology.Threshold, eh.host, eh.communication, eh.storer,
)
go eh.coordinator.Execute(context.Background(), resharing, make(chan interface{}, 1), make(chan error, 1))

return nil
return eh.coordinator.Execute(context.Background(), resharing, make(chan interface{}, 1))
}

func (eh *RefreshEventHandler) sessionID(block *big.Int) string {
Expand Down
9 changes: 3 additions & 6 deletions chains/substrate/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *SubstrateClient) Transact(method string, args ...interface{}) (types.Ha
return hash, sub, nil
}

func (c *SubstrateClient) TrackExtrinsic(extHash types.Hash, sub *author.ExtrinsicStatusSubscription, errChn chan error) {
func (c *SubstrateClient) TrackExtrinsic(extHash types.Hash, sub *author.ExtrinsicStatusSubscription) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Minute*10))
defer sub.Unsubscribe()
defer cancel()
Expand All @@ -109,14 +109,11 @@ func (c *SubstrateClient) TrackExtrinsic(extHash types.Hash, sub *author.Extrins
}
if status.IsFinalized {
log.Info().Str("extrinsic", extHash.Hex()).Msgf("Extrinsic is finalized in block with hash: %#x", status.AsFinalized)
err := c.checkExtrinsicSuccess(extHash, status.AsFinalized)
errChn <- err
return
return c.checkExtrinsicSuccess(extHash, status.AsFinalized)
}
}
case <-ctx.Done():
errChn <- fmt.Errorf("extrinsic has timed out")
return
return fmt.Errorf("extrinsic has timed out")
}
}
}
Expand Down
60 changes: 31 additions & 29 deletions chains/substrate/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ChainSafe/sygma-relayer/chains"
"github.com/ChainSafe/sygma-relayer/chains/substrate/connection"
"github.com/binance-chain/tss-lib/common"
"github.com/sourcegraph/conc/pool"

"github.com/centrifuge/go-substrate-rpc-client/v4/rpc/author"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
Expand Down Expand Up @@ -40,7 +41,7 @@ type BridgePallet interface {
IsProposalExecuted(p *chains.Proposal) (bool, error)
ExecuteProposals(proposals []*chains.Proposal, signature []byte) (types.Hash, *author.ExtrinsicStatusSubscription, error)
ProposalsHash(proposals []*chains.Proposal) ([]byte, error)
TrackExtrinsic(extHash types.Hash, sub *author.ExtrinsicStatusSubscription, errChn chan error)
TrackExtrinsic(extHash types.Hash, sub *author.ExtrinsicStatusSubscription) error
}

type Executor struct {
Expand Down Expand Up @@ -115,58 +116,48 @@ func (e *Executor) Execute(msgs []*message.Message) error {
}

sigChn := make(chan interface{})
statusChn := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
go e.coordinator.Execute(ctx, signing, sigChn, statusChn)
ctx := context.Background()
pool := pool.New().WithContext(ctx).WithCancelOnError()
pool.Go(func(ctx context.Context) error { return e.coordinator.Execute(ctx, signing, sigChn) })
pool.Go(func(ctx context.Context) error { return e.watchExecution(ctx, proposals, sigChn, sessionID) })
return pool.Wait()
}

func (e *Executor) watchExecution(ctx context.Context, proposals []*chains.Proposal, sigChn chan interface{}, sessionID string) error {
ticker := time.NewTicker(executionCheckPeriod)
timeout := time.NewTicker(signingTimeout)
defer ticker.Stop()
defer timeout.Stop()
defer cancel()
for {
select {
case sigResult := <-sigChn:
{
signatureData := sigResult.(*common.SignatureData)
hash, sub, err := e.executeProposal(proposals, signatureData)
if err != nil {
go e.comm.Broadcast(e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID, nil)
_ = e.comm.Broadcast(e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID)
return err
}
errChn := make(chan error)
go e.bridge.TrackExtrinsic(hash, sub, errChn)
err = <-errChn
return err
}
case err := <-statusChn:
{
return err

return e.bridge.TrackExtrinsic(hash, sub)
}
case <-ticker.C:
{
allExecuted := true
for _, prop := range proposals {
isExecuted, err := e.bridge.IsProposalExecuted(prop)
if err != nil {
return err
}
if !isExecuted {
allExecuted = false
continue
}

log.Info().Msgf("Successfully executed proposal %v", prop)
if !e.areProposalsExecuted(proposals, sessionID) {
continue
}

if allExecuted {
return nil
}
log.Info().Str("SessionID", sessionID).Msgf("Successfully executed proposals")
return nil
}
case <-timeout.C:
{
return fmt.Errorf("execution timed out in %s", signingTimeout)
}
case <-ctx.Done():
{
return nil
}
}
}
}
Expand All @@ -186,6 +177,17 @@ func (e *Executor) executeProposal(proposals []*chains.Proposal, signatureData *
return hash, sub, err
}

func (e *Executor) areProposalsExecuted(proposals []*chains.Proposal, sessionID string) bool {
for _, prop := range proposals {
isExecuted, err := e.bridge.IsProposalExecuted(prop)
if err != nil || !isExecuted {
return false
}
}

return true
}

func (e *Executor) sessionID(hash []byte) string {
return fmt.Sprintf("signing-%s", ethCommon.Bytes2Hex(hash))
}
4 changes: 2 additions & 2 deletions cli/topology/encryptTopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"os"

"github.com/ChainSafe/sygma-relayer/topology"
Expand Down Expand Up @@ -47,7 +47,7 @@ func encryptTopology(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
byteValue, err := ioutil.ReadAll(topologyFile)
byteValue, err := io.ReadAll(topologyFile)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion comm/communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Communication interface {
CloseSession(sessionID string)
// Broadcast sends message to provided peers
// If error has occurred on sending any message, broadcast will be aborted and error will be sent to errChan
Broadcast(peers peer.IDSlice, msg []byte, msgType MessageType, sessionID string, errChan chan error)
Broadcast(peers peer.IDSlice, msg []byte, msgType MessageType, sessionID string) error
// Subscribe subscribes provided channel to a specific message type for a provided session
// Returns SubscriptionID - unique identifier of created subscription that is used to unsubscribe from subscription
Subscribe(sessionID string, msgType MessageType, channel chan *WrappedMessage) SubscriptionID
Expand Down
Loading

0 comments on commit 986966f

Please sign in to comment.