Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network switch mangos to nats #306

Merged
merged 22 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
df0d89f
Changed pubsub communication to nats
esuwu Nov 13, 2024
d1cbca6
Moved the nats server to cmd
esuwu Nov 22, 2024
59ea89a
merged from main
esuwu Nov 22, 2024
9cd6bf6
Added a nodemon-built-in optional messaging server, dockerfile and ma…
esuwu Dec 10, 2024
ea9f9c0
Fixed subscription to alerts
esuwu Dec 10, 2024
0d19672
Merge branch 'main' into network-switch-mangos-to-nats
nickeskov Dec 20, 2024
601cba9
Fixes.
nickeskov Dec 20, 2024
9090946
Add explicit connection timeout NATS for clients.
nickeskov Dec 20, 2024
89716e1
Updated telegram and discord bot flags for NATS connection.
nickeskov Dec 20, 2024
afbcd4c
Update docs for telegram and discord bots.
nickeskov Dec 20, 2024
e1bd826
Change nodemon flags.
nickeskov Dec 20, 2024
ae246f8
Update 'natsConnectionsTimeoutDefault' const.
nickeskov Dec 20, 2024
a1ecbf3
Update 'cmd/nodemon/README.md'.
nickeskov Dec 20, 2024
2f4a856
Refactor topics naming
nickeskov Dec 20, 2024
a9a2b2b
Run nats embedded server before messaging services start.
nickeskov Dec 20, 2024
ce5dfa0
Fix 'RunNatsMessagingServer' func
nickeskov Dec 20, 2024
fbdf1c7
Remove default 'scheme' value for 'nodemon'.
nickeskov Dec 20, 2024
f0bf9cd
Merge branch 'main' into network-switch-mangos-to-nats
nickeskov Dec 20, 2024
9c8a29f
Update 'Dockerfile-messaging-server'.
nickeskov Dec 20, 2024
b1310e0
Remove 'messaging-server' application and its 'Dockerfile-messaging-s…
nickeskov Dec 20, 2024
25c999c
Merge branch 'main' into network-switch-mangos-to-nats
nickeskov Dec 20, 2024
604b27a
Added scheme validation for nodemon
esuwu Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile-nodemon
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ RUN apk add --no-cache bind-tools

USER $APP_USER
WORKDIR ${APP}
# Considered as a default HTTP API Port
EXPOSE 8080
# Considered as a default HTTP API Port, NATS embedded server port
EXPOSE 8080, 4222

COPY --from=builder ${APP}/build/linux-amd64/nodemon ${APP}/nodemon

Expand Down
8 changes: 3 additions & 5 deletions cmd/bots/discord/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ To do the same as environment variable form use _**UPPER_SNAKE_CASE**_ option na
- _-discord-chat-id_ (string) — discord chat ID to send alerts through a specific chat
- _-log-level_ (string) — Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level
is INFO. (default "INFO")
- _-nano-msg-pair-discord-url_ (string) — Nanomsg IPC URL for pair socket (default
"ipc:///tmp/nano-msg-nodemon-pair.ipc"). Used for communication between the monitoring and bot services.
- _-nano-msg-pubsub-url_ (string) — Nanomsg IPC URL for pubsub socket (default
"ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc"). Used by the bot to subscribe to alerts generated by
the monitoring service.
- _-nats-msg-url_ (string) — NATS server URL for messaging (default "nats://127.0.0.1:4222").
Used by the bot to subscribe to alerts generated by
the monitoring service and for communication between the monitoring and bot services.

## Build requirements

Expand Down
23 changes: 15 additions & 8 deletions cmd/bots/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"nodemon/cmd/bots/internal/common/messaging"
"nodemon/cmd/bots/internal/discord/handlers"
"nodemon/internal"
generalMessaging "nodemon/pkg/messaging"
"nodemon/pkg/messaging/pair"
"nodemon/pkg/tools"

Expand All @@ -38,21 +39,19 @@ func main() {
}

type discordBotConfig struct {
nanomsgPubSubURL string
nanomsgPairURL string
natsMessagingURL string
discordBotToken string
discordChatID string
logLevel string
development bool
bindAddress string
scheme string
}

