Skip to content

Commit

Permalink
use tls for nats connections (#3334)
Browse files Browse the repository at this point in the history
  • Loading branch information
David Christofas authored Oct 12, 2022
1 parent e92ac5d commit ae7c58b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 8 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/nats-tls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Secure the nats connectin with TLS

Encyrpted the connection to the event broker using TLS.

https://github.com/cs3org/reva/pull/3334
33 changes: 32 additions & 1 deletion internal/grpc/interceptors/eventsmiddleware/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
package eventsmiddleware

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"os"

"go-micro.dev/v4/util/log"
"google.golang.org/grpc"
Expand Down Expand Up @@ -209,6 +214,32 @@ func publisherFromConfig(m map[string]interface{}) (events.Publisher, error) {
case "nats":
address := m["address"].(string)
cid := m["clusterID"].(string)
return server.NewNatsStream(natsjs.Address(address), natsjs.ClusterID(cid))

skipVerify := m["tls-insecure"].(bool)
var rootCAPool *x509.CertPool
if val, ok := m["tls-root-ca-cert"]; ok {
rootCACertPath := val.(string)
if rootCACertPath != "" {
f, err := os.Open(rootCACertPath)
if err != nil {
return nil, err
}

var certBytes bytes.Buffer
if _, err := io.Copy(&certBytes, f); err != nil {
return nil, err
}

rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
skipVerify = false
}
}

tlsConf := &tls.Config{
InsecureSkipVerify: skipVerify,
RootCAs: rootCAPool,
}
return server.NewNatsStream(natsjs.TLSConfig(tlsConf), natsjs.Address(address), natsjs.ClusterID(cid))
}
}
41 changes: 34 additions & 7 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
package dataprovider

import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"os"

"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/events"
Expand All @@ -40,12 +45,14 @@ func init() {
}

type config struct {
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
DataTXs map[string]map[string]interface{} `mapstructure:"data_txs" docs:"url:pkg/rhttp/datatx/manager/simple/simple.go;The configuration for the data tx protocols"`
NatsAddress string `mapstructure:"nats_address"`
NatsClusterID string `mapstructure:"nats_clusterID"`
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
DataTXs map[string]map[string]interface{} `mapstructure:"data_txs" docs:"url:pkg/rhttp/datatx/manager/simple/simple.go;The configuration for the data tx protocols"`
NatsAddress string `mapstructure:"nats_address"`
NatsClusterID string `mapstructure:"nats_clusterID"`
NatsTLSInsecure bool `mapstructure:"nats_tls_insecure"`
NatsRootCACertPath string `mapstructure:"nats_root_ca_cert_path"`
}

func (c *config) init() {
Expand Down Expand Up @@ -83,7 +90,27 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
if conf.NatsAddress == "" || conf.NatsClusterID == "" {
log.Warn().Msg("missing or incomplete nats configuration. Events will not be published.")
} else {
publisher, err = server.NewNatsStream(natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID))
var rootCAPool *x509.CertPool
if conf.NatsRootCACertPath != "" {
f, err := os.Open(conf.NatsRootCACertPath)
if err != nil {
return nil, err
}

var certBytes bytes.Buffer
if _, err := io.Copy(&certBytes, f); err != nil {
return nil, err
}

rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
conf.NatsTLSInsecure = false
}
tlsConf := &tls.Config{
InsecureSkipVerify: conf.NatsTLSInsecure,
RootCAs: rootCAPool,
}
publisher, err = server.NewNatsStream(natsjs.TLSConfig(tlsConf), natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit ae7c58b

Please sign in to comment.