Skip to content

Commit

Permalink
Merge pull request #19 from kumparan/AddReconnectHandler
Browse files Browse the repository at this point in the history
feature: new connection method with default config and reconnect handler
  • Loading branch information
atjhoendz authored Nov 2, 2022
2 parents dd79d60 + 66dddae commit 4a75b64
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 135 deletions.
224 changes: 126 additions & 98 deletions jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,48 +44,65 @@ type (
MessageHandler func(payload MessageParser) (err error)
)

// NewNATSConnection :nodoc:
func NewNATSConnection(url string, natsOpts ...nats.Option) (JetStream, error) {
nc, err := connect(url, natsOpts...)
if err != nil {
logrus.WithFields(logrus.Fields{
"url": url,
}).Error(err)
return nil, err
// GetNATSConnection :nodoc:
func (j *jsImpl) GetNATSConnection() *nats.Conn {
if j == nil {
return nil
}
return j.natsConn
}

js, err := nc.JetStream()
if err != nil {
logrus.Error(err)
return nil, err
// Publish publish message using JetStream
func (j *jsImpl) Publish(subject string, value []byte, opts ...nats.PubOpt) (*nats.PubAck, error) {
if !j.isValidConn() {
return nil, ErrConnectionLost
}
return j.jsCtx.Publish(subject, value, opts...)
}

impl := &jsImpl{
natsConn: nc,
jsCtx: js,
// QueueSubscribe :nodoc:
func (j *jsImpl) QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) {
if !j.isValidConn() {
return nil, ErrConnectionLost
}
return j.jsCtx.QueueSubscribe(subj, queue, cb, opts...)
}

return impl, nil
// Subscribe :nodoc:
func (j *jsImpl) Subscribe(subj string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) {
if !j.isValidConn() {
return nil, ErrConnectionLost
}
return j.jsCtx.Subscribe(subj, cb, opts...)
}

// RegisterJetStreamClient provide jetstream instance, stream, and subscription registration
func RegisterJetStreamClient(js JetStream, clients []JetStreamRegistrar) error {
for _, client := range clients {
client.RegisterNATSJetStream(js)
if streamRegistrar, ok := client.(StreamRegistrar); ok {
err := streamRegistrar.InitStream()
if err != nil {
return err
}
}
if subscriber, ok := client.(Subscriber); ok {
err := subscriber.SubscribeJetStreamEvent()
if err != nil {
return err
}
}
// AddStream add stream
func (j *jsImpl) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) {
if !j.isValidConn() {
return nil, ErrConnectionLost
}
return nil

streamInfo, _ := j.jsCtx.StreamInfo(cfg.Name)

if streamInfo == nil {
return j.jsCtx.AddStream(cfg, opts...)
}

return j.jsCtx.UpdateStream(cfg)

}

// ConsumerInfo :nodoc:
func (j *jsImpl) ConsumerInfo(streamName, consumerName string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) {
if !j.isValidConn() {
return nil, ErrConnectionLost
}

return j.jsCtx.ConsumerInfo(streamName, consumerName, opts...)
}

func (j *jsImpl) isValidConn() (b bool) {
return j.natsConn != nil && j.natsConn.IsConnected()
}

// NewNATSMessageHandler a wrapper to standardize how we handle NATS messages.
Expand All @@ -101,13 +118,13 @@ func NewNATSMessageHandler(payload MessageParser, retryAttempts int, retryInterv
}(logger)

if msg.Data == nil {
logger.Error("Message payload is nil")
logger.Error("message payload is nil")
return
}

err := payload.ParseFromBytes(msg.Data)
if err != nil {
logger.WithField("error-detail", err).Error("Unmarshal failed")
logger.WithField("error-detail", err).Error("unmarshal failed")
return
}

Expand Down Expand Up @@ -142,87 +159,98 @@ func NewNATSMessageHandler(payload MessageParser, retryAttempts int, retryInterv
}
}

// connect to nats streaming
func connect(url string, options ...nats.Option) (*nats.Conn, error) {
nc, err := nats.Connect(url, options...)
if err != nil {
return nil, err
// SafeClose :nodoc:
func SafeClose(js JetStream) {
if js == nil {
return
}
return nc, nil
}

// GetNATSConnection :nodoc:
func (j *jsImpl) GetNATSConnection() *nats.Conn {
if j == nil {
return nil
natsConn := js.GetNATSConnection()
if natsConn == nil {
return
}
return j.natsConn
}

func (j *jsImpl) checkConnIsValid() (b bool) {
return j.natsConn != nil && j.natsConn.IsConnected()
}

// Publish publish message using JetStream
func (j *jsImpl) Publish(subject string, value []byte, opts ...nats.PubOpt) (*nats.PubAck, error) {
if !j.checkConnIsValid() {
return nil, ErrConnectionLost
if !natsConn.IsConnected() {
return
}
return j.jsCtx.Publish(subject, value, opts...)
}

// QueueSubscribe :nodoc:
func (j *jsImpl) QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) {
if !j.checkConnIsValid() {
return nil, ErrConnectionLost
err := natsConn.Drain()
if err != nil {
logrus.Errorf("draining connection error. reason: %q\n", err)
logrus.Info("force closing...")
natsConn.Close()
}
return j.jsCtx.QueueSubscribe(subj, queue, cb, opts...)
}

// Subscribe :nodoc:
func (j *jsImpl) Subscribe(subj string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) {
if !j.checkConnIsValid() {
return nil, ErrConnectionLost
}
return j.jsCtx.Subscribe(subj, cb, opts...)
}
// NewNATSConnection :nodoc:
func NewNATSConnection(NATSJSHost string, clients []JetStreamRegistrar, natsOpts ...nats.Option) (JetStream, error) {
opts := []nats.Option{
nats.UseOldRequestStyle(),
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
logrus.Errorf("NATS got error! reason: %q\n", err)
}),
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
logrus.Errorf("NATS got disconnected! reason: %q\n", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
_, err := initJetStreamClients(nc, clients)
if err != nil {
logrus.Errorf("NATS failed to reconnect. reason: %q\n", err)
return
}

// AddStream add stream
func (j *jsImpl) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) {
if !j.checkConnIsValid() {
return nil, ErrConnectionLost
logrus.Infof("NATS got reconnected to %q\n", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
logrus.Errorf("NATS connection closed. reason: %q\n", nc.LastError())
}),
}

streamInfo, _ := j.jsCtx.StreamInfo(cfg.Name)
natsOpts = append(natsOpts, opts...)

if streamInfo == nil {
return j.jsCtx.AddStream(cfg, opts...)
nc, err := nats.Connect(NATSJSHost, natsOpts...)
if err != nil {
logrus.Errorf("NATS failed to connect. reason: %q\n", err)
return nil, err
}

return j.jsCtx.UpdateStream(cfg)

return initJetStreamClients(nc, clients)
}

// ConsumerInfo :nodoc:
func (j *jsImpl) ConsumerInfo(streamName, consumerName string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) {
if !j.checkConnIsValid() {
return nil, ErrConnectionLost
// registerJetStreamClient provide jetstream instance, stream, and subscription registration
func registerJetStreamClient(js JetStream, clients []JetStreamRegistrar) error {
for _, client := range clients {
client.RegisterNATSJetStream(js)
if streamRegistrar, ok := client.(StreamRegistrar); ok {
err := streamRegistrar.InitStream()
if err != nil {
return err
}
}
if subscriber, ok := client.(Subscriber); ok {
err := subscriber.SubscribeJetStreamEvent()
if err != nil {
return err
}
}
}

return j.jsCtx.ConsumerInfo(streamName, consumerName, opts...)
return nil
}

// SafeClose :nodoc:
func SafeClose(js JetStream) {
if js == nil {
return
func initJetStreamClients(nc *nats.Conn, clients []JetStreamRegistrar) (JetStream, error) {
jsCtx, err := nc.JetStream()
if err != nil {
logrus.Errorf("failed to get jetstream context. reason: %q\n", err)
return nil, err
}
natsConn := js.GetNATSConnection()
if natsConn == nil {
return

js := &jsImpl{
natsConn: nc,
jsCtx: jsCtx,
}
if !natsConn.IsConnected() {
return

err = registerJetStreamClient(js, clients)
if err != nil {
logrus.Errorf("failed to register jetstream client. reason %q\n", err)
return nil, err
}
natsConn.Close()

return js, nil
}
Loading

0 comments on commit 4a75b64

Please sign in to comment.