From b0b292872d29aa793d5a2aab644503d0128ebff0 Mon Sep 17 00:00:00 2001 From: Eric Li Date: Tue, 29 May 2018 16:20:47 +0800 Subject: [PATCH] refactor: remove ceph volume plugin As the ceph volume plugin depends on alibaba storage service, ceph volume plugin can't work outside alibaba. So we will remove the ceph volume. If you need ceph storage, you can use contiv volume plugin (https://github.com/contiv-experimental/volplugin) Signed-off-by: Eric Li --- Makefile | 1 - apis/server/volume_bridge.go | 6 +- daemon/mgr/container.go | 2 +- daemon/mgr/volume.go | 27 +- storage/controlserver/client/client.go | 115 --- storage/controlserver/client/errors.go | 28 - storage/controlserver/client/httpcli.go | 276 ------ storage/controlserver/client/httpcli_test.go | 848 ------------------- storage/controlserver/client/url.go | 19 - storage/controlserver/client/url_test.go | 30 - storage/volume/README.md | 12 +- storage/volume/config.go | 11 +- storage/volume/core.go | 303 +------ storage/volume/core_util.go | 81 -- storage/volume/driver/driver_interface.go | 16 +- storage/volume/driver/remote.go | 43 +- storage/volume/examples/demo/demo.go | 4 +- storage/volume/modules/ceph/ceph.go | 305 ------- storage/volume/modules/ceph/map.go | 165 ---- storage/volume/modules/ceph/types.go | 127 --- storage/volume/modules/local/local.go | 76 +- storage/volume/modules/tmpfs/tmpfs.go | 15 +- storage/volume/types/storage.go | 82 -- storage/volume/types/volume.go | 10 - storage/volume/types/volume_util.go | 35 + test/api_system_test.go | 9 +- 26 files changed, 166 insertions(+), 2480 deletions(-) delete mode 100644 storage/controlserver/client/client.go delete mode 100644 storage/controlserver/client/errors.go delete mode 100644 storage/controlserver/client/httpcli.go delete mode 100644 storage/controlserver/client/httpcli_test.go delete mode 100644 storage/controlserver/client/url.go delete mode 100644 storage/controlserver/client/url_test.go delete mode 100644 storage/volume/modules/ceph/ceph.go delete mode 100644 storage/volume/modules/ceph/map.go delete mode 100644 storage/volume/modules/ceph/types.go delete mode 100644 storage/volume/types/storage.go diff --git a/Makefile b/Makefile index 1d6da8cb7..25dc6cd56 100644 --- a/Makefile +++ b/Makefile @@ -76,7 +76,6 @@ validate-swagger: ## run swagger validate .PHONY: modules modules: @./hack/module --clean - @./hack/module --add-volume=github.com/alibaba/pouch/storage/volume/modules/ceph @./hack/module --add-volume=github.com/alibaba/pouch/storage/volume/modules/tmpfs @./hack/module --add-volume=github.com/alibaba/pouch/storage/volume/modules/local diff --git a/apis/server/volume_bridge.go b/apis/server/volume_bridge.go index f78399e70..64c1de077 100644 --- a/apis/server/volume_bridge.go +++ b/apis/server/volume_bridge.go @@ -38,11 +38,7 @@ func (s *Server) createVolume(ctx context.Context, rw http.ResponseWriter, req * driver = volumetypes.DefaultBackend } - if err := s.VolumeMgr.Create(ctx, name, driver, options, labels); err != nil { - return err - } - - volume, err := s.VolumeMgr.Get(ctx, name) + volume, err := s.VolumeMgr.Create(ctx, name, driver, options, labels) if err != nil { return err } diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index 9433a375d..086700e72 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -1771,7 +1771,7 @@ func (mgr *ContainerManager) attachVolume(ctx context.Context, name string, meta opts := map[string]string{ "backend": driver, } - if err := mgr.VolumeMgr.Create(ctx, name, meta.HostConfig.VolumeDriver, opts, nil); err != nil { + if _, err := mgr.VolumeMgr.Create(ctx, name, meta.HostConfig.VolumeDriver, opts, nil); err != nil { logrus.Errorf("failed to create volume: %s, err: %v", name, err) return "", "", errors.Wrap(err, "failed to create volume") } diff --git a/daemon/mgr/volume.go b/daemon/mgr/volume.go index e38978b3a..cc618e46f 100644 --- a/daemon/mgr/volume.go +++ b/daemon/mgr/volume.go @@ -3,7 +3,6 @@ package mgr import ( "context" "os" - "path" "strings" "github.com/alibaba/pouch/pkg/errtypes" @@ -16,7 +15,7 @@ import ( // VolumeMgr defines interface to manage container volume. type VolumeMgr interface { // Create is used to create volume. - Create(ctx context.Context, name, driver string, options, labels map[string]string) error + Create(ctx context.Context, name, driver string, options, labels map[string]string) (*types.Volume, error) // Get returns the information of volume that specified name/id. Get(ctx context.Context, name string) (*types.Volume, error) @@ -59,33 +58,25 @@ func NewVolumeManager(cfg volume.Config) (*VolumeManager, error) { } // Create is used to create volume. -func (vm *VolumeManager) Create(ctx context.Context, name, driver string, options, labels map[string]string) error { +func (vm *VolumeManager) Create(ctx context.Context, name, driver string, options, labels map[string]string) (*types.Volume, error) { + if driver == "" { + driver = types.DefaultBackend + } + id := types.VolumeID{ Name: name, Driver: driver, Options: map[string]string{}, Selectors: map[string]string{}, + Labels: map[string]string{}, } if labels != nil { id.Labels = labels - } else { - id.Labels = map[string]string{} - } - - for key, opt := range options { - if strings.HasPrefix(key, "selector.") { - key = strings.TrimPrefix(key, "selector.") - id.Selectors[key] = opt - continue - } - - id.Options[key] = opt } - // set default volume mount path - if mount, ok := id.Options["mount"]; !ok || mount == "" { - id.Options["mount"] = path.Dir(vm.core.VolumeMetaPath) + if options != nil { + id.Options = options } return vm.core.CreateVolume(id) diff --git a/storage/controlserver/client/client.go b/storage/controlserver/client/client.go deleted file mode 100644 index 83633ca98..000000000 --- a/storage/controlserver/client/client.go +++ /dev/null @@ -1,115 +0,0 @@ -package client - -import ( - "context" - "crypto/tls" - "net/http" - "time" - - "github.com/alibaba/pouch/pkg/serializer" -) - -var ( - // VolumePath is volume path url prefix - VolumePath = "/volume" - - // StoragePath is storage path url prefix - StoragePath = "/storage" -) - -// Client represents a client connect to control server. -type Client struct { - ttl time.Duration - errChan chan error - cli *HTTPClient - tlsc *tls.Config -} - -// New is used to initialize client class object. -func New() *Client { - return &Client{ - ttl: time.Second * 90, - cli: HTTPClientNew(), - errChan: make(chan error, 1), - } -} - -// NewWithTLS is used to initialize client class object with tls. -func NewWithTLS(tlsc *tls.Config) *Client { - c := New() - c.cli.TLSConfig(tlsc) - return c -} - -func (c *Client) context() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.TODO(), c.ttl) -} - -func (c *Client) wait(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c.errChan: - if err != nil { - if code := c.cli.StatusCode(); code >= http.StatusBadRequest { - return newError(code, err.Error()) - } - } - return err - } -} - -// Create is used to create "obj" with url on control server. -func (c *Client) Create(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func(obj serializer.Object) { - c.errChan <- c.cli.POST().URL(url).JSONBody(obj).Do().Into(obj) - }(obj) - - return c.wait(ctx) -} - -// Update is used to update "obj" with url on control server. -func (c *Client) Update(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func(obj serializer.Object) { - c.errChan <- c.cli.PUT().URL(url).JSONBody(obj).Do().Into(obj) - }(obj) - - return c.wait(ctx) -} - -// Get returns "obj" content with url on control server. -func (c *Client) Get(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func(obj serializer.Object) { - c.errChan <- c.cli.GET().URL(url).Do().Into(obj) - }(obj) - - return c.wait(ctx) -} - -// Delete is used to delete "obj" with url on control server. -func (c *Client) Delete(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func() { - c.errChan <- c.cli.DELETE().URL(url).JSONBody(obj).Do().Err() - }() - - return c.wait(ctx) -} - -// ListKeys returns "obj" with url that contains labels or keys. -func (c *Client) ListKeys(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func(obj serializer.Object) { - c.errChan <- c.cli.GET().URL(url).Do().Into(obj) - }(obj) - - return c.wait(ctx) -} diff --git a/storage/controlserver/client/errors.go b/storage/controlserver/client/errors.go deleted file mode 100644 index 71d7ab3f0..000000000 --- a/storage/controlserver/client/errors.go +++ /dev/null @@ -1,28 +0,0 @@ -package client - -import ( - "net/http" -) - -// Error is defined as client error. -type Error struct { - status int - err string -} - -func newError(status int, err string) Error { - return Error{ - status: status, - err: err, - } -} - -// Error returns client Error's error message. -func (e Error) Error() string { - return e.err -} - -// IsNotFound will check the error is "StatusNotFound" or not. -func (e Error) IsNotFound() bool { - return e.status == http.StatusNotFound -} diff --git a/storage/controlserver/client/httpcli.go b/storage/controlserver/client/httpcli.go deleted file mode 100644 index 8a0f3f352..000000000 --- a/storage/controlserver/client/httpcli.go +++ /dev/null @@ -1,276 +0,0 @@ -package client - -import ( - "crypto/tls" - "crypto/x509" - "fmt" - "net/http" - "net/url" - "os" - "reflect" - - "github.com/alibaba/pouch/pkg/serializer" - - "github.com/go-resty/resty" -) - -// HTTPClient represents a client connect to http server. -type HTTPClient struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int -} - -var restyClient = resty.New().SetCloseConnection(true) - -func isPtr(i interface{}) bool { - return reflect.ValueOf(i).Kind() == reflect.Ptr -} - -func parseTLSConfig(ca, cert, key []byte) (*tls.Config, error) { - tlsConfig := &tls.Config{ - // Prefer TLS1.2 as the client minimum - MinVersion: tls.VersionTLS12, - InsecureSkipVerify: os.Getenv("DOCKER_TLS_VERIFY") == "", - } - - //parse root ca - cas, err := certpool(ca) - if err != nil { - return nil, err - } - - tlsConfig.RootCAs = cas - - //parse cert and key pem - tlsCert, err := tls.X509KeyPair(cert, key) - if err != nil { - return nil, err - } - - tlsConfig.Certificates = []tls.Certificate{tlsCert} - return tlsConfig, nil -} - -func certpool(pem []byte) (*x509.CertPool, error) { - certPool := x509.NewCertPool() - if !certPool.AppendCertsFromPEM(pem) { - return nil, fmt.Errorf("failed to append certificates from PEM") - } - - return certPool, nil -} - -// HTTPClientNew returns HTTPClient class object. -func HTTPClientNew() *HTTPClient { - return &HTTPClient{ - cli: restyClient, - req: restyClient.R(), - resp: &resty.Response{}, - } -} - -func (c *HTTPClient) verb(method string) *HTTPClient { - c.method = method - return c -} - -// TLSConfig is used to set tls config with specified config. -func (c *HTTPClient) TLSConfig(tlsc *tls.Config) *HTTPClient { - c.cli.SetTLSClientConfig(tlsc) - return c -} - -// TLS is used to set tls config with ca, cert and key. -func (c *HTTPClient) TLS(ca, cert, key []byte) (*HTTPClient, error) { - tlsc, err := parseTLSConfig(ca, cert, key) - if err != nil { - return nil, err - } - c.cli.SetTLSClientConfig(tlsc) - return c, nil -} - -// PUT used to implement http PUT method. -func PUT() *HTTPClient { - c := HTTPClientNew() - return c.verb("PUT") -} - -// GET used to implement HTTPClient GET method. -func GET() *HTTPClient { - c := HTTPClientNew() - return c.verb("GET") -} - -// POST used to implement HTTPClient POST method. -func POST() *HTTPClient { - c := HTTPClientNew() - return c.verb("POST") -} - -// DELETE used to implement HTTPClient DELETE method. -func DELETE() *HTTPClient { - c := HTTPClientNew() - return c.verb("DELETE") -} - -// PUT used to implement HTTPClient PUT method and then returns HTTPClient -func (c *HTTPClient) PUT() *HTTPClient { - c.verb("PUT") - return c -} - -// GET used to implement HTTPClient GET method and then returns HTTPClient -func (c *HTTPClient) GET() *HTTPClient { - c.verb("GET") - return c -} - -// POST used to implement HTTPClient POST method and then returns HTTPClient -func (c *HTTPClient) POST() *HTTPClient { - c.verb("POST") - return c -} - -// DELETE used to implement HTTPClient DELETE method and then returns HTTPClient -func (c *HTTPClient) DELETE() *HTTPClient { - c.verb("DELETE") - return c -} - -// Err returns HTTPClient's error. -func (c *HTTPClient) Err() error { - return c.err -} - -// Method used to implement HTTPClient method that specified by caller, -// and then returns HTTPClient object. -func (c *HTTPClient) Method(method string) *HTTPClient { - c.verb(method) - return c -} - -// URL is used to set url and then returns HTTPClient object. -func (c *HTTPClient) URL(rawurl string) *HTTPClient { - if _, err := url.Parse(rawurl); err != nil { - c.err = err - return c - } - c.urls = rawurl - return c -} - -// SetHeader is used to set header and then returns HTTPClient object. -func (c *HTTPClient) SetHeader(key, value string) *HTTPClient { - c.req.SetHeader(key, value) - return c -} - -// Body is used to set body with map[string]string type, -// and then returns HTTPClient object. -func (c *HTTPClient) Body(obj serializer.Object) *HTTPClient { - if c.err != nil { - return c - } - switch t := obj.(type) { - case map[string]string: - c.req.SetFormData(t) - default: - c.err = fmt.Errorf("post body need a map[string]string, get: %v", t) - } - return c -} - -// JSONBody is used to set body with json format and then returns HTTPClient object. -func (c *HTTPClient) JSONBody(obj serializer.Object) *HTTPClient { - c.req.SetHeader("Content-Type", "application/json") - c.req.SetBody(obj) - return c -} - -// Query is used to set query and then returns HTTPClient object. -func (c *HTTPClient) Query(obj serializer.Object) *HTTPClient { - if c.err != nil { - return c - } - switch t := obj.(type) { - case string: - c.req.SetQueryString(t) - case map[string][]string: - query := url.Values(t) - c.req.SetQueryString(query.Encode()) - case map[string]string: - c.req.SetQueryParams(t) - case url.Values: - c.req.SetQueryString(t.Encode()) - default: - c.err = fmt.Errorf("invalid query %#v", obj) - } - - return c -} - -// Do used to deal http request and then returns HTTPClient object. -func (c *HTTPClient) Do() *HTTPClient { - if c.err != nil { - return c - } - - switch c.method { - case "POST": - c.resp, c.err = c.req.Post(c.urls) - case "GET": - c.resp, c.err = c.req.Get(c.urls) - case "DELETE": - c.resp, c.err = c.req.Delete(c.urls) - case "PUT": - c.resp, c.err = c.req.Put(c.urls) - default: - c.err = fmt.Errorf("unsupport method: %s", c.method) - return c - } - - c.code = c.resp.StatusCode() - if c.code >= http.StatusBadRequest { - c.err = fmt.Errorf("Bad response code: %d, %s", c.code, string(c.resp.Body())) - } - return c -} - -// RespCodeEqual check HTTPClient's code is equal with "code" or not. -func (c *HTTPClient) RespCodeEqual(code int) bool { - return c.code == code -} - -// StatusCode returns HTTPClient code. -func (c *HTTPClient) StatusCode() int { - return c.code -} - -// Into used to set obj into body. -func (c *HTTPClient) Into(obj serializer.Object) error { - if c.err != nil { - return c.err - } - if !isPtr(obj) { - return fmt.Errorf("expect ptr, get: %v", obj) - } - body := c.resp.Body() - if reflect.TypeOf(obj).Elem().Kind() == reflect.String { - reflect.Indirect(reflect.ValueOf(obj)).Set(reflect.Indirect(reflect.ValueOf(string(body)))) - return nil - } - - newObj := reflect.New(reflect.TypeOf(obj).Elem()).Interface() - - if err := serializer.Codec.Decode(body, newObj); err != nil { - return err - } - reflect.Indirect(reflect.ValueOf(obj)).Set(reflect.Indirect(reflect.ValueOf(newObj))) - return nil -} diff --git a/storage/controlserver/client/httpcli_test.go b/storage/controlserver/client/httpcli_test.go deleted file mode 100644 index bedb1b088..000000000 --- a/storage/controlserver/client/httpcli_test.go +++ /dev/null @@ -1,848 +0,0 @@ -package client - -import ( - "crypto/tls" - "crypto/x509" - "reflect" - "testing" - - "github.com/alibaba/pouch/pkg/serializer" - "github.com/go-resty/resty" -) - -func Test_isPtr(t *testing.T) { - type args struct { - i interface{} - } - tests := []struct { - name string - args args - want bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := isPtr(tt.args.i); got != tt.want { - t.Errorf("isPtr() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_parseTLSConfig(t *testing.T) { - type args struct { - ca []byte - cert []byte - key []byte - } - tests := []struct { - name string - args args - want *tls.Config - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := parseTLSConfig(tt.args.ca, tt.args.cert, tt.args.key) - if (err != nil) != tt.wantErr { - t.Errorf("parseTLSConfig() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("parseTLSConfig() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_certpool(t *testing.T) { - type args struct { - pem []byte - } - tests := []struct { - name string - args args - want *x509.CertPool - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := certpool(tt.args.pem) - if (err != nil) != tt.wantErr { - t.Errorf("certpool() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("certpool() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClientNew(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := HTTPClientNew(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClientNew() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_verb(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - method string - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.verb(tt.args.method); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.verb() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_TLSConfig(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - tlsc *tls.Config - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.TLSConfig(tt.args.tlsc); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.TLSConfig() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_TLS(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - ca []byte - cert []byte - key []byte - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - got, err := c.TLS(tt.args.ca, tt.args.cert, tt.args.key) - if (err != nil) != tt.wantErr { - t.Errorf("HTTPClient.TLS() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.TLS() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestPUT(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := PUT(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("PUT() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestGET(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := GET(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("GET() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestPOST(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := POST(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("POST() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestDELETE(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := DELETE(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("DELETE() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_PUT(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.PUT(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.PUT() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_GET(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.GET(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.GET() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_POST(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.POST(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.POST() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_DELETE(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.DELETE(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.DELETE() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Err(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if err := c.Err(); (err != nil) != tt.wantErr { - t.Errorf("HTTPClient.Err() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestHTTPClient_Method(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - method string - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.Method(tt.args.method); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.Method() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_URL(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - rawurl string - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.URL(tt.args.rawurl); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.URL() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_SetHeader(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - key string - value string - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.SetHeader(tt.args.key, tt.args.value); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.SetHeader() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Body(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - obj serializer.Object - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.Body(tt.args.obj); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.Body() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_JSONBody(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - obj serializer.Object - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.JSONBody(tt.args.obj); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.JSONBody() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Query(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - obj serializer.Object - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.Query(tt.args.obj); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.Query() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Do(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.Do(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.Do() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_RespCodeEqual(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - code int - } - tests := []struct { - name string - fields fields - args args - want bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.RespCodeEqual(tt.args.code); got != tt.want { - t.Errorf("HTTPClient.RespCodeEqual() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_StatusCode(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want int - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.StatusCode(); got != tt.want { - t.Errorf("HTTPClient.StatusCode() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Into(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - obj serializer.Object - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if err := c.Into(tt.args.obj); (err != nil) != tt.wantErr { - t.Errorf("HTTPClient.Into() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/storage/controlserver/client/url.go b/storage/controlserver/client/url.go deleted file mode 100644 index b8d4fd2df..000000000 --- a/storage/controlserver/client/url.go +++ /dev/null @@ -1,19 +0,0 @@ -package client - -import ( - "net/url" - "path" -) - -// JoinURL is used to check "api" and join url. -func JoinURL(api string, s ...string) (string, error) { - url, err := url.Parse(api) - if err != nil { - return "", err - } - p := []string{url.Path} - p = append(p, s...) - url.Path = path.Join(p...) - - return url.String(), nil -} diff --git a/storage/controlserver/client/url_test.go b/storage/controlserver/client/url_test.go deleted file mode 100644 index b53ac1cb1..000000000 --- a/storage/controlserver/client/url_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package client - -import "testing" - -func TestJoinURL(t *testing.T) { - type args struct { - api string - s []string - } - tests := []struct { - name string - args args - want string - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := JoinURL(tt.args.api, tt.args.s...) - if (err != nil) != tt.wantErr { - t.Errorf("JoinURL() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("JoinURL() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/storage/volume/README.md b/storage/volume/README.md index 1d1cd5ae8..38d26f1b3 100644 --- a/storage/volume/README.md +++ b/storage/volume/README.md @@ -24,7 +24,7 @@ It provides interface is as following: ```go type VolumeMgr interface { // Create is used to create volume. - Create(ctx context.Context, name, driver string, options, labels map[string]string) error + Create(ctx context.Context, name, driver string, options, labels map[string]string) (*types.Volume, error) // Remove is used to remove an existing volume. Remove(ctx context.Context, name string) error @@ -95,10 +95,10 @@ type Driver interface { StoreMode(Context) VolumeStoreMode // Create a volume. - Create(Context, *types.Volume, *types.Storage) error + Create(Context, *types.Volume) error // Remove a volume. - Remove(Context, *types.Volume, *types.Storage) error + Remove(Context, *types.Volume) error // Path returns volume's path. Path(Context, *types.Volume) (string, error) @@ -117,16 +117,16 @@ type Opt interface { // AttachDetach represents volume attach/detach interface. type AttachDetach interface { // Attach a Volume to host, enable the volume. - Attach(Context, *types.Volume, *types.Storage) error + Attach(Context, *types.Volume) error // Detach a volume with host, disable the volume. - Detach(Context, *types.Volume, *types.Storage) error + Detach(Context, *types.Volume) error } // Formator represents volume format interface. type Formator interface { // Format a volume. - Format(Context, *types.Volume, *types.Storage) error + Format(Context, *types.Volume) error } ``` diff --git a/storage/volume/config.go b/storage/volume/config.go index 79acc64ef..1f606ae94 100644 --- a/storage/volume/config.go +++ b/storage/volume/config.go @@ -6,10 +6,9 @@ import ( // Config represents volume config struct. type Config struct { - ControlServerAddress string `json:"control-server-address"` // control server address in csi. - Timeout time.Duration `json:"volume-timeout"` // operation timeout. - RemoveVolume bool `json:"remove-volume"` // remove volume add data or volume's metadata when remove pouch volume. - DefaultBackend string `json:"volume-default-driver"` // default volume backend. - VolumeMetaPath string `json:"volume-meta-dir"` // volume metadata store path. - DriverAlias string `json:"volume-driver-alias"` // driver alias configure. + Timeout time.Duration `json:"volume-timeout"` // operation timeout. + RemoveVolume bool `json:"remove-volume"` // remove volume add data or volume's metadata when remove pouch volume. + DefaultBackend string `json:"volume-default-driver"` // default volume backend. + VolumeMetaPath string `json:"volume-meta-dir"` // volume metadata store path. + DriverAlias string `json:"volume-driver-alias"` // driver alias configure. } diff --git a/storage/volume/core.go b/storage/volume/core.go index 7f8548d87..48ea365bb 100644 --- a/storage/volume/core.go +++ b/storage/volume/core.go @@ -6,11 +6,9 @@ import ( "strings" metastore "github.com/alibaba/pouch/pkg/meta" - "github.com/alibaba/pouch/storage/controlserver/client" "github.com/alibaba/pouch/storage/volume/driver" volerr "github.com/alibaba/pouch/storage/volume/error" "github.com/alibaba/pouch/storage/volume/types" - "github.com/alibaba/pouch/storage/volume/types/meta" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -19,20 +17,12 @@ import ( // Core represents volume core struct. type Core struct { Config - BaseURL string - EnableControl bool - store *metastore.Store + store *metastore.Store } // NewCore returns Core struct instance with volume config. func NewCore(cfg Config) (*Core, error) { c := &Core{Config: cfg} - if cfg.ControlServerAddress != "" { - c.EnableControl = true - c.BaseURL = cfg.ControlServerAddress - } else { - c.EnableControl = false - } if cfg.DriverAlias != "" { parts := strings.Split(cfg.DriverAlias, ";") @@ -70,12 +60,6 @@ func NewCore(cfg Config) (*Core, error) { // GetVolume return a volume's info with specified name, If not errors. func (c *Core) GetVolume(id types.VolumeID) (*types.Volume, error) { - v := &types.Volume{ - ObjectMeta: meta.ObjectMeta{ - Name: id.Name, - }, - } - ctx := driver.Contexts() // first, try to get volume from local store. @@ -109,16 +93,7 @@ func (c *Core) GetVolume(id types.VolumeID) (*types.Volume, error) { return nil, err } - // then, try to get volume from central store. - if c.EnableControl { - if url, err := c.volumeURL(id); err == nil { - if err := client.New().Get(url, v); err == nil { - return v, nil - } - } - } - - // at last, scan all drivers + // scan all drivers logrus.Debugf("probing all drivers for volume with name(%s)", id.Name) drivers, err := driver.GetAll() if err != nil { @@ -130,6 +105,7 @@ func (c *Core) GetVolume(id types.VolumeID) (*types.Volume, error) { if !ok { continue } + v, err := d.Get(ctx, id.Name) if err != nil { // not found, ignore it @@ -160,92 +136,30 @@ func (c *Core) ExistVolume(id types.VolumeID) (bool, error) { } // CreateVolume use to create a volume, if failed, will return error info. -func (c *Core) CreateVolume(id types.VolumeID) error { +func (c *Core) CreateVolume(id types.VolumeID) (*types.Volume, error) { exist, err := c.ExistVolume(id) if err != nil { - return err - } - if exist { - return volerr.ErrVolumeExisted + return nil, err + } else if exist { + return nil, volerr.ErrVolumeExisted } - v, err := types.NewVolume(id) + dv, err := driver.Get(id.Driver) if err != nil { - return errors.Wrapf(err, "Create volume") + return nil, err } - dv, err := driver.Get(v.Spec.Backend) + volume, err := dv.Create(driver.Contexts(), id) if err != nil { - return err - } - - // check options, then add some driver-specific options. - if err := checkOptions(v); err != nil { - return err - } - - // Create volume's meta. - ctx := driver.Contexts() - var s *types.Storage - - if dv.StoreMode(ctx).CentralCreateDelete() { - url, err := c.volumeURL() - if err != nil { - return err - } - - // before creating, we can't get the path - p, err := c.volumePath(v, dv) - if err != nil { - return err - } - v.SetPath(p) - - if err := client.New().Create(url, v); err != nil { - return errors.Wrap(err, "Create volume") - } - - // get the storage - s, err = c.getStorage(v.StorageID()) - if err != nil { - return err - } - } else { - if err := dv.Create(ctx, v, nil); err != nil { - return err - } - - // after creating volume, we can get the path - p, err := c.volumePath(v, dv) - if err != nil { - return err - } - v.SetPath(p) - - if err := c.store.Put(v); err != nil { - return err - } + return nil, err } - // format the volume - if f, ok := dv.(driver.Formator); ok { - err := f.Format(ctx, v, s) - if err == nil { - return nil - } - - logrus.Errorf("failed to format new volume: %s, err: %v", v.Name, err) - logrus.Warnf("rollback create volume, start to cleanup new volume: %s", v.Name) - if err := c.RemoveVolume(id); err != nil { - logrus.Errorf("failed to rollback create volume, cleanup new volume: %s, err: %v", v.Name, err) - return err - } - - // return format error. - return err + // create the meta + if err := c.store.Put(volume); err != nil { + return nil, err } - return nil + return volume, nil } // ListVolumes return all volumes. @@ -253,27 +167,13 @@ func (c *Core) CreateVolume(id types.VolumeID) error { func (c *Core) ListVolumes(labels map[string]string) ([]*types.Volume, error) { var retVolumes = make([]*types.Volume, 0) - // first, list local meta store. + // list local meta store. metaList, err := c.store.List() if err != nil { return nil, err } - // then, list central store. - if c.EnableControl { - url, err := c.listVolumeURL(labels) - if err != nil { - return nil, errors.Wrap(err, "List volume's name") - } - - logrus.Debugf("List volume URL: %s, labels: %s", url, labels) - - if err := client.New().ListKeys(url, &retVolumes); err != nil { - return nil, errors.Wrap(err, "List volume's name") - } - } - - // at last, scan all drivers. + // scan all drivers. logrus.Debugf("probing all drivers for listing volume") drivers, err := driver.GetAll() if err != nil { @@ -335,7 +235,12 @@ func (c *Core) ListVolumes(labels map[string]string) ([]*types.Volume, error) { } for _, v := range realVolumes { + // found new volumes, store the meta + logrus.Warningf("found new volume %s", v.Name) + c.store.Put(v) + retVolumes = append(retVolumes, v) + } return retVolumes, nil @@ -346,86 +251,13 @@ func (c *Core) ListVolumes(labels map[string]string) ([]*types.Volume, error) { func (c *Core) ListVolumeName(labels map[string]string) ([]string, error) { var names []string - // first, list local meta store. - metaList, err := c.store.List() + volumes, err := c.ListVolumes(labels) if err != nil { - return nil, err - } - - // then, list central store. - if c.EnableControl { - url, err := c.listVolumeNameURL(labels) - if err != nil { - return nil, errors.Wrap(err, "List volume's name") - } - - logrus.Debugf("List volume name URL: %s, labels: %s", url, labels) - - if err := client.New().ListKeys(url, &names); err != nil { - return nil, errors.Wrap(err, "List volume's name") - } + return names, err } - // at last, scan all drivers - logrus.Debugf("probing all drivers for listing volume") - drivers, err := driver.GetAll() - if err != nil { - return nil, err - } - - ctx := driver.Contexts() - - var realVolumes = map[string]*types.Volume{} - var volumeDrivers = map[string]driver.Driver{} - - for _, dv := range drivers { - volumeDrivers[dv.Name(ctx)] = dv - - d, ok := dv.(driver.Lister) - if !ok { - continue - } - vList, err := d.List(ctx) - if err != nil { - logrus.Warnf("volume driver %s list error: %v", dv.Name(ctx), err) - continue - } - - for _, v := range vList { - realVolumes[v.Name] = v - } - } - - for name, obj := range metaList { - v, ok := obj.(*types.Volume) - if !ok { - continue - } - - d, ok := volumeDrivers[v.Spec.Backend] - if !ok { - // driver not exist, ignore it - continue - } - - if d.StoreMode(ctx).IsLocal() { - names = append(names, name) - continue - } - - _, ok = realVolumes[name] - if !ok { - // real volume not exist, ignore it - continue - } - - delete(realVolumes, name) - - names = append(names, name) - } - - for name := range realVolumes { - names = append(names, name) + for _, v := range volumes { + names = append(names, v.Name) } return names, nil @@ -438,25 +270,14 @@ func (c *Core) RemoveVolume(id types.VolumeID) error { return errors.Wrap(err, "Remove volume: "+id.String()) } - // Call interface to remove meta info. - if dv.StoreMode(driver.Contexts()).CentralCreateDelete() { - url, err := c.volumeURL(id) - if err != nil { - return errors.Wrap(err, "Remove volume: "+id.String()) - } - if err := client.New().Delete(url, v); err != nil { - return errors.Wrap(err, "Remove volume: "+id.String()) - } - } else { - // Call driver's Remove method to remove the volume. - if err := dv.Remove(driver.Contexts(), v, nil); err != nil { - return err - } + // Call driver's Remove method to remove the volume. + if err := dv.Remove(driver.Contexts(), v); err != nil { + return err + } - // delete the meta - if err := c.store.Remove(id.Name); err != nil { - return err - } + // delete the meta + if err := c.store.Remove(id.Name); err != nil { + return err } return nil @@ -500,35 +321,14 @@ func (c *Core) AttachVolume(id types.VolumeID, extra map[string]string) (*types. } if d, ok := dv.(driver.AttachDetach); ok { - var ( - s *types.Storage - err error - ) - - if dv.StoreMode(ctx).CentralCreateDelete() { - if s, err = c.getStorage(v.StorageID()); err != nil { - return nil, err - } - } - - if err = d.Attach(ctx, v, s); err != nil { + if err := d.Attach(ctx, v); err != nil { return nil, err } } - // Call interface to update meta info. - if dv.StoreMode(driver.Contexts()).UseLocalMeta() { - if err := c.store.Put(v); err != nil { - return nil, err - } - } else { - url, err := c.volumeURL(id) - if err != nil { - return nil, errors.Wrap(err, "Update volume: "+id.String()) - } - if err := client.New().Update(url, v); err != nil { - return nil, errors.Wrap(err, "Update volume: "+id.String()) - } + // update meta info. + if err := c.store.Put(v); err != nil { + return nil, err } return v, nil @@ -551,35 +351,14 @@ func (c *Core) DetachVolume(id types.VolumeID, extra map[string]string) (*types. // if volume has referance, skip to detach volume. ref := v.Option(types.OptionRef) if d, ok := dv.(driver.AttachDetach); ok && ref == "" { - var ( - s *types.Storage - err error - ) - - if dv.StoreMode(ctx).CentralCreateDelete() { - if s, err = c.getStorage(v.StorageID()); err != nil { - return nil, err - } - } - - if err = d.Detach(ctx, v, s); err != nil { + if err := d.Detach(ctx, v); err != nil { return nil, err } } - // Call interface to update meta info. - if dv.StoreMode(driver.Contexts()).UseLocalMeta() { - if err := c.store.Put(v); err != nil { - return nil, err - } - } else { - url, err := c.volumeURL(id) - if err != nil { - return nil, errors.Wrap(err, "Update volume: "+id.String()) - } - if err := client.New().Update(url, v); err != nil { - return nil, errors.Wrap(err, "Update volume: "+id.String()) - } + // update meta info. + if err := c.store.Put(v); err != nil { + return nil, err } return v, nil diff --git a/storage/volume/core_util.go b/storage/volume/core_util.go index 318760134..acb5a91ef 100644 --- a/storage/volume/core_util.go +++ b/storage/volume/core_util.go @@ -3,13 +3,9 @@ package volume import ( "fmt" "path" - "strings" - "github.com/alibaba/pouch/storage/controlserver/client" "github.com/alibaba/pouch/storage/volume/driver" - volerr "github.com/alibaba/pouch/storage/volume/error" "github.com/alibaba/pouch/storage/volume/types" - "github.com/alibaba/pouch/storage/volume/types/meta" "github.com/pkg/errors" ) @@ -26,83 +22,6 @@ func (c *Core) volumePath(v *types.Volume, dv driver.Driver) (string, error) { return p, nil } -func (c *Core) getStorage(id types.StorageID) (*types.Storage, error) { - if !c.EnableControl { - return nil, fmt.Errorf("disable control server") - } - - s := &types.Storage{ - ObjectMeta: meta.ObjectMeta{ - UID: id.UID, - }, - } - - url, err := c.storageURL(id) - if err != nil { - return nil, err - } - if err := client.New().Get(url, s); err != nil { - return nil, err - } - return s, nil -} - -func (c *Core) storageURL(id ...types.StorageID) (string, error) { - if c.BaseURL == "" { - return "", volerr.ErrDisableControl - } - if len(id) == 0 { - return client.JoinURL(c.BaseURL, types.APIVersion, client.StoragePath) - } - return client.JoinURL(c.BaseURL, types.APIVersion, client.StoragePath, id[0].UID) -} - -func (c *Core) volumeURL(id ...types.VolumeID) (string, error) { - if c.BaseURL == "" { - return "", volerr.ErrDisableControl - } - if len(id) == 0 { - return client.JoinURL(c.BaseURL, types.APIVersion, client.VolumePath) - } - return client.JoinURL(c.BaseURL, types.APIVersion, client.VolumePath, id[0].Name) -} - -func (c *Core) listVolumeURL(labels map[string]string) (string, error) { - if c.BaseURL == "" { - return "", volerr.ErrDisableControl - } - url, err := client.JoinURL(c.BaseURL, types.APIVersion, client.VolumePath) - if err != nil { - return "", err - } - - querys := make([]string, 0, len(labels)) - for k, v := range labels { - querys = append(querys, fmt.Sprintf("labels=%s=%s", k, v)) - } - - url = url + "?" + strings.Join(querys, "&") - return url, nil -} - -func (c *Core) listVolumeNameURL(labels map[string]string) (string, error) { - if c.BaseURL == "" { - return "", volerr.ErrDisableControl - } - url, err := client.JoinURL(c.BaseURL, types.APIVersion, "/listkeys", client.VolumePath) - if err != nil { - return "", err - } - - querys := make([]string, 0, len(labels)) - for k, v := range labels { - querys = append(querys, fmt.Sprintf("labels=%s=%s", k, v)) - } - - url = url + "?" + strings.Join(querys, "&") - return url, nil -} - func checkVolume(v *types.Volume) error { if v.Spec.ClusterID == "" || v.Status.Phase == types.VolumePhaseFailed { err := fmt.Errorf("volume is created failed: %s", v.Name) diff --git a/storage/volume/driver/driver_interface.go b/storage/volume/driver/driver_interface.go index e6b95346f..b7fbb75b4 100644 --- a/storage/volume/driver/driver_interface.go +++ b/storage/volume/driver/driver_interface.go @@ -13,10 +13,10 @@ type Driver interface { StoreMode(Context) VolumeStoreMode // Create a volume. - Create(Context, *types.Volume, *types.Storage) error + Create(Context, types.VolumeID) (*types.Volume, error) // Remove a volume. - Remove(Context, *types.Volume, *types.Storage) error + Remove(Context, *types.Volume) error // Path returns volume's path. Path(Context, *types.Volume) (string, error) @@ -31,16 +31,16 @@ type Opt interface { // AttachDetach represents volume attach/detach interface. type AttachDetach interface { // Attach a Volume to host, enable the volume. - Attach(Context, *types.Volume, *types.Storage) error + Attach(Context, *types.Volume) error // Detach a volume with host, disable the volume. - Detach(Context, *types.Volume, *types.Storage) error + Detach(Context, *types.Volume) error } // Formator represents volume format interface. type Formator interface { // Format a volume. - Format(Context, *types.Volume, *types.Storage) error + Format(Context, *types.Volume) error } // Getter represents volume get interface. @@ -54,9 +54,3 @@ type Lister interface { // List a volume from driver List(Context) ([]*types.Volume, error) } - -// GatewayDriver represents storage gateway interface. -type GatewayDriver interface { - // Report storage cluster status. - Report(Context) ([]*types.Storage, error) -} diff --git a/storage/volume/driver/remote.go b/storage/volume/driver/remote.go index 4834d67b5..8c4c969e1 100644 --- a/storage/volume/driver/remote.go +++ b/storage/volume/driver/remote.go @@ -33,18 +33,25 @@ func (r *remoteDriverWrapper) StoreMode(ctx Context) VolumeStoreMode { } // Create a remote volume. -func (r *remoteDriverWrapper) Create(ctx Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("driver wrapper [%s] creates volume: %s", r.Name(ctx), v.Name) +func (r *remoteDriverWrapper) Create(ctx Context, id types.VolumeID) (*types.Volume, error) { + ctx.Log.Debugf("driver wrapper [%s] creates volume: %s", r.Name(ctx), id.Name) - options := types.ExtractOptionsFromVolume(v) + ctx.Log.Debugf("driver wrapper gets options: %v", id.Options) - ctx.Log.Debugf("driver wrapper gets options: %v", options) + if err := r.proxy.Create(id.Name, id.Options); err != nil { + return nil, err + } - return r.proxy.Create(v.Name, options) + mountPath, err := r.proxy.Path(id.Name) + if err != nil { + mountPath = "" + } + + return types.NewVolumeFromID(mountPath, "", id), nil } // Remove a remote volume. -func (r *remoteDriverWrapper) Remove(ctx Context, v *types.Volume, s *types.Storage) error { +func (r *remoteDriverWrapper) Remove(ctx Context, v *types.Volume) error { ctx.Log.Debugf("driver wrapper [%s] removes volume: %s", r.Name(ctx), v.Name) return r.proxy.Remove(v.Name) @@ -61,15 +68,7 @@ func (r *remoteDriverWrapper) Get(ctx Context, name string) (*types.Volume, erro id := types.NewVolumeID(name, r.Name(ctx)) - volume, err := types.NewVolume(id) - if err != nil { - return nil, err - } - - // set the mountpoint - volume.Status.MountPoint = rv.Mountpoint - - return volume, nil + return types.NewVolumeFromID(rv.Mountpoint, "", id), nil } // List all volumes from remote driver. @@ -85,15 +84,7 @@ func (r *remoteDriverWrapper) List(ctx Context) ([]*types.Volume, error) { for _, rv := range rvList { id := types.NewVolumeID(rv.Name, r.Name(ctx)) - - volume, err := types.NewVolume(id) - if err != nil { - continue - } - - // set the mountpoint - volume.Status.MountPoint = rv.Mountpoint - + volume := types.NewVolumeFromID(rv.Mountpoint, "", id) vList = append(vList, volume) } @@ -119,7 +110,7 @@ func (r *remoteDriverWrapper) Options() map[string]types.Option { } // Attach a remote volume. -func (r *remoteDriverWrapper) Attach(ctx Context, v *types.Volume, s *types.Storage) error { +func (r *remoteDriverWrapper) Attach(ctx Context, v *types.Volume) error { ctx.Log.Debugf("driver wrapper [%s] attach volume: %s", r.Name(ctx), v.Name) _, err := r.proxy.Mount(v.Name, v.UID) @@ -131,7 +122,7 @@ func (r *remoteDriverWrapper) Attach(ctx Context, v *types.Volume, s *types.Stor } // Detach a remote volume. -func (r *remoteDriverWrapper) Detach(ctx Context, v *types.Volume, s *types.Storage) error { +func (r *remoteDriverWrapper) Detach(ctx Context, v *types.Volume) error { ctx.Log.Debugf("driver wrapper [%s] detach volume: %s", r.Name(ctx), v.Name) return r.proxy.Unmount(v.Name, v.UID) diff --git a/storage/volume/examples/demo/demo.go b/storage/volume/examples/demo/demo.go index 2fe3bf29f..331f7c185 100644 --- a/storage/volume/examples/demo/demo.go +++ b/storage/volume/examples/demo/demo.go @@ -28,13 +28,13 @@ func (d *Demo) StoreMode(ctx driver.Context) driver.VolumeStoreMode { } // Create a demo volume. -func (d *Demo) Create(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (d *Demo) Create(ctx driver.Context, v *types.Volume) error { ctx.Log.Infof("Demo create volume: %s", v.Name) return nil } // Remove a demo volume. -func (d *Demo) Remove(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (d *Demo) Remove(ctx driver.Context, v *types.Volume) error { ctx.Log.Infof("Demo Remove volume: %s", v.Name) return nil } diff --git a/storage/volume/modules/ceph/ceph.go b/storage/volume/modules/ceph/ceph.go deleted file mode 100644 index 6c107f053..000000000 --- a/storage/volume/modules/ceph/ceph.go +++ /dev/null @@ -1,305 +0,0 @@ -// +build linux - -package ceph - -import ( - "encoding/base64" - "fmt" - "os" - "path" - "strings" - "syscall" - "time" - - "github.com/alibaba/pouch/pkg/exec" - "github.com/alibaba/pouch/pkg/utils" - "github.com/alibaba/pouch/storage/volume/driver" - "github.com/alibaba/pouch/storage/volume/types" - "github.com/alibaba/pouch/storage/volume/types/meta" - - "github.com/go-ini/ini" -) - -func init() { - if err := driver.Register(&Ceph{}); err != nil { - panic(err) - } -} - -// Ceph represents ceph volume driver struct. -type Ceph struct{} - -// Name returns volume driver's name. -func (c *Ceph) Name(ctx driver.Context) string { - return "ceph" -} - -// StoreMode returns volume driver's store mode. -func (c *Ceph) StoreMode(ctx driver.Context) driver.VolumeStoreMode { - return driver.RemoteStore | driver.CreateDeleteInCentral -} - -// Create a ceph volume. -func (c *Ceph) Create(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph create volume: %s", v.Name) - - poolName := getPool(v) - objSize := getObjectSize(v) - feature := getImageFeature(v) - name := v.Name - conf, ok := ctx.GetString("conf") - if !ok { - return fmt.Errorf("get config file fail") - } - - var cmdList []string - base := v.Option("base") - if base != "" { - cmdList = []string{"clone", fmt.Sprintf("%s/%s@snap", poolName, base), - fmt.Sprintf("%s/%s", poolName, name)} - } else { - cmdList = []string{"create", name, "--pool", poolName, "--size", v.Size(), - "--object-size", objSize, "--image-feature", feature} - } - - ctx.Log.Infof("Ceph create volume, command: %v", cmdList) - - cmd := NewCephCommand("rbd", conf) - err := cmd.RunCommand(nil, cmdList...) - if err != nil { - ctx.Log.Errorf("Ceph rbd create failed, err: %v", err) - return err - } - - return nil -} - -// Remove a ceph volume. -func (c *Ceph) Remove(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph Remove volume: %s", v.Name) - - poolName := getPool(v) - name := v.Name - conf, ok := ctx.GetString("conf") - if !ok { - return fmt.Errorf("get config file fail") - } - - cmdList := []string{"snap", "purge", mkpool(poolName, name)} - - ctx.Log.Infof("Ceph remove volume, command: %v", cmdList) - - cmd := NewCephCommand("rbd", conf) - err := cmd.RunCommand(nil, cmdList...) - if err != nil { - return err - } - - cmdList = []string{"rm", mkpool(poolName, name)} - err = cmd.RunCommand(nil, cmdList...) - if err != nil { - return err - } - - return nil -} - -// Path returns ceph volume mount path. -func (c *Ceph) Path(ctx driver.Context, v *types.Volume) (string, error) { - ctx.Log.Debugf("Ceph volume mount path: %s", v.Name) - return path.Join("/mnt", getPool(v), v.Name), nil -} - -// Options returns ceph volume options. -func (c *Ceph) Options() map[string]types.Option { - return map[string]types.Option{ - "conf": {Value: "/etc/ceph/ceph.conf", Desc: "set ceph config file"}, - "key": {Value: "", Desc: "set ceph keyring file"}, - "pool": {Value: "rbd", Desc: "set rbd image pool"}, - "image-feature": {Value: "", Desc: "set rbd image feature"}, - "object-size": {Value: "4M", Desc: "set rbd image object size"}, - "base": {Value: "", Desc: "create image by base image"}, - } -} - -// Format is used to format ceph volume. -func (c *Ceph) Format(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph format volume: %s", v.Name) - - device, err := rbdMap(ctx, v, s.Spec.Address, s.Spec.Keyring) - if err != nil { - ctx.Log.Errorf("Ceph map volume: %s failed: %v", v.Name, err) - return err - } - - fs := v.FileSystem() - ctx.Log.Infof("Ceph make file system, volume: %s, device: %s, fs: %v", v.Name, device, fs) - - if err := utils.MakeFSVolume(fs, device, defaultFormatTimeout); err != nil { - ctx.Log.Errorf("Ceph mkfs error: %v", err) - if err := rbdUnmap(ctx, v); err != nil { - ctx.Log.Errorf("Ceph error while trying to unmap after failed filesystem creation: %v", err) - } - return err - } - - ctx.Log.Infof("Ceph start to unmap %s", v.Name) - if err := rbdUnmap(ctx, v); err != nil { - ctx.Log.Errorf("Ceph unmap v :%s err: %v", v.Name, err) - return err - } - - return nil -} - -// Attach is used to attach ceph volume. -func (c *Ceph) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph attach volume: %s", v.Name) - var err error - volumePath := v.Path() - - // Map rbd device - devName, err := rbdMap(ctx, v, s.Spec.Address, s.Spec.Keyring) - if err != nil { - ctx.Log.Errorf("Ceph volume: %s map error: %v", v.Name, err) - return err - } - - // Create directory to mount - err = os.MkdirAll(volumePath, 0755) - if err != nil && !os.IsExist(err) { - return fmt.Errorf("error creating %q directory: %v", volumePath, err) - } - - // Mount the RBD - mountOpt := v.MountOption() - ctx.Log.Infof("Ceph mount volume: %s, device: %s, mountpath: %s, opt: %v", - v.Name, devName, volumePath, mountOpt) - - err = utils.MountVolume(mountOpt, devName, volumePath, time.Second*60) - if err != nil { - ctx.Log.Errorf("Ceph volume: %s mount failed, err: %v", v.Name, err) - return fmt.Errorf("failed to mount rbd device %q: %v", devName, err) - } - - return nil -} - -// Detach is used to detach ceph volume. -func (c *Ceph) Detach(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph detach volume: %s", v.Name) - mountPath := v.Path() - - err := exec.Retry(3, 3*time.Second, func() error { - if err := syscall.Unmount(mountPath, 0); err != nil { - ctx.Log.Errorf("Ceph volume: %s unmount %q failed, err: %v", v.Name, mountPath, err) - return err - } - return nil - }) - if err != nil { - return err - } - - // Remove the mounted directory - // FIXME remove all, but only after the FIXME above. - err = exec.Retry(3, 3*time.Second, func() error { - if err := os.Remove(mountPath); err != nil && !os.IsNotExist(err) { - ctx.Log.Errorf("Ceph removing %q directory failed, err: %v", mountPath, err) - return err - } - return nil - }) - if err != nil { - return fmt.Errorf("failed to remove mount path: %s after 3 retries", mountPath) - } - - // Unmap rbd device - if err := rbdUnmap(ctx, v); err != os.ErrNotExist { - return err - } - - return nil -} - -// Report is used to collect ceph storage information and report to control server. -func (c *Ceph) Report(ctx driver.Context) ([]*types.Storage, error) { - var err error - var storages []*types.Storage - conf, ok := ctx.GetString("conf") - if !ok { - return nil, fmt.Errorf("get config file fail") - } - keyFile, ok := ctx.GetString("keyring") - if !ok { - return nil, fmt.Errorf("get keyring file fail") - } - - s := &types.Storage{ - ObjectMeta: meta.ObjectMeta{ - Labels: map[string]string{}, - Annotations: map[string]string{}, - }, - Spec: &types.StorageSpec{}, - Status: &types.StorageStatus{}, - } - - // analyze keyfile - cfg, err := ini.Load(keyFile) - if err != nil { - ctx.Log.Errorf("Ceph load keyring error : %v", err) - return nil, err - } - key, err := cfg.Section("client.admin").GetKey("key") - if err != nil { - ctx.Log.Errorf("Ceph get client.admin section error: %v", err) - return nil, err - } - keyring := fmt.Sprintf("%s:%s", "admin", key.Value()) - encoded := base64.StdEncoding.EncodeToString([]byte(keyring)) - s.Spec.Keyring = encoded - - // get ceph quorum status - cmd := NewCephCommand("ceph", conf) - status := &QuorumStatus{} - if err = cmd.RunCommand(status, QuorumCommand...); err != nil { - ctx.Log.Errorf("Ceph run quorum error: %v", err) - return nil, err - } - monAddrs := []string{} - for _, mon := range status.MonMap.Mons { - monAddrs = append(monAddrs, strings.Split(mon.Addr, "/")[0]) - } - s.Spec.Address = strings.Join(monAddrs, ",") - - ctx.Log.Debugf("Ceph report storage: %s", s.Spec.Address) - - // get ceph status - stats := &Stats{} - if err = cmd.RunCommand(stats, CephStatusCommand...); err != nil { - ctx.Log.Errorf("Ceph run cephstatus command error: %v", err) - return nil, err - } - s.Status.Schedulable = stats.Health.OverallStatus != HealthErr - s.Status.HealthyStatus = stats.Health.OverallStatus - - // get ceph usage - usage := &PoolStats{} - if err = cmd.RunCommand(usage, CephUsageCommand...); err != nil { - return nil, err - } - poolSpec := make(map[string]types.PoolSpec) - for _, p := range usage.Pools { - poolSpec[p.Name] = types.PoolSpec{ - Capacity: p.Stats.BytesUsed + p.Stats.MaxAvail, - Available: p.Stats.MaxAvail, - } - } - s.Spec.PoolSpec = poolSpec - - s.UID = stats.ID - - storages = append(storages, s) - - return storages, nil -} diff --git a/storage/volume/modules/ceph/map.go b/storage/volume/modules/ceph/map.go deleted file mode 100644 index d4499aca5..000000000 --- a/storage/volume/modules/ceph/map.go +++ /dev/null @@ -1,165 +0,0 @@ -// +build linux - -package ceph - -import ( - "encoding/json" - "fmt" - "os" - "strings" - "time" - - "github.com/alibaba/pouch/pkg/exec" - "github.com/alibaba/pouch/storage/volume/driver" - "github.com/alibaba/pouch/storage/volume/types" -) - -var ( - bin = "/usr/bin/rbd-nbd" - poolName = "rbd" - objectSize = "4M" - imageFeature = "layering,exclusive-lock" -) - -func getImageFeature(v *types.Volume) string { - if s, ok := v.Spec.Extra["image-feature"]; ok { - return s - } - return imageFeature -} - -func getObjectSize(v *types.Volume) string { - if s, ok := v.Spec.Extra["object-size"]; ok { - return s - } - return objectSize -} - -func getPool(v *types.Volume) string { - if p, ok := v.Spec.Extra["pool"]; ok { - return p - } - return poolName -} - -func mkpool(poolName, volumeName string) string { - return fmt.Sprintf("%s/%s", poolName, volumeName) -} - -func rbdMap(ctx driver.Context, v *types.Volume, address, keyring string) (string, error) { - poolName := getPool(v) - name := v.Name - - args := []string{"map", mkpool(poolName, name), "--mon_host", address} - exit, _, stderr, err := exec.RunWithRetry(3, 100*time.Microsecond, defaultTimeout, bin, args...) - if err != nil || exit != 0 { - err = fmt.Errorf("could not map %q: %d (%v) (%v)", name, exit, err, stderr) - ctx.Log.Errorf("Ceph %s", err.Error()) - return "", err - } - - var device string - rbdmap, err := showMapped(ctx) - if err != nil { - return "", err - } - - for _, rbd := range rbdmap.MapDevice { - if rbd.Image == name && rbd.Pool == poolName { - device = rbd.Device - break - } - } - if device == "" { - return "", fmt.Errorf("volume %s in pool %s not found in RBD showmapped output", name, poolName) - } - - ctx.Log.Infof("Ceph mapped volume %q as %q", name, device) - - return device, nil -} - -func rbdUnmap(ctx driver.Context, v *types.Volume) error { - err := exec.Retry(10, 100*time.Millisecond, func() error { - rbdmap, err := showMapped(ctx) - if err != nil { - return err - } - - err = doUnmap(ctx, v, rbdmap) - if err != nil { - return err - } - return nil - }) - if err != nil { - return fmt.Errorf("could not unmap volume %q after 10 retries", v.Name) - } - - return nil -} - -func showMapped(ctx driver.Context) (RBDMap, error) { - rbdmap := RBDMap{} - - exit, stdout, stderr, err := exec.RunWithRetry(3, 100*time.Millisecond, defaultTimeout, - bin, "--dump-json", "list-mapped") - if err != nil || exit != 0 { - ctx.Log.Warnf("Ceph could not show mapped. Retrying: err:%v exit: %v, stderr: %v, stdout:%v", - err, exit, stderr, stdout) - } else if stdout != "" { - err = json.Unmarshal([]byte(stdout), &rbdmap) - if err != nil { - ctx.Log.Errorf("Ceph could not parse rbd list-mapped output: %s", stdout) - } - } - - return rbdmap, err -} - -func doUnmap(ctx driver.Context, v *types.Volume, rbdmap RBDMap) error { - poolName := getPool(v) - name := v.Name - - for _, rbd := range rbdmap.MapDevice { - if rbd.Image != name || rbd.Pool != poolName { - continue - } - ctx.Log.Debugf("Ceph unmapping volume %s/%s at device %q", poolName, name, strings.TrimSpace(rbd.Device)) - - // Check device is exist or not. - if _, err := os.Stat(rbd.Device); err != nil { - ctx.Log.Errorf("Ceph trying to unmap device %q for %s/%s that does not exist, continuing", - poolName, name, rbd.Device) - continue - } - - // Unmap device. - exit, _, stderr, err := exec.Run(defaultTimeout, bin, "unmap", rbd.Device) - if err != nil { - ctx.Log.Errorf("Ceph could not unmap volume %q (device %q): %d (%v) (%s)", - name, rbd.Device, exit, err, stderr) - return err - } - if exit != 0 { - err = fmt.Errorf("Ceph could not unmap volume %q (device %q): %d (%s)", - name, rbd.Device, exit, stderr) - ctx.Log.Error(err) - return err - } - - // Check have unmapped or not. - rbdmap2, err := showMapped(ctx) - if err != nil { - return err - } - for _, rbd2 := range rbdmap2.MapDevice { - if rbd.Image == rbd2.Image && rbd.Pool == rbd2.Pool { - return fmt.Errorf("could not unmap volume %q, device: %q is exist", - name, rbd.Image) - } - } - break - } - return nil -} diff --git a/storage/volume/modules/ceph/types.go b/storage/volume/modules/ceph/types.go deleted file mode 100644 index 9f895f5b8..000000000 --- a/storage/volume/modules/ceph/types.go +++ /dev/null @@ -1,127 +0,0 @@ -// +build linux - -package ceph - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/alibaba/pouch/pkg/exec" -) - -const ( - // HealthOK denotes the status of ceph cluster when healthy. - HealthOK = "HEALTH_OK" - - // HealthWarn denotes the status of ceph cluster when unhealthy but recovering. - HealthWarn = "HEALTH_WARN" - - // HealthErr denotes the status of ceph cluster when unhealthy but usually needs - // manual intervention. - HealthErr = "HEALTH_ERR" -) - -var ( - // QuorumCommand represents ceph quorum_status command. - QuorumCommand = []string{"quorum_status", "-f", "json"} - - // CephUsageCommand represents ceph df command. - CephUsageCommand = []string{"df", "detail", "-f", "json"} - - // CephStatusCommand represents ceph status command. - CephStatusCommand = []string{"status", "-f", "json"} - - // OsdDFCommand represents ceph osd df command. - OsdDFCommand = []string{"osd", "df", "-f", "json"} - - // OsdPerfCommand represents ceph osd perf command. - OsdPerfCommand = []string{"osd", "perf", "-f", "json"} - - // OsdDumpCommand represents ceph osd dump command. - OsdDumpCommand = []string{"osd", "dump", "-f", "json"} -) - -var ( - defaultTimeout = time.Second * 10 - defaultFormatTimeout = time.Second * 120 -) - -type mon struct { - Addr string `json:"addr"` - Name string `json:"name"` - Rank int `json:"rank"` -} - -type monMap struct { - Created string `json:"created"` - Epoch int `json:"epoch"` - FsID string `json:"fsid"` - Modified string `json:"modified"` - Mons []mon `json:"mons"` -} - -// QuorumStatus represents ceph quorum_status result struct. -type QuorumStatus struct { - MonMap monMap `json:"monmap"` - QuorumLeaderName string `json:"quorum_leader_name"` -} - -// Stats represents ceph status result struct. -type Stats struct { - ID string `json:"fsid"` - Health struct { - OverallStatus string `json:"overall_status"` - } `json:"health"` -} - -// PoolStats represents ceph pool status struct. -type PoolStats struct { - Pools []struct { - Name string `json:"name"` - Stats struct { - BytesUsed int64 `json:"bytes_used"` - MaxAvail int64 `json:"max_avail"` - } `json:"stats"` - } `json:"pools"` -} - -// RBDMap represents "rbd listmapped" command result struct. -type RBDMap struct { - MapDevice []struct { - ID int `json:"id"` - Device string `json:"name"` - Image string `json:"image"` - Pool string `json:"pool"` - } `json:"Devices"` -} - -// Command represents ceph command. -type Command struct { - bin string - conf string -} - -// NewCephCommand returns ceph command instance. -func NewCephCommand(bin, conf string) *Command { - return &Command{bin, conf} -} - -// RunCommand is used to execute ceph command. -func (cc *Command) RunCommand(obj interface{}, args ...string) error { - args = append(args, "--conf") - args = append(args, cc.conf) - - exit, stdout, stderr, err := exec.Run(defaultTimeout, cc.bin, args...) - if err != nil || exit != 0 { - return fmt.Errorf("run command failed, err: %v, exit: %d, output: %s", err, exit, stderr) - } - - if obj != nil { - if err := json.Unmarshal([]byte(stdout), obj); err != nil { - return err - } - } - - return nil -} diff --git a/storage/volume/modules/local/local.go b/storage/volume/modules/local/local.go index dcf91d9ea..edfff203b 100644 --- a/storage/volume/modules/local/local.go +++ b/storage/volume/modules/local/local.go @@ -7,16 +7,15 @@ import ( "os" "path" "strconv" - "strings" - "time" + "github.com/alibaba/pouch/pkg/bytefmt" "github.com/alibaba/pouch/storage/quota" "github.com/alibaba/pouch/storage/volume/driver" "github.com/alibaba/pouch/storage/volume/types" ) var ( - dataDir = "/mnt/local" + dataDir = "/var/lib/pouch/volume" ) func init() { @@ -40,23 +39,42 @@ func (p *Local) StoreMode(ctx driver.Context) driver.VolumeStoreMode { } // Create a local volume. -func (p *Local) Create(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Local create volume: %s", v.Name) - mountPath, _ := p.Path(ctx, v) +func (p *Local) Create(ctx driver.Context, id types.VolumeID) (*types.Volume, error) { + ctx.Log.Debugf("Local create volume: %s", id.Name) + var ( + mountPath = path.Join(dataDir, id.Name) + size string + ) + + // parse the mount path. + if dir, ok := id.Options["mount"]; ok { + mountPath = path.Join(dir, id.Name) + } + + // parse the size. + if value, ok := id.Options["opt.size"]; ok { + sizeInt, err := bytefmt.ToMegabytes(value) + if err != nil { + return nil, err + } + size = strconv.Itoa(int(sizeInt)) + "M" + } + + // create the volume path if st, exist := os.Stat(mountPath); exist != nil { if e := os.MkdirAll(mountPath, 0755); e != nil { - return e + return nil, e } } else if !st.IsDir() { - return fmt.Errorf("mount path is not a dir %s", mountPath) + return nil, fmt.Errorf("mount path is not a dir %s", mountPath) } - return nil + return types.NewVolumeFromID(mountPath, size, id), nil } // Remove a local volume. -func (p *Local) Remove(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Local) Remove(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Local remove volume: %s", v.Name) mountPath := v.Path() @@ -82,28 +100,15 @@ func (p *Local) Path(ctx driver.Context, v *types.Volume) (string, error) { // Options returns local volume's options. func (p *Local) Options() map[string]types.Option { return map[string]types.Option{ - "ids": {Value: "", Desc: "local volume user's ids"}, - "reqID": {Value: "", Desc: "create local volume request id"}, - "freeTime": {Value: "", Desc: "local volume free time"}, - "mount": {Value: "", Desc: "local directory"}, + "mount": {Value: "", Desc: "local directory"}, } } // Attach a local volume. -func (p *Local) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Local) Attach(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Local attach volume: %s", v.Name) mountPath := v.Path() size := v.Size() - reqID := v.Option("reqID") - ids := v.Option("ids") - - if ids != "" { - if !strings.Contains(ids, reqID) { - ids = ids + "," + reqID - } - } else { - ids = reqID - } if st, exist := os.Stat(mountPath); exist != nil { if e := os.MkdirAll(mountPath, 0777); e != nil { @@ -120,31 +125,12 @@ func (p *Local) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) er } } - v.SetOption("ids", ids) - v.SetOption("freeTime", "") - return nil } // Detach a local volume. -func (p *Local) Detach(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Local) Detach(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Local detach volume: %s", v.Name) - reqID := v.Option("reqID") - ids := v.Option("ids") - - arr := strings.Split(ids, ",") - newIDs := []string{} - for _, id := range arr { - if id != reqID { - newIDs = append(newIDs, reqID) - } - } - - if len(newIDs) == 0 { - v.SetOption("freeTime", strconv.FormatInt(time.Now().Unix(), 10)) - } - - v.SetOption("ids", strings.Join(newIDs, ",")) return nil } diff --git a/storage/volume/modules/tmpfs/tmpfs.go b/storage/volume/modules/tmpfs/tmpfs.go index 4ed1a9be2..eecbece1b 100644 --- a/storage/volume/modules/tmpfs/tmpfs.go +++ b/storage/volume/modules/tmpfs/tmpfs.go @@ -41,14 +41,17 @@ func (p *Tmpfs) StoreMode(ctx driver.Context) driver.VolumeStoreMode { } // Create a tmpfs volume. -func (p *Tmpfs) Create(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Tmpfs create volume: %s", v.Name) +func (p *Tmpfs) Create(ctx driver.Context, id types.VolumeID) (*types.Volume, error) { + ctx.Log.Debugf("Tmpfs create volume: %s", id.Name) - return nil + // parse the mount path + mountPath := path.Join(dataDir, id.Name) + + return types.NewVolumeFromID(mountPath, "", id), nil } // Remove a tmpfs volume. -func (p *Tmpfs) Remove(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Tmpfs) Remove(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Tmpfs remove volume: %s", v.Name) return nil @@ -70,7 +73,7 @@ func (p *Tmpfs) Options() map[string]types.Option { } // Attach a tmpfs volume. -func (p *Tmpfs) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Tmpfs) Attach(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Tmpfs attach volume: %s", v.Name) mountPath := v.Path() size := v.Size() @@ -105,7 +108,7 @@ func (p *Tmpfs) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) er } // Detach a tmpfs volume. -func (p *Tmpfs) Detach(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Tmpfs) Detach(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Tmpfs detach volume: %s", v.Name) mountPath := v.Path() reqID := v.Option("reqID") diff --git a/storage/volume/types/storage.go b/storage/volume/types/storage.go deleted file mode 100644 index e1425e930..000000000 --- a/storage/volume/types/storage.go +++ /dev/null @@ -1,82 +0,0 @@ -package types - -import ( - "time" - - "github.com/alibaba/pouch/storage/volume/types/meta" -) - -// StorageSpec represents storage spec. -type StorageSpec struct { - Type string `json:"type"` // storage type - ID string `json:"id,omitempty"` // storage uid or unique name - Address string `json:"address"` // storage address, such as ceph it's monitor ip:port,ip:port - PoolSpec map[string]PoolSpec `json:"poolspec"` // storage pool spec - - ClusterName string `json:"clustername,omitempty"` // storage cluster name - Keyring string `json:"keyring,omitempty"` // storage access key, such as ceph's keyring user:secret -} - -// PoolSpec represents storage pool spec. -type PoolSpec struct { - WriteBps int64 `json:"iowbps"` // storage write bytes per second - ReadBps int64 `json:"iorbps"` // storage read bytes per second - ReadIOps int64 `json:"ioriops"` // storage read io count per second - WriteIOps int64 `json:"iowiops"` // storage write io count per second - IOps int64 `json:"ioiops"` // storage total io count per second - Capacity int64 `json:"capacity"` // storage capacity like: MB/GB/TB/PB - Available int64 `json:"available"` // storage available like: MB/GB/TB/PB - MaxVolumes int64 `json:"maxvolume"` // max disks in this storage cluster - NicSendBps int64 `json:"networkSendBps"` // nic card send bytes per second - NicRecvBps int64 `json:"networkRecvBps"` // nic card recv bytes per second -} - -// StorageStatus represents storage status. -type StorageStatus struct { - Schedulable bool `json:"schedulable,omitempty"` - LastUpdateTime *time.Time `json:"lastUpdateTime,omitempty"` - HealthyStatus string `json:"message"` -} - -// Storage represents storage struct. -type Storage struct { - meta.ObjectMeta `json:",inline"` - Spec *StorageSpec `json:"spec,omitempty"` - Status *StorageStatus `json:"status,omitempty"` -} - -// Address return storage operate address. -func (s *Storage) Address() string { - return s.Spec.Address -} - -// SetAddress set storage operate address. -func (s *Storage) SetAddress(address string) { - s.Spec.Address = address -} - -// StorageList represents storage list type. -type StorageList struct { - meta.ListMeta `json:",inline,omitempty"` - Items []Storage `json:"items,omitempty"` -} - -// StorageID returns storage's uid. -func (s *Storage) StorageID() StorageID { - return NewStorageID(s.GetUID()) -} - -// StorageID represents storage uid. -type StorageID struct { - UID string -} - -// NewStorageID is used to set uid. -func NewStorageID(uid string) StorageID { - return StorageID{UID: uid} -} - -// String returns storage uid. -func (si StorageID) String() string { - return si.UID -} diff --git a/storage/volume/types/volume.go b/storage/volume/types/volume.go index ef3147f7d..9b20819c5 100644 --- a/storage/volume/types/volume.go +++ b/storage/volume/types/volume.go @@ -147,16 +147,6 @@ func (v *Volume) Driver() string { return v.Spec.Backend } -// StorageID return driver's storage identity. -func (v *Volume) StorageID() StorageID { - return NewStorageID(v.Spec.ClusterID) -} - -// SetStorageUID save storage uid into volume. -func (v *Volume) SetStorageUID(uid string) { - v.Spec.ClusterID = uid -} - // VolumeID return volume's identity. func (v *Volume) VolumeID() VolumeID { return NewVolumeID(v.Name, v.Driver()) diff --git a/storage/volume/types/volume_util.go b/storage/volume/types/volume_util.go index 0e3929979..d744ccd12 100644 --- a/storage/volume/types/volume_util.go +++ b/storage/volume/types/volume_util.go @@ -105,6 +105,41 @@ func buildVolumeConfig(options map[string]string) (*VolumeConfig, error) { return config, nil } +// NewVolumeFromID will create an Volume using mountPath, size and VolumeID. +func NewVolumeFromID(mountPath, size string, id VolumeID) *Volume { + now := time.Now() + v := &Volume{ + ObjectMeta: meta.ObjectMeta{ + Name: id.Name, + Claimer: "pouch", + Namespace: "pouch", + UID: uuid.NewRandom().String(), + Generation: meta.ObjectPhasePreCreate, + Labels: id.Labels, + CreationTimestamp: &now, + ModifyTimestamp: &now, + }, + Spec: &VolumeSpec{ + Backend: id.Driver, + Extra: id.Options, + Selector: make(Selector, 0), + VolumeConfig: &VolumeConfig{ + Size: size, + }, + }, + Status: &VolumeStatus{ + MountPoint: mountPath, + }, + } + + for n, selector := range id.Selectors { + requirement := translateSelector(n, strings.ToLower(selector)) + v.Spec.Selector = append(v.Spec.Selector, requirement) + } + + return v +} + // NewVolume generates a volume based VolumeID func NewVolume(id VolumeID) (*Volume, error) { now := time.Now() diff --git a/test/api_system_test.go b/test/api_system_test.go index 3e3de65f9..d0e4c363c 100644 --- a/test/api_system_test.go +++ b/test/api_system_test.go @@ -53,11 +53,10 @@ func (suite *APISystemSuite) TestInfo(c *check.C) { c.Assert(got.NCPU, check.Equals, int64(runtime.NumCPU())) // Check the volume drivers - c.Assert(len(got.VolumeDrivers), check.Equals, 4) - c.Assert(got.VolumeDrivers[0], check.Equals, "ceph") - c.Assert(got.VolumeDrivers[1], check.Equals, "local") - c.Assert(got.VolumeDrivers[2], check.Equals, "local-persist") - c.Assert(got.VolumeDrivers[3], check.Equals, "tmpfs") + c.Assert(len(got.VolumeDrivers), check.Equals, 3) + c.Assert(got.VolumeDrivers[0], check.Equals, "local") + c.Assert(got.VolumeDrivers[1], check.Equals, "local-persist") + c.Assert(got.VolumeDrivers[2], check.Equals, "tmpfs") } // TestVersion tests /version API.