diff --git a/cmd/bots/internal/common/messaging/pubsub_client.go b/cmd/bots/internal/common/messaging/pubsub_client.go index ac5b2d51..40af251b 100644 --- a/cmd/bots/internal/common/messaging/pubsub_client.go +++ b/cmd/bots/internal/common/messaging/pubsub_client.go @@ -3,6 +3,7 @@ package messaging import ( "context" + "github.com/pkg/errors" "go.nanomsg.org/mangos/v3/protocol" "go.nanomsg.org/mangos/v3/protocol/sub" _ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports @@ -16,11 +17,9 @@ func StartSubMessagingClient(ctx context.Context, nanomsgURL string, bot Bot, lo if sockErr != nil { return sockErr } - defer func(subSocket protocol.Socket) { - if closeErr := subSocket.Close(); closeErr != nil { - logger.Error("failed to closed a sub socket", zap.Error(closeErr)) - } - }(subSocket) + defer func() { + _ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed + }() bot.SetSubSocket(subSocket) @@ -31,28 +30,53 @@ func StartSubMessagingClient(ctx context.Context, nanomsgURL string, bot Bot, lo if err := bot.SubscribeToAllAlerts(); err != nil { return err } - go func() { + + done := runSubLoop(ctx, subSocket, logger, bot) + + <-ctx.Done() + logger.Info("stopping sub messaging service...") + <-done + logger.Info("sub messaging service finished") + return nil +} + +func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logger, bot Bot) <-chan struct{} { + sockCh := make(chan struct{}) + go func() { // run socket closer goroutine + defer close(sockCh) + <-ctx.Done() + _ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed + }() + ch := make(chan struct{}) + go func(done chan<- struct{}, closedSock <-chan struct{}) { + defer func() { + <-closedSock + close(done) + }() for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: - msg, err := subSocket.Recv() - if err != nil { - logger.Error("failed to receive message", zap.Error(err)) - return - } - alertMsg, err := messaging.NewAlertMessageFromBytes(msg) - if err != nil { - logger.Error("failed to parse alert message from bytes", zap.Error(err)) + } + if err := recvMessage(subSocket, bot); err != nil { + if errors.Is(err, protocol.ErrClosed) { // socket is closed, this means that context is canceled return } - bot.SendAlertMessage(alertMsg) + logger.Error("failed to receive message", zap.Error(err)) } } - }() + }(ch, sockCh) + return ch +} - <-ctx.Done() - logger.Info("sub messaging service finished") +func recvMessage(subSocket protocol.Socket, bot Bot) error { + msg, err := subSocket.Recv() // this operation is blocking, we have to close the socket to interrupt this block + if err != nil { + return errors.Wrap(err, "failed to receive message from sub socket") + } + alertMsg, err := messaging.NewAlertMessageFromBytes(msg) + if err != nil { + return errors.Wrap(err, "failed to parse alert message from bytes") + } + bot.SendAlertMessage(alertMsg) return nil }