Skip to content

Commit

Permalink
Merge pull request #46966 from ilackarms/compression-gating
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 47883, 47179, 46966, 47982, 47945)

Add feature gating to REST Compression

**What this PR does / why we need it**: Adds feature gating to opt out of REST API compression

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #46963 

**Special notes for your reviewer**: This PR is a fix / addendum to #45666

**Release note**:

```release-note
```
  • Loading branch information
Kubernetes Submit Queue authored Jun 23, 2017
2 parents 171f48a + c305f72 commit 80af10c
Show file tree
Hide file tree
Showing 10 changed files with 534 additions and 22 deletions.
2 changes: 2 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/endpoints/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/testing:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
],
)

Expand Down Expand Up @@ -84,5 +85,6 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
],
)
182 changes: 182 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package endpoints

import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -66,6 +67,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/server/filters"
)

// alwaysAdmit is an implementation of admission.Interface which always says yes to an admit request.
Expand Down Expand Up @@ -1207,6 +1209,110 @@ func TestRequestsWithInvalidQuery(t *testing.T) {
}
}

func TestListCompression(t *testing.T) {
testCases := []struct {
url string
namespace string
selfLink string
legacy bool
label string
field string
acceptEncoding string
}{
// list items in a namespace in the path
{
url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
namespace: "default",
selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
acceptEncoding: "",
},
{
url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
namespace: "default",
selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
acceptEncoding: "gzip",
},
}
for i, testCase := range testCases {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{expectedResourceNamespace: testCase.namespace}
storage["simple"] = &simpleStorage
selfLinker := &setTestSelfLinker{
t: t,
namespace: testCase.namespace,
expectedSet: testCase.selfLink,
}
var handler = handleInternal(storage, admissionControl, selfLinker, nil)

requestContextMapper = request.NewRequestContextMapper()

handler = filters.WithCompression(handler, requestContextMapper)
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper)
handler = request.WithRequestContext(handler, requestContextMapper)

server := httptest.NewServer(handler)

defer server.Close()

req, err := http.NewRequest("GET", server.URL+testCase.url, nil)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
// It's necessary to manually set Accept-Encoding here
// to prevent http.DefaultClient from automatically
// decoding responses
req.Header.Set("Accept-Encoding", testCase.acceptEncoding)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("%d: unexpected status: %d from url %s, Expected: %d, %#v", i, resp.StatusCode, testCase.url, http.StatusOK, resp)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
t.Logf("%d: body: %s", i, string(body))
continue
}
// TODO: future, restore get links
if !selfLinker.called {
t.Errorf("%d: never set self link", i)
}
if !simpleStorage.namespacePresent {
t.Errorf("%d: namespace not set", i)
} else if simpleStorage.actualNamespace != testCase.namespace {
t.Errorf("%d: %q unexpected resource namespace: %s", i, testCase.url, simpleStorage.actualNamespace)
}
if simpleStorage.requestedLabelSelector == nil || simpleStorage.requestedLabelSelector.String() != testCase.label {
t.Errorf("%d: unexpected label selector: %v", i, simpleStorage.requestedLabelSelector)
}
if simpleStorage.requestedFieldSelector == nil || simpleStorage.requestedFieldSelector.String() != testCase.field {
t.Errorf("%d: unexpected field selector: %v", i, simpleStorage.requestedFieldSelector)
}

var decoder *json.Decoder
if testCase.acceptEncoding == "gzip" {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
t.Fatalf("unexpected error creating gzip reader: %v", err)
}
decoder = json.NewDecoder(gzipReader)
} else {
decoder = json.NewDecoder(resp.Body)
}
var itemOut genericapitesting.SimpleList
err = decoder.Decode(&itemOut)
if err != nil {
t.Errorf("failed to read response body as SimpleList: %v", err)
}
}
}

func TestLogs(t *testing.T) {
handler := handle(map[string]rest.Storage{})
server := httptest.NewServer(handler)
Expand Down Expand Up @@ -1522,6 +1628,82 @@ func TestGet(t *testing.T) {
}
}

func TestGetCompression(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
item: genericapitesting.Simple{
Other: "foo",
},
}
selfLinker := &setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
name: "id",
namespace: "default",
}

requestContextMapper = request.NewRequestContextMapper()

storage["simple"] = &simpleStorage
handler := handleLinker(storage, selfLinker)
handler = filters.WithCompression(handler, requestContextMapper)
handler = genericapifilters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper)
handler = request.WithRequestContext(handler, requestContextMapper)
server := httptest.NewServer(handler)
defer server.Close()

tests := []struct {
acceptEncoding string
}{
{acceptEncoding: ""},
{acceptEncoding: "gzip"},
}

