Skip to content

Commit

Permalink
[bug-654]: Zero quota should allow infinite usage (#211)
Browse files Browse the repository at this point in the history
* add zero quota check for powerflex and remove url policy checks

* move quota check to enforcer and cleanup powerscale handler

* fix data race in token getter and storage pool cache

* remove todo

* remove ApproveQuota

* remove powerscale volume create policy

* fix powerflex coverage

* fix linting

* remove coverage.out

* fix linting

* fix linting

* add comments to powerflex client

* quieter logs and update volume create policy to allow zero quota

* add role service to redeploy

* remove redundant request dumps

* remove url policy installation

* use separate powerflex clients

* undo minor changes

* remove powerscale volume create install
  • Loading branch information
atye committed Feb 22, 2023
1 parent 34615f6 commit 4cb1658
Show file tree
Hide file tree
Showing 17 changed files with 344 additions and 699 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ redeploy: build docker
docker save --output ./bin/storage-service-$(DOCKER_TAG).tar storage-service:$(DOCKER_TAG)
sudo /usr/local/bin/k3s ctr images import ./bin/storage-service-$(DOCKER_TAG).tar
sudo /usr/local/bin/k3s kubectl rollout restart -n karavi deploy/storage-service
# role-service
docker save --output ./bin/role-service-$(DOCKER_TAG).tar role-service:$(DOCKER_TAG)
sudo /usr/local/bin/k3s ctr images import ./bin/role-service-$(DOCKER_TAG).tar
sudo /usr/local/bin/k3s kubectl rollout restart -n karavi deploy/role-service

.PHONY: docker
docker: build
Expand Down
60 changes: 23 additions & 37 deletions internal/proxy/powerflex_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,28 @@ func buildSystem(ctx context.Context, e SystemEntry, log *logrus.Entry) (*System
if err != nil {
return nil, err
}
c, err := goscaleio.NewClientWithArgs(tgt.String(), "", true, false)

// Cannot use the same powerflex client for the storage pool cache and
// the token getter because of data races with concurrent usage so we
// create a powerflex client for each

spCacheClient, err := goscaleio.NewClientWithArgs(tgt.String(), "", true, false)
if err != nil {
return nil, err
}

spc, err := powerflex.NewStoragePoolCache(c, 100)
tgClient, err := goscaleio.NewClientWithArgs(tgt.String(), "", true, false)
if err != nil {
return nil, err
}

spc, err := powerflex.NewStoragePoolCache(spCacheClient, 100)
if err != nil {
return nil, err
}

tk := powerflex.NewTokenGetter(powerflex.Config{
PowerFlexClient: c,
PowerFlexClient: tgClient,
TokenRefreshInterval: 5 * time.Minute,
ConfigConnect: &goscaleio.ConfigConnect{
Endpoint: e.Endpoint,
Expand Down Expand Up @@ -226,39 +236,6 @@ func (h *PowerFlexHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}))
mux.Handle("/", proxyHandler)

// Request policy decision from OPA
ans, err := decision.Can(func() decision.Query {
return decision.Query{
Host: h.opaHost,
Policy: "/karavi/authz/url",
Input: map[string]interface{}{
"method": r.Method,
"url": r.URL.Path,
},
}
})
if err != nil {
h.log.WithError(err).Error("requesting policy decision from OPA")
writeError(w, "powerflex", err.Error(), http.StatusInternalServerError, h.log)
return
}
var resp struct {
Result struct {
Allow bool `json:"allow"`
} `json:"result"`
}
err = json.NewDecoder(bytes.NewReader(ans)).Decode(&resp)
if err != nil {
h.log.WithError(err).WithField("opa_policy_decision", string(ans)).Error("decoding json")
writeError(w, "powerflex", err.Error(), http.StatusInternalServerError, h.log)
return
}
if !resp.Result.Allow {
h.log.Debug("Request denied")
writeError(w, "powerflex", "request denied for path", http.StatusNotFound, h.log)
return
}

mux.ServeHTTP(w, r)
}

Expand Down Expand Up @@ -403,12 +380,15 @@ func (s *System) volumeCreateHandler(next http.Handler, enf *quota.RedisEnforcem
// this request, choose the one with the most quota.
var maxQuotaInKb int
for _, quota := range opaResp.Result.PermittedRoles {
if quota == 0 {
maxQuotaInKb = 0
break
}
if quota >= maxQuotaInKb {
maxQuotaInKb = quota
}
}

// At this point, the request has been approved.
qr := quota.Request{
SystemType: "powerflex",
SystemID: systemID,
Expand Down Expand Up @@ -659,6 +639,9 @@ func (s *System) volumeMapHandler(next http.Handler, enf *quota.RedisEnforcement
return nil, err
}
token, err := s.tk.GetToken(ctx)
if err != nil {
return nil, err
}
c.SetToken(token)

id = strings.TrimPrefix(id, "Volume::")
Expand Down Expand Up @@ -795,6 +778,9 @@ func (s *System) volumeUnmapHandler(next http.Handler, enf *quota.RedisEnforceme
return nil, err
}
token, err := s.tk.GetToken(ctx)
if err != nil {
return nil, err
}
c.SetToken(token)

id = strings.TrimPrefix(id, "Volume::")
Expand Down
153 changes: 153 additions & 0 deletions internal/proxy/powerflex_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,159 @@ func TestPowerFlex(t *testing.T) {
t.Errorf("expected status %d, got %d", http.StatusOK, w.Code)
}
})

t.Run("provisioning request with a role with infinite quota", func(t *testing.T) {
// Logging
log := logrus.New().WithContext(context.Background())
log.Logger.SetOutput(os.Stdout)

body := struct {
VolumeSize int64
VolumeSizeInKb string `json:"volumeSizeInKb"`
StoragePoolID string `json:"storagePoolId"`
}{
VolumeSize: 2000,
VolumeSizeInKb: "2000",
StoragePoolID: "3df6df7600000001",
}
data, err := json.Marshal(body)
if err != nil {
t.Fatal(err)
}
payload := bytes.NewBuffer(data)

w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/api/types/Volume/instances/", payload)

// Add a jwt token to the request context
// In production, the jwt token would have the role information for OPA to make a decision on
// Since we are faking the OPA server, the jwt token doesn't require real info for the unit test
reqCtx := context.WithValue(context.Background(), web.JWTKey, token.Token(&jwx.Token{}))
reqCtx = context.WithValue(reqCtx, web.JWTTenantName, "mygroup")
r = r.WithContext(reqCtx)

// Build a httptest server to fake OPA
fakeOPA := buildTestServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
// This path validates a supported request path, see policies/url.rego
case "/v1/data/karavi/authz/url":
w.Write([]byte(`{"result": {"allow": true}}`))
// This path returns the OPA decision to allow a create volume request
case "/v1/data/karavi/volumes/create":
w.Write([]byte(fmt.Sprintf(`{
"result": {
"allow": true,
"permitted_roles": {
"role": 0,
"role2": 100
}
}}`)))
default:
t.Fatalf("OPA path %s not supported", r.URL.Path)
}
}))

