Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Handle too big CH payloads for caching #191

Merged
merged 2 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type AsyncCache struct {
TransactionRegistry

graceTime time.Duration

MaxPayloadSize config.ByteSize
}

func (c *AsyncCache) Close() error {
Expand Down Expand Up @@ -103,9 +105,12 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach
return nil, err
}

maxPayloadSize := cfg.MaxPayloadSize

return &AsyncCache{
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
}, nil
}
6 changes: 4 additions & 2 deletions cache/async_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ func TestAsyncCache_FilesystemCache_instantiation(t *testing.T) {
Dir: asyncTestDir,
MaxSize: 8192,
},
Expire: config.Duration(time.Minute),
Expire: config.Duration(time.Minute),
MaxPayloadSize: config.ByteSize(100000000),
}
if err := os.RemoveAll(testDirAsync); err != nil {
log.Fatalf("cannot remove %q: %s", testDirAsync, err)
Expand Down Expand Up @@ -248,7 +249,8 @@ func TestAsyncCache_RedisCache_instantiation(t *testing.T) {
Redis: config.RedisCacheConfig{
Addresses: []string{s.Addr()},
},
Expire: config.Duration(cacheTTL),
Expire: config.Duration(cacheTTL),
MaxPayloadSize: config.ByteSize(100000000),
}

_, err := NewAsyncCache(redisCfg, 1*time.Second)
Expand Down
9 changes: 9 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ expire: <duration>
# By default `grace_time` is 5s. Negative value disables the protection
# from `thundering herd` problem.
grace_time: <duration>

# Maximum total size of request payload for caching. The default value
# is set to 1 Petabyte.
max_payload_size: <byte_size>
```

### <distributed_cache_config>
Expand Down Expand Up @@ -113,6 +117,11 @@ expire: <duration>
# By default `grace_time` is 5s. Negative value disables the protection
# from `thundering herd` problem.
grace_time: <duration>

# Maximum total size of request payload for caching. The default value
# is set to 1 Petabyte.
# The default value set so high is to allow users who do not use response size limitations virtually unlimited cache.
max_payload_size: <byte_size>
```

### <param_groups_config>
Expand Down
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
}

defaultExecutionTime = Duration(120 * time.Second)

defaultMaxPayloadSize = ByteSize(1 << 50)
)

// Config describes server configuration, access and proxy rules
Expand Down Expand Up @@ -601,6 +603,9 @@ type Cache struct {

// Catches all undefined fields
XXX map[string]interface{} `yaml:",inline"`

// Maximum total size of request payload for caching
MaxPayloadSize ByteSize `yaml:"max_payload_size,omitempty"`
}

type FileSystemCacheConfig struct {
Expand Down Expand Up @@ -812,6 +817,13 @@ func LoadFile(filename string) (*Config, error) {
}
}

for i := range cfg.Caches {
c := &cfg.Caches[i]
if c.MaxPayloadSize <= 0 {
c.MaxPayloadSize = defaultMaxPayloadSize
}
}