for _, test := range tests {
req, err := http.NewRequest("GET", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/id", nil)
if err != nil {
t.Fatalf("unexpected error cretaing request: %v", err)
}
// It's necessary to manually set Accept-Encoding here
// to prevent http.DefaultClient from automatically
// decoding responses
req.Header.Set("Accept-Encoding", test.acceptEncoding)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected response: %#v", resp)
}
var decoder *json.Decoder
if test.acceptEncoding == "gzip" {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
t.Fatalf("unexpected error creating gzip reader: %v", err)
}
decoder = json.NewDecoder(gzipReader)
} else {
decoder = json.NewDecoder(resp.Body)
}
var itemOut genericapitesting.Simple
err = decoder.Decode(&itemOut)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("unexpected error reading body: %v", err)
}

if itemOut.Name != simpleStorage.item.Name {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
}
if !selfLinker.called {
t.Errorf("Never set self link")
}
}
}

func TestGetUninitialized(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
Expand Down
11 changes: 8 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type APIGroupVersion struct {
// ResourceLister is an interface that knows how to list resources
// for this API Group.
ResourceLister discovery.APIResourceLister

// EnableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
EnableAPIResponseCompression bool
}

// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
Expand Down Expand Up @@ -138,9 +142,10 @@ func (g *APIGroupVersion) UpdateREST(container *restful.Container) error {
func (g *APIGroupVersion) newInstaller() *APIInstaller {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
enableAPIResponseCompression: g.EnableAPIResponseCompression,
}
return installer
}
Expand Down
14 changes: 11 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
genericfilters "k8s.io/apiserver/pkg/server/filters"
)

const (
Expand All @@ -48,9 +49,10 @@ const (
)

type APIInstaller struct {
group *APIGroupVersion
prefix string // Path prefix where API resources are to be registered.
minRequestTimeout time.Duration
group *APIGroupVersion
prefix string // Path prefix where API resources are to be registered.
minRequestTimeout time.Duration
enableAPIResponseCompression bool
}

// Struct capturing information about an action ("GET", "POST", "WATCH", "PROXY", etc).
Expand Down Expand Up @@ -584,6 +586,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
handler = restfulGetResource(getter, exporter, reqScope)
}
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler)
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
}
doc := "read the specified " + kind
if hasSubresource {
doc = "read " + subresource + " of the specified " + kind
Expand Down Expand Up @@ -613,6 +618,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "list " + subresource + " of objects of kind " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
}
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Expand Down
7 changes: 7 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ const (
// pluggable output backends and an audit policy specifying how different requests should be
// audited.
AdvancedAuditing utilfeature.Feature = "AdvancedAuditing"

// owner: @ilackams
// alpha: v1.7
//
// Enables compression of REST responses (GET and LIST only)
APIResponseCompression utilfeature.Feature = "APIResponseCompression"
)

func init() {
Expand All @@ -53,4 +59,5 @@ func init() {
var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{
StreamingProxyRedirects: {Default: true, PreRelease: utilfeature.Beta},
AdvancedAuditing: {Default: false, PreRelease: utilfeature.Alpha},
APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha},
}
33 changes: 20 additions & 13 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ type Config struct {
// Predicate which is true for paths of long-running http requests
LongRunningFunc apirequest.LongRunningRequestCheck

// EnableAPIResponseCompression indicates whether API Responses should support compression
// if the client requests it via Accept-Encoding
EnableAPIResponseCompression bool

//===========================================================================
// values below here are targets for removal
//===========================================================================
Expand Down Expand Up @@ -206,19 +210,20 @@ type SecureServingInfo struct {
// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
return &Config{
Serializer: codecs,
ReadWritePort: 443,
RequestContextMapper: apirequest.NewRequestContextMapper(),
BuildHandlerChainFunc: DefaultBuildHandlerChain,
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
EnableIndex: true,
EnableDiscovery: true,
EnableProfiling: true,
MaxRequestsInFlight: 400,
MaxMutatingRequestsInFlight: 200,
MinRequestTimeout: 1800,
Serializer: codecs,
ReadWritePort: 443,
RequestContextMapper: apirequest.NewRequestContextMapper(),
BuildHandlerChainFunc: DefaultBuildHandlerChain,
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
EnableIndex: true,
EnableDiscovery: true,
EnableProfiling: true,
MaxRequestsInFlight: 400,
MaxMutatingRequestsInFlight: 200,
MinRequestTimeout: 1800,
EnableAPIResponseCompression: utilfeature.DefaultFeatureGate.Enabled(features.APIResponseCompression),

// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
Expand Down Expand Up @@ -412,6 +417,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
healthzChecks: c.HealthzChecks,

DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer, c.RequestContextMapper),

enableAPIResponseCompression: c.EnableAPIResponseCompression,
}

for k, v := range delegationTarget.PostStartHooks() {
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ load(
go_test(
name = "go_default_test",
srcs = [
"compression_test.go",
"cors_test.go",
"maxinflight_test.go",
"timeout_test.go",
Expand All @@ -31,6 +32,7 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"compression.go",
"cors.go",
"doc.go",
"longrunning.go",
Expand All @@ -40,6 +42,7 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
Expand Down
Loading

0 comments on commit 80af10c

Please sign in to comment.