// Build a httptest TLS server to fake PowerFlex
fakePowerFlex := buildTestTLSServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/login" {
w.Write([]byte("token"))
}
if r.URL.Path == "/api/version" {
w.Write([]byte("3.5"))
}
if r.URL.Path == "/api/types/StoragePool/instances" {
data, err := ioutil.ReadFile("testdata/storage_pool_instances.json")
if err != nil {
t.Fatal(err)
}
w.Write(data)
}
if r.URL.Path == "/api/types/Volume/instances/" {
type volumeCreate struct {
VolumeSizeInKb string `json:"volumeSizeInKb"`
StoragePoolID string `json:"storagePoolId"`
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
log.Println(string(body))
var v volumeCreate
err = json.Unmarshal(body, &v)
if err != nil {
t.Fatal(err)
}
w.Write([]byte("{\"id\": \"847ce5f30000005a\"}"))
}
}))

// Add headers that the sidecar-proxy would add, in order to identify
// the request as intended for a PowerFlex with the given systemID.
r.Header.Add("Forwarded", "by=csi-vxflexos")
r.Header.Add("Forwarded", fmt.Sprintf("for=https://%s;542a2d5f5122210f", fakePowerFlex.URL))
rtr := newTestRouter()

rdb := testCreateRedisInstance(t)
if rdb == nil {
t.Fatal("expected non-nil return value for redis client")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sut := quota.NewRedisEnforcement(ctx, quota.WithRedis(rdb))
req := quota.Request{
StoragePoolID: "3df6df7600000001",
Group: "allowed",
VolumeName: "k8s-0",
Capacity: "1",
}
sut.ApproveRequest(ctx, req, 8000)
t.Run("NewRedisEnforcer", func(t *testing.T) {
if sut == nil {
t.Fatal("expected non-nil return value for redis enforcemnt")
}
})

// Create a PowerFlexHandler and update it with the fake PowerFlex
powerFlexHandler := proxy.NewPowerFlexHandler(log, sut, hostPort(t, fakeOPA.URL))
powerFlexHandler.UpdateSystems(context.Background(), strings.NewReader(fmt.Sprintf(`
{
"powerflex": {
"542a2d5f5122210f": {
"endpoint": "%s",
"user": "admin",
"pass": "Password123",
"insecure": true
}
}
}
`, fakePowerFlex.URL)), logrus.New().WithContext(context.Background()))

// Create a dispatch handler with the powerFlexHandler
systemHandlers := map[string]http.Handler{
"powerflex": web.Adapt(powerFlexHandler),
}
dh := proxy.NewDispatchHandler(log, systemHandlers)
rtr.ProxyHandler = dh
h := web.Adapt(rtr.Handler(), web.CleanMW())

// Serve the request
h.ServeHTTP(w, r)

respBody := struct {
StatusCode int `json:"httpStatusCode"`
// Message string `json:"message"`
}{}

err = json.Unmarshal(w.Body.Bytes(), &respBody)
if err != nil {
t.Fatal(err)
}

if w.Code != http.StatusOK {
t.Errorf("expected status %d, got %d", http.StatusOK, w.Code)
}
})
}