func newDiscordBotConfigConfig() *discordBotConfig {
c := new(discordBotConfig)
tools.StringVarFlagWithEnv(&c.nanomsgPubSubURL, "nano-msg-pubsub-url",
"ipc:///tmp/discord/nano-msg-nodemon-pubsub.ipc", "Nanomsg IPC URL for pubsub socket")
tools.StringVarFlagWithEnv(&c.nanomsgPairURL, "nano-msg-pair-discord-url",
"ipc:///tmp/nano-msg-nodemon-pair.ipc", "Nanomsg IPC URL for pair socket")
tools.StringVarFlagWithEnv(&c.natsMessagingURL, "nats-msg-url",
"nats://127.0.0.1:4222", "NATS server URL for messaging")
tools.StringVarFlagWithEnv(&c.discordBotToken, "discord-bot-token",
"", "The secret token used to authenticate the bot")
tools.StringVarFlagWithEnv(&c.discordChatID, "discord-chat-id",
Expand All @@ -62,6 +61,8 @@ func newDiscordBotConfigConfig() *discordBotConfig {
tools.BoolVarFlagWithEnv(&c.development, "development", false, "Development mode.")
tools.StringVarFlagWithEnv(&c.bindAddress, "bind", "",
"Local network address to bind the HTTP API of the service on.")
tools.StringVarFlagWithEnv(&c.scheme, "scheme", "",
"Blockchain scheme i.e. mainnet, testnet, stagenet. Used in messaging service")
return c
}

Expand All @@ -70,6 +71,10 @@ func (c *discordBotConfig) validate(zap *zap.Logger) error {
zap.Error("discord bot token is required")
return common.ErrInvalidParameters
}
if c.scheme == "" {
zap.Error("the blockchain scheme must be specified")
return common.ErrInvalidParameters
}
if c.discordChatID == "" {
zap.Error("discord chat ID is required")
return common.ErrInvalidParameters
Expand Down Expand Up @@ -111,6 +116,7 @@ func runDiscordBot() error {
logger,
requestChan,
responseChan,
cfg.scheme,
)
if initErr != nil {
return errors.Wrap(initErr, "failed to init discord bot")
Expand Down Expand Up @@ -178,15 +184,16 @@ func runMessagingClients(
responseChan chan pair.Response,
) {
go func() {
clientErr := messaging.StartSubMessagingClient(ctx, cfg.nanomsgPubSubURL, discordBotEnv, logger)
clientErr := messaging.StartSubMessagingClient(ctx, cfg.natsMessagingURL, discordBotEnv, logger)
if clientErr != nil {
logger.Fatal("failed to start sub messaging client", zap.Error(clientErr))
return
}
}()

go func() {
err := messaging.StartPairMessagingClient(ctx, cfg.nanomsgPairURL, requestChan, responseChan, logger)
topic := generalMessaging.DiscordBotRequestsTopic(cfg.scheme)
err := messaging.StartPairMessagingClient(ctx, cfg.natsMessagingURL, requestChan, responseChan, logger, topic)
if err != nil {
logger.Fatal("failed to start pair messaging client", zap.Error(err))
return
Expand Down
92 changes: 60 additions & 32 deletions cmd/bots/internal/common/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import (

"codnect.io/chrono"
"github.com/bwmarrin/discordgo"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/wavesplatform/gowaves/pkg/crypto"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol"
"go.uber.org/zap"
"gopkg.in/telebot.v3"
)
Expand All @@ -46,19 +45,25 @@ const (

var errUnknownAlertType = errors.New("received unknown alert type")

type AlertSubscription struct {
alertName entities.AlertName
subscription *nats.Subscription
}

type subscriptions struct {
mu *sync.RWMutex
subs map[entities.AlertType]entities.AlertName
subs map[entities.AlertType]AlertSubscription
}

func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName) {
func (s *subscriptions) Add(alertType entities.AlertType, alertName entities.AlertName,
subscription *nats.Subscription) {
s.mu.Lock()
s.subs[alertType] = alertName
s.subs[alertType] = AlertSubscription{alertName, subscription}
s.mu.Unlock()
}

// Read returns alert name.
func (s *subscriptions) Read(alertType entities.AlertType) (entities.AlertName, bool) {
func (s *subscriptions) Read(alertType entities.AlertType) (AlertSubscription, bool) {
s.mu.RLock()
elem, ok := s.subs[alertType]
s.mu.RUnlock()
Expand All @@ -80,12 +85,14 @@ func (s *subscriptions) MapR(f func()) {
type DiscordBotEnvironment struct {
ChatID string
Bot *discordgo.Session
subSocket protocol.Socket
Subscriptions subscriptions
zap *zap.Logger
requestType chan<- pair.Request
responsePairType <-chan pair.Response
unhandledAlertMessages unhandledAlertMessages
scheme string
nc *nats.Conn
alertHandlerFunc func(msg *nats.Msg)
}

func NewDiscordBotEnvironment(
Expand All @@ -94,18 +101,20 @@ func NewDiscordBotEnvironment(
zap *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) *DiscordBotEnvironment {
return &DiscordBotEnvironment{
Bot: bot,
ChatID: chatID,
Subscriptions: subscriptions{
subs: make(map[entities.AlertType]entities.AlertName),
subs: make(map[entities.AlertType]AlertSubscription),
mu: new(sync.RWMutex),
},
zap: zap,
requestType: requestType,
responsePairType: responsePairType,
unhandledAlertMessages: newUnhandledAlertMessages(),
scheme: scheme,
}
}

Expand All @@ -121,10 +130,6 @@ func (dscBot *DiscordBotEnvironment) Start() error {
return nil
}

func (dscBot *DiscordBotEnvironment) SetSubSocket(subSocket protocol.Socket) {
dscBot.subSocket = subSocket
}

func (dscBot *DiscordBotEnvironment) SendMessage(msg string) {
_, err := dscBot.Bot.ChannelMessageSend(dscBot.ChatID, msg)
if err != nil {
Expand Down Expand Up @@ -189,16 +194,25 @@ func (dscBot *DiscordBotEnvironment) SendAlertMessage(msg generalMessaging.Alert
dscBot.unhandledAlertMessages.Add(alertID, messageID)
}

func (dscBot *DiscordBotEnvironment) SetNatsConnection(nc *nats.Conn) {
dscBot.nc = nc
}

func (dscBot *DiscordBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) {
dscBot.alertHandlerFunc = alertHandlerFunc
}

func (dscBot *DiscordBotEnvironment) SubscribeToAllAlerts() error {
for alertType, alertName := range entities.GetAllAlertTypesAndNames() {
if dscBot.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}
err := dscBot.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
topic := generalMessaging.PubSubMsgTopic(dscBot.scheme, alertType)
subscription, err := dscBot.nc.Subscribe(topic, dscBot.alertHandlerFunc)
if err != nil {
return err
return errors.Wrap(err, "failed to subscribe to alert")
}
dscBot.Subscriptions.Add(alertType, alertName)
dscBot.Subscriptions.Add(alertType, alertName, subscription)
dscBot.zap.Sugar().Infof("subscribed to %s", alertName)
}
return nil
Expand Down Expand Up @@ -244,13 +258,15 @@ func (m unhandledAlertMessages) Delete(alertID crypto.Digest) {
type TelegramBotEnvironment struct {
ChatID int64
Bot *telebot.Bot
Mute bool // If it used elsewhere, should be protected by mutex
subSocket protocol.Socket
Mute bool // If it used elsewhere, should be protected by mutex.
subscriptions subscriptions
zap *zap.Logger
requestType chan<- pair.Request
responsePairType <-chan pair.Response
unhandledAlertMessages unhandledAlertMessages
scheme string
nc *nats.Conn
alertHandlerFunc func(msg *nats.Msg)
}

func NewTelegramBotEnvironment(
Expand All @@ -260,19 +276,21 @@ func NewTelegramBotEnvironment(
zap *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) *TelegramBotEnvironment {
return &TelegramBotEnvironment{
Bot: bot,
ChatID: chatID,
Mute: mute,
subscriptions: subscriptions{
subs: make(map[entities.AlertType]entities.AlertName),
subs: make(map[entities.AlertType]AlertSubscription),
mu: new(sync.RWMutex),
},
zap: zap,
requestType: requestType,
responsePairType: responsePairType,
unhandledAlertMessages: newUnhandledAlertMessages(),
scheme: scheme,
}
}

Expand All @@ -289,10 +307,6 @@ func (tgEnv *TelegramBotEnvironment) Start(ctx context.Context) error {
return nil
}

func (tgEnv *TelegramBotEnvironment) SetSubSocket(subSocket protocol.Socket) {
tgEnv.subSocket = subSocket
}

func (tgEnv *TelegramBotEnvironment) SendAlertMessage(msg generalMessaging.AlertMessage) {
if tgEnv.Mute {
tgEnv.zap.Info("received an alert, but asleep now")
Expand Down Expand Up @@ -425,11 +439,12 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAllAlerts() error {
if tgEnv.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}
err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
topic := generalMessaging.PubSubMsgTopic(tgEnv.scheme, alertType)
subscription, err := tgEnv.nc.Subscribe(topic, tgEnv.alertHandlerFunc)
if err != nil {
return err
return errors.Wrap(err, "failed to subscribe to alert")
}
tgEnv.subscriptions.Add(alertType, alertName)
tgEnv.subscriptions.Add(alertType, alertName, subscription)
tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName)
}

Expand All @@ -446,12 +461,14 @@ func (tgEnv *TelegramBotEnvironment) SubscribeToAlert(alertType entities.AlertTy
return errors.Errorf("failed to subscribe to %s, already subscribed to it", alertName)
}

err := tgEnv.subSocket.SetOption(mangos.OptionSubscribe, []byte{byte(alertType)})
topic := generalMessaging.PubSubMsgTopic(tgEnv.scheme, alertType)
subscription, err := tgEnv.nc.Subscribe(topic, tgEnv.alertHandlerFunc)
if err != nil {
return errors.Wrap(err, "failed to subscribe to alert")
}
tgEnv.subscriptions.Add(alertType, alertName)
tgEnv.subscriptions.Add(alertType, alertName, subscription)
tgEnv.zap.Sugar().Infof("Telegram bot subscribed to %s", alertName)

return nil
}

Expand All @@ -464,20 +481,31 @@ func (tgEnv *TelegramBotEnvironment) UnsubscribeFromAlert(alertType entities.Ale
if !tgEnv.IsAlreadySubscribed(alertType) {
return errors.Errorf("failed to unsubscribe from %s, was not subscribed to it", alertName)
}

err := tgEnv.subSocket.SetOption(mangos.OptionUnsubscribe, []byte{byte(alertType)})
alertSub, ok := tgEnv.subscriptions.Read(alertType)
if !ok {
return errors.Errorf("subscription didn't exist even though I was subscribed to it")
}
err := alertSub.subscription.Unsubscribe()
if err != nil {
return errors.Wrap(err, "failed to unsubscribe from alert")
return errors.New("failed to unsubscribe from alert")
}
ok = tgEnv.IsAlreadySubscribed(alertType)
if !ok {
return errors.New("failed to unsubscribe from alert: was not subscribed to it")
return errors.New("tried to unsubscribe from alert, but still subscribed to it")
}
tgEnv.subscriptions.Delete(alertType)
tgEnv.zap.Sugar().Infof("Telegram bot unsubscribed from %s", alertName)
return nil
}

func (tgEnv *TelegramBotEnvironment) SetNatsConnection(nc *nats.Conn) {
tgEnv.nc = nc
}

func (tgEnv *TelegramBotEnvironment) SetAlertHandlerFunc(alertHandlerFunc func(msg *nats.Msg)) {
tgEnv.alertHandlerFunc = alertHandlerFunc
}

type subscribed struct {
AlertName string
}
Expand All @@ -501,7 +529,7 @@ func (tgEnv *TelegramBotEnvironment) SubscriptionsList() (string, error) {
var subscribedTo []subscribed
tgEnv.subscriptions.MapR(func() {
for _, alertName := range tgEnv.subscriptions.subs {
s := subscribed{AlertName: string(alertName) + "\n\n"}
s := subscribed{AlertName: string(alertName.alertName) + "\n\n"}
subscribedTo = append(subscribedTo, s)
}
})
Expand Down
6 changes: 4 additions & 2 deletions cmd/bots/internal/common/initial/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func InitTgBot(behavior string,
logger *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) (*common.TelegramBotEnvironment, error) {
botSettings, err := config.NewTgBotSettings(behavior, webhookLocalAddress, publicURL, botToken)
if err != nil {
Expand All @@ -31,7 +32,7 @@ func InitTgBot(behavior string,

logger.Sugar().Debugf("telegram chat id for sending alerts is %d", chatID)

tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType)
tgBotEnv := common.NewTelegramBotEnvironment(bot, chatID, false, logger, requestType, responsePairType, scheme)
return tgBotEnv, nil
}

Expand All @@ -41,6 +42,7 @@ func InitDiscordBot(
logger *zap.Logger,
requestType chan<- pair.Request,
responsePairType <-chan pair.Response,
scheme string,
) (*common.DiscordBotEnvironment, error) {
bot, err := discordgo.New("Bot " + botToken)
if err != nil {
Expand All @@ -49,6 +51,6 @@ func InitDiscordBot(
logger.Sugar().Debugf("discord chat id for sending alerts is %s", chatID)

bot.Identify.Intents = discordgo.IntentsGuildMessages | discordgo.IntentsMessageContent
dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType)
dscBotEnv := common.NewDiscordBotEnvironment(bot, chatID, logger, requestType, responsePairType, scheme)
return dscBotEnv, nil
}
Loading
Loading