Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory issue & improve processing speed #212

Merged
merged 12 commits into from
Aug 29, 2022
94 changes: 0 additions & 94 deletions cache/buffered_response_writer.go

This file was deleted.

2 changes: 1 addition & 1 deletion cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ContentMetadata struct {

type CachedData struct {
ContentMetadata
Data io.Reader
Data io.ReadCloser // we need a ReadCloser because the reader is used oustide the scope where it was created and need to be closed by the function using it.
Ttl time.Duration
}

Expand Down
20 changes: 7 additions & 13 deletions cache/filesystem_cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cache

import (
"bytes"
"fmt"
"io"
"math/rand"
Expand All @@ -17,10 +16,6 @@ import (
"github.com/contentsquare/chproxy/log"
)

// Version must be increased with each backward-incompatible change
// in the cache storage.
const Version = 3

var cachefileRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)

// fileSystemCache represents a file cache.
Expand Down Expand Up @@ -101,7 +96,8 @@ func (f *fileSystemCache) Get(key *Key) (*CachedData, error) {
return nil, ErrMissing
}

defer file.Close()
// the file will be closed once it's read as an io.ReaderCloser
// This ReaderCloser is stored in the returned CachedData
fi, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("cache %q: cannot stat %q: %w", f.Name(), fp, err)
Expand All @@ -111,28 +107,26 @@ func (f *fileSystemCache) Get(key *Key) (*CachedData, error) {
if age > f.expire {
// check if file exceeded expiration time + grace time
if age > f.expire+f.grace {
file.Close()
return nil, ErrMissing
}
// Serve expired file in the hope it will be substituted
// with the fresh file during deadline.
}

b, err := io.ReadAll(file)

if err != nil {
file.Close()
return nil, fmt.Errorf("failed to read file content from %q: %w", f.Name(), err)
}

reader := bytes.NewReader(b)

metadata, err := decodeHeader(reader)
metadata, err := decodeHeader(file)
if err != nil {
return nil, err
}

value := &CachedData{
ContentMetadata: *metadata,
Data: reader,
Data: file,
Ttl: f.expire - age,
}

Expand All @@ -141,7 +135,7 @@ func (f *fileSystemCache) Get(key *Key) (*CachedData, error) {

// decodeHeader decodes header from raw byte stream. Data is encoded as follows:
// length(contentType)|contentType|length(contentEncoding)|contentEncoding|length(contentLength)|contentLength|cachedData
func decodeHeader(reader *bytes.Reader) (*ContentMetadata, error) {
func decodeHeader(reader io.Reader) (*ContentMetadata, error) {
contentType, err := readHeader(reader)
if err != nil {
return nil, fmt.Errorf("cannot read Content-Type from provided reader: %w", err)
Expand Down
55 changes: 33 additions & 22 deletions cache/filesystem_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,31 @@ func TestCacheClose(t *testing.T) {
func TestFilesystemCacheAddGet(t *testing.T) {
c := newTestCache(t)
defer c.Close()
c1 := newTestCache(t)
defer c1.Close()
cacheAddGetHelper(t, c, c1)
cacheAddGetHelper(t, c)
}

const maxStringSizeToLog = 30

// metatest used for both filesystem and redis Cache
func cacheAddGetHelper(t *testing.T, c Cache, c1 Cache) {
func cacheAddGetHelper(t *testing.T, c Cache) {

for i := 0; i < 10; i++ {
key := &Key{
Query: []byte(fmt.Sprintf("SELECT %d", i)),
}
trw := &testResponseWriter{}
crw := NewBufferedResponseWriter(trw)

ct := fmt.Sprintf("text/html; %d", i)
crw.Header().Set("Content-Type", ct)
ce := fmt.Sprintf("gzip; %d", i)
crw.Header().Set("Content-Encoding", ce)

value := fmt.Sprintf("value %d", i)
bs := bytes.NewBufferString(value)
if _, err := io.Copy(crw, bs); err != nil {
t.Fatalf("cannot send response to cache: %s", err)
//we want to test what happen we the cache handle a big value
if i == 0 {
// 4MB string
value = strings.Repeat("a", 4*1024*1024)
}

buffer := crw.Reader()

length := int64(len(value))
buffer := strings.NewReader(value)
if _, err := c.Put(buffer, ContentMetadata{Encoding: ce, Type: ct, Length: length}, key); err != nil {
t.Fatalf("failed to put it to cache: %s", err)
}
Expand All @@ -90,14 +86,14 @@ func cacheAddGetHelper(t *testing.T, c Cache, c1 Cache) {

// Verify trw contains valid headers.
if cachedData.Type != ct {
t.Fatalf("unexpected Content-Type: %q; expecting %q", cachedData.Type, ct)
t.Fatalf("unexpected Content-Type: %s; expecting %s", cachedData.Type, ct)
}
if cachedData.Encoding != ce {
t.Fatalf("unexpected Content-Encoding: %q; expecting %q", cachedData.Encoding, ce)
t.Fatalf("unexpected Content-Encoding: %s; expecting %s", cachedData.Encoding, ce)
}
cl := length
if cachedData.Length != cl {
t.Fatalf("unexpected Content-Length: %q; expecting %q", cachedData.Length, cl)
t.Fatalf("unexpected Content-Length: %d; expecting %d", cachedData.Length, cl)
}
buf := new(strings.Builder)
_, err = io.Copy(buf, cachedData.Data)
Expand All @@ -106,7 +102,11 @@ func cacheAddGetHelper(t *testing.T, c Cache, c1 Cache) {
}
// Verify trw contains the response.
if buf.String() != value {
t.Fatalf("unexpected response sent to client: %q; expecting %q", trw.b, value)
logSuffx := ""
if len(value) > maxStringSizeToLog {
logSuffx = "..."
}
t.Fatalf("unexpected response sent to client: %q; expecting %q%s", trw.b, value[:maxStringSizeToLog], logSuffx)
}
}

Expand All @@ -120,18 +120,23 @@ func cacheAddGetHelper(t *testing.T, c Cache, c1 Cache) {
t.Fatalf("failed to get data from filesystem cache: %s", err)
}
value := fmt.Sprintf("value %d", i)
//we want to test what happen we the cache handle a big value
if i == 0 {
// 4MB string
value = strings.Repeat("a", 4*1024*1024)
}
ct := fmt.Sprintf("text/html; %d", i)
ce := fmt.Sprintf("gzip; %d", i)
// Verify trw contains valid headers.
if cachedData.Type != ct {
t.Fatalf("unexpected Content-Type: %q; expecting %q", cachedData.Type, ct)
t.Fatalf("unexpected Content-Type: %s; expecting %s", cachedData.Type, ct)
}
if cachedData.Encoding != ce {
t.Fatalf("unexpected Content-Encoding: %q; expecting %q", cachedData.Encoding, ce)
t.Fatalf("unexpected Content-Encoding: %s; expecting %s", cachedData.Encoding, ce)
}
cl := int64(len(value))
if cachedData.Length != cl {
t.Fatalf("unexpected Content-Length: %q; expecting %q", cachedData.Length, cl)
t.Fatalf("unexpected Content-Length: %d; expecting %d", cachedData.Length, cl)
}
buf := new(strings.Builder)
_, err = io.Copy(buf, cachedData.Data)
Expand Down Expand Up @@ -187,15 +192,21 @@ func TestCacheClean(t *testing.T) {
Query: []byte(fmt.Sprintf("SELECT %d cache clean", i)),
}
trw := &testResponseWriter{}
crw := NewBufferedResponseWriter(trw)
crw, err := NewTmpFileResponseWriter(trw, testTmpWriterDir)
if err != nil {
t.Fatalf("create tmp cache: %s", err)
}

value := fmt.Sprintf("very big value %d", i)
bs := bytes.NewBufferString(value)
if _, err := io.Copy(crw, bs); err != nil {
t.Fatalf("cannot send response to cache: %s", err)
}

reader := crw.Reader()
reader, err := crw.Reader()
if err != nil {
t.Fatalf("failed to put it to cache: %s", err)
}
if _, err := c.Put(reader, ContentMetadata{}, key); err != nil {
t.Fatalf("failed to put it to cache: %s", err)
}
Expand Down
4 changes: 4 additions & 0 deletions cache/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"path/filepath"
)

// Version must be increased with each backward-incompatible change
// in the cache storage.
const Version = 4

// Key is the key for use in the cache.
type Key struct {
// Query must contain full request query.
Expand Down
Loading