func newTestRouter() *web.Router {
Expand Down
36 changes: 4 additions & 32 deletions internal/proxy/powermax_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,38 +166,6 @@ func (h *PowerMaxHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
router.MethodNotAllowed = proxyHandler
router.RedirectTrailingSlash = false

// Request policy decision from OPA
ans, err := decision.Can(func() decision.Query {
return decision.Query{
Host: h.opaHost,
Policy: "/karavi/authz/powermax/url",
Input: map[string]interface{}{
"method": r.Method,
"url": r.URL.Path,
},
}
})
if err != nil {
h.log.WithError(err).Error("requesting policy decision from OPA")
writeError(w, "powermax", err.Error(), http.StatusInternalServerError, h.log)
return
}
var resp struct {
Result struct {
Allow bool `json:"allow"`
} `json:"result"`
}
err = json.NewDecoder(bytes.NewReader(ans)).Decode(&resp)
if err != nil {
h.log.WithError(err).WithField("opa_policy_decision", string(ans)).Error("decoding json")
writeError(w, "powermax", err.Error(), http.StatusInternalServerError, h.log)
return
}
if !resp.Result.Allow {
h.log.Debug("Request denied")
writeError(w, "powermax", "request denied for path", http.StatusNotFound, h.log)
return
}
router.ServeHTTP(w, r)
}

Expand Down Expand Up @@ -451,6 +419,10 @@ func (s *PowerMaxSystem) volumeCreateHandler(next http.Handler, enf *quota.Redis
// this request, choose the one with the most quota.
var maxQuotaInKb int
for _, quota := range opaResp.Result.PermittedRoles {
if quota == 0 {
maxQuotaInKb = 0
break
}
if quota >= maxQuotaInKb {
maxQuotaInKb = quota
}
Expand Down
Loading

0 comments on commit 4cb1658

Please sign in to comment.