Skip to content

Commit

Permalink
create a separate func implementation for sending blockchain messages
Browse files Browse the repository at this point in the history
  • Loading branch information
zakhar-petukhov committed Sep 19, 2024
1 parent df65491 commit fbbfd94
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {

msgSender, err := blockchain.NewMsgSender(log, cfg.App.LiteServers, map[string]chan<- blockchain.ExtInMsgCopy{
"mempool": mempoolCh,
})
}, nil)
if err != nil {
log.Fatal("failed to create msg sender", zap.Error(err))
}
Expand Down
28 changes: 12 additions & 16 deletions pkg/blockchain/msg_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sourcegraph/conc/iter"

"github.com/tonkeeper/tongo"
"github.com/tonkeeper/tongo/config"
"github.com/tonkeeper/tongo/liteapi"
Expand All @@ -31,6 +28,8 @@ type MsgSender struct {
mu sync.Mutex
// batches is used as a cache for boc multi-sending.
batches []batchOfMessages

send func(ctx context.Context, payload []byte, clients []*liteapi.Client) error
}

type batchOfMessages struct {
Expand All @@ -50,15 +49,11 @@ type ExtInMsgCopy struct {
Accounts map[tongo.AccountID]struct{}
}

var liteserverMessageSendMc = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "liteserver_message_send",
}, []string{"server", "result", "iteration"})

func (m *ExtInMsgCopy) IsEmulation() bool {
return len(m.Accounts) > 0
}

func NewMsgSender(logger *zap.Logger, servers []config.LiteServer, receivers map[string]chan<- ExtInMsgCopy) (*MsgSender, error) {
func NewMsgSender(logger *zap.Logger, servers []config.LiteServer, receivers map[string]chan<- ExtInMsgCopy, send func(ctx context.Context, payload []byte, clients []*liteapi.Client) error) (*MsgSender, error) {
var (
client *liteapi.Client
clients []*liteapi.Client
Expand Down Expand Up @@ -91,11 +86,14 @@ func NewMsgSender(logger *zap.Logger, servers []config.LiteServer, receivers map
if len(clients) == 0 {
return nil, fmt.Errorf("no lite clients available")
}

if send == nil {
send = simpleSend
}
msgSender := &MsgSender{
sendingClients: clients,
logger: logger,
receivers: receivers,
send: send,
}
go func() {
for {
Expand Down Expand Up @@ -151,20 +149,18 @@ func (ms *MsgSender) SendMessage(ctx context.Context, msgCopy ExtInMsgCopy) erro
ms.logger.Warn("receiver is too slow", zap.String("name", name))
}
}
return ms.send(ctx, msgCopy.Payload)
return ms.send(ctx, msgCopy.Payload, ms.sendingClients)
}

func (ms *MsgSender) send(ctx context.Context, payload []byte) error {
func simpleSend(ctx context.Context, payload []byte, clients []*liteapi.Client) error {
var err error
for i := 0; i < 3; i++ {
serverNumber := rand.Intn(len(ms.sendingClients))
c := ms.sendingClients[serverNumber]
serverNumber := rand.Intn(len(clients))
c := clients[serverNumber]
_, err = c.SendMessage(ctx, payload)
if err == nil {
liteserverMessageSendMc.WithLabelValues(fmt.Sprintf("%d", serverNumber), "success", fmt.Sprintf("%d", i)).Inc()
return nil
}
liteserverMessageSendMc.WithLabelValues(fmt.Sprintf("%d", serverNumber), "error", fmt.Sprintf("%d", i)).Inc()
}
return err
}
Expand All @@ -178,7 +174,7 @@ func (ms *MsgSender) sendMessageFromBatch(msgCopy ExtInMsgCopy) error {
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
return ms.send(ctx, msgCopy.Payload)
return ms.send(ctx, msgCopy.Payload, ms.sendingClients)
}

func (ms *MsgSender) SendMultipleMessages(ctx context.Context, copies []ExtInMsgCopy) {
Expand Down

0 comments on commit fbbfd94

Please sign in to comment.