Skip to content

Commit

Permalink
Network switch mangos to nats (#306)
Browse files Browse the repository at this point in the history
* Changed pubsub communication to nats

* Moved the nats server to cmd

* Added a nodemon-built-in optional messaging server, dockerfile and make command

* Fixed subscription to alerts

* Fixes.

* Add explicit connection timeout NATS for clients.

* Updated telegram and discord bot flags for NATS connection.

* Update docs for telegram and discord bots.

* Change nodemon flags.

* Update 'natsConnectionsTimeoutDefault' const.

* Update 'cmd/nodemon/README.md'.

* Refactor topics naming

- Now topic names can be created only with special functions
- Created different topics for telegram and discord bots

* Run nats embedded server before messaging services start.

* Fix 'RunNatsMessagingServer' func

Also changed default param for embedded NATS server

* Remove default 'scheme' value for 'nodemon'.

* Update 'Dockerfile-messaging-server'.

* Remove 'messaging-server' application and its 'Dockerfile-messaging-server'.

* Added scheme validation for nodemon

---------

Co-authored-by: Nikolay Eskov <mr.eskov1@yandex.ru>
  • Loading branch information
esuwu and nickeskov authored Dec 23, 2024
1 parent 3d23a09 commit da846bb
Show file tree
Hide file tree
Showing 18 changed files with 541 additions and 404 deletions.
4 changes: 2 additions & 2 deletions Dockerfile-nodemon
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ RUN apk add --no-cache bind-tools

USER $APP_USER
WORKDIR ${APP}
# Considered as a default HTTP API Port
EXPOSE 8080
# Considered as a default HTTP API Port, NATS embedded server port
EXPOSE 8080, 4222

COPY --from=builder ${APP}/build/linux-amd64/nodemon ${APP}/nodemon

Expand Down
8 changes: 3 additions & 5 deletions cmd/bots/discord/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ To do the same as environment variable form use _**UPPER_SNAKE_CASE**_ option na
- _-discord-chat-id_ (string) — discord chat ID to send alerts through a specific chat
- _-log-level_ (string) — Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level
is INFO. (default "INFO")
- _-nano-msg-pair-discord-url_ (string) — Nanomsg IPC URL for pair socket (default
"ipc:///tmp/nano-msg-nodemon-pair.ipc"). Used for communication between the monitoring and bot services.
- _-nano-msg-pubsub-url_ (string) — Nanomsg IPC URL for pubsub socket (default
"ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc"). Used by the bot to subscribe to alerts generated by
the monitoring service.
- _-nats-msg-url_ (string) — NATS server URL for messaging (default "nats://127.0.0.1:4222").
Used by the bot to subscribe to alerts generated by
the monitoring service and for communication between the monitoring and bot services.

## Build requirements

Expand Down
23 changes: 15 additions & 8 deletions cmd/bots/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"nodemon/cmd/bots/internal/common/messaging"
"nodemon/cmd/bots/internal/discord/handlers"
"nodemon/internal"
generalMessaging "nodemon/pkg/messaging"
"nodemon/pkg/messaging/pair"
"nodemon/pkg/tools"

Expand All @@ -38,21 +39,19 @@ func main() {
}

type discordBotConfig struct {
nanomsgPubSubURL string
nanomsgPairURL string
natsMessagingURL string
discordBotToken string
discordChatID string
logLevel string
development bool
bindAddress string
scheme string
}

func newDiscordBotConfigConfig() *discordBotConfig {
c := new(discordBotConfig)
tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url",
"ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc", "Nanomsg IPC URL for pubsub socket")
tools.StringVarFlagWithEnv(&c.nanomsgPairURL, "nano-msg-pair-discord-url",
"ipc:///tmp/nano-msg-nodemon-pair.ipc", "Nanomsg IPC URL for pair socket")
tools.StringVarFlagWithEnv(&c.natsMessagingURL, "nats-msg-url",
"nats://127.0.0.1:4222", "NATS server URL for messaging")
tools.StringVarFlagWithEnv(&c.discordBotToken, "discord-bot-token",
"", "The secret token used to authenticate the bot")
tools.StringVarFlagWithEnv(&c.discordChatID, "discord-chat-id",
Expand All @@ -62,6 +61,8 @@ func newDiscordBotConfigConfig() *discordBotConfig {
tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.")
tools.StringVarFlagWithEnv(&c.bindAddress, "bind", "",
"Local network address to bind the HTTP API of the service on.")
tools.StringVarFlagWithEnv(&c.scheme, "scheme", "",
"Blockchain scheme i.e. mainnet, testnet, stagenet. Used in messaging service")
return c
}

Expand All @@ -70,6 +71,10 @@ func (c *discordBotConfig) validate(zap *zap.Logger) error {
zap.Error("discord bot token is required")
return common.ErrInvalidParameters
}
if c.scheme == "" {
zap.Error("the blockchain scheme must be specified")
return common.ErrInvalidParameters
}
if c.discordChatID == "" {
zap.Error("discord chat ID is required")
return common.ErrInvalidParameters
Expand Down Expand Up @@ -111,6 +116,7 @@ func runDiscordBot() error {
logger,
requestChan,
responseChan,
cfg.scheme,
)
if initErr != nil {
return errors.Wrap(initErr, "failed to init discord bot")
Expand Down Expand Up @@ -178,15 +184,16 @@ func runMessagingClients(
responseChan chan pair.Response,
) {
go func() {
clientErr := messaging.StartSubMessagingClient(ctx, cfg.nanomsgPubSubURL, discordBotEnv, logger)
clientErr := messaging.StartSubMessagingClient(ctx, cfg.natsMessagingURL, discordBotEnv, logger)
if clientErr != nil {
logger.Fatal("failed to start sub messaging client", zap.Error(clientErr))
return
}
}()

go func() {
err := messaging.StartPairMessagingClient(ctx, cfg.nanomsgPairURL, requestChan, responseChan, logger)
topic := generalMessaging.DiscordBotRequestsTopic(cfg.scheme)
err := messaging.StartPairMessagingClient(ctx, cfg.natsMessagingURL, requestChan, responseChan, logger, topic)
if err != nil {
logger.Fatal("failed to start pair messaging client", zap.Error(err))
return
Expand Down
92 changes: 60 additions & 32 deletions cmd/bots/internal/common/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import (

"codnect.io/chrono"
"github.com/bwmarrin/discordgo"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/wavesplatform/gowaves/pkg/crypto"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol"
"go.uber.org/zap"
"gopkg.in/telebot.v3"
)
Expand All @@ -46,19 +45,25 @@ const (

var errUnknownAlertType = errors.New("received unknown alert type")

type AlertSubscription struct {
alertName entities.AlertName
subscription *nats.Subscription
}

type subscriptions struct {
mu *sync.RWMutex
subs map[entities.AlertType]entities.AlertName
subs map[entities.AlertType]AlertSubscription
}

func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName) {
func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName,
subscription *nats.Subscription) {
s.mu.Lock()
s.subs[alertType] = alertName
s.subs[alertType] = AlertSubscription{alertName, subscription}
s.mu.Unlock()
}

// Read returns alert name.
func (s *subscriptions) Read(alertType entities.AlertType) (entities.AlertName, bool) {
func (s *subscriptions) Read(alertType entities.AlertType) (AlertSubscription, bool) {
s.mu.RLock()
elem, ok := s.subs[alertType]
s.mu.RUnlock()
Expand All @@ -80,12 +85,14 @@ func (s *subscriptions) MapR(f func()) {
type DiscordBotEnvironment struct {
ChatID string
Bot *discordgo.Session
subSocket protocol.Socket
Subscriptions subscriptions
zap *zap.Logger
requestType chan<- pair.Request
responsePairType <-chan pair.Response
unhandledAlertMessages unhandledAlertMessages
scheme string
nc *nats.Conn
alertHandlerFunc func(msg *nats.Msg)
}

func NewDiscordBotEnvironment(
Expand All @@ -94,18 +101,20 @@ func NewDiscordBotEnvironment(
zap *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) *DiscordBotEnvironment {
return &DiscordBotEnvironment{
Bot: bot,
ChatID: chatID,
Subscriptions: subscriptions{
subs: make(map[entities.AlertType]entities.AlertName),
subs: make(map[entities.AlertType]AlertSubscription),
mu: new(sync.RWMutex),
},
zap: zap,
requestType: requestType,
responsePairType: responsePairType,
unhandledAlertMessages: newUnhandledAlertMessages(),
scheme: scheme,
}
}

Expand All @@ -121,10 +130,6 @@ func (dscBot *DiscordBotEnvironment) Start() error {
return nil
}

func (dscBot *DiscordBotEnvironment) SetSubSocket(subSocket protocol.Socket) {
dscBot.subSocket = subSocket
}

func (dscBot *DiscordBotEnvironment) SendMessage(msg string) {
_, err := dscBot.Bot.ChannelMessageSend(dscBot.ChatID, msg)
if err != nil {
Expand Down Expand Up @@ -189,16 +194,25 @@ func (dscBot *DiscordBotEnvironment) SendAlertMessage(msg generalMessaging.Alert
dscBot.unhandledAlertMessages.Add(alertID, messageID)
}

func (dscBot *DiscordBotEnvironment) SetNatsConnection(nc *nats.Conn) {
dscBot.nc = nc
}

func (dscBot *DiscordBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) {
dscBot.alertHandlerFunc = alertHandlerFunc
}

func (dscBot *DiscordBotEnvironment) SubscribeToAllAlerts() error {
for alertType, alertName := range entities.GetAllAlertTypesAndNames() {
if dscBot.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}
err := dscBot.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
topic := generalMessaging.PubSubMsgTopic(dscBot.scheme, alertType)
subscription, err := dscBot.nc.Subscribe(topic, dscBot.alertHandlerFunc)
if err != nil {
return err
return errors.Wrap(err, "failed to subscribe to alert")
}
dscBot.Subscriptions.Add(alertType, alertName)
dscBot.Subscriptions.Add(alertType, alertName, subscription)
dscBot.zap.Sugar().Infof("subscribed to %s", alertName)
}
return nil
Expand Down Expand Up @@ -244,13 +258,15 @@ func (m unhandledAlertMessages) Delete(alertID crypto.Digest) {
type TelegramBotEnvironment struct {
ChatID int64
Bot *telebot.Bot
Mute bool // If it used elsewhere, should be protected by mutex
subSocket protocol.Socket
Mute bool // If it used elsewhere, should be protected by mutex.
subscriptions subscriptions
zap *zap.Logger
requestType chan<- pair.Request
responsePairType <-chan pair.Response
unhandledAlertMessages unhandledAlertMessages
scheme string
nc *nats.Conn
alertHandlerFunc func(msg *nats.Msg)
}

func NewTelegramBotEnvironment(
Expand All @@ -260,19 +276,21 @@ func NewTelegramBotEnvironment(
zap *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) *TelegramBotEnvironment {
return &TelegramBotEnvironment{
Bot: bot,
ChatID: chatID,
Mute: mute,
subscriptions: subscriptions{
subs: make(map[entities.AlertType]entities.AlertName),
subs: make(map[entities.AlertType]AlertSubscription),
mu: new(sync.RWMutex),
},
zap: zap,
requestType: requestType,
responsePairType: responsePairType,
unhandledAlertMessages: newUnhandledAlertMessages(),
scheme: scheme,
}
}

Expand All @@ -289,10 +307,6 @@ func (tgEnv *TelegramBotEnvironment) Start(ctx context.Context) error {
return nil
}

func (tgEnv *TelegramBotEnvironment) SetSubSocket(subSocket protocol.Socket) {
tgEnv.subSocket = subSocket
}

func (tgEnv *TelegramBotEnvironment) SendAlertMessage(msg generalMessaging.AlertMessage) {
if tgEnv.Mute {
tgEnv.zap.Info("received an alert, but asleep now")
Expand Down Expand Up @@ -425,11 +439,12 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAllAlerts() error {
if tgEnv.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}
err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
topic := generalMessaging.PubSubMsgTopic(tgEnv.scheme, alertType)
subscription, err := tgEnv.nc.Subscribe(topic, tgEnv.alertHandlerFunc)
if err != nil {
return err
return errors.Wrap(err, "failed to subscribe to alert")
}
tgEnv.subscriptions.Add(alertType, alertName)
tgEnv.subscriptions.Add(alertType, alertName, subscription)
tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName)
}

Expand All @@ -446,12 +461,14 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAlert(alertType entities.AlertTy
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}

err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
topic := generalMessaging.PubSubMsgTopic(tgEnv.scheme, alertType)
subscription, err := tgEnv.nc.Subscribe(topic, tgEnv.alertHandlerFunc)
if err != nil {
return errors.Wrap(err, "failed to subscribe to alert")
}
tgEnv.subscriptions.Add(alertType, alertName)
tgEnv.subscriptions.Add(alertType, alertName, subscription)
tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName)

return nil
}

Expand All @@ -464,20 +481,31 @@ func (tgEnv *TelegramBotEnvironment) UnsubscribeFromAlert(alertType entities.Ale
if !tgEnv.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to unsubscribe from %s, was not subscribed to it", alertName)
}

err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)})
alertSub, ok := tgEnv.subscriptions.Read(alertType)
if !ok {
return errors.Errorf("subscription didn't exist even though I was subscribed to it")
}
err := alertSub.subscription.Unsubscribe()
if err != nil {
return errors.Wrap(err, "failed to unsubscribe from alert")
return errors.New("failed to unsubscribe from alert")
}
ok = tgEnv.IsAlreadySubscribed(alertType)
if !ok {
return errors.New("failed to unsubscribe from alert: was not subscribed to it")
return errors.New("tried to unsubscribe from alert, but still subscribed to it")
}
tgEnv.subscriptions.Delete(alertType)
tgEnv.zap.Sugar().Infof("Telegram bot unsubscribed from %s", alertName)
return nil
}

func (tgEnv *TelegramBotEnvironment) SetNatsConnection(nc *nats.Conn) {
tgEnv.nc = nc
}

func (tgEnv *TelegramBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) {
tgEnv.alertHandlerFunc = alertHandlerFunc
}

type subscribed struct {
AlertName string
}
Expand All @@ -501,7 +529,7 @@ func (tgEnv *TelegramBotEnvironment) SubscriptionsList() (string, error) {
var subscribedTo []subscribed
tgEnv.subscriptions.MapR(func() {
for _, alertName := range tgEnv.subscriptions.subs {
s := subscribed{AlertName: string(alertName) + "\n\n"}
s := subscribed{AlertName: string(alertName.alertName) + "\n\n"}
subscribedTo = append(subscribedTo, s)
}
})
Expand Down
6 changes: 4 additions & 2 deletions cmd/bots/internal/common/initial/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func InitTgBot(behavior string,
logger *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) (*common.TelegramBotEnvironment, error) {
botSettings, err := config.NewTgBotSettings(behavior, webhookLocalAddress, publicURL, botToken)
if err != nil {
Expand All @@ -31,7 +32,7 @@ func InitTgBot(behavior string,

logger.Sugar().Debugf("telegram chat id for sending alerts is %d", chatID)

tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType)
tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType, scheme)
return tgBotEnv, nil
}

Expand All @@ -41,6 +42,7 @@ func InitDiscordBot(
logger *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) (*common.DiscordBotEnvironment, error) {
bot, err := discordgo.New("Bot " + botToken)
if err != nil {
Expand All @@ -49,6 +51,6 @@ func InitDiscordBot(
logger.Sugar().Debugf("discord chat id for sending alerts is %s", chatID)

bot.Identify.Intents = discordgo.IntentsGuildMessages | discordgo.IntentsMessageContent
dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType)
dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType, scheme)
return dscBotEnv, nil
}
Loading

0 comments on commit da846bb

Please sign in to comment.