Skip to content

Commit

Permalink
Add graceful shutdown for pubsub client.
Browse files Browse the repository at this point in the history
  • Loading branch information
nickeskov committed Sep 18, 2023
1 parent 60ff7bd commit 71a8d40
Showing 1 changed file with 45 additions and 21 deletions.
66 changes: 45 additions & 21 deletions cmd/bots/internal/common/messaging/pubsub_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package messaging
import (
"context"

"github.com/pkg/errors"
"go.nanomsg.org/mangos/v3/protocol"
"go.nanomsg.org/mangos/v3/protocol/sub"
_ "go.nanomsg.org/mangos/v3/transport/all" // registers all transports
Expand All @@ -16,11 +17,9 @@ func StartSubMessagingClient(ctx context.Context, nanomsgURL string, bot Bot, lo
if sockErr != nil {
return sockErr
}
defer func(subSocket protocol.Socket) {
if closeErr := subSocket.Close(); closeErr != nil {
logger.Error("failed to closed a sub socket", zap.Error(closeErr))
}
}(subSocket)
defer func() {
_ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed
}()

bot.SetSubSocket(subSocket)

Expand All @@ -31,28 +30,53 @@ func StartSubMessagingClient(ctx context.Context, nanomsgURL string, bot Bot, lo
if err := bot.SubscribeToAllAlerts(); err != nil {
return err
}
go func() {

done := runSubLoop(ctx, subSocket, logger, bot)

<-ctx.Done()
logger.Info("stopping sub messaging service...")
<-done
logger.Info("sub messaging service finished")
return nil
}

func runSubLoop(ctx context.Context, subSocket protocol.Socket, logger *zap.Logger, bot Bot) <-chan struct{} {
sockCh := make(chan struct{})
go func() { // run socket closer goroutine
defer close(sockCh)
<-ctx.Done()
_ = subSocket.Close() // can be ignored, only possible error is protocol.ErrClosed
}()
ch := make(chan struct{})
go func(done chan<- struct{}, closedSock <-chan struct{}) {
defer func() {
<-closedSock
close(done)
}()
for {
select {
case <-ctx.Done():
if ctx.Err() != nil {
return
default:
msg, err := subSocket.Recv()
if err != nil {
logger.Error("failed to receive message", zap.Error(err))
return
}
alertMsg, err := messaging.NewAlertMessageFromBytes(msg)
if err != nil {
logger.Error("failed to parse alert message from bytes", zap.Error(err))
}
if err := recvMessage(subSocket, bot); err != nil {
if errors.Is(err, protocol.ErrClosed) { // socket is closed, this means that context is canceled
return
}
bot.SendAlertMessage(alertMsg)
logger.Error("failed to receive message", zap.Error(err))
}
}
}()
}(ch, sockCh)
return ch
}

<-ctx.Done()
logger.Info("sub messaging service finished")
func recvMessage(subSocket protocol.Socket, bot Bot) error {
msg, err := subSocket.Recv() // this operation is blocking, we have to close the socket to interrupt this block
if err != nil {
return errors.Wrap(err, "failed to receive message from sub socket")
}
alertMsg, err := messaging.NewAlertMessageFromBytes(msg)
if err != nil {
return errors.Wrap(err, "failed to parse alert message from bytes")
}
bot.SendAlertMessage(alertMsg)
return nil
}

0 comments on commit 71a8d40

Please sign in to comment.