if maxResponseTime < 0 {
maxResponseTime = 0
}
Expand Down
15 changes: 12 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ var fullConfig = Config{
Dir: "/path/to/longterm/cachedir",
MaxSize: ByteSize(100 << 30),
},
Expire: Duration(time.Hour),
GraceTime: Duration(20 * time.Second),
Expire: Duration(time.Hour),
GraceTime: Duration(20 * time.Second),
MaxPayloadSize: ByteSize(100 << 30),
},
{
Name: "shortterm",
Expand All @@ -31,7 +32,8 @@ var fullConfig = Config{
Dir: "/path/to/shortterm/cachedir",
MaxSize: ByteSize(100 << 20),
},
Expire: Duration(10 * time.Second),
Expire: Duration(10 * time.Second),
MaxPayloadSize: ByteSize(100 << 20),
},
},
HackMePlease: true,
Expand Down Expand Up @@ -441,6 +443,11 @@ func TestBadConfig(t *testing.T) {
"testdata/bad.heartbeat_section.empty.yml",
"`cluster.heartbeat` cannot be unset for \"cluster\"",
},
{
"max payload size to cache",
"testdata/bad.max_payload_size.yml",
"cannot parse byte size \"-10B\": it must be positive float followed by optional units. For example, 1.5Gb, 3T",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -815,12 +822,14 @@ caches:
file_system:
dir: /path/to/longterm/cachedir
max_size: 107374182400
max_payload_size: 107374182400
- mode: file_system
name: shortterm
expire: 10s
file_system:
dir: /path/to/shortterm/cachedir
max_size: 104857600
max_payload_size: 104857600
param_groups:
- name: cron-job
params:
Expand Down
21 changes: 21 additions & 0 deletions config/testdata/bad.max_payload_size.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
caches:
- name: "longterm"
mode: "file_system"
max_payload_size: "-10B"
file_system:
dir: "cache_dir"
max_size: 100Gb

server:
http:
listen_addr: ":8080"

users:
- name: "dummy"
allowed_networks: ["1.2.3.4"]
to_cluster: "cluster"
to_user: "default"

clusters:
- name: "cluster"
nodes: ["127.0.1.1:8123"]
3 changes: 3 additions & 0 deletions config/testdata/full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ caches:
# Path to directory where cached responses will be stored.
dir: "/path/to/longterm/cachedir"

max_payload_size: 100Gb

# Expiration time for cached responses.
expire: 1h

Expand All @@ -44,6 +46,7 @@ caches:
file_system:
max_size: 100Mb
dir: "/path/to/shortterm/cachedir"
max_payload_size: 100Mb
expire: 10s

# Optional network lists, might be used as values for `allowed_networks`.
Expand Down
5 changes: 4 additions & 1 deletion docs/content/en/configuration/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ Distributed cache relies on external database to share cache across multiple rep
multiple replicas deployments. Currently only [Redis](https://redis.io/) key value store is supported.
Configuration template for distributed cache can be found [here](https://github.com/ContentSquare/chproxy/blob/master/config/#distributed_cache_config).


#### Response limitations for caching
Before caching Clickhouse response, chproxy verifies that the response size
is not greater than configured max size. This setting can be specified in config section of the cache `max_payload_size`. The default value
is set to 1 Petabyte. Therefore, by default this security mechanism is disabled.
87 changes: 87 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,85 @@ func TestServe(t *testing.T) {
},
startTLS,
},
{
"https cache max payload size",
"testdata/https.cache.max-payload-size.yml",
func(t *testing.T) {
q := "SELECT MaxPayloadSize"
req, err := http.NewRequest("GET", "https://127.0.0.1:8443?query="+url.QueryEscape(q), nil)
checkErr(t, err)
req.SetBasicAuth("default", "qwerty")
req.Close = true

resp, err := tlsClient.Do(req)
checkErr(t, err)
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK)
}

checkResponse(t, resp.Body, expectedOkResp)

key := &cache.Key{
Query: []byte(q),
AcceptEncoding: "gzip",
Version: cache.Version,
}

cc := proxy.caches["https_cache_max_payload_size"]
cachedData, err := cc.Get(key)

if cachedData != nil || err == nil {
t.Fatal("response bigger than maxPayloadSize should not be cached")
}

resp.Body.Close()
},
startTLS,
},
{
"https cache max payload size not reached",
"testdata/https.cache.max-payload-size-not-reached.yml",
func(t *testing.T) {
q := "SELECT MaxPayloadSize"
req, err := http.NewRequest("GET", "https://127.0.0.1:8443?query="+url.QueryEscape(q), nil)
checkErr(t, err)
req.SetBasicAuth("default", "qwerty")
req.Close = true

resp, err := tlsClient.Do(req)
checkErr(t, err)
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK)
}

checkResponse(t, resp.Body, expectedOkResp)

key := &cache.Key{
Query: []byte(q),
AcceptEncoding: "gzip",
Version: cache.Version,
}

rw := httptest.NewRecorder()

cc := proxy.caches["https_cache_max_payload_size"]
cachedData, err := cc.Get(key)

if err != nil {
t.Fatalf("unexpected error while getting response from cache: %s", err)
}

err = RespondWithData(rw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, 200)
if err != nil {
t.Fatalf("unexpected error while getting response from cache: %s", err)
}
checkResponse(t, rw.Body, expectedOkResp)

cachedData.Data.Close()
resp.Body.Close()
},
startTLS,
},
{
"https cache with mix query source",
"testdata/https.cache.yml",
Expand Down Expand Up @@ -794,6 +873,14 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) {
case q == "SELECT 1 FORMAT TabSeparatedWithNamesAndTypes":
w.WriteHeader(http.StatusOK)
w.Write(bytesWithInvalidUTFPairs)
case q == "SELECT MaxPayloadSize":
w.WriteHeader(http.StatusOK)

// generate 10M payload size
b := make([]byte, 10485760)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprint(w, b)
fmt.Fprint(w, "Ok.\n")
default:
if strings.Contains(string(query), killQueryPattern) {
fakeCHState.kill()
Expand Down
9 changes: 8 additions & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ var (
},
[]string{"cache"},
)
cacheSkipped = prometheus.NewCounterVec(
sigua-cs marked this conversation as resolved.
Show resolved Hide resolved
prometheus.CounterOpts{
Name: "cache_payloadsize_too_big_total",
Help: "The amount of too big payloads to be cached",
},
[]string{"cache", "user", "cluster", "cluster_user"},
)
requestDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "request_duration_seconds",
Expand Down Expand Up @@ -194,7 +201,7 @@ func registerMetrics() {
limitExcess, hostPenalties, hostHealth, concurrentQueries,
requestQueueSize, userQueueOverflow, clusterUserQueueOverflow,
requestBodyBytes, responseBodyBytes,
cacheHit, cacheMiss, cacheSize, cacheItems,
cacheHit, cacheMiss, cacheSize, cacheItems, cacheSkipped,
requestDuration, proxiedResponseDuration, cachedResponseDuration,
canceledRequest, timeoutRequest,
configSuccess, configSuccessTime, badRequest)
Expand Down
15 changes: 14 additions & 1 deletion proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,22 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
if err != nil {
err = fmt.Errorf("%s: %w; query: %q", s, err, q)
respondWith(srw, err, http.StatusInternalServerError)
return
}
} else {
// Do not cache responses greater than max payload size.
if contentLength > int64(s.user.cache.MaxPayloadSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should complete ongoing transaction even if cache is being skipped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

cacheSkipped.With(labels).Inc()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a test that will check that the maxpayloadSize works for a payload bigger and doesn't work for a payload smaller. One way to check it in the test would be to look at the cacheSkipped counter

log.Infof("%s: Request will not be cached. Content length (%d) is greater than max payload size (%d)", s, contentLength, s.user.cache.MaxPayloadSize)

rp.completeTransaction(s, statusCode, userCache, key, q)

err = RespondWithData(srw, reader, contentMetadata, 0*time.Second, tmpFileRespWriter.StatusCode())
if err != nil {
err = fmt.Errorf("%s: %w; query: %q", s, err, q)
respondWith(srw, err, http.StatusInternalServerError)
}
return
}
cacheMiss.With(labels).Inc()
log.Debugf("%s: cache miss", s)
expiration, err := userCache.Put(reader, contentMetadata, key)
Expand Down
29 changes: 29 additions & 0 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
"fmt"
"github.com/contentsquare/chproxy/cache"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -249,6 +250,19 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
return makeRequest(p)
},
},
{
cfg: goodCfg,
name: "max payload size limit",
expResponse: okResponse,
expStatusCode: http.StatusOK,
f: func(p *reverseProxy) *http.Response {
p.caches["max_payload_size"] = &cache.AsyncCache{
MaxPayloadSize: 8 * 1024 * 1024,
}
p.users["default"].cache = p.caches["max_payload_size"]
return makeRequest(p)
},
},
{
cfg: goodCfg,
name: "queue overflow for user",
Expand Down Expand Up @@ -397,6 +411,21 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
return makeCustomRequest(p, req)
},
},
{
cfg: authCfg,
name: "post request max payload size",
expResponse: okResponse,
expStatusCode: http.StatusOK,
f: func(p *reverseProxy) *http.Response {
uri := fmt.Sprintf("%s?user=foo&password=bar", fakeServer.URL)
req := httptest.NewRequest("POST", uri, nil)
p.caches["max_payload_size"] = &cache.AsyncCache{
MaxPayloadSize: 8 * 1024 * 1024,
}
p.users["foo"].cache = p.caches["max_payload_size"]
return makeCustomRequest(p, req)
},
},
}

for _, tc := range testCases {
Expand Down
Loading