Skip to content

Commit

Permalink
object/gluster: support multiple gluster clients (#4003)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandyXSD authored Aug 16, 2023
1 parent acabf98 commit cb8622f
Showing 1 changed file with 52 additions and 24 deletions.
76 changes: 52 additions & 24 deletions pkg/object/gluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/juicedata/gogfapi/gfapi"
Expand All @@ -38,15 +40,24 @@ import (
type gluster struct {
DefaultObjectStorage
name string
vol *gfapi.Volume
indx uint64
vols []*gfapi.Volume
}

func (c *gluster) String() string {
return fmt.Sprintf("gluster://%s/", c.name)
}

func (d *gluster) vol() *gfapi.Volume {
if len(d.vols) == 1 {
return d.vols[0]
}
n := atomic.AddUint64(&d.indx, 1)
return d.vols[n%uint64(len(d.vols))]
}

func (c *gluster) Head(key string) (Object, error) {
fi, err := c.vol.Stat(key)
fi, err := c.vol().Stat(key)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -75,7 +86,7 @@ func (d *gluster) toFile(key string, fi fs.FileInfo, isSymlink bool) *file {
}

func (c *gluster) Get(key string, off, limit int64) (io.ReadCloser, error) {
f, err := c.vol.Open(key)
f, err := c.vol().Open(key)
if err != nil {
return nil, err
}
Expand All @@ -100,22 +111,23 @@ func (c *gluster) Get(key string, off, limit int64) (io.ReadCloser, error) {
}

func (c *gluster) Put(key string, in io.Reader) error {
v := c.vol()
if strings.HasSuffix(key, dirSuffix) {
return c.vol.MkdirAll(key, os.FileMode(0777))
return v.MkdirAll(key, os.FileMode(0777))
}
f, err := c.vol.OpenFile(key, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
f, err := v.OpenFile(key, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil && os.IsNotExist(err) {
if err := c.vol.MkdirAll(filepath.Dir(key), os.FileMode(0777)); err != nil {
if err := v.MkdirAll(filepath.Dir(key), os.FileMode(0777)); err != nil {
return err
}
f, err = c.vol.OpenFile(key, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
f, err = v.OpenFile(key, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
}
if err != nil {
return err
}
defer func() {
if err != nil {
_ = c.vol.Unlink(key)
_ = v.Unlink(key)
}
}()

Expand All @@ -135,9 +147,10 @@ func (c *gluster) Put(key string, in io.Reader) error {
}

func (c *gluster) Delete(key string) error {
err := c.vol.Unlink(key)
v := c.vol()
err := v.Unlink(key)
if err != nil && strings.Contains(err.Error(), "is a directory") {
err = c.vol.Rmdir(key)
err = v.Rmdir(key)
}
if os.IsNotExist(err) {
err = nil
Expand All @@ -148,7 +161,8 @@ func (c *gluster) Delete(key string) error {
// readDirSorted reads the directory named by dirname and returns
// a sorted list of directory entries.
func (d *gluster) readDirSorted(dirname string, followLink bool) ([]*mEntry, error) {
f, err := d.vol.Open(dirname)
v := d.vol()
f, err := v.Open(dirname)
if err != nil {
return nil, err
}
Expand All @@ -167,7 +181,7 @@ func (d *gluster) readDirSorted(dirname string, followLink bool) ([]*mEntry, err
if e.IsDir() {
mEntries = append(mEntries, &mEntry{nil, name + dirSuffix, e, false})
} else if !e.Mode().IsRegular() && followLink {
fi, err := d.vol.Stat(filepath.Join(dirname, name))
fi, err := v.Stat(filepath.Join(dirname, name))
if err != nil {
mEntries = append(mEntries, &mEntry{nil, name, e, true})
continue
Expand Down Expand Up @@ -240,7 +254,7 @@ func (d *gluster) Chtimes(path string, mtime time.Time) error {
}

func (d *gluster) Chmod(path string, mode os.FileMode) error {
return d.vol.Chmod(path, mode)
return d.vol().Chmod(path, mode)
}

func (d *gluster) Chown(path string, owner, group string) error {
Expand All @@ -260,20 +274,34 @@ func newGluster(endpoint, ak, sk, token string) (ObjectStorage, error) {
return nil, fmt.Errorf("no volume provided")
}
name := ps[1]
v := &gfapi.Volume{}
// TODO: support port in host
err = v.Init(name, strings.Split(uri.Host, ",")...)
if err != nil {
return nil, fmt.Errorf("init %s: %s", name, err)
var size int
if ssize := os.Getenv("JFS_NUM_GLUSTER_CLIENTS"); ssize != "" {
size, _ = strconv.Atoi(ssize)
if size > 8 {
size = 8
}
}
err = v.Mount()
if err != nil {
return nil, fmt.Errorf("mount %s: %s", name, err)
if size < 1 {
size = 1
}
return &gluster{
ostore := gluster{
name: name,
vol: v,
}, nil
vols: make([]*gfapi.Volume, size),
}
for i := range ostore.vols {
v := &gfapi.Volume{}
// TODO: support port in host
err = v.Init(name, strings.Split(uri.Host, ",")...)
if err != nil {
return nil, fmt.Errorf("init %s: %s", name, err)
}
err = v.Mount()
if err != nil {
return nil, fmt.Errorf("mount %s: %s", name, err)
}
ostore.vols[i] = v
}
return &ostore, nil
}

func init() {
Expand Down

0 comments on commit cb8622f

Please sign in to comment.