Skip to content

Commit

Permalink
Keep the http transport struct on a per EOS instance base, and make a…
Browse files Browse the repository at this point in the history
…ll of its parameters configurable
  • Loading branch information
ffurano committed Jun 14, 2021
1 parent eba9d4d commit 338598b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 56 deletions.
85 changes: 47 additions & 38 deletions pkg/eosclient/eosgrpc/eos_http/eoshttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/cs3org/reva/pkg/appctx"
Expand All @@ -53,6 +52,18 @@ type Options struct {
// Timeout in seconds for performing an operation. Includes every redirection, retry, etc
OpTimeout int

// Max idle conns per Transport
MaxIdleConns int

// Max conns per transport per destination host
MaxConnsPerHost int

// Max idle conns per transport per destination host
MaxIdleConnsPerHost int

// TTL for an idle conn per transport
IdleConnTimeout int

// If the URL is https, then we need to configure this client
// with the usual TLS stuff
// Defaults are /etc/grid-security/hostcert.pem and /etc/grid-security/hostkey.pem
Expand All @@ -66,14 +77,8 @@ type Options struct {
ClientCAFiles string
}

// We want just one instance of these options in the whole app, as we don't want to
// instantiate more than once the http client internals. For example to have
// http keepalive, pools, etc...
var httpTransport *http.Transport
var httpTransportMtx sync.Mutex

// Init fills the basic fields
func (opt *Options) Init() error {
func (opt *Options) Init() (*http.Transport, error) {

if opt.BaseURL == "" {
opt.BaseURL = "https://eos-example.org"
Expand All @@ -88,6 +93,18 @@ func (opt *Options) Init() error {
if opt.OpTimeout == 0 {
opt.OpTimeout = 360
}
if opt.MaxIdleConns == 0 {
opt.MaxIdleConns = 100
}
if opt.MaxConnsPerHost == 0 {
opt.MaxConnsPerHost = 64
}
if opt.MaxIdleConnsPerHost == 0 {
opt.MaxIdleConnsPerHost = 8
}
if opt.IdleConnTimeout == 0 {
opt.IdleConnTimeout = 30
}

if opt.ClientCertFile == "" {
opt.ClientCertFile = "/etc/grid-security/hostcert.pem"
Expand All @@ -107,33 +124,25 @@ func (opt *Options) Init() error {

cert, err := tls.LoadX509KeyPair(opt.ClientCertFile, opt.ClientKeyFile)
if err != nil {
return err
return nil, err
}

httpTransportMtx.Lock()
// Lock so only one goroutine at a time can access the var
// Note that we assume that the variable will stay constant,
// hence we don't need to lock it when used
defer httpTransportMtx.Unlock()

if httpTransport == nil {

// TODO: the error reporting of http.transport is insufficient
// must check manually at least the existence of the certfiles
// The point is that also the error reporting of the context that calls this function
// is weak
httpTransport = &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
},
MaxIdleConns: 10,
MaxConnsPerHost: 8,
MaxIdleConnsPerHost: 8,
IdleConnTimeout: 30 * time.Second,
DisableCompression: true,
}
// TODO: the error reporting of http.transport is insufficient
// we may want to check manually at least the existence of the certfiles
// The point is that also the error reporting of the context that calls this function
// is weak
t := &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
},
MaxIdleConns: opt.MaxIdleConns,
MaxConnsPerHost: opt.MaxConnsPerHost,
MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost,
IdleConnTimeout: time.Duration(opt.IdleConnTimeout) * time.Second,
DisableCompression: true,
}
return nil

return t, nil
}

// Client performs HTTP-based tasks (e.g. upload, download)
Expand All @@ -147,7 +156,7 @@ type Client struct {
}

// New creates a new client with the given options.
func New(opt *Options) *Client {
func New(opt *Options, t *http.Transport) *Client {
log := logger.New().With().Int("pid", os.Getpid()).Logger()
log.Debug().Str("func", "New").Str("Creating new eoshttp client. opt: ", "'"+fmt.Sprintf("%#v", opt)+"' ").Msg("")

Expand All @@ -164,7 +173,7 @@ func New(opt *Options) *Client {
log.Debug().Str("func", "newhttp").Str("Connecting to ", "'"+opt.BaseURL+"'").Msg("")

c.cl = &http.Client{
Transport: httpTransport}
Transport: t}

c.cl.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
Expand Down Expand Up @@ -263,7 +272,7 @@ func (c *Client) buildFullURL(urlpath, uid, gid string) (string, error) {
}

// GETFile does an entire GET to download a full file. Returns a stream to read the content from
func (c *Client) GETFile(ctx context.Context, remoteuser, uid, gid, urlpath string, stream io.WriteCloser) (io.ReadCloser, error) {
func (c *Client) GETFile(ctx context.Context, httptransport *http.Transport, remoteuser, uid, gid, urlpath string, stream io.WriteCloser) (io.ReadCloser, error) {

log := appctx.GetLogger(ctx)
log.Info().Str("func", "GETFile").Str("remoteuser", remoteuser).Str("uid,gid", uid+","+gid).Str("path", urlpath).Msg("")
Expand Down Expand Up @@ -311,7 +320,7 @@ func (c *Client) GETFile(ctx context.Context, remoteuser, uid, gid, urlpath stri
}

c.cl = &http.Client{
Transport: httpTransport}
Transport: httptransport}

req, err = http.NewRequestWithContext(ctx, "GET", loc.String(), nil)
if err != nil {
Expand Down Expand Up @@ -358,7 +367,7 @@ func (c *Client) GETFile(ctx context.Context, remoteuser, uid, gid, urlpath stri
}

// PUTFile does an entire PUT to upload a full file, taking the data from a stream
func (c *Client) PUTFile(ctx context.Context, remoteuser, uid, gid, urlpath string, stream io.ReadCloser, length int64) error {
func (c *Client) PUTFile(ctx context.Context, httptransport *http.Transport, remoteuser, uid, gid, urlpath string, stream io.ReadCloser, length int64) error {

log := appctx.GetLogger(ctx)
log.Info().Str("func", "PUTFile").Str("remoteuser", remoteuser).Str("uid,gid", uid+","+gid).Str("path", urlpath).Int64("length", length).Msg("")
Expand Down Expand Up @@ -408,7 +417,7 @@ func (c *Client) PUTFile(ctx context.Context, remoteuser, uid, gid, urlpath stri
}

c.cl = &http.Client{
Transport: httpTransport}
Transport: httptransport}

req, err = http.NewRequestWithContext(ctx, "PUT", loc.String(), stream)
if err != nil {
Expand Down
38 changes: 31 additions & 7 deletions pkg/eosclient/eosgrpc/eosgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path"
Expand Down Expand Up @@ -102,6 +103,18 @@ type Options struct {
// SecProtocol is the comma separated list of security protocols used by xrootd.
// For example: "sss, unix"
SecProtocol string

// HTTP connections to EOS: max number of idle conns
MaxIdleConns int

// HTTP connections to EOS: max number of conns per host
MaxConnsPerHost int

// HTTP connections to EOS: max number of idle conns per host
MaxIdleConnsPerHost int

// HTTP connections to EOS: idle conections TTL
IdleConnTimeout int
}

func (opt *Options) init() {
Expand All @@ -123,20 +136,22 @@ func (opt *Options) init() {
// Client performs actions against a EOS management node (MGM)
// using the EOS GRPC interface.
type Client struct {
opt *Options
cl erpc.EosClient
opt *Options
httptransport *http.Transport
cl erpc.EosClient
}

// GetHTTPCl creates an http client for immediate usage, using the already instantiated resources
func (c *Client) GetHTTPCl() *ehttp.Client {
var htopts ehttp.Options

if htopts.Init() != nil {
t, err := htopts.Init()
if err != nil {
panic("Cant't init the EOS http client options")
}
htopts.BaseURL = c.opt.URL

return ehttp.New(&htopts)
return ehttp.New(&htopts, t)
}

// Create and connect a grpc eos Client
Expand Down Expand Up @@ -177,6 +192,15 @@ func New(opt *Options) *Client {
c := new(Client)
c.opt = opt

var htopts ehttp.Options

t, err := htopts.Init()
if err != nil {
panic("Cant't init the EOS http client options")
}
c.httptransport = t
htopts.BaseURL = c.opt.URL

tctx := appctx.WithLogger(context.Background(), &tlog)
ccl, err := newgrpc(tctx, opt)
if err != nil {
Expand Down Expand Up @@ -1176,7 +1200,7 @@ func (c *Client) Read(ctx context.Context, uid, gid, path string) (io.ReadCloser
}
}

bodystream, err := c.GetHTTPCl().GETFile(ctx, "", uid, gid, path, localfile)
bodystream, err := c.GetHTTPCl().GETFile(ctx, c.httptransport, "", uid, gid, path, localfile)
if err != nil {
log.Error().Str("func", "Read").Str("path", path).Str("uid,gid", uid+","+gid).Str("err", err.Error()).Msg("")
return nil, errtypes.InternalError(fmt.Sprintf("can't GET local cache file '%s'", localTarget))
Expand Down Expand Up @@ -1216,10 +1240,10 @@ func (c *Client) Write(ctx context.Context, uid, gid, path string, stream io.Rea
defer wfd.Close()
defer os.RemoveAll(fd.Name())

return c.GetHTTPCl().PUTFile(ctx, "", uid, gid, path, wfd, length)
return c.GetHTTPCl().PUTFile(ctx, c.httptransport, "", uid, gid, path, wfd, length)
}

return c.GetHTTPCl().PUTFile(ctx, "", uid, gid, path, stream, length)
return c.GetHTTPCl().PUTFile(ctx, c.httptransport, "", uid, gid, path, stream, length)

// return c.GetHttpCl().PUTFile(ctx, remoteuser, uid, gid, urlpathng, stream)
// return c.WriteFile(ctx, uid, gid, path, fd.Name())
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/utils/eosfs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,16 @@ type Config struct {
// Beware: in pure streaming mode the FST must support
// the HTTP chunked encoding
WriteUsesLocalTemp bool `mapstructure:"write_uses_local_temp"`

// HTTP connections to EOS: max number of idle conns
MaxIdleConns int `mapstructure:"max_idle_conns"`

// HTTP connections to EOS: max number of conns per host
MaxConnsPerHost int `mapstructure:"max_conns_per_host"`

// HTTP connections to EOS: max number of idle conns per host
MaxIdleConnsPerHost int `mapstructure:"max_idle_conns_per_host"`

// HTTP connections to EOS: idle conections TTL
IdleConnTimeout int `mapstructure:"idle_conn_timeout"`
}
26 changes: 15 additions & 11 deletions pkg/storage/utils/eosfs/eosfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,17 +153,21 @@ func NewEOSFS(c *Config) (storage.FS, error) {
var eosClient eosclient.EOSClient
if c.UseGRPC {
eosClientOpts := &eosgrpc.Options{
XrdcopyBinary: c.XrdcopyBinary,
URL: c.MasterURL,
GrpcURI: c.GrpcURI,
CacheDirectory: c.CacheDirectory,
UseKeytab: c.UseKeytab,
Keytab: c.Keytab,
Authkey: c.GRPCAuthkey,
SecProtocol: c.SecProtocol,
VersionInvariant: c.VersionInvariant,
ReadUsesLocalTemp: c.ReadUsesLocalTemp,
WriteUsesLocalTemp: c.WriteUsesLocalTemp,
XrdcopyBinary: c.XrdcopyBinary,
URL: c.MasterURL,
GrpcURI: c.GrpcURI,
CacheDirectory: c.CacheDirectory,
UseKeytab: c.UseKeytab,
Keytab: c.Keytab,
Authkey: c.GRPCAuthkey,
SecProtocol: c.SecProtocol,
VersionInvariant: c.VersionInvariant,
ReadUsesLocalTemp: c.ReadUsesLocalTemp,
WriteUsesLocalTemp: c.WriteUsesLocalTemp,
MaxIdleConns: c.MaxIdleConns,
MaxConnsPerHost: c.MaxConnsPerHost,
MaxIdleConnsPerHost: c.MaxIdleConnsPerHost,
IdleConnTimeout: c.IdleConnTimeout,
}
eosClient = eosgrpc.New(eosClientOpts)
} else {
Expand Down

0 comments on commit 338598b

Please sign in to comment.