diff --git a/Makefile b/Makefile index 1d6da8cb7f..25dc6cd566 100644 --- a/Makefile +++ b/Makefile @@ -76,7 +76,6 @@ validate-swagger: ## run swagger validate .PHONY: modules modules: @./hack/module --clean - @./hack/module --add-volume=github.com/alibaba/pouch/storage/volume/modules/ceph @./hack/module --add-volume=github.com/alibaba/pouch/storage/volume/modules/tmpfs @./hack/module --add-volume=github.com/alibaba/pouch/storage/volume/modules/local diff --git a/apis/server/volume_bridge.go b/apis/server/volume_bridge.go index f78399e707..64c1de077b 100644 --- a/apis/server/volume_bridge.go +++ b/apis/server/volume_bridge.go @@ -38,11 +38,7 @@ func (s *Server) createVolume(ctx context.Context, rw http.ResponseWriter, req * driver = volumetypes.DefaultBackend } - if err := s.VolumeMgr.Create(ctx, name, driver, options, labels); err != nil { - return err - } - - volume, err := s.VolumeMgr.Get(ctx, name) + volume, err := s.VolumeMgr.Create(ctx, name, driver, options, labels) if err != nil { return err } diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index 9433a375d6..086700e729 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -1771,7 +1771,7 @@ func (mgr *ContainerManager) attachVolume(ctx context.Context, name string, meta opts := map[string]string{ "backend": driver, } - if err := mgr.VolumeMgr.Create(ctx, name, meta.HostConfig.VolumeDriver, opts, nil); err != nil { + if _, err := mgr.VolumeMgr.Create(ctx, name, meta.HostConfig.VolumeDriver, opts, nil); err != nil { logrus.Errorf("failed to create volume: %s, err: %v", name, err) return "", "", errors.Wrap(err, "failed to create volume") } diff --git a/daemon/mgr/volume.go b/daemon/mgr/volume.go index e38978b3ab..cc618e46fc 100644 --- a/daemon/mgr/volume.go +++ b/daemon/mgr/volume.go @@ -3,7 +3,6 @@ package mgr import ( "context" "os" - "path" "strings" "github.com/alibaba/pouch/pkg/errtypes" @@ -16,7 +15,7 @@ import ( // VolumeMgr defines interface to manage container volume. type VolumeMgr interface { // Create is used to create volume. - Create(ctx context.Context, name, driver string, options, labels map[string]string) error + Create(ctx context.Context, name, driver string, options, labels map[string]string) (*types.Volume, error) // Get returns the information of volume that specified name/id. Get(ctx context.Context, name string) (*types.Volume, error) @@ -59,33 +58,25 @@ func NewVolumeManager(cfg volume.Config) (*VolumeManager, error) { } // Create is used to create volume. -func (vm *VolumeManager) Create(ctx context.Context, name, driver string, options, labels map[string]string) error { +func (vm *VolumeManager) Create(ctx context.Context, name, driver string, options, labels map[string]string) (*types.Volume, error) { + if driver == "" { + driver = types.DefaultBackend + } + id := types.VolumeID{ Name: name, Driver: driver, Options: map[string]string{}, Selectors: map[string]string{}, + Labels: map[string]string{}, } if labels != nil { id.Labels = labels - } else { - id.Labels = map[string]string{} - } - - for key, opt := range options { - if strings.HasPrefix(key, "selector.") { - key = strings.TrimPrefix(key, "selector.") - id.Selectors[key] = opt - continue - } - - id.Options[key] = opt } - // set default volume mount path - if mount, ok := id.Options["mount"]; !ok || mount == "" { - id.Options["mount"] = path.Dir(vm.core.VolumeMetaPath) + if options != nil { + id.Options = options } return vm.core.CreateVolume(id) diff --git a/storage/controlserver/client/client.go b/storage/controlserver/client/client.go deleted file mode 100644 index 83633ca980..0000000000 --- a/storage/controlserver/client/client.go +++ /dev/null @@ -1,115 +0,0 @@ -package client - -import ( - "context" - "crypto/tls" - "net/http" - "time" - - "github.com/alibaba/pouch/pkg/serializer" -) - -var ( - // VolumePath is volume path url prefix - VolumePath = "/volume" - - // StoragePath is storage path url prefix - StoragePath = "/storage" -) - -// Client represents a client connect to control server. -type Client struct { - ttl time.Duration - errChan chan error - cli *HTTPClient - tlsc *tls.Config -} - -// New is used to initialize client class object. -func New() *Client { - return &Client{ - ttl: time.Second * 90, - cli: HTTPClientNew(), - errChan: make(chan error, 1), - } -} - -// NewWithTLS is used to initialize client class object with tls. -func NewWithTLS(tlsc *tls.Config) *Client { - c := New() - c.cli.TLSConfig(tlsc) - return c -} - -func (c *Client) context() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.TODO(), c.ttl) -} - -func (c *Client) wait(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c.errChan: - if err != nil { - if code := c.cli.StatusCode(); code >= http.StatusBadRequest { - return newError(code, err.Error()) - } - } - return err - } -} - -// Create is used to create "obj" with url on control server. -func (c *Client) Create(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func(obj serializer.Object) { - c.errChan <- c.cli.POST().URL(url).JSONBody(obj).Do().Into(obj) - }(obj) - - return c.wait(ctx) -} - -// Update is used to update "obj" with url on control server. -func (c *Client) Update(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func(obj serializer.Object) { - c.errChan <- c.cli.PUT().URL(url).JSONBody(obj).Do().Into(obj) - }(obj) - - return c.wait(ctx) -} - -// Get returns "obj" content with url on control server. -func (c *Client) Get(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func(obj serializer.Object) { - c.errChan <- c.cli.GET().URL(url).Do().Into(obj) - }(obj) - - return c.wait(ctx) -} - -// Delete is used to delete "obj" with url on control server. -func (c *Client) Delete(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func() { - c.errChan <- c.cli.DELETE().URL(url).JSONBody(obj).Do().Err() - }() - - return c.wait(ctx) -} - -// ListKeys returns "obj" with url that contains labels or keys. -func (c *Client) ListKeys(url string, obj serializer.Object) error { - ctx, cancel := c.context() - defer cancel() - go func(obj serializer.Object) { - c.errChan <- c.cli.GET().URL(url).Do().Into(obj) - }(obj) - - return c.wait(ctx) -} diff --git a/storage/controlserver/client/errors.go b/storage/controlserver/client/errors.go deleted file mode 100644 index 71d7ab3f09..0000000000 --- a/storage/controlserver/client/errors.go +++ /dev/null @@ -1,28 +0,0 @@ -package client - -import ( - "net/http" -) - -// Error is defined as client error. -type Error struct { - status int - err string -} - -func newError(status int, err string) Error { - return Error{ - status: status, - err: err, - } -} - -// Error returns client Error's error message. -func (e Error) Error() string { - return e.err -} - -// IsNotFound will check the error is "StatusNotFound" or not. -func (e Error) IsNotFound() bool { - return e.status == http.StatusNotFound -} diff --git a/storage/controlserver/client/httpcli.go b/storage/controlserver/client/httpcli.go deleted file mode 100644 index 8a0f3f3523..0000000000 --- a/storage/controlserver/client/httpcli.go +++ /dev/null @@ -1,276 +0,0 @@ -package client - -import ( - "crypto/tls" - "crypto/x509" - "fmt" - "net/http" - "net/url" - "os" - "reflect" - - "github.com/alibaba/pouch/pkg/serializer" - - "github.com/go-resty/resty" -) - -// HTTPClient represents a client connect to http server. -type HTTPClient struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int -} - -var restyClient = resty.New().SetCloseConnection(true) - -func isPtr(i interface{}) bool { - return reflect.ValueOf(i).Kind() == reflect.Ptr -} - -func parseTLSConfig(ca, cert, key []byte) (*tls.Config, error) { - tlsConfig := &tls.Config{ - // Prefer TLS1.2 as the client minimum - MinVersion: tls.VersionTLS12, - InsecureSkipVerify: os.Getenv("DOCKER_TLS_VERIFY") == "", - } - - //parse root ca - cas, err := certpool(ca) - if err != nil { - return nil, err - } - - tlsConfig.RootCAs = cas - - //parse cert and key pem - tlsCert, err := tls.X509KeyPair(cert, key) - if err != nil { - return nil, err - } - - tlsConfig.Certificates = []tls.Certificate{tlsCert} - return tlsConfig, nil -} - -func certpool(pem []byte) (*x509.CertPool, error) { - certPool := x509.NewCertPool() - if !certPool.AppendCertsFromPEM(pem) { - return nil, fmt.Errorf("failed to append certificates from PEM") - } - - return certPool, nil -} - -// HTTPClientNew returns HTTPClient class object. -func HTTPClientNew() *HTTPClient { - return &HTTPClient{ - cli: restyClient, - req: restyClient.R(), - resp: &resty.Response{}, - } -} - -func (c *HTTPClient) verb(method string) *HTTPClient { - c.method = method - return c -} - -// TLSConfig is used to set tls config with specified config. -func (c *HTTPClient) TLSConfig(tlsc *tls.Config) *HTTPClient { - c.cli.SetTLSClientConfig(tlsc) - return c -} - -// TLS is used to set tls config with ca, cert and key. -func (c *HTTPClient) TLS(ca, cert, key []byte) (*HTTPClient, error) { - tlsc, err := parseTLSConfig(ca, cert, key) - if err != nil { - return nil, err - } - c.cli.SetTLSClientConfig(tlsc) - return c, nil -} - -// PUT used to implement http PUT method. -func PUT() *HTTPClient { - c := HTTPClientNew() - return c.verb("PUT") -} - -// GET used to implement HTTPClient GET method. -func GET() *HTTPClient { - c := HTTPClientNew() - return c.verb("GET") -} - -// POST used to implement HTTPClient POST method. -func POST() *HTTPClient { - c := HTTPClientNew() - return c.verb("POST") -} - -// DELETE used to implement HTTPClient DELETE method. -func DELETE() *HTTPClient { - c := HTTPClientNew() - return c.verb("DELETE") -} - -// PUT used to implement HTTPClient PUT method and then returns HTTPClient -func (c *HTTPClient) PUT() *HTTPClient { - c.verb("PUT") - return c -} - -// GET used to implement HTTPClient GET method and then returns HTTPClient -func (c *HTTPClient) GET() *HTTPClient { - c.verb("GET") - return c -} - -// POST used to implement HTTPClient POST method and then returns HTTPClient -func (c *HTTPClient) POST() *HTTPClient { - c.verb("POST") - return c -} - -// DELETE used to implement HTTPClient DELETE method and then returns HTTPClient -func (c *HTTPClient) DELETE() *HTTPClient { - c.verb("DELETE") - return c -} - -// Err returns HTTPClient's error. -func (c *HTTPClient) Err() error { - return c.err -} - -// Method used to implement HTTPClient method that specified by caller, -// and then returns HTTPClient object. -func (c *HTTPClient) Method(method string) *HTTPClient { - c.verb(method) - return c -} - -// URL is used to set url and then returns HTTPClient object. -func (c *HTTPClient) URL(rawurl string) *HTTPClient { - if _, err := url.Parse(rawurl); err != nil { - c.err = err - return c - } - c.urls = rawurl - return c -} - -// SetHeader is used to set header and then returns HTTPClient object. -func (c *HTTPClient) SetHeader(key, value string) *HTTPClient { - c.req.SetHeader(key, value) - return c -} - -// Body is used to set body with map[string]string type, -// and then returns HTTPClient object. -func (c *HTTPClient) Body(obj serializer.Object) *HTTPClient { - if c.err != nil { - return c - } - switch t := obj.(type) { - case map[string]string: - c.req.SetFormData(t) - default: - c.err = fmt.Errorf("post body need a map[string]string, get: %v", t) - } - return c -} - -// JSONBody is used to set body with json format and then returns HTTPClient object. -func (c *HTTPClient) JSONBody(obj serializer.Object) *HTTPClient { - c.req.SetHeader("Content-Type", "application/json") - c.req.SetBody(obj) - return c -} - -// Query is used to set query and then returns HTTPClient object. -func (c *HTTPClient) Query(obj serializer.Object) *HTTPClient { - if c.err != nil { - return c - } - switch t := obj.(type) { - case string: - c.req.SetQueryString(t) - case map[string][]string: - query := url.Values(t) - c.req.SetQueryString(query.Encode()) - case map[string]string: - c.req.SetQueryParams(t) - case url.Values: - c.req.SetQueryString(t.Encode()) - default: - c.err = fmt.Errorf("invalid query %#v", obj) - } - - return c -} - -// Do used to deal http request and then returns HTTPClient object. -func (c *HTTPClient) Do() *HTTPClient { - if c.err != nil { - return c - } - - switch c.method { - case "POST": - c.resp, c.err = c.req.Post(c.urls) - case "GET": - c.resp, c.err = c.req.Get(c.urls) - case "DELETE": - c.resp, c.err = c.req.Delete(c.urls) - case "PUT": - c.resp, c.err = c.req.Put(c.urls) - default: - c.err = fmt.Errorf("unsupport method: %s", c.method) - return c - } - - c.code = c.resp.StatusCode() - if c.code >= http.StatusBadRequest { - c.err = fmt.Errorf("Bad response code: %d, %s", c.code, string(c.resp.Body())) - } - return c -} - -// RespCodeEqual check HTTPClient's code is equal with "code" or not. -func (c *HTTPClient) RespCodeEqual(code int) bool { - return c.code == code -} - -// StatusCode returns HTTPClient code. -func (c *HTTPClient) StatusCode() int { - return c.code -} - -// Into used to set obj into body. -func (c *HTTPClient) Into(obj serializer.Object) error { - if c.err != nil { - return c.err - } - if !isPtr(obj) { - return fmt.Errorf("expect ptr, get: %v", obj) - } - body := c.resp.Body() - if reflect.TypeOf(obj).Elem().Kind() == reflect.String { - reflect.Indirect(reflect.ValueOf(obj)).Set(reflect.Indirect(reflect.ValueOf(string(body)))) - return nil - } - - newObj := reflect.New(reflect.TypeOf(obj).Elem()).Interface() - - if err := serializer.Codec.Decode(body, newObj); err != nil { - return err - } - reflect.Indirect(reflect.ValueOf(obj)).Set(reflect.Indirect(reflect.ValueOf(newObj))) - return nil -} diff --git a/storage/controlserver/client/httpcli_test.go b/storage/controlserver/client/httpcli_test.go deleted file mode 100644 index bedb1b088b..0000000000 --- a/storage/controlserver/client/httpcli_test.go +++ /dev/null @@ -1,848 +0,0 @@ -package client - -import ( - "crypto/tls" - "crypto/x509" - "reflect" - "testing" - - "github.com/alibaba/pouch/pkg/serializer" - "github.com/go-resty/resty" -) - -func Test_isPtr(t *testing.T) { - type args struct { - i interface{} - } - tests := []struct { - name string - args args - want bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := isPtr(tt.args.i); got != tt.want { - t.Errorf("isPtr() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_parseTLSConfig(t *testing.T) { - type args struct { - ca []byte - cert []byte - key []byte - } - tests := []struct { - name string - args args - want *tls.Config - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := parseTLSConfig(tt.args.ca, tt.args.cert, tt.args.key) - if (err != nil) != tt.wantErr { - t.Errorf("parseTLSConfig() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("parseTLSConfig() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_certpool(t *testing.T) { - type args struct { - pem []byte - } - tests := []struct { - name string - args args - want *x509.CertPool - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := certpool(tt.args.pem) - if (err != nil) != tt.wantErr { - t.Errorf("certpool() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("certpool() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClientNew(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := HTTPClientNew(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClientNew() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_verb(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - method string - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.verb(tt.args.method); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.verb() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_TLSConfig(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - tlsc *tls.Config - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.TLSConfig(tt.args.tlsc); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.TLSConfig() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_TLS(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - ca []byte - cert []byte - key []byte - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - got, err := c.TLS(tt.args.ca, tt.args.cert, tt.args.key) - if (err != nil) != tt.wantErr { - t.Errorf("HTTPClient.TLS() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.TLS() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestPUT(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := PUT(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("PUT() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestGET(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := GET(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("GET() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestPOST(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := POST(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("POST() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestDELETE(t *testing.T) { - tests := []struct { - name string - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := DELETE(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("DELETE() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_PUT(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.PUT(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.PUT() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_GET(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.GET(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.GET() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_POST(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.POST(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.POST() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_DELETE(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.DELETE(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.DELETE() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Err(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if err := c.Err(); (err != nil) != tt.wantErr { - t.Errorf("HTTPClient.Err() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestHTTPClient_Method(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - method string - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.Method(tt.args.method); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.Method() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_URL(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - rawurl string - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.URL(tt.args.rawurl); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.URL() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_SetHeader(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - key string - value string - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.SetHeader(tt.args.key, tt.args.value); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.SetHeader() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Body(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - obj serializer.Object - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.Body(tt.args.obj); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.Body() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_JSONBody(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - obj serializer.Object - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.JSONBody(tt.args.obj); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.JSONBody() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Query(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - obj serializer.Object - } - tests := []struct { - name string - fields fields - args args - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.Query(tt.args.obj); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.Query() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Do(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want *HTTPClient - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.Do(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("HTTPClient.Do() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_RespCodeEqual(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - code int - } - tests := []struct { - name string - fields fields - args args - want bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.RespCodeEqual(tt.args.code); got != tt.want { - t.Errorf("HTTPClient.RespCodeEqual() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_StatusCode(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - tests := []struct { - name string - fields fields - want int - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if got := c.StatusCode(); got != tt.want { - t.Errorf("HTTPClient.StatusCode() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestHTTPClient_Into(t *testing.T) { - type fields struct { - cli *resty.Client - req *resty.Request - err error - resp *resty.Response - method string - urls string - code int - } - type args struct { - obj serializer.Object - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &HTTPClient{ - cli: tt.fields.cli, - req: tt.fields.req, - err: tt.fields.err, - resp: tt.fields.resp, - method: tt.fields.method, - urls: tt.fields.urls, - code: tt.fields.code, - } - if err := c.Into(tt.args.obj); (err != nil) != tt.wantErr { - t.Errorf("HTTPClient.Into() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/storage/controlserver/client/url.go b/storage/controlserver/client/url.go deleted file mode 100644 index b8d4fd2df9..0000000000 --- a/storage/controlserver/client/url.go +++ /dev/null @@ -1,19 +0,0 @@ -package client - -import ( - "net/url" - "path" -) - -// JoinURL is used to check "api" and join url. -func JoinURL(api string, s ...string) (string, error) { - url, err := url.Parse(api) - if err != nil { - return "", err - } - p := []string{url.Path} - p = append(p, s...) - url.Path = path.Join(p...) - - return url.String(), nil -} diff --git a/storage/controlserver/client/url_test.go b/storage/controlserver/client/url_test.go deleted file mode 100644 index b53ac1cb1a..0000000000 --- a/storage/controlserver/client/url_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package client - -import "testing" - -func TestJoinURL(t *testing.T) { - type args struct { - api string - s []string - } - tests := []struct { - name string - args args - want string - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := JoinURL(tt.args.api, tt.args.s...) - if (err != nil) != tt.wantErr { - t.Errorf("JoinURL() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("JoinURL() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/storage/volume/README.md b/storage/volume/README.md index 1d1cd5ae87..38d26f1b3a 100644 --- a/storage/volume/README.md +++ b/storage/volume/README.md @@ -24,7 +24,7 @@ It provides interface is as following: ```go type VolumeMgr interface { // Create is used to create volume. - Create(ctx context.Context, name, driver string, options, labels map[string]string) error + Create(ctx context.Context, name, driver string, options, labels map[string]string) (*types.Volume, error) // Remove is used to remove an existing volume. Remove(ctx context.Context, name string) error @@ -95,10 +95,10 @@ type Driver interface { StoreMode(Context) VolumeStoreMode // Create a volume. - Create(Context, *types.Volume, *types.Storage) error + Create(Context, *types.Volume) error // Remove a volume. - Remove(Context, *types.Volume, *types.Storage) error + Remove(Context, *types.Volume) error // Path returns volume's path. Path(Context, *types.Volume) (string, error) @@ -117,16 +117,16 @@ type Opt interface { // AttachDetach represents volume attach/detach interface. type AttachDetach interface { // Attach a Volume to host, enable the volume. - Attach(Context, *types.Volume, *types.Storage) error + Attach(Context, *types.Volume) error // Detach a volume with host, disable the volume. - Detach(Context, *types.Volume, *types.Storage) error + Detach(Context, *types.Volume) error } // Formator represents volume format interface. type Formator interface { // Format a volume. - Format(Context, *types.Volume, *types.Storage) error + Format(Context, *types.Volume) error } ``` diff --git a/storage/volume/config.go b/storage/volume/config.go index 79acc64efe..1f606ae94e 100644 --- a/storage/volume/config.go +++ b/storage/volume/config.go @@ -6,10 +6,9 @@ import ( // Config represents volume config struct. type Config struct { - ControlServerAddress string `json:"control-server-address"` // control server address in csi. - Timeout time.Duration `json:"volume-timeout"` // operation timeout. - RemoveVolume bool `json:"remove-volume"` // remove volume add data or volume's metadata when remove pouch volume. - DefaultBackend string `json:"volume-default-driver"` // default volume backend. - VolumeMetaPath string `json:"volume-meta-dir"` // volume metadata store path. - DriverAlias string `json:"volume-driver-alias"` // driver alias configure. + Timeout time.Duration `json:"volume-timeout"` // operation timeout. + RemoveVolume bool `json:"remove-volume"` // remove volume add data or volume's metadata when remove pouch volume. + DefaultBackend string `json:"volume-default-driver"` // default volume backend. + VolumeMetaPath string `json:"volume-meta-dir"` // volume metadata store path. + DriverAlias string `json:"volume-driver-alias"` // driver alias configure. } diff --git a/storage/volume/core.go b/storage/volume/core.go index 7f8548d878..48ea365bb3 100644 --- a/storage/volume/core.go +++ b/storage/volume/core.go @@ -6,11 +6,9 @@ import ( "strings" metastore "github.com/alibaba/pouch/pkg/meta" - "github.com/alibaba/pouch/storage/controlserver/client" "github.com/alibaba/pouch/storage/volume/driver" volerr "github.com/alibaba/pouch/storage/volume/error" "github.com/alibaba/pouch/storage/volume/types" - "github.com/alibaba/pouch/storage/volume/types/meta" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -19,20 +17,12 @@ import ( // Core represents volume core struct. type Core struct { Config - BaseURL string - EnableControl bool - store *metastore.Store + store *metastore.Store } // NewCore returns Core struct instance with volume config. func NewCore(cfg Config) (*Core, error) { c := &Core{Config: cfg} - if cfg.ControlServerAddress != "" { - c.EnableControl = true - c.BaseURL = cfg.ControlServerAddress - } else { - c.EnableControl = false - } if cfg.DriverAlias != "" { parts := strings.Split(cfg.DriverAlias, ";") @@ -70,12 +60,6 @@ func NewCore(cfg Config) (*Core, error) { // GetVolume return a volume's info with specified name, If not errors. func (c *Core) GetVolume(id types.VolumeID) (*types.Volume, error) { - v := &types.Volume{ - ObjectMeta: meta.ObjectMeta{ - Name: id.Name, - }, - } - ctx := driver.Contexts() // first, try to get volume from local store. @@ -109,16 +93,7 @@ func (c *Core) GetVolume(id types.VolumeID) (*types.Volume, error) { return nil, err } - // then, try to get volume from central store. - if c.EnableControl { - if url, err := c.volumeURL(id); err == nil { - if err := client.New().Get(url, v); err == nil { - return v, nil - } - } - } - - // at last, scan all drivers + // scan all drivers logrus.Debugf("probing all drivers for volume with name(%s)", id.Name) drivers, err := driver.GetAll() if err != nil { @@ -130,6 +105,7 @@ func (c *Core) GetVolume(id types.VolumeID) (*types.Volume, error) { if !ok { continue } + v, err := d.Get(ctx, id.Name) if err != nil { // not found, ignore it @@ -160,92 +136,30 @@ func (c *Core) ExistVolume(id types.VolumeID) (bool, error) { } // CreateVolume use to create a volume, if failed, will return error info. -func (c *Core) CreateVolume(id types.VolumeID) error { +func (c *Core) CreateVolume(id types.VolumeID) (*types.Volume, error) { exist, err := c.ExistVolume(id) if err != nil { - return err - } - if exist { - return volerr.ErrVolumeExisted + return nil, err + } else if exist { + return nil, volerr.ErrVolumeExisted } - v, err := types.NewVolume(id) + dv, err := driver.Get(id.Driver) if err != nil { - return errors.Wrapf(err, "Create volume") + return nil, err } - dv, err := driver.Get(v.Spec.Backend) + volume, err := dv.Create(driver.Contexts(), id) if err != nil { - return err - } - - // check options, then add some driver-specific options. - if err := checkOptions(v); err != nil { - return err - } - - // Create volume's meta. - ctx := driver.Contexts() - var s *types.Storage - - if dv.StoreMode(ctx).CentralCreateDelete() { - url, err := c.volumeURL() - if err != nil { - return err - } - - // before creating, we can't get the path - p, err := c.volumePath(v, dv) - if err != nil { - return err - } - v.SetPath(p) - - if err := client.New().Create(url, v); err != nil { - return errors.Wrap(err, "Create volume") - } - - // get the storage - s, err = c.getStorage(v.StorageID()) - if err != nil { - return err - } - } else { - if err := dv.Create(ctx, v, nil); err != nil { - return err - } - - // after creating volume, we can get the path - p, err := c.volumePath(v, dv) - if err != nil { - return err - } - v.SetPath(p) - - if err := c.store.Put(v); err != nil { - return err - } + return nil, err } - // format the volume - if f, ok := dv.(driver.Formator); ok { - err := f.Format(ctx, v, s) - if err == nil { - return nil - } - - logrus.Errorf("failed to format new volume: %s, err: %v", v.Name, err) - logrus.Warnf("rollback create volume, start to cleanup new volume: %s", v.Name) - if err := c.RemoveVolume(id); err != nil { - logrus.Errorf("failed to rollback create volume, cleanup new volume: %s, err: %v", v.Name, err) - return err - } - - // return format error. - return err + // create the meta + if err := c.store.Put(volume); err != nil { + return nil, err } - return nil + return volume, nil } // ListVolumes return all volumes. @@ -253,27 +167,13 @@ func (c *Core) CreateVolume(id types.VolumeID) error { func (c *Core) ListVolumes(labels map[string]string) ([]*types.Volume, error) { var retVolumes = make([]*types.Volume, 0) - // first, list local meta store. + // list local meta store. metaList, err := c.store.List() if err != nil { return nil, err } - // then, list central store. - if c.EnableControl { - url, err := c.listVolumeURL(labels) - if err != nil { - return nil, errors.Wrap(err, "List volume's name") - } - - logrus.Debugf("List volume URL: %s, labels: %s", url, labels) - - if err := client.New().ListKeys(url, &retVolumes); err != nil { - return nil, errors.Wrap(err, "List volume's name") - } - } - - // at last, scan all drivers. + // scan all drivers. logrus.Debugf("probing all drivers for listing volume") drivers, err := driver.GetAll() if err != nil { @@ -335,7 +235,12 @@ func (c *Core) ListVolumes(labels map[string]string) ([]*types.Volume, error) { } for _, v := range realVolumes { + // found new volumes, store the meta + logrus.Warningf("found new volume %s", v.Name) + c.store.Put(v) + retVolumes = append(retVolumes, v) + } return retVolumes, nil @@ -346,86 +251,13 @@ func (c *Core) ListVolumes(labels map[string]string) ([]*types.Volume, error) { func (c *Core) ListVolumeName(labels map[string]string) ([]string, error) { var names []string - // first, list local meta store. - metaList, err := c.store.List() + volumes, err := c.ListVolumes(labels) if err != nil { - return nil, err - } - - // then, list central store. - if c.EnableControl { - url, err := c.listVolumeNameURL(labels) - if err != nil { - return nil, errors.Wrap(err, "List volume's name") - } - - logrus.Debugf("List volume name URL: %s, labels: %s", url, labels) - - if err := client.New().ListKeys(url, &names); err != nil { - return nil, errors.Wrap(err, "List volume's name") - } + return names, err } - // at last, scan all drivers - logrus.Debugf("probing all drivers for listing volume") - drivers, err := driver.GetAll() - if err != nil { - return nil, err - } - - ctx := driver.Contexts() - - var realVolumes = map[string]*types.Volume{} - var volumeDrivers = map[string]driver.Driver{} - - for _, dv := range drivers { - volumeDrivers[dv.Name(ctx)] = dv - - d, ok := dv.(driver.Lister) - if !ok { - continue - } - vList, err := d.List(ctx) - if err != nil { - logrus.Warnf("volume driver %s list error: %v", dv.Name(ctx), err) - continue - } - - for _, v := range vList { - realVolumes[v.Name] = v - } - } - - for name, obj := range metaList { - v, ok := obj.(*types.Volume) - if !ok { - continue - } - - d, ok := volumeDrivers[v.Spec.Backend] - if !ok { - // driver not exist, ignore it - continue - } - - if d.StoreMode(ctx).IsLocal() { - names = append(names, name) - continue - } - - _, ok = realVolumes[name] - if !ok { - // real volume not exist, ignore it - continue - } - - delete(realVolumes, name) - - names = append(names, name) - } - - for name := range realVolumes { - names = append(names, name) + for _, v := range volumes { + names = append(names, v.Name) } return names, nil @@ -438,25 +270,14 @@ func (c *Core) RemoveVolume(id types.VolumeID) error { return errors.Wrap(err, "Remove volume: "+id.String()) } - // Call interface to remove meta info. - if dv.StoreMode(driver.Contexts()).CentralCreateDelete() { - url, err := c.volumeURL(id) - if err != nil { - return errors.Wrap(err, "Remove volume: "+id.String()) - } - if err := client.New().Delete(url, v); err != nil { - return errors.Wrap(err, "Remove volume: "+id.String()) - } - } else { - // Call driver's Remove method to remove the volume. - if err := dv.Remove(driver.Contexts(), v, nil); err != nil { - return err - } + // Call driver's Remove method to remove the volume. + if err := dv.Remove(driver.Contexts(), v); err != nil { + return err + } - // delete the meta - if err := c.store.Remove(id.Name); err != nil { - return err - } + // delete the meta + if err := c.store.Remove(id.Name); err != nil { + return err } return nil @@ -500,35 +321,14 @@ func (c *Core) AttachVolume(id types.VolumeID, extra map[string]string) (*types. } if d, ok := dv.(driver.AttachDetach); ok { - var ( - s *types.Storage - err error - ) - - if dv.StoreMode(ctx).CentralCreateDelete() { - if s, err = c.getStorage(v.StorageID()); err != nil { - return nil, err - } - } - - if err = d.Attach(ctx, v, s); err != nil { + if err := d.Attach(ctx, v); err != nil { return nil, err } } - // Call interface to update meta info. - if dv.StoreMode(driver.Contexts()).UseLocalMeta() { - if err := c.store.Put(v); err != nil { - return nil, err - } - } else { - url, err := c.volumeURL(id) - if err != nil { - return nil, errors.Wrap(err, "Update volume: "+id.String()) - } - if err := client.New().Update(url, v); err != nil { - return nil, errors.Wrap(err, "Update volume: "+id.String()) - } + // update meta info. + if err := c.store.Put(v); err != nil { + return nil, err } return v, nil @@ -551,35 +351,14 @@ func (c *Core) DetachVolume(id types.VolumeID, extra map[string]string) (*types. // if volume has referance, skip to detach volume. ref := v.Option(types.OptionRef) if d, ok := dv.(driver.AttachDetach); ok && ref == "" { - var ( - s *types.Storage - err error - ) - - if dv.StoreMode(ctx).CentralCreateDelete() { - if s, err = c.getStorage(v.StorageID()); err != nil { - return nil, err - } - } - - if err = d.Detach(ctx, v, s); err != nil { + if err := d.Detach(ctx, v); err != nil { return nil, err } } - // Call interface to update meta info. - if dv.StoreMode(driver.Contexts()).UseLocalMeta() { - if err := c.store.Put(v); err != nil { - return nil, err - } - } else { - url, err := c.volumeURL(id) - if err != nil { - return nil, errors.Wrap(err, "Update volume: "+id.String()) - } - if err := client.New().Update(url, v); err != nil { - return nil, errors.Wrap(err, "Update volume: "+id.String()) - } + // update meta info. + if err := c.store.Put(v); err != nil { + return nil, err } return v, nil diff --git a/storage/volume/core_util.go b/storage/volume/core_util.go index 3187601349..acb5a91eff 100644 --- a/storage/volume/core_util.go +++ b/storage/volume/core_util.go @@ -3,13 +3,9 @@ package volume import ( "fmt" "path" - "strings" - "github.com/alibaba/pouch/storage/controlserver/client" "github.com/alibaba/pouch/storage/volume/driver" - volerr "github.com/alibaba/pouch/storage/volume/error" "github.com/alibaba/pouch/storage/volume/types" - "github.com/alibaba/pouch/storage/volume/types/meta" "github.com/pkg/errors" ) @@ -26,83 +22,6 @@ func (c *Core) volumePath(v *types.Volume, dv driver.Driver) (string, error) { return p, nil } -func (c *Core) getStorage(id types.StorageID) (*types.Storage, error) { - if !c.EnableControl { - return nil, fmt.Errorf("disable control server") - } - - s := &types.Storage{ - ObjectMeta: meta.ObjectMeta{ - UID: id.UID, - }, - } - - url, err := c.storageURL(id) - if err != nil { - return nil, err - } - if err := client.New().Get(url, s); err != nil { - return nil, err - } - return s, nil -} - -func (c *Core) storageURL(id ...types.StorageID) (string, error) { - if c.BaseURL == "" { - return "", volerr.ErrDisableControl - } - if len(id) == 0 { - return client.JoinURL(c.BaseURL, types.APIVersion, client.StoragePath) - } - return client.JoinURL(c.BaseURL, types.APIVersion, client.StoragePath, id[0].UID) -} - -func (c *Core) volumeURL(id ...types.VolumeID) (string, error) { - if c.BaseURL == "" { - return "", volerr.ErrDisableControl - } - if len(id) == 0 { - return client.JoinURL(c.BaseURL, types.APIVersion, client.VolumePath) - } - return client.JoinURL(c.BaseURL, types.APIVersion, client.VolumePath, id[0].Name) -} - -func (c *Core) listVolumeURL(labels map[string]string) (string, error) { - if c.BaseURL == "" { - return "", volerr.ErrDisableControl - } - url, err := client.JoinURL(c.BaseURL, types.APIVersion, client.VolumePath) - if err != nil { - return "", err - } - - querys := make([]string, 0, len(labels)) - for k, v := range labels { - querys = append(querys, fmt.Sprintf("labels=%s=%s", k, v)) - } - - url = url + "?" + strings.Join(querys, "&") - return url, nil -} - -func (c *Core) listVolumeNameURL(labels map[string]string) (string, error) { - if c.BaseURL == "" { - return "", volerr.ErrDisableControl - } - url, err := client.JoinURL(c.BaseURL, types.APIVersion, "/listkeys", client.VolumePath) - if err != nil { - return "", err - } - - querys := make([]string, 0, len(labels)) - for k, v := range labels { - querys = append(querys, fmt.Sprintf("labels=%s=%s", k, v)) - } - - url = url + "?" + strings.Join(querys, "&") - return url, nil -} - func checkVolume(v *types.Volume) error { if v.Spec.ClusterID == "" || v.Status.Phase == types.VolumePhaseFailed { err := fmt.Errorf("volume is created failed: %s", v.Name) diff --git a/storage/volume/driver/driver_interface.go b/storage/volume/driver/driver_interface.go index e6b95346fa..b7fbb75b4c 100644 --- a/storage/volume/driver/driver_interface.go +++ b/storage/volume/driver/driver_interface.go @@ -13,10 +13,10 @@ type Driver interface { StoreMode(Context) VolumeStoreMode // Create a volume. - Create(Context, *types.Volume, *types.Storage) error + Create(Context, types.VolumeID) (*types.Volume, error) // Remove a volume. - Remove(Context, *types.Volume, *types.Storage) error + Remove(Context, *types.Volume) error // Path returns volume's path. Path(Context, *types.Volume) (string, error) @@ -31,16 +31,16 @@ type Opt interface { // AttachDetach represents volume attach/detach interface. type AttachDetach interface { // Attach a Volume to host, enable the volume. - Attach(Context, *types.Volume, *types.Storage) error + Attach(Context, *types.Volume) error // Detach a volume with host, disable the volume. - Detach(Context, *types.Volume, *types.Storage) error + Detach(Context, *types.Volume) error } // Formator represents volume format interface. type Formator interface { // Format a volume. - Format(Context, *types.Volume, *types.Storage) error + Format(Context, *types.Volume) error } // Getter represents volume get interface. @@ -54,9 +54,3 @@ type Lister interface { // List a volume from driver List(Context) ([]*types.Volume, error) } - -// GatewayDriver represents storage gateway interface. -type GatewayDriver interface { - // Report storage cluster status. - Report(Context) ([]*types.Storage, error) -} diff --git a/storage/volume/driver/remote.go b/storage/volume/driver/remote.go index 4834d67b5d..8a7b9480e7 100644 --- a/storage/volume/driver/remote.go +++ b/storage/volume/driver/remote.go @@ -33,18 +33,25 @@ func (r *remoteDriverWrapper) StoreMode(ctx Context) VolumeStoreMode { } // Create a remote volume. -func (r *remoteDriverWrapper) Create(ctx Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("driver wrapper [%s] creates volume: %s", r.Name(ctx), v.Name) +func (r *remoteDriverWrapper) Create(ctx Context, id types.VolumeID) (*types.Volume, error) { + ctx.Log.Debugf("driver wrapper [%s] creates volume: %s", r.Name(ctx), id.Name) - options := types.ExtractOptionsFromVolume(v) + ctx.Log.Debugf("driver wrapper gets options: %v", id.Options) - ctx.Log.Debugf("driver wrapper gets options: %v", options) + if err := r.proxy.Create(id.Name, id.Options); err != nil { + return nil, err + } - return r.proxy.Create(v.Name, options) + mountPath, err := r.proxy.Path(id.Name) + if err != nil { + mountPath = "" + } + + return types.NewVolumeFromVolumeID(mountPath, "", id), nil } // Remove a remote volume. -func (r *remoteDriverWrapper) Remove(ctx Context, v *types.Volume, s *types.Storage) error { +func (r *remoteDriverWrapper) Remove(ctx Context, v *types.Volume) error { ctx.Log.Debugf("driver wrapper [%s] removes volume: %s", r.Name(ctx), v.Name) return r.proxy.Remove(v.Name) @@ -61,15 +68,7 @@ func (r *remoteDriverWrapper) Get(ctx Context, name string) (*types.Volume, erro id := types.NewVolumeID(name, r.Name(ctx)) - volume, err := types.NewVolume(id) - if err != nil { - return nil, err - } - - // set the mountpoint - volume.Status.MountPoint = rv.Mountpoint - - return volume, nil + return types.NewVolumeFromVolumeID(rv.Mountpoint, "", id), nil } // List all volumes from remote driver. @@ -85,15 +84,7 @@ func (r *remoteDriverWrapper) List(ctx Context) ([]*types.Volume, error) { for _, rv := range rvList { id := types.NewVolumeID(rv.Name, r.Name(ctx)) - - volume, err := types.NewVolume(id) - if err != nil { - continue - } - - // set the mountpoint - volume.Status.MountPoint = rv.Mountpoint - + volume := types.NewVolumeFromVolumeID(rv.Mountpoint, "", id) vList = append(vList, volume) } @@ -119,7 +110,7 @@ func (r *remoteDriverWrapper) Options() map[string]types.Option { } // Attach a remote volume. -func (r *remoteDriverWrapper) Attach(ctx Context, v *types.Volume, s *types.Storage) error { +func (r *remoteDriverWrapper) Attach(ctx Context, v *types.Volume) error { ctx.Log.Debugf("driver wrapper [%s] attach volume: %s", r.Name(ctx), v.Name) _, err := r.proxy.Mount(v.Name, v.UID) @@ -131,7 +122,7 @@ func (r *remoteDriverWrapper) Attach(ctx Context, v *types.Volume, s *types.Stor } // Detach a remote volume. -func (r *remoteDriverWrapper) Detach(ctx Context, v *types.Volume, s *types.Storage) error { +func (r *remoteDriverWrapper) Detach(ctx Context, v *types.Volume) error { ctx.Log.Debugf("driver wrapper [%s] detach volume: %s", r.Name(ctx), v.Name) return r.proxy.Unmount(v.Name, v.UID) diff --git a/storage/volume/examples/demo/demo.go b/storage/volume/examples/demo/demo.go index 2fe3bf29f7..331f7c185d 100644 --- a/storage/volume/examples/demo/demo.go +++ b/storage/volume/examples/demo/demo.go @@ -28,13 +28,13 @@ func (d *Demo) StoreMode(ctx driver.Context) driver.VolumeStoreMode { } // Create a demo volume. -func (d *Demo) Create(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (d *Demo) Create(ctx driver.Context, v *types.Volume) error { ctx.Log.Infof("Demo create volume: %s", v.Name) return nil } // Remove a demo volume. -func (d *Demo) Remove(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (d *Demo) Remove(ctx driver.Context, v *types.Volume) error { ctx.Log.Infof("Demo Remove volume: %s", v.Name) return nil } diff --git a/storage/volume/modules/ceph/ceph.go b/storage/volume/modules/ceph/ceph.go deleted file mode 100644 index 6c107f0531..0000000000 --- a/storage/volume/modules/ceph/ceph.go +++ /dev/null @@ -1,305 +0,0 @@ -// +build linux - -package ceph - -import ( - "encoding/base64" - "fmt" - "os" - "path" - "strings" - "syscall" - "time" - - "github.com/alibaba/pouch/pkg/exec" - "github.com/alibaba/pouch/pkg/utils" - "github.com/alibaba/pouch/storage/volume/driver" - "github.com/alibaba/pouch/storage/volume/types" - "github.com/alibaba/pouch/storage/volume/types/meta" - - "github.com/go-ini/ini" -) - -func init() { - if err := driver.Register(&Ceph{}); err != nil { - panic(err) - } -} - -// Ceph represents ceph volume driver struct. -type Ceph struct{} - -// Name returns volume driver's name. -func (c *Ceph) Name(ctx driver.Context) string { - return "ceph" -} - -// StoreMode returns volume driver's store mode. -func (c *Ceph) StoreMode(ctx driver.Context) driver.VolumeStoreMode { - return driver.RemoteStore | driver.CreateDeleteInCentral -} - -// Create a ceph volume. -func (c *Ceph) Create(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph create volume: %s", v.Name) - - poolName := getPool(v) - objSize := getObjectSize(v) - feature := getImageFeature(v) - name := v.Name - conf, ok := ctx.GetString("conf") - if !ok { - return fmt.Errorf("get config file fail") - } - - var cmdList []string - base := v.Option("base") - if base != "" { - cmdList = []string{"clone", fmt.Sprintf("%s/%s@snap", poolName, base), - fmt.Sprintf("%s/%s", poolName, name)} - } else { - cmdList = []string{"create", name, "--pool", poolName, "--size", v.Size(), - "--object-size", objSize, "--image-feature", feature} - } - - ctx.Log.Infof("Ceph create volume, command: %v", cmdList) - - cmd := NewCephCommand("rbd", conf) - err := cmd.RunCommand(nil, cmdList...) - if err != nil { - ctx.Log.Errorf("Ceph rbd create failed, err: %v", err) - return err - } - - return nil -} - -// Remove a ceph volume. -func (c *Ceph) Remove(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph Remove volume: %s", v.Name) - - poolName := getPool(v) - name := v.Name - conf, ok := ctx.GetString("conf") - if !ok { - return fmt.Errorf("get config file fail") - } - - cmdList := []string{"snap", "purge", mkpool(poolName, name)} - - ctx.Log.Infof("Ceph remove volume, command: %v", cmdList) - - cmd := NewCephCommand("rbd", conf) - err := cmd.RunCommand(nil, cmdList...) - if err != nil { - return err - } - - cmdList = []string{"rm", mkpool(poolName, name)} - err = cmd.RunCommand(nil, cmdList...) - if err != nil { - return err - } - - return nil -} - -// Path returns ceph volume mount path. -func (c *Ceph) Path(ctx driver.Context, v *types.Volume) (string, error) { - ctx.Log.Debugf("Ceph volume mount path: %s", v.Name) - return path.Join("/mnt", getPool(v), v.Name), nil -} - -// Options returns ceph volume options. -func (c *Ceph) Options() map[string]types.Option { - return map[string]types.Option{ - "conf": {Value: "/etc/ceph/ceph.conf", Desc: "set ceph config file"}, - "key": {Value: "", Desc: "set ceph keyring file"}, - "pool": {Value: "rbd", Desc: "set rbd image pool"}, - "image-feature": {Value: "", Desc: "set rbd image feature"}, - "object-size": {Value: "4M", Desc: "set rbd image object size"}, - "base": {Value: "", Desc: "create image by base image"}, - } -} - -// Format is used to format ceph volume. -func (c *Ceph) Format(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph format volume: %s", v.Name) - - device, err := rbdMap(ctx, v, s.Spec.Address, s.Spec.Keyring) - if err != nil { - ctx.Log.Errorf("Ceph map volume: %s failed: %v", v.Name, err) - return err - } - - fs := v.FileSystem() - ctx.Log.Infof("Ceph make file system, volume: %s, device: %s, fs: %v", v.Name, device, fs) - - if err := utils.MakeFSVolume(fs, device, defaultFormatTimeout); err != nil { - ctx.Log.Errorf("Ceph mkfs error: %v", err) - if err := rbdUnmap(ctx, v); err != nil { - ctx.Log.Errorf("Ceph error while trying to unmap after failed filesystem creation: %v", err) - } - return err - } - - ctx.Log.Infof("Ceph start to unmap %s", v.Name) - if err := rbdUnmap(ctx, v); err != nil { - ctx.Log.Errorf("Ceph unmap v :%s err: %v", v.Name, err) - return err - } - - return nil -} - -// Attach is used to attach ceph volume. -func (c *Ceph) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph attach volume: %s", v.Name) - var err error - volumePath := v.Path() - - // Map rbd device - devName, err := rbdMap(ctx, v, s.Spec.Address, s.Spec.Keyring) - if err != nil { - ctx.Log.Errorf("Ceph volume: %s map error: %v", v.Name, err) - return err - } - - // Create directory to mount - err = os.MkdirAll(volumePath, 0755) - if err != nil && !os.IsExist(err) { - return fmt.Errorf("error creating %q directory: %v", volumePath, err) - } - - // Mount the RBD - mountOpt := v.MountOption() - ctx.Log.Infof("Ceph mount volume: %s, device: %s, mountpath: %s, opt: %v", - v.Name, devName, volumePath, mountOpt) - - err = utils.MountVolume(mountOpt, devName, volumePath, time.Second*60) - if err != nil { - ctx.Log.Errorf("Ceph volume: %s mount failed, err: %v", v.Name, err) - return fmt.Errorf("failed to mount rbd device %q: %v", devName, err) - } - - return nil -} - -// Detach is used to detach ceph volume. -func (c *Ceph) Detach(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Ceph detach volume: %s", v.Name) - mountPath := v.Path() - - err := exec.Retry(3, 3*time.Second, func() error { - if err := syscall.Unmount(mountPath, 0); err != nil { - ctx.Log.Errorf("Ceph volume: %s unmount %q failed, err: %v", v.Name, mountPath, err) - return err - } - return nil - }) - if err != nil { - return err - } - - // Remove the mounted directory - // FIXME remove all, but only after the FIXME above. - err = exec.Retry(3, 3*time.Second, func() error { - if err := os.Remove(mountPath); err != nil && !os.IsNotExist(err) { - ctx.Log.Errorf("Ceph removing %q directory failed, err: %v", mountPath, err) - return err - } - return nil - }) - if err != nil { - return fmt.Errorf("failed to remove mount path: %s after 3 retries", mountPath) - } - - // Unmap rbd device - if err := rbdUnmap(ctx, v); err != os.ErrNotExist { - return err - } - - return nil -} - -// Report is used to collect ceph storage information and report to control server. -func (c *Ceph) Report(ctx driver.Context) ([]*types.Storage, error) { - var err error - var storages []*types.Storage - conf, ok := ctx.GetString("conf") - if !ok { - return nil, fmt.Errorf("get config file fail") - } - keyFile, ok := ctx.GetString("keyring") - if !ok { - return nil, fmt.Errorf("get keyring file fail") - } - - s := &types.Storage{ - ObjectMeta: meta.ObjectMeta{ - Labels: map[string]string{}, - Annotations: map[string]string{}, - }, - Spec: &types.StorageSpec{}, - Status: &types.StorageStatus{}, - } - - // analyze keyfile - cfg, err := ini.Load(keyFile) - if err != nil { - ctx.Log.Errorf("Ceph load keyring error : %v", err) - return nil, err - } - key, err := cfg.Section("client.admin").GetKey("key") - if err != nil { - ctx.Log.Errorf("Ceph get client.admin section error: %v", err) - return nil, err - } - keyring := fmt.Sprintf("%s:%s", "admin", key.Value()) - encoded := base64.StdEncoding.EncodeToString([]byte(keyring)) - s.Spec.Keyring = encoded - - // get ceph quorum status - cmd := NewCephCommand("ceph", conf) - status := &QuorumStatus{} - if err = cmd.RunCommand(status, QuorumCommand...); err != nil { - ctx.Log.Errorf("Ceph run quorum error: %v", err) - return nil, err - } - monAddrs := []string{} - for _, mon := range status.MonMap.Mons { - monAddrs = append(monAddrs, strings.Split(mon.Addr, "/")[0]) - } - s.Spec.Address = strings.Join(monAddrs, ",") - - ctx.Log.Debugf("Ceph report storage: %s", s.Spec.Address) - - // get ceph status - stats := &Stats{} - if err = cmd.RunCommand(stats, CephStatusCommand...); err != nil { - ctx.Log.Errorf("Ceph run cephstatus command error: %v", err) - return nil, err - } - s.Status.Schedulable = stats.Health.OverallStatus != HealthErr - s.Status.HealthyStatus = stats.Health.OverallStatus - - // get ceph usage - usage := &PoolStats{} - if err = cmd.RunCommand(usage, CephUsageCommand...); err != nil { - return nil, err - } - poolSpec := make(map[string]types.PoolSpec) - for _, p := range usage.Pools { - poolSpec[p.Name] = types.PoolSpec{ - Capacity: p.Stats.BytesUsed + p.Stats.MaxAvail, - Available: p.Stats.MaxAvail, - } - } - s.Spec.PoolSpec = poolSpec - - s.UID = stats.ID - - storages = append(storages, s) - - return storages, nil -} diff --git a/storage/volume/modules/ceph/map.go b/storage/volume/modules/ceph/map.go deleted file mode 100644 index d4499aca52..0000000000 --- a/storage/volume/modules/ceph/map.go +++ /dev/null @@ -1,165 +0,0 @@ -// +build linux - -package ceph - -import ( - "encoding/json" - "fmt" - "os" - "strings" - "time" - - "github.com/alibaba/pouch/pkg/exec" - "github.com/alibaba/pouch/storage/volume/driver" - "github.com/alibaba/pouch/storage/volume/types" -) - -var ( - bin = "/usr/bin/rbd-nbd" - poolName = "rbd" - objectSize = "4M" - imageFeature = "layering,exclusive-lock" -) - -func getImageFeature(v *types.Volume) string { - if s, ok := v.Spec.Extra["image-feature"]; ok { - return s - } - return imageFeature -} - -func getObjectSize(v *types.Volume) string { - if s, ok := v.Spec.Extra["object-size"]; ok { - return s - } - return objectSize -} - -func getPool(v *types.Volume) string { - if p, ok := v.Spec.Extra["pool"]; ok { - return p - } - return poolName -} - -func mkpool(poolName, volumeName string) string { - return fmt.Sprintf("%s/%s", poolName, volumeName) -} - -func rbdMap(ctx driver.Context, v *types.Volume, address, keyring string) (string, error) { - poolName := getPool(v) - name := v.Name - - args := []string{"map", mkpool(poolName, name), "--mon_host", address} - exit, _, stderr, err := exec.RunWithRetry(3, 100*time.Microsecond, defaultTimeout, bin, args...) - if err != nil || exit != 0 { - err = fmt.Errorf("could not map %q: %d (%v) (%v)", name, exit, err, stderr) - ctx.Log.Errorf("Ceph %s", err.Error()) - return "", err - } - - var device string - rbdmap, err := showMapped(ctx) - if err != nil { - return "", err - } - - for _, rbd := range rbdmap.MapDevice { - if rbd.Image == name && rbd.Pool == poolName { - device = rbd.Device - break - } - } - if device == "" { - return "", fmt.Errorf("volume %s in pool %s not found in RBD showmapped output", name, poolName) - } - - ctx.Log.Infof("Ceph mapped volume %q as %q", name, device) - - return device, nil -} - -func rbdUnmap(ctx driver.Context, v *types.Volume) error { - err := exec.Retry(10, 100*time.Millisecond, func() error { - rbdmap, err := showMapped(ctx) - if err != nil { - return err - } - - err = doUnmap(ctx, v, rbdmap) - if err != nil { - return err - } - return nil - }) - if err != nil { - return fmt.Errorf("could not unmap volume %q after 10 retries", v.Name) - } - - return nil -} - -func showMapped(ctx driver.Context) (RBDMap, error) { - rbdmap := RBDMap{} - - exit, stdout, stderr, err := exec.RunWithRetry(3, 100*time.Millisecond, defaultTimeout, - bin, "--dump-json", "list-mapped") - if err != nil || exit != 0 { - ctx.Log.Warnf("Ceph could not show mapped. Retrying: err:%v exit: %v, stderr: %v, stdout:%v", - err, exit, stderr, stdout) - } else if stdout != "" { - err = json.Unmarshal([]byte(stdout), &rbdmap) - if err != nil { - ctx.Log.Errorf("Ceph could not parse rbd list-mapped output: %s", stdout) - } - } - - return rbdmap, err -} - -func doUnmap(ctx driver.Context, v *types.Volume, rbdmap RBDMap) error { - poolName := getPool(v) - name := v.Name - - for _, rbd := range rbdmap.MapDevice { - if rbd.Image != name || rbd.Pool != poolName { - continue - } - ctx.Log.Debugf("Ceph unmapping volume %s/%s at device %q", poolName, name, strings.TrimSpace(rbd.Device)) - - // Check device is exist or not. - if _, err := os.Stat(rbd.Device); err != nil { - ctx.Log.Errorf("Ceph trying to unmap device %q for %s/%s that does not exist, continuing", - poolName, name, rbd.Device) - continue - } - - // Unmap device. - exit, _, stderr, err := exec.Run(defaultTimeout, bin, "unmap", rbd.Device) - if err != nil { - ctx.Log.Errorf("Ceph could not unmap volume %q (device %q): %d (%v) (%s)", - name, rbd.Device, exit, err, stderr) - return err - } - if exit != 0 { - err = fmt.Errorf("Ceph could not unmap volume %q (device %q): %d (%s)", - name, rbd.Device, exit, stderr) - ctx.Log.Error(err) - return err - } - - // Check have unmapped or not. - rbdmap2, err := showMapped(ctx) - if err != nil { - return err - } - for _, rbd2 := range rbdmap2.MapDevice { - if rbd.Image == rbd2.Image && rbd.Pool == rbd2.Pool { - return fmt.Errorf("could not unmap volume %q, device: %q is exist", - name, rbd.Image) - } - } - break - } - return nil -} diff --git a/storage/volume/modules/ceph/types.go b/storage/volume/modules/ceph/types.go deleted file mode 100644 index 9f895f5b8c..0000000000 --- a/storage/volume/modules/ceph/types.go +++ /dev/null @@ -1,127 +0,0 @@ -// +build linux - -package ceph - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/alibaba/pouch/pkg/exec" -) - -const ( - // HealthOK denotes the status of ceph cluster when healthy. - HealthOK = "HEALTH_OK" - - // HealthWarn denotes the status of ceph cluster when unhealthy but recovering. - HealthWarn = "HEALTH_WARN" - - // HealthErr denotes the status of ceph cluster when unhealthy but usually needs - // manual intervention. - HealthErr = "HEALTH_ERR" -) - -var ( - // QuorumCommand represents ceph quorum_status command. - QuorumCommand = []string{"quorum_status", "-f", "json"} - - // CephUsageCommand represents ceph df command. - CephUsageCommand = []string{"df", "detail", "-f", "json"} - - // CephStatusCommand represents ceph status command. - CephStatusCommand = []string{"status", "-f", "json"} - - // OsdDFCommand represents ceph osd df command. - OsdDFCommand = []string{"osd", "df", "-f", "json"} - - // OsdPerfCommand represents ceph osd perf command. - OsdPerfCommand = []string{"osd", "perf", "-f", "json"} - - // OsdDumpCommand represents ceph osd dump command. - OsdDumpCommand = []string{"osd", "dump", "-f", "json"} -) - -var ( - defaultTimeout = time.Second * 10 - defaultFormatTimeout = time.Second * 120 -) - -type mon struct { - Addr string `json:"addr"` - Name string `json:"name"` - Rank int `json:"rank"` -} - -type monMap struct { - Created string `json:"created"` - Epoch int `json:"epoch"` - FsID string `json:"fsid"` - Modified string `json:"modified"` - Mons []mon `json:"mons"` -} - -// QuorumStatus represents ceph quorum_status result struct. -type QuorumStatus struct { - MonMap monMap `json:"monmap"` - QuorumLeaderName string `json:"quorum_leader_name"` -} - -// Stats represents ceph status result struct. -type Stats struct { - ID string `json:"fsid"` - Health struct { - OverallStatus string `json:"overall_status"` - } `json:"health"` -} - -// PoolStats represents ceph pool status struct. -type PoolStats struct { - Pools []struct { - Name string `json:"name"` - Stats struct { - BytesUsed int64 `json:"bytes_used"` - MaxAvail int64 `json:"max_avail"` - } `json:"stats"` - } `json:"pools"` -} - -// RBDMap represents "rbd listmapped" command result struct. -type RBDMap struct { - MapDevice []struct { - ID int `json:"id"` - Device string `json:"name"` - Image string `json:"image"` - Pool string `json:"pool"` - } `json:"Devices"` -} - -// Command represents ceph command. -type Command struct { - bin string - conf string -} - -// NewCephCommand returns ceph command instance. -func NewCephCommand(bin, conf string) *Command { - return &Command{bin, conf} -} - -// RunCommand is used to execute ceph command. -func (cc *Command) RunCommand(obj interface{}, args ...string) error { - args = append(args, "--conf") - args = append(args, cc.conf) - - exit, stdout, stderr, err := exec.Run(defaultTimeout, cc.bin, args...) - if err != nil || exit != 0 { - return fmt.Errorf("run command failed, err: %v, exit: %d, output: %s", err, exit, stderr) - } - - if obj != nil { - if err := json.Unmarshal([]byte(stdout), obj); err != nil { - return err - } - } - - return nil -} diff --git a/storage/volume/modules/local/local.go b/storage/volume/modules/local/local.go index dcf91d9ea1..66bc7aaabc 100644 --- a/storage/volume/modules/local/local.go +++ b/storage/volume/modules/local/local.go @@ -7,16 +7,15 @@ import ( "os" "path" "strconv" - "strings" - "time" + "github.com/alibaba/pouch/pkg/bytefmt" "github.com/alibaba/pouch/storage/quota" "github.com/alibaba/pouch/storage/volume/driver" "github.com/alibaba/pouch/storage/volume/types" ) var ( - dataDir = "/mnt/local" + dataDir = "/var/lib/pouch/volume" ) func init() { @@ -40,23 +39,42 @@ func (p *Local) StoreMode(ctx driver.Context) driver.VolumeStoreMode { } // Create a local volume. -func (p *Local) Create(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Local create volume: %s", v.Name) - mountPath, _ := p.Path(ctx, v) +func (p *Local) Create(ctx driver.Context, id types.VolumeID) (*types.Volume, error) { + ctx.Log.Debugf("Local create volume: %s", id.Name) + var ( + mountPath = path.Join(dataDir, id.Name) + size string + ) + + // parse the mount path. + if value, ok := id.Options["mount"]; ok { + mountPath = value + } + + // parse the size. + if value, ok := id.Options["opt.size"]; ok { + sizeInt, err := bytefmt.ToMegabytes(value) + if err != nil { + return nil, err + } + size = strconv.Itoa(int(sizeInt)) + "M" + } + + // create the volume path if st, exist := os.Stat(mountPath); exist != nil { if e := os.MkdirAll(mountPath, 0755); e != nil { - return e + return nil, e } } else if !st.IsDir() { - return fmt.Errorf("mount path is not a dir %s", mountPath) + return nil, fmt.Errorf("mount path is not a dir %s", mountPath) } - return nil + return types.NewVolumeFromVolumeID(mountPath, size, id), nil } // Remove a local volume. -func (p *Local) Remove(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Local) Remove(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Local remove volume: %s", v.Name) mountPath := v.Path() @@ -82,28 +100,15 @@ func (p *Local) Path(ctx driver.Context, v *types.Volume) (string, error) { // Options returns local volume's options. func (p *Local) Options() map[string]types.Option { return map[string]types.Option{ - "ids": {Value: "", Desc: "local volume user's ids"}, - "reqID": {Value: "", Desc: "create local volume request id"}, - "freeTime": {Value: "", Desc: "local volume free time"}, - "mount": {Value: "", Desc: "local directory"}, + "mount": {Value: "", Desc: "local directory"}, } } // Attach a local volume. -func (p *Local) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Local) Attach(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Local attach volume: %s", v.Name) mountPath := v.Path() size := v.Size() - reqID := v.Option("reqID") - ids := v.Option("ids") - - if ids != "" { - if !strings.Contains(ids, reqID) { - ids = ids + "," + reqID - } - } else { - ids = reqID - } if st, exist := os.Stat(mountPath); exist != nil { if e := os.MkdirAll(mountPath, 0777); e != nil { @@ -120,31 +125,12 @@ func (p *Local) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) er } } - v.SetOption("ids", ids) - v.SetOption("freeTime", "") - return nil } // Detach a local volume. -func (p *Local) Detach(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Local) Detach(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Local detach volume: %s", v.Name) - reqID := v.Option("reqID") - ids := v.Option("ids") - - arr := strings.Split(ids, ",") - newIDs := []string{} - for _, id := range arr { - if id != reqID { - newIDs = append(newIDs, reqID) - } - } - - if len(newIDs) == 0 { - v.SetOption("freeTime", strconv.FormatInt(time.Now().Unix(), 10)) - } - - v.SetOption("ids", strings.Join(newIDs, ",")) return nil } diff --git a/storage/volume/modules/tmpfs/tmpfs.go b/storage/volume/modules/tmpfs/tmpfs.go index 4ed1a9be21..c549f3d9d4 100644 --- a/storage/volume/modules/tmpfs/tmpfs.go +++ b/storage/volume/modules/tmpfs/tmpfs.go @@ -41,14 +41,17 @@ func (p *Tmpfs) StoreMode(ctx driver.Context) driver.VolumeStoreMode { } // Create a tmpfs volume. -func (p *Tmpfs) Create(ctx driver.Context, v *types.Volume, s *types.Storage) error { - ctx.Log.Debugf("Tmpfs create volume: %s", v.Name) +func (p *Tmpfs) Create(ctx driver.Context, id types.VolumeID) (*types.Volume, error) { + ctx.Log.Debugf("Tmpfs create volume: %s", id.Name) - return nil + // parse the mount path + mountPath := path.Join(dataDir, id.Name) + + return types.NewVolumeFromVolumeID(mountPath, "", id), nil } // Remove a tmpfs volume. -func (p *Tmpfs) Remove(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Tmpfs) Remove(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Tmpfs remove volume: %s", v.Name) return nil @@ -70,7 +73,7 @@ func (p *Tmpfs) Options() map[string]types.Option { } // Attach a tmpfs volume. -func (p *Tmpfs) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Tmpfs) Attach(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Tmpfs attach volume: %s", v.Name) mountPath := v.Path() size := v.Size() @@ -105,7 +108,7 @@ func (p *Tmpfs) Attach(ctx driver.Context, v *types.Volume, s *types.Storage) er } // Detach a tmpfs volume. -func (p *Tmpfs) Detach(ctx driver.Context, v *types.Volume, s *types.Storage) error { +func (p *Tmpfs) Detach(ctx driver.Context, v *types.Volume) error { ctx.Log.Debugf("Tmpfs detach volume: %s", v.Name) mountPath := v.Path() reqID := v.Option("reqID") diff --git a/storage/volume/types/storage.go b/storage/volume/types/storage.go deleted file mode 100644 index e1425e930f..0000000000 --- a/storage/volume/types/storage.go +++ /dev/null @@ -1,82 +0,0 @@ -package types - -import ( - "time" - - "github.com/alibaba/pouch/storage/volume/types/meta" -) - -// StorageSpec represents storage spec. -type StorageSpec struct { - Type string `json:"type"` // storage type - ID string `json:"id,omitempty"` // storage uid or unique name - Address string `json:"address"` // storage address, such as ceph it's monitor ip:port,ip:port - PoolSpec map[string]PoolSpec `json:"poolspec"` // storage pool spec - - ClusterName string `json:"clustername,omitempty"` // storage cluster name - Keyring string `json:"keyring,omitempty"` // storage access key, such as ceph's keyring user:secret -} - -// PoolSpec represents storage pool spec. -type PoolSpec struct { - WriteBps int64 `json:"iowbps"` // storage write bytes per second - ReadBps int64 `json:"iorbps"` // storage read bytes per second - ReadIOps int64 `json:"ioriops"` // storage read io count per second - WriteIOps int64 `json:"iowiops"` // storage write io count per second - IOps int64 `json:"ioiops"` // storage total io count per second - Capacity int64 `json:"capacity"` // storage capacity like: MB/GB/TB/PB - Available int64 `json:"available"` // storage available like: MB/GB/TB/PB - MaxVolumes int64 `json:"maxvolume"` // max disks in this storage cluster - NicSendBps int64 `json:"networkSendBps"` // nic card send bytes per second - NicRecvBps int64 `json:"networkRecvBps"` // nic card recv bytes per second -} - -// StorageStatus represents storage status. -type StorageStatus struct { - Schedulable bool `json:"schedulable,omitempty"` - LastUpdateTime *time.Time `json:"lastUpdateTime,omitempty"` - HealthyStatus string `json:"message"` -} - -// Storage represents storage struct. -type Storage struct { - meta.ObjectMeta `json:",inline"` - Spec *StorageSpec `json:"spec,omitempty"` - Status *StorageStatus `json:"status,omitempty"` -} - -// Address return storage operate address. -func (s *Storage) Address() string { - return s.Spec.Address -} - -// SetAddress set storage operate address. -func (s *Storage) SetAddress(address string) { - s.Spec.Address = address -} - -// StorageList represents storage list type. -type StorageList struct { - meta.ListMeta `json:",inline,omitempty"` - Items []Storage `json:"items,omitempty"` -} - -// StorageID returns storage's uid. -func (s *Storage) StorageID() StorageID { - return NewStorageID(s.GetUID()) -} - -// StorageID represents storage uid. -type StorageID struct { - UID string -} - -// NewStorageID is used to set uid. -func NewStorageID(uid string) StorageID { - return StorageID{UID: uid} -} - -// String returns storage uid. -func (si StorageID) String() string { - return si.UID -} diff --git a/storage/volume/types/volume.go b/storage/volume/types/volume.go index ef3147f7df..9b20819c5f 100644 --- a/storage/volume/types/volume.go +++ b/storage/volume/types/volume.go @@ -147,16 +147,6 @@ func (v *Volume) Driver() string { return v.Spec.Backend } -// StorageID return driver's storage identity. -func (v *Volume) StorageID() StorageID { - return NewStorageID(v.Spec.ClusterID) -} - -// SetStorageUID save storage uid into volume. -func (v *Volume) SetStorageUID(uid string) { - v.Spec.ClusterID = uid -} - // VolumeID return volume's identity. func (v *Volume) VolumeID() VolumeID { return NewVolumeID(v.Name, v.Driver()) diff --git a/storage/volume/types/volume_util.go b/storage/volume/types/volume_util.go index 0e3929979e..3e77226759 100644 --- a/storage/volume/types/volume_util.go +++ b/storage/volume/types/volume_util.go @@ -105,6 +105,41 @@ func buildVolumeConfig(options map[string]string) (*VolumeConfig, error) { return config, nil } +// NewVolumeFromVolumeID will create an Volume using mountPath, size and VolumeID. +func NewVolumeFromVolumeID(mountPath, size string, id VolumeID) *Volume { + now := time.Now() + v := &Volume{ + ObjectMeta: meta.ObjectMeta{ + Name: id.Name, + Claimer: "pouch", + Namespace: "pouch", + UID: uuid.NewRandom().String(), + Generation: meta.ObjectPhasePreCreate, + Labels: id.Labels, + CreationTimestamp: &now, + ModifyTimestamp: &now, + }, + Spec: &VolumeSpec{ + Backend: id.Driver, + Extra: id.Options, + Selector: make(Selector, 0), + VolumeConfig: &VolumeConfig{ + Size: size, + }, + }, + Status: &VolumeStatus{ + MountPoint: mountPath, + }, + } + + for n, selector := range id.Selectors { + requirement := translateSelector(n, strings.ToLower(selector)) + v.Spec.Selector = append(v.Spec.Selector, requirement) + } + + return v +} + // NewVolume generates a volume based VolumeID func NewVolume(id VolumeID) (*Volume, error) { now := time.Now()