diff --git a/daemon/mgr/volume.go b/daemon/mgr/volume.go index bc5fd6d51..e38978b3a 100644 --- a/daemon/mgr/volume.go +++ b/daemon/mgr/volume.go @@ -139,6 +139,7 @@ func (vm *VolumeManager) Remove(ctx context.Context, name string) error { if strings.Contains(err.Error(), "not found") { return errors.Wrap(errtypes.ErrNotfound, err.Error()) } + return err } return nil diff --git a/plugins/plugins.go b/plugins/plugins.go index bbe6f7667..19bb056f9 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -2,7 +2,7 @@ package plugins var manager = &pluginManager{ plugins: make(map[string]*Plugin), - pluginSockPaths: []string{"/run/pouch/plugins"}, + pluginSockPaths: []string{"/run/pouch/plugins", "/run/docker/plugins"}, pluginSpecPaths: []string{"/etc/pouch/plugins", "/var/lib/pouch/plugins"}, } diff --git a/storage/volume/core.go b/storage/volume/core.go index 946290c31..7f8548d87 100644 --- a/storage/volume/core.go +++ b/storage/volume/core.go @@ -76,34 +76,75 @@ func (c *Core) GetVolume(id types.VolumeID) (*types.Volume, error) { }, } + ctx := driver.Contexts() + // first, try to get volume from local store. obj, err := c.store.Get(id.Name) if err == nil { - return obj.(*types.Volume), nil + v, ok := obj.(*types.Volume) + if !ok { + return nil, volerr.ErrVolumeNotFound + } + + // get the volume driver. + dv, err := driver.Get(v.Spec.Backend) + if err != nil { + return nil, err + } + + // if the driver implements Getter interface. + if d, ok := dv.(driver.Getter); ok { + curV, err := d.Get(ctx, id.Name) + if err != nil { + return nil, volerr.ErrVolumeNotFound + } + + v.Status.MountPoint = curV.Status.MountPoint + } + + return v, nil } if err != metastore.ErrObjectNotFound { return nil, err } - err = volerr.ErrVolumeNotFound // then, try to get volume from central store. if c.EnableControl { - url, err := c.volumeURL(id) - if err != nil { - return nil, err + if url, err := c.volumeURL(id); err == nil { + if err := client.New().Get(url, v); err == nil { + return v, nil + } } + } + + // at last, scan all drivers + logrus.Debugf("probing all drivers for volume with name(%s)", id.Name) + drivers, err := driver.GetAll() + if err != nil { + return nil, err + } - if err = client.New().Get(url, v); err == nil { - return v, nil + for _, dv := range drivers { + d, ok := dv.(driver.Getter) + if !ok { + continue } - if ce, ok := err.(client.Error); ok && ce.IsNotFound() { - return nil, volerr.ErrVolumeNotFound + v, err := d.Get(ctx, id.Name) + if err != nil { + // not found, ignore it + continue } - return nil, err + + // store volume meta + if err := c.store.Put(v); err != nil { + return nil, err + } + + return v, nil } - return nil, err + return nil, volerr.ErrVolumeNotFound } // ExistVolume return 'true' if volume be found and not errors. @@ -128,60 +169,65 @@ func (c *Core) CreateVolume(id types.VolumeID) error { return volerr.ErrVolumeExisted } - v, err := c.newVolume(id) + v, err := types.NewVolume(id) if err != nil { return errors.Wrapf(err, "Create volume") } - dv, ok := driver.Get(v.Spec.Backend) - if !ok { - return errors.Errorf("Backend driver: %s not found", v.Spec.Backend) - } - - p, err := c.volumePath(v, dv) + dv, err := driver.Get(v.Spec.Backend) if err != nil { return err } - v.SetPath(p) - // check options, then delete invalid options. + // check options, then add some driver-specific options. if err := checkOptions(v); err != nil { return err } // Create volume's meta. ctx := driver.Contexts() + var s *types.Storage - if !dv.StoreMode(ctx).UseLocalMeta() { + if dv.StoreMode(ctx).CentralCreateDelete() { url, err := c.volumeURL() if err != nil { return err } + // before creating, we can't get the path + p, err := c.volumePath(v, dv) + if err != nil { + return err + } + v.SetPath(p) + if err := client.New().Create(url, v); err != nil { return errors.Wrap(err, "Create volume") } - } - // Create volume's store room on local. - var s *types.Storage - if !dv.StoreMode(ctx).IsLocal() { + // get the storage s, err = c.getStorage(v.StorageID()) if err != nil { return err } - } + } else { + if err := dv.Create(ctx, v, nil); err != nil { + return err + } - if !dv.StoreMode(ctx).CentralCreateDelete() { - if err := dv.Create(ctx, v, s); err != nil { + // after creating volume, we can get the path + p, err := c.volumePath(v, dv) + if err != nil { return err } + v.SetPath(p) if err := c.store.Put(v); err != nil { return err } } + // format the volume if f, ok := dv.(driver.Formator); ok { err := f.Format(ctx, v, s) if err == nil { @@ -205,10 +251,10 @@ func (c *Core) CreateVolume(id types.VolumeID) error { // ListVolumes return all volumes. // Param 'labels' use to filter the volumes, only return those you want. func (c *Core) ListVolumes(labels map[string]string) ([]*types.Volume, error) { - var ls = make([]*types.Volume, 0) + var retVolumes = make([]*types.Volume, 0) // first, list local meta store. - list, err := c.store.List() + metaList, err := c.store.List() if err != nil { return nil, err } @@ -222,20 +268,77 @@ func (c *Core) ListVolumes(labels map[string]string) ([]*types.Volume, error) { logrus.Debugf("List volume URL: %s, labels: %s", url, labels) - if err := client.New().ListKeys(url, &ls); err != nil { + if err := client.New().ListKeys(url, &retVolumes); err != nil { return nil, errors.Wrap(err, "List volume's name") } } - for _, obj := range list { + // at last, scan all drivers. + logrus.Debugf("probing all drivers for listing volume") + drivers, err := driver.GetAll() + if err != nil { + return nil, err + } + + ctx := driver.Contexts() + + var realVolumes = map[string]*types.Volume{} + var volumeDrivers = map[string]driver.Driver{} + + for _, dv := range drivers { + volumeDrivers[dv.Name(ctx)] = dv + + d, ok := dv.(driver.Lister) + if !ok { + // not Lister, ignore it. + continue + } + vList, err := d.List(ctx) + if err != nil { + logrus.Warnf("volume driver %s list error: %v", dv.Name(ctx), err) + continue + } + + for _, v := range vList { + realVolumes[v.Name] = v + } + } + + for name, obj := range metaList { v, ok := obj.(*types.Volume) if !ok { - return nil, fmt.Errorf("failed to get volumes in store") + continue + } + + d, ok := volumeDrivers[v.Spec.Backend] + if !ok { + // driver not exist, ignore it + continue + } + + // the local driver and tmpfs driver + if d.StoreMode(ctx).IsLocal() { + retVolumes = append(retVolumes, v) + continue + } + + rv, ok := realVolumes[name] + if !ok { + // real volume not exist, ignore it + continue } - ls = append(ls, v) + v.Status.MountPoint = rv.Status.MountPoint + + delete(realVolumes, name) + + retVolumes = append(retVolumes, v) + } + + for _, v := range realVolumes { + retVolumes = append(retVolumes, v) } - return ls, nil + return retVolumes, nil } // ListVolumeName return the name of all volumes only. @@ -244,7 +347,7 @@ func (c *Core) ListVolumeName(labels map[string]string) ([]string, error) { var names []string // first, list local meta store. - list, err := c.store.List() + metaList, err := c.store.List() if err != nil { return nil, err } @@ -263,7 +366,65 @@ func (c *Core) ListVolumeName(labels map[string]string) ([]string, error) { } } - for name := range list { + // at last, scan all drivers + logrus.Debugf("probing all drivers for listing volume") + drivers, err := driver.GetAll() + if err != nil { + return nil, err + } + + ctx := driver.Contexts() + + var realVolumes = map[string]*types.Volume{} + var volumeDrivers = map[string]driver.Driver{} + + for _, dv := range drivers { + volumeDrivers[dv.Name(ctx)] = dv + + d, ok := dv.(driver.Lister) + if !ok { + continue + } + vList, err := d.List(ctx) + if err != nil { + logrus.Warnf("volume driver %s list error: %v", dv.Name(ctx), err) + continue + } + + for _, v := range vList { + realVolumes[v.Name] = v + } + } + + for name, obj := range metaList { + v, ok := obj.(*types.Volume) + if !ok { + continue + } + + d, ok := volumeDrivers[v.Spec.Backend] + if !ok { + // driver not exist, ignore it + continue + } + + if d.StoreMode(ctx).IsLocal() { + names = append(names, name) + continue + } + + _, ok = realVolumes[name] + if !ok { + // real volume not exist, ignore it + continue + } + + delete(realVolumes, name) + + names = append(names, name) + } + + for name := range realVolumes { names = append(names, name) } @@ -278,11 +439,7 @@ func (c *Core) RemoveVolume(id types.VolumeID) error { } // Call interface to remove meta info. - if dv.StoreMode(driver.Contexts()).UseLocalMeta() { - if err := c.store.Remove(id.Name); err != nil { - return err - } - } else { + if dv.StoreMode(driver.Contexts()).CentralCreateDelete() { url, err := c.volumeURL(id) if err != nil { return errors.Wrap(err, "Remove volume: "+id.String()) @@ -290,19 +447,14 @@ func (c *Core) RemoveVolume(id types.VolumeID) error { if err := client.New().Delete(url, v); err != nil { return errors.Wrap(err, "Remove volume: "+id.String()) } - } - - // Call driver's Remove method to remove the volume. - if !dv.StoreMode(driver.Contexts()).CentralCreateDelete() { - var s *types.Storage - if !dv.StoreMode(driver.Contexts()).UseLocalMeta() { - s, err = c.getStorage(v.StorageID()) - if err != nil { - return err - } + } else { + // Call driver's Remove method to remove the volume. + if err := dv.Remove(driver.Contexts(), v, nil); err != nil { + return err } - if err := dv.Remove(driver.Contexts(), v, s); err != nil { + // delete the meta + if err := c.store.Remove(id.Name); err != nil { return err } } @@ -326,9 +478,9 @@ func (c *Core) GetVolumeDriver(id types.VolumeID) (*types.Volume, driver.Driver, if err != nil { return nil, nil, err } - dv, ok := driver.Get(v.Spec.Backend) - if !ok { - return nil, nil, errors.Errorf("Backend driver: %s not found", v.Spec.Backend) + dv, err := driver.Get(v.Spec.Backend) + if err != nil { + return nil, nil, errors.Errorf("failed to get backend driver %s: %v", v.Spec.Backend, err) } return v, dv, nil } @@ -341,21 +493,25 @@ func (c *Core) AttachVolume(id types.VolumeID, extra map[string]string) (*types. } ctx := driver.Contexts() - var s *types.Storage // merge extra to volume spec extra. for key, value := range extra { v.Spec.Extra[key] = value } - if a, ok := dv.(driver.AttachDetach); ok { - if !dv.StoreMode(ctx).IsLocal() { + if d, ok := dv.(driver.AttachDetach); ok { + var ( + s *types.Storage + err error + ) + + if dv.StoreMode(ctx).CentralCreateDelete() { if s, err = c.getStorage(v.StorageID()); err != nil { return nil, err } } - if err = a.Attach(ctx, v, s); err != nil { + if err = d.Attach(ctx, v, s); err != nil { return nil, err } } @@ -386,7 +542,6 @@ func (c *Core) DetachVolume(id types.VolumeID, extra map[string]string) (*types. } ctx := driver.Contexts() - var s *types.Storage // merge extra to volume spec extra. for key, value := range extra { @@ -395,14 +550,19 @@ func (c *Core) DetachVolume(id types.VolumeID, extra map[string]string) (*types. // if volume has referance, skip to detach volume. ref := v.Option(types.OptionRef) - if a, ok := dv.(driver.AttachDetach); ok && ref == "" { - if !dv.StoreMode(ctx).IsLocal() { + if d, ok := dv.(driver.AttachDetach); ok && ref == "" { + var ( + s *types.Storage + err error + ) + + if dv.StoreMode(ctx).CentralCreateDelete() { if s, err = c.getStorage(v.StorageID()); err != nil { return nil, err } } - if err = a.Detach(ctx, v, s); err != nil { + if err = d.Detach(ctx, v, s); err != nil { return nil, err } } diff --git a/storage/volume/core_util.go b/storage/volume/core_util.go index cfb767a22..318760134 100644 --- a/storage/volume/core_util.go +++ b/storage/volume/core_util.go @@ -2,23 +2,16 @@ package volume import ( "fmt" - "os" "path" - "strconv" "strings" - "time" - "github.com/alibaba/pouch/pkg/bytefmt" "github.com/alibaba/pouch/storage/controlserver/client" "github.com/alibaba/pouch/storage/volume/driver" volerr "github.com/alibaba/pouch/storage/volume/error" "github.com/alibaba/pouch/storage/volume/types" "github.com/alibaba/pouch/storage/volume/types/meta" - "github.com/pborman/uuid" "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" ) func (c *Core) volumePath(v *types.Volume, dv driver.Driver) (string, error) { @@ -110,78 +103,6 @@ func (c *Core) listVolumeNameURL(labels map[string]string) (string, error) { return url, nil } -func (c *Core) newVolume(id types.VolumeID) (*types.Volume, error) { - now := time.Now() - v := &types.Volume{ - ObjectMeta: meta.ObjectMeta{ - Name: id.Name, - Claimer: "pouch", - Namespace: "pouch", - UID: uuid.NewRandom().String(), - Generation: meta.ObjectPhasePreCreate, - Labels: labels.Set{}, - CreationTimestamp: &now, - ModifyTimestamp: &now, - }, - Spec: &types.VolumeSpec{ - Extra: map[string]string{}, - Selector: make(types.Selector, 0), - }, - Status: &types.VolumeStatus{}, - } - - conf, err := buildVolumeConfig(id.Options) - if err != nil { - return nil, err - } - v.Spec.VolumeConfig = conf - - for n, opt := range id.Options { - v.Spec.Extra[n] = opt - } - - for n, selector := range id.Selectors { - requirement := translateSelector(n, strings.ToLower(selector)) - v.Spec.Selector = append(v.Spec.Selector, requirement) - } - - v.Labels = id.Labels - - // initialize default option/label/selector - if id.Driver != "" { - v.Spec.Backend = id.Driver - v.Labels["backend"] = id.Driver - } else { - v.Spec.Backend = c.DefaultBackend - v.Labels["backend"] = c.DefaultBackend - } - - if hostname, err := os.Hostname(); err == nil { - v.Labels["hostname"] = hostname - } - - if _, ok := id.Selectors[selectNamespace]; !ok { - requirement := translateSelector("namespace", commonOptions["namespace"].Value) - v.Spec.Selector = append(v.Spec.Selector, requirement) - } - - if _, ok := v.Spec.Extra["sifter"]; !ok { - v.Spec.Extra["sifter"] = "Default" - } - - return v, nil -} - -func translateSelector(k, v string) types.SelectorRequirement { - values := strings.Split(v, ",") - - return types.SelectorRequirement{ - Key: k, - Operator: selection.In, - Values: values, - } -} - func checkVolume(v *types.Volume) error { if v.Spec.ClusterID == "" || v.Status.Phase == types.VolumePhaseFailed { err := fmt.Errorf("volume is created failed: %s", v.Name) @@ -194,116 +115,20 @@ func checkVolume(v *types.Volume) error { return nil } -func buildVolumeConfig(options map[string]string) (*types.VolumeConfig, error) { - size := "" - config := &types.VolumeConfig{ - FileSystem: defaultFileSystem, - MountOpt: defaultFileSystem, - } - - // Parse size - if s, ok := options[optionSize]; ok { - size = s - } - - if size != "" { - sizeInt, err := bytefmt.ToMegabytes(size) - if err != nil { - return nil, err - } - config.Size = strconv.Itoa(int(sizeInt)) + "M" - } - - // Parse filesystem - if fs, ok := options[optionFS]; ok { - config.FileSystem = fs - delete(options, optionFS) - } - config.MountOpt = strings.Split(config.FileSystem, " ")[0] - - // Parse IO config - if wbps, ok := options[optionWBps]; ok { - v, err := strconv.ParseInt(wbps, 10, 64) - if err != nil { - return nil, err - } - config.WriteBPS = v - - delete(options, optionWBps) - } - - if rbps, ok := options[optionRBps]; ok { - v, err := strconv.ParseInt(rbps, 10, 64) - if err != nil { - return nil, err - } - config.ReadBPS = v - - delete(options, optionRBps) - } - - if iops, ok := options[optionIOps]; ok { - v, err := strconv.ParseInt(iops, 10, 64) - if err != nil { - return nil, err - } - config.TotalIOPS = v - delete(options, optionIOps) - } - - if wiops, ok := options[optionWriteIOps]; ok { - v, err := strconv.ParseInt(wiops, 10, 64) - if err != nil { - return nil, err - } - config.WriteIOPS = v - delete(options, optionWriteIOps) - } - - if riops, ok := options[optionReadIOps]; ok { - v, err := strconv.ParseInt(riops, 10, 64) - if err != nil { - return nil, err - } - config.ReadIOPS = v - delete(options, optionReadIOps) - } - - return config, nil -} - func checkOptions(v *types.Volume) error { var ( - deleteOpts []string driverOpts map[string]types.Option ) - dv, ok := driver.Get(v.Spec.Backend) - if !ok { - return errors.Errorf("Backend driver: %s not found", v.Spec.Backend) + dv, err := driver.Get(v.Spec.Backend) + if err != nil { + return errors.Errorf("failed to get backend driver %s: %v", v.Spec.Backend, err) } if opt, ok := dv.(driver.Opt); ok { driverOpts = opt.Options() } - // check extra options is invalid or not. - for name := range v.Spec.Extra { - if _, ok := commonOptions[name]; ok { - continue - } - if driverOpts != nil { - if _, ok := driverOpts[name]; ok { - continue - } - } - deleteOpts = append(deleteOpts, name) - } - for _, d := range deleteOpts { - delete(v.Spec.Extra, d) - } - - // set driver options into extra map. if driverOpts != nil { for name, opt := range driverOpts { if _, ok := v.Spec.Extra[name]; !ok { diff --git a/storage/volume/driver/driver.go b/storage/volume/driver/driver.go index 3dbc3b335..953f3f4ee 100644 --- a/storage/volume/driver/driver.go +++ b/storage/volume/driver/driver.go @@ -1,8 +1,11 @@ package driver import ( + "fmt" "regexp" + "sync" + "github.com/alibaba/pouch/plugins" "github.com/alibaba/pouch/storage/volume/types" "github.com/pkg/errors" @@ -13,6 +16,8 @@ const ( driverNameRegexp = "^[a-zA-Z0-9].*$" // Option's name can only contain these characters, a-z 0-9 - _. optionNameRegexp = "^[a-z0-9-_].*$" + // volumePluginType is the plugin which implements volume driver + volumePluginType = "VolumeDriver" ) const ( @@ -82,44 +87,88 @@ func (m VolumeStoreMode) UseLocalMeta() bool { return (m & UseLocalMetaStore) != 0 } -// -type driverTable map[string]Driver - -// Add is used to add driver into driver table. -func (t driverTable) Add(name string, d Driver) { - t[name] = d +// driverTable contains all volume drivers +type driverTable struct { + sync.Mutex + drivers map[string]Driver } -// Del is used to delete driver from driver table. -func (t driverTable) Del(name string) { - delete(t, name) +// Get is used to get driver from driver table. +func (t *driverTable) Get(name string) (Driver, error) { + t.Lock() + v, ok := t.drivers[name] + if ok { + t.Unlock() + return v, nil + } + t.Unlock() + + plugin, err := plugins.Get(volumePluginType, name) + if err != nil { + return nil, fmt.Errorf("%s driver not found: %v", name, err) + } + + driver := NewRemoteDriverWrapper(name, plugin) + + t.Lock() + defer t.Unlock() + + v, ok = t.drivers[name] + if !ok { + v = driver + t.drivers[name] = v + } + + return v, nil } -// Get is used to get driver from driver table. -func (t driverTable) Get(name string) (Driver, bool) { - v, ok := t[name] - return v, ok +// GetAll will list all volume drivers. +func (t *driverTable) GetAll() ([]Driver, error) { + pluginList, err := plugins.GetAll(volumePluginType) + if err != nil { + return nil, fmt.Errorf("error listing plugins: %v", err) + } + + var driverList []Driver + + t.Lock() + defer t.Unlock() + + for _, d := range t.drivers { + driverList = append(driverList, d) + } + + for _, p := range pluginList { + d, ok := t.drivers[p.Name] + if ok { + // the driver has existed, ignore it. + continue + } + + d = NewRemoteDriverWrapper(p.Name, p) + + t.drivers[p.Name] = d + driverList = append(driverList, d) + } + + return driverList, nil } -var backendDrivers driverTable +var backendDrivers = &driverTable{ + drivers: make(map[string]Driver), +} // Register add a backend driver module. func Register(d Driver) error { - if backendDrivers == nil { - backendDrivers = make(driverTable) - } ctx := Contexts() + driverName := d.Name(ctx) - matched, err := regexp.MatchString(driverNameRegexp, d.Name(ctx)) + matched, err := regexp.MatchString(driverNameRegexp, driverName) if err != nil { return err } if !matched { - return errors.Errorf("Invalid driver name: %s, not match: %s", d.Name(ctx), driverNameRegexp) - } - - if _, ok := backendDrivers.Get(d.Name(ctx)); ok { - return errors.Errorf("Backend driver's name \"%s\" duplicate", d.Name(ctx)) + return errors.Errorf("Invalid driver name: %s, not match: %s", driverName, driverNameRegexp) } if !d.StoreMode(ctx).Valid() { @@ -138,43 +187,54 @@ func Register(d Driver) error { } } - backendDrivers.Add(d.Name(ctx), d) + backendDrivers.Lock() + defer backendDrivers.Unlock() + + if _, ok := backendDrivers.drivers[driverName]; ok { + return errors.Errorf("backend driver's name \"%s\" duplicate", driverName) + } + + backendDrivers.drivers[driverName] = d return nil } -// Get return one backend driver with specified name. -func Get(name string) (Driver, bool) { +// Get returns one backend driver with specified name. +func Get(name string) (Driver, error) { return backendDrivers.Get(name) } -// Exist return true if the backend driver is registered. -func Exist(name string) bool { - _, ok := Get(name) - return ok +// GetAll returns all volume drivers. +func GetAll() ([]Driver, error) { + return backendDrivers.GetAll() } -// List return all backend drivers. -func List() []Driver { - drivers := make([]Driver, 0, 5) - for _, d := range backendDrivers { - drivers = append(drivers, d) +// Exist return true if the backend driver is registered. +func Exist(name string) bool { + _, err := Get(name) + if err != nil { + return false } - return drivers + + return true } -// AllDriversName return all backend driver's name. +// AllDriversName return all registered backend driver's name. func AllDriversName() []string { - names := make([]string, 0, 5) - for n := range backendDrivers { + backendDrivers.Lock() + defer backendDrivers.Unlock() + + var names []string + for n := range backendDrivers.drivers { names = append(names, n) } + return names } // ListDriverOption return backend driver's options by name. func ListDriverOption(name string) map[string]types.Option { - dv, ok := Get(name) - if !ok { + dv, err := Get(name) + if err != nil { return nil } if opt, ok := dv.(Opt); ok { @@ -185,11 +245,6 @@ func ListDriverOption(name string) map[string]types.Option { // Alias is used to add driver name's alias into exist driver. func Alias(name, alias string) error { - d, exist := backendDrivers.Get(name) - if !exist { - return errors.Errorf("volume driver: %s is not exist", name) - } - matched, err := regexp.MatchString(driverNameRegexp, alias) if err != nil { return err @@ -198,7 +253,22 @@ func Alias(name, alias string) error { return errors.Errorf("Invalid driver name: %s, not match: %s", name, driverNameRegexp) } - backendDrivers.Add(alias, d) + backendDrivers.Lock() + defer backendDrivers.Unlock() + + // check whether the driver exists + d, ok := backendDrivers.drivers[name] + if !ok { + return errors.Errorf("volume driver: %s is not exist", name) + } + + // alias should not exist + _, ok = backendDrivers.drivers[alias] + if ok { + return errors.Errorf("Invalid volume alias: %s, duplicate", name) + } + + backendDrivers.drivers[alias] = d return nil } diff --git a/storage/volume/driver/driver_interface.go b/storage/volume/driver/driver_interface.go index fe4cc0ed5..e6b95346f 100644 --- a/storage/volume/driver/driver_interface.go +++ b/storage/volume/driver/driver_interface.go @@ -43,6 +43,18 @@ type Formator interface { Format(Context, *types.Volume, *types.Storage) error } +// Getter represents volume get interface. +type Getter interface { + // Get a volume from driver + Get(Context, string) (*types.Volume, error) +} + +// Lister represents volume list interface +type Lister interface { + // List a volume from driver + List(Context) ([]*types.Volume, error) +} + // GatewayDriver represents storage gateway interface. type GatewayDriver interface { // Report storage cluster status. diff --git a/storage/volume/driver/proxy.go b/storage/volume/driver/proxy.go new file mode 100644 index 000000000..28de52b10 --- /dev/null +++ b/storage/volume/driver/proxy.go @@ -0,0 +1,189 @@ +package driver + +import ( + "errors" + + "github.com/alibaba/pouch/plugins" +) + +// the following const variables is the protocol of remote volume driver. +const ( + // remoteVolumeCreateService is the service path of creating volume. + remoteVolumeCreateService = "/VolumeDriver.Create" + + // remoteVolumeRemoveService is the service path of removing volume. + remoteVolumeRemoveService = "/VolumeDriver.Remove" + + // remoteVolumeMountService is the service path of mounting volume. + remoteVolumeMountService = "/VolumeDriver.Mount" + + // remoteVolumeUnmountService is the service path of unmounting volume. + remoteVolumeUnmountService = "/VolumeDriver.Unmount" + + // remoteVolumePathService is the service path of getting volume's mountpath. + remoteVolumePathService = "/VolumeDriver.Path" + + // remoteVolumeGetService is the service path of getting volume. + remoteVolumeGetService = "/VolumeDriver.Get" + + // remoteVolumeListService is the service path of listing all volumes. + remoteVolumeListService = "/VolumeDriver.List" + + // remoteVolumeListService is the service path of getting the volume driver' capabilities. + remoteVolumeCapabilitiesService = "/VolumeDriver.Capabilities" +) + +// remoteDriverProxy is a remote driver proxy. +type remoteDriverProxy struct { + Name string + client *plugins.PluginClient +} + +// Create creates a volume. +func (proxy *remoteDriverProxy) Create(name string, opts map[string]string) error { + var req = remoteVolumeCreateReq{ + Name: name, + Opts: opts, + } + + var resp remoteVolumeCreateResp + + if err := proxy.client.CallService(remoteVolumeCreateService, &req, &resp, true); err != nil { + return err + } + + if resp.Err != "" { + return errors.New(resp.Err) + } + + return nil +} + +// Remove deletes a volume. +func (proxy *remoteDriverProxy) Remove(name string) error { + var req = remoteVolumeRemoveReq{ + Name: name, + } + + var resp remoteVolumeCreateResp + + if err := proxy.client.CallService(remoteVolumeRemoveService, &req, &resp, true); err != nil { + return err + } + + if resp.Err != "" { + return errors.New(resp.Err) + } + + return nil +} + +// Mount mounts a volume. +func (proxy *remoteDriverProxy) Mount(name, id string) (string, error) { + var req = remoteVolumeMountReq{ + Name: name, + ID: id, + } + + var resp remoteVolumeMountResp + + if err := proxy.client.CallService(remoteVolumeMountService, &req, &resp, true); err != nil { + return "", err + } + + if resp.Err != "" { + return "", errors.New(resp.Err) + } + + return resp.Mountpoint, nil +} + +/// Umount unmounts a volume. +func (proxy *remoteDriverProxy) Unmount(name, id string) error { + var req = remoteVolumeUnmountReq{ + Name: name, + ID: id, + } + + var resp remoteVolumeUnmountResp + + if err := proxy.client.CallService(remoteVolumeUnmountService, &req, &resp, true); err != nil { + return err + } + + if resp.Err != "" { + return errors.New(resp.Err) + } + + return nil +} + +// Path returns the mount path. +func (proxy *remoteDriverProxy) Path(name string) (string, error) { + var req = remoteVolumePathReq{ + Name: name, + } + + var resp remoteVolumePathResp + + if err := proxy.client.CallService(remoteVolumePathService, &req, &resp, true); err != nil { + return "", err + } + + if resp.Err != "" { + return "", errors.New(resp.Err) + } + + return resp.Mountpoint, nil +} + +// Get returns the remote volume. +func (proxy *remoteDriverProxy) Get(name string) (*remoteVolumeDesc, error) { + var req = remoteVolumeGetReq{ + Name: name, + } + + var resp remoteVolumeGetResp + + if err := proxy.client.CallService(remoteVolumeGetService, &req, &resp, true); err != nil { + return nil, err + } + + if resp.Err != "" { + return nil, errors.New(resp.Err) + } + + return resp.Volume, nil +} + +// List returns all remote volumes. +func (proxy *remoteDriverProxy) List() ([]*remoteVolumeDesc, error) { + var req remoteVolumeListReq + var resp remoteVolumeListResp + + if err := proxy.client.CallService(remoteVolumeListService, &req, &resp, true); err != nil { + return nil, err + } + + if resp.Err != "" { + return nil, errors.New(resp.Err) + } + + return resp.Volumes, nil +} + +// Capabilities returns the driver capabilities. +func (proxy *remoteDriverProxy) Capabilities() (*remoteVolumeCapability, error) { + var req remoteVolumeCapabilitiesReq + var resp remoteVolumeCapabilitiesResp + + if err := proxy.client.CallService(remoteVolumeCapabilitiesService, &req, &resp, true); err != nil { + return nil, err + } + + if resp.Err != "" { + return nil, errors.New(resp.Err) + } + + return resp.Capabilities, nil +} diff --git a/storage/volume/driver/remote.go b/storage/volume/driver/remote.go new file mode 100644 index 000000000..4834d67b5 --- /dev/null +++ b/storage/volume/driver/remote.go @@ -0,0 +1,138 @@ +package driver + +import ( + "github.com/alibaba/pouch/plugins" + "github.com/alibaba/pouch/storage/volume/types" +) + +// remoteDriverWrapper represents a volume driver. +type remoteDriverWrapper struct { + driverName string + proxy *remoteDriverProxy +} + +// NewRemoteDriverWrapper returns a remote driver +func NewRemoteDriverWrapper(name string, plugin *plugins.Plugin) Driver { + return &remoteDriverWrapper{ + driverName: name, + proxy: &remoteDriverProxy{ + Name: name, + client: plugin.Client(), + }, + } +} + +// Name returns the volume driver's name. +func (r *remoteDriverWrapper) Name(ctx Context) string { + return r.driverName +} + +// StoreMode returns the volume driver's store mode. +func (r *remoteDriverWrapper) StoreMode(ctx Context) VolumeStoreMode { + return RemoteStore | UseLocalMetaStore +} + +// Create a remote volume. +func (r *remoteDriverWrapper) Create(ctx Context, v *types.Volume, s *types.Storage) error { + ctx.Log.Debugf("driver wrapper [%s] creates volume: %s", r.Name(ctx), v.Name) + + options := types.ExtractOptionsFromVolume(v) + + ctx.Log.Debugf("driver wrapper gets options: %v", options) + + return r.proxy.Create(v.Name, options) +} + +// Remove a remote volume. +func (r *remoteDriverWrapper) Remove(ctx Context, v *types.Volume, s *types.Storage) error { + ctx.Log.Debugf("driver wrapper [%s] removes volume: %s", r.Name(ctx), v.Name) + + return r.proxy.Remove(v.Name) +} + +// Get a volume from remote driver. +func (r *remoteDriverWrapper) Get(ctx Context, name string) (*types.Volume, error) { + ctx.Log.Debugf("driver wrapper [%s] gets volume: %s", r.Name(ctx), name) + + rv, err := r.proxy.Get(name) + if err != nil { + return nil, err + } + + id := types.NewVolumeID(name, r.Name(ctx)) + + volume, err := types.NewVolume(id) + if err != nil { + return nil, err + } + + // set the mountpoint + volume.Status.MountPoint = rv.Mountpoint + + return volume, nil +} + +// List all volumes from remote driver. +func (r *remoteDriverWrapper) List(ctx Context) ([]*types.Volume, error) { + ctx.Log.Debugf("driver wrapper [%s] list all volumes", r.Name(ctx)) + + rvList, err := r.proxy.List() + if err != nil { + return nil, err + } + + var vList []*types.Volume + + for _, rv := range rvList { + id := types.NewVolumeID(rv.Name, r.Name(ctx)) + + volume, err := types.NewVolume(id) + if err != nil { + continue + } + + // set the mountpoint + volume.Status.MountPoint = rv.Mountpoint + + vList = append(vList, volume) + } + + return vList, nil + +} + +// Path returns remote volume mount path. +func (r *remoteDriverWrapper) Path(ctx Context, v *types.Volume) (string, error) { + ctx.Log.Debugf("driver wrapper [%s] gets volume [%s] mount path", r.Name(ctx), v.Name) + + // Get the mount path from remote plugin + mountPath, err := r.proxy.Path(v.Name) + if err != nil { + return "", err + } + return mountPath, nil +} + +// Options returns remote volume options. +func (r *remoteDriverWrapper) Options() map[string]types.Option { + return map[string]types.Option{} +} + +// Attach a remote volume. +func (r *remoteDriverWrapper) Attach(ctx Context, v *types.Volume, s *types.Storage) error { + ctx.Log.Debugf("driver wrapper [%s] attach volume: %s", r.Name(ctx), v.Name) + + _, err := r.proxy.Mount(v.Name, v.UID) + if err != nil { + return err + } + + return nil +} + +// Detach a remote volume. +func (r *remoteDriverWrapper) Detach(ctx Context, v *types.Volume, s *types.Storage) error { + ctx.Log.Debugf("driver wrapper [%s] detach volume: %s", r.Name(ctx), v.Name) + + return r.proxy.Unmount(v.Name, v.UID) +} diff --git a/storage/volume/driver/types.go b/storage/volume/driver/types.go new file mode 100644 index 000000000..04b57611c --- /dev/null +++ b/storage/volume/driver/types.go @@ -0,0 +1,81 @@ +package driver + +type remoteVolumeCreateReq struct { + Name string `json:"Name"` + Opts map[string]string `json:"Opts"` +} + +type remoteVolumeCreateResp struct { + Err string `json:"Err"` +} + +type remoteVolumeRemoveReq struct { + Name string `json:"Name"` +} + +type remoteVolumeRemoveResp struct { + Err string `json:"Err"` +} + +type remoteVolumeMountReq struct { + Name string `json:"Name"` + ID string `json:"ID"` +} + +type remoteVolumeMountResp struct { + Mountpoint string `json:"Mountpoint"` + Err string `json:"Err"` +} + +type remoteVolumePathReq struct { + Name string `json:"Name"` +} + +type remoteVolumePathResp struct { + Mountpoint string `json:"Mountpoint"` + Err string `json:"Err"` +} + +type remoteVolumeUnmountReq struct { + Name string `json:"Name"` + ID string `json:"ID"` +} + +type remoteVolumeUnmountResp struct { + Err string `json:"Err"` +} + +type remoteVolumeGetReq struct { + Name string `json:"Name"` +} + +type remoteVolumeDesc struct { + Name string `json:"Name"` + Mountpoint string `json:"Mountpoint"` + Status map[string]interface{} `json:"Status"` +} + +type remoteVolumeCapability struct { + Scope string `json:"Scope"` +} + +type remoteVolumeGetResp struct { + Volume *remoteVolumeDesc `json:"Volume"` + Err string `json:"Err"` +} + +type remoteVolumeListReq struct { +} + +type remoteVolumeListResp struct { + Volumes []*remoteVolumeDesc `json:"Volumes"` + Err string `json:"Err"` +} + +type remoteVolumeCapabilitiesReq struct { +} + +type remoteVolumeCapabilitiesResp struct { + Capabilities *remoteVolumeCapability `json:"Capabilities"` + Err string `json:"Err"` +} diff --git a/storage/volume/modules/local/local.go b/storage/volume/modules/local/local.go index 83c3dcaa8..1fe121fc7 100644 --- a/storage/volume/modules/local/local.go +++ b/storage/volume/modules/local/local.go @@ -42,7 +42,7 @@ func (p *Local) StoreMode(ctx driver.Context) driver.VolumeStoreMode { // Create a local volume. func (p *Local) Create(ctx driver.Context, v *types.Volume, s *types.Storage) error { ctx.Log.Debugf("Local create volume: %s", v.Name) - mountPath := v.Path() + mountPath, _ := p.Path(ctx, v) if st, exist := os.Stat(mountPath); exist != nil { if e := os.MkdirAll(mountPath, 0755); e != nil { diff --git a/storage/volume/vars.go b/storage/volume/types/vars.go similarity index 68% rename from storage/volume/vars.go rename to storage/volume/types/vars.go index ee976a00a..17087cd68 100644 --- a/storage/volume/vars.go +++ b/storage/volume/types/vars.go @@ -1,26 +1,22 @@ -package volume - -import ( - "github.com/alibaba/pouch/storage/volume/types" -) +package types const ( defaultSize = "100G" defaultFileSystem = "ext4" defaultMountpoint = "/mnt" optionPath = "mount" - optionSize = "size" + optionSize = "opt.size" optionSifter = "sifter" - optionFS = "fs" - optionWBps = "wbps" - optionRBps = "rbps" - optionIOps = "iops" - optionReadIOps = "riops" - optionWriteIOps = "wiops" + optionFS = "opt.fs" + optionWBps = "opt.wbps" + optionRBps = "opt.rbps" + optionIOps = "opt.iops" + optionReadIOps = "opt.riops" + optionWriteIOps = "opt.wiops" selectNamespace = "namespace" ) -var commonOptions = map[string]types.Option{ +var commonOptions = map[string]Option{ "size": {Value: "", Desc: "volume size"}, "backend": {Value: "", Desc: "volume backend"}, "sifter": {Value: "", Desc: "volume scheduler sifter"}, @@ -34,6 +30,6 @@ var commonOptions = map[string]types.Option{ } // ListCommonOptions returns common options. -func ListCommonOptions() map[string]types.Option { +func ListCommonOptions() map[string]Option { return commonOptions } diff --git a/storage/volume/types/volume.go b/storage/volume/types/volume.go index fcd95d5af..ef3147f7d 100644 --- a/storage/volume/types/volume.go +++ b/storage/volume/types/volume.go @@ -212,7 +212,13 @@ type VolumeID struct { // NewVolumeID returns VolumeID instance. func NewVolumeID(name, driver string) VolumeID { - return VolumeID{Name: name, Driver: driver} + return VolumeID{ + Name: name, + Driver: driver, + Options: map[string]string{}, + Labels: map[string]string{}, + Selectors: map[string]string{}, + } } // Equal check VolumeID is equal or not. diff --git a/storage/volume/types/volume_util.go b/storage/volume/types/volume_util.go new file mode 100644 index 000000000..0e3929979 --- /dev/null +++ b/storage/volume/types/volume_util.go @@ -0,0 +1,229 @@ +package types + +import ( + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/alibaba/pouch/pkg/bytefmt" + "github.com/alibaba/pouch/storage/volume/types/meta" + + "github.com/pborman/uuid" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" +) + +func translateSelector(k, v string) SelectorRequirement { + values := strings.Split(v, ",") + + return SelectorRequirement{ + Key: k, + Operator: selection.In, + Values: values, + } +} + +func buildVolumeConfig(options map[string]string) (*VolumeConfig, error) { + size := "" + config := &VolumeConfig{ + FileSystem: defaultFileSystem, + MountOpt: defaultFileSystem, + } + + // Parse size + if s, ok := options[optionSize]; ok { + size = s + } + + if size != "" { + sizeInt, err := bytefmt.ToMegabytes(size) + if err != nil { + return nil, err + } + config.Size = strconv.Itoa(int(sizeInt)) + "M" + + delete(options, optionSize) + } + + // Parse filesystem + if fs, ok := options[optionFS]; ok { + config.FileSystem = fs + delete(options, optionFS) + } + config.MountOpt = strings.Split(config.FileSystem, " ")[0] + + // Parse IO config + if wbps, ok := options[optionWBps]; ok { + v, err := strconv.ParseInt(wbps, 10, 64) + if err != nil { + return nil, err + } + config.WriteBPS = v + + delete(options, optionWBps) + } + + if rbps, ok := options[optionRBps]; ok { + v, err := strconv.ParseInt(rbps, 10, 64) + if err != nil { + return nil, err + } + config.ReadBPS = v + + delete(options, optionRBps) + } + + if iops, ok := options[optionIOps]; ok { + v, err := strconv.ParseInt(iops, 10, 64) + if err != nil { + return nil, err + } + config.TotalIOPS = v + delete(options, optionIOps) + } + + if wiops, ok := options[optionWriteIOps]; ok { + v, err := strconv.ParseInt(wiops, 10, 64) + if err != nil { + return nil, err + } + config.WriteIOPS = v + delete(options, optionWriteIOps) + } + + if riops, ok := options[optionReadIOps]; ok { + v, err := strconv.ParseInt(riops, 10, 64) + if err != nil { + return nil, err + } + config.ReadIOPS = v + delete(options, optionReadIOps) + } + + return config, nil +} + +// NewVolume generates a volume based VolumeID +func NewVolume(id VolumeID) (*Volume, error) { + now := time.Now() + v := &Volume{ + ObjectMeta: meta.ObjectMeta{ + Name: id.Name, + Claimer: "pouch", + Namespace: "pouch", + UID: uuid.NewRandom().String(), + Generation: meta.ObjectPhasePreCreate, + Labels: labels.Set{}, + CreationTimestamp: &now, + ModifyTimestamp: &now, + }, + Spec: &VolumeSpec{ + Extra: map[string]string{}, + Selector: make(Selector, 0), + }, + Status: &VolumeStatus{}, + } + + conf, err := buildVolumeConfig(id.Options) + if err != nil { + return nil, err + } + v.Spec.VolumeConfig = conf + + for n, opt := range id.Options { + v.Spec.Extra[n] = opt + } + + for n, selector := range id.Selectors { + requirement := translateSelector(n, strings.ToLower(selector)) + v.Spec.Selector = append(v.Spec.Selector, requirement) + } + + v.Labels = id.Labels + + // initialize default option/label/selector + if id.Driver != "" { + v.Spec.Backend = id.Driver + v.Labels["backend"] = id.Driver + } else { + v.Spec.Backend = DefaultBackend + v.Labels["backend"] = DefaultBackend + } + + if hostname, err := os.Hostname(); err == nil { + v.Labels["hostname"] = hostname + } + + if _, ok := id.Selectors[selectNamespace]; !ok { + requirement := translateSelector("namespace", commonOptions["namespace"].Value) + v.Spec.Selector = append(v.Spec.Selector, requirement) + } + + if _, ok := v.Spec.Extra["sifter"]; !ok { + v.Spec.Extra["sifter"] = "Default" + } + + return v, nil +} + +// extractOptionsFromVolumeConfig will extract options from VolumeConfig. +func extractOptionsFromVolumeConfig(config *VolumeConfig) map[string]string { + var options = map[string]string{} + + if config == nil { + return options + } + + if config.Size != "" { + options[optionSize] = config.Size + } + + if config.FileSystem != "" { + options[optionFS] = config.FileSystem + } + + if config.WriteBPS != 0 { + options[optionWBps] = strconv.FormatInt(config.WriteBPS, 10) + } + + if config.ReadBPS != 0 { + options[optionRBps] = strconv.FormatInt(config.ReadBPS, 10) + } + + if config.TotalIOPS != 0 { + options[optionIOps] = strconv.FormatInt(config.TotalIOPS, 10) + } + + if config.WriteIOPS != 0 { + options[optionWriteIOps] = strconv.FormatInt(config.WriteIOPS, 10) + } + + if config.ReadIOPS != 0 { + options[optionReadIOps] = strconv.FormatInt(config.ReadIOPS, 10) + } + + return options +} + +// ExtractOptionsFromVolume extracts options from a volume. +func ExtractOptionsFromVolume(v *Volume) map[string]string { + var options map[string]string + + // extract options from volume config. + options = extractOptionsFromVolumeConfig(v.Spec.VolumeConfig) + + // extract options from selector. + for _, s := range v.Spec.Selector { + k := fmt.Sprintf("selector.%s", s.Key) + options[k] = strings.Join(s.Values, ",") + } + + // extract options from Extra. + for k, v := range v.Spec.Extra { + options[k] = v + } + + return options +}