From 10b9aa3773eeca2c5684da6bf980baccb10af5f3 Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Wed, 24 Aug 2022 16:28:19 +0200 Subject: [PATCH 01/12] add filebased response writer --- cache/tmp_file_response_writer.go | 149 ++++++++++++++++++++ cache/tmp_file_response_writer_test.go | 186 +++++++++++++++++++++++++ 2 files changed, 335 insertions(+) create mode 100644 cache/tmp_file_response_writer.go create mode 100644 cache/tmp_file_response_writer_test.go diff --git a/cache/tmp_file_response_writer.go b/cache/tmp_file_response_writer.go new file mode 100644 index 00000000..828c975f --- /dev/null +++ b/cache/tmp_file_response_writer.go @@ -0,0 +1,149 @@ +package cache + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" +) + +// TmpFileResponseWriter caches Clickhouse response. +// the http header are kept in memory +type TmpFileResponseWriter struct { + http.ResponseWriter // the original response writer + + contentLength int64 + contentType string + contentEncoding string + headersCaptured bool + statusCode int + + tmpFile *os.File // temporary file for response streaming + bw *bufio.Writer // buffered writer for the temporary file +} + +func NewTmpFileResponseWriter(rw http.ResponseWriter, dir string) (*TmpFileResponseWriter, error) { + f, err := ioutil.TempFile(dir, "tmp") + if err != nil { + return nil, fmt.Errorf("cannot create temporary file in %q: %s", dir, err) + } + return &TmpFileResponseWriter{ + ResponseWriter: rw, + + tmpFile: f, + bw: bufio.NewWriter(f), + }, nil +} + +func (rw *TmpFileResponseWriter) Close() error { + rw.tmpFile.Close() + return os.Remove(rw.tmpFile.Name()) +} + +func (rw *TmpFileResponseWriter) GetFile() (*os.File, error) { + if err := rw.bw.Flush(); err != nil { + fn := rw.tmpFile.Name() + err = rw.tmpFile.Close() + err = os.Remove(fn) + return nil, fmt.Errorf("cannot flush data into %q: %s", fn, err) + } + + return rw.tmpFile, nil +} + +func (rw *TmpFileResponseWriter) Reader() (io.Reader, error) { + f, err := rw.GetFile() + if err != nil { + return nil, fmt.Errorf("cannot open tmp file: %s", err) + } + return f, nil +} + +func (rw *TmpFileResponseWriter) ResetFileOffset() error { + data, err := rw.GetFile() + if err != nil { + return err + } + if _, err := data.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("cannot reset offset in: %s", err) + } + return nil +} + +func (rw *TmpFileResponseWriter) captureHeaders() error { + if rw.headersCaptured { + return nil + } + + rw.headersCaptured = true + h := rw.Header() + + ct := h.Get("Content-Type") + + ce := h.Get("Content-Encoding") + + rw.contentEncoding = ce + rw.contentType = ct + //nb the Content-Lenght http header is not set by CH so we can't get it + return nil +} + +func (rw *TmpFileResponseWriter) GetCapturedContentType() string { + return rw.contentType +} + +func (rw *TmpFileResponseWriter) GetCapturedContentLength() (int64, error) { + if rw.contentLength == 0 { + // Determine Content-Length looking at the file + data, err := rw.GetFile() + if err != nil { + return 0, fmt.Errorf("GetCapturedContentLength: cannot open tmp file: %s", err) + } + + end, err := data.Seek(0, io.SeekEnd) + if err != nil { + return 0, fmt.Errorf("GetCapturedContentLength: cannot determine the last position in: %s", err) + + } + if err := rw.ResetFileOffset(); err != nil { + return 0, err + } + return end - 0, nil + } + return rw.contentLength, nil +} + +func (rw *TmpFileResponseWriter) GetCapturedContentEncoding() string { + return rw.contentEncoding +} + +// CloseNotify implements http.CloseNotifier +func (rw *TmpFileResponseWriter) CloseNotify() <-chan bool { + // The rw.FSResponseWriter must implement http.CloseNotifier. + return rw.ResponseWriter.(http.CloseNotifier).CloseNotify() +} + +// WriteHeader captures response status code. +func (rw *TmpFileResponseWriter) WriteHeader(statusCode int) { + rw.statusCode = statusCode + // Do not call rw.ClickhouseResponseWriter.WriteHeader here + // It will be called explicitly in Finalize / Unregister. +} + +// StatusCode returns captured status code from WriteHeader. +func (rw *TmpFileResponseWriter) StatusCode() int { + if rw.statusCode == 0 { + return http.StatusOK + } + return rw.statusCode +} + +// Write writes b into rw. +func (rw *TmpFileResponseWriter) Write(b []byte) (int, error) { + if err := rw.captureHeaders(); err != nil { + return 0, err + } + return rw.bw.Write(b) +} diff --git a/cache/tmp_file_response_writer_test.go b/cache/tmp_file_response_writer_test.go new file mode 100644 index 00000000..898f502a --- /dev/null +++ b/cache/tmp_file_response_writer_test.go @@ -0,0 +1,186 @@ +package cache + +import ( + "io/ioutil" + "log" + "net/http" + "os" + "testing" +) + +type FakeResponse struct { + t *testing.T + headers http.Header + body []byte + status int +} + +func newFakeResponse() *FakeResponse { + resp := &FakeResponse{ + headers: make(http.Header), + } + resp.headers.Set("Content-Type", "content-type-1") + resp.headers.Set("Content-Encoding", "content-encoding-1") + + return resp +} + +func (r *FakeResponse) Header() http.Header { + return r.headers +} + +func (r *FakeResponse) Write(body []byte) (int, error) { + r.body = body + return len(body), nil +} + +func (r *FakeResponse) WriteHeader(status int) { + r.status = status +} + +const testTmpWriterDir = "./test-tmp-data" + +func init() { + if err := os.RemoveAll(testTmpWriterDir); err != nil { + log.Fatalf("cannot remove %q: %s", testTmpWriterDir, err) + + } + err := os.Mkdir(testTmpWriterDir, 0777) + if err != nil && !os.IsExist(err) { + log.Fatal(err) + } +} + +func TestFileCreation(t *testing.T) { + srw := newFakeResponse() + files, _ := ioutil.ReadDir(testTmpWriterDir) + nbFileBefore := len(files) + + tmpFileRespWriter, err := NewTmpFileResponseWriter(srw, testTmpWriterDir) + defer tmpFileRespWriter.Close() + if err != nil { + t.Fatalf("could not initate TmpFileResponseWriter error:%s", err) + return + } + + files, _ = ioutil.ReadDir(testTmpWriterDir) + nbFileAfter := len(files) + if nbFileAfter == nbFileBefore { + t.Fatalf("Error while creating tmp file") + return + } +} + +func TestFileRemoval(t *testing.T) { + srw := newFakeResponse() + files, _ := ioutil.ReadDir(testTmpWriterDir) + nbFileBefore := len(files) + + tmpFileRespWriter, err := NewTmpFileResponseWriter(srw, testTmpWriterDir) + if err != nil { + t.Fatalf("could not initate TmpFileResponseWriter error:%s", err) + return + } + tmpFileRespWriter.Close() + + files, _ = ioutil.ReadDir(testTmpWriterDir) + nbFileAfter := len(files) + if nbFileAfter != nbFileBefore { + t.Fatalf("Error while deleting tmp file") + return + } +} + +func TestWriteThenReadHeader(t *testing.T) { + srw := newFakeResponse() + + tmpFileRespWriter, err := NewTmpFileResponseWriter(srw, testTmpWriterDir) + defer tmpFileRespWriter.Close() + if err != nil { + t.Fatalf("could not initate TmpFileResponseWriter error:%s", err) + return + } + tmpFileRespWriter.Write([]byte("this is a test1 of length 28")) + + cType := tmpFileRespWriter.GetCapturedContentType() + cEncoding := tmpFileRespWriter.GetCapturedContentEncoding() + cLength, _ := tmpFileRespWriter.GetCapturedContentLength() + if err != nil { + t.Fatalf("could not get ContentLength error:%s", err) + return + } + + if cType != "content-type-1" { + t.Fatalf("wrong value for contentType, got %s, expected %s", cType, "content-type-1") + } + if cEncoding != "content-encoding-1" { + t.Fatalf("wrong value for contentEncoding, got %s, expected %s", cEncoding, "content-encoding-1") + } + if cLength != 28 { + t.Fatalf("wrong value for contentLength, got %d, expected %d", cLength, 28) + } + +} + +func TestWriteThenReadContent(t *testing.T) { + srw := newFakeResponse() + + tmpFileRespWriter, err := NewTmpFileResponseWriter(srw, testTmpWriterDir) + defer tmpFileRespWriter.Close() + if err != nil { + t.Fatalf("could not initate TmpFileResponseWriter error:%s", err) + return + } + expectedContent := "test content" + _, err = tmpFileRespWriter.Write([]byte(expectedContent)) + if err != nil { + t.Fatalf("could not write into tmp file:%s", err) + return + } + reader, err := tmpFileRespWriter.Reader() + if err != nil { + t.Fatalf("could not read tmp file:%s", err) + return + } + tmpFileRespWriter.ResetFileOffset() + buffer, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatalf("could not read tmp file:%s", err) + return + } + if len(buffer) == 0 { + t.Fatalf("read 0 bytes file:") + return + } + s := string(buffer) + + if s != expectedContent { + t.Fatalf("wrong value for contentLength, got %s, expected %s", s, expectedContent) + } +} + +func TestWriteThenReadStatusCode(t *testing.T) { + srw := newFakeResponse() + expectStatusCode1 := http.StatusOK + expectStatusCode2 := 444 + tmpFileRespWriter, err := NewTmpFileResponseWriter(srw, testTmpWriterDir) + defer tmpFileRespWriter.Close() + if err != nil { + t.Fatalf("could not initate TmpFileResponseWriter error:%s", err) + return + } + statusCode := tmpFileRespWriter.StatusCode() + + if expectStatusCode1 != statusCode { + t.Fatalf("wrong value for statusCode, got %d, expected %d", statusCode, expectStatusCode1) + + } + + tmpFileRespWriter.WriteHeader(expectStatusCode2) + statusCode = tmpFileRespWriter.StatusCode() + if expectStatusCode2 != statusCode { + t.Fatalf("wrong value for statusCode, got %d, expected %d", statusCode, expectStatusCode2) + + } + +} From a22ece009cf1c1c28af7f5bd633e7feb3671eaaf Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Wed, 24 Aug 2022 22:30:37 +0200 Subject: [PATCH 02/12] switch from memory to file tmp buff + remove tee --- cache/buffered_response_writer.go | 94 ------------------------------- cache/filesystem_cache_test.go | 27 ++++----- cache/redis_cache_test.go | 9 +-- proxy.go | 62 +++++++++++++------- 4 files changed, 55 insertions(+), 137 deletions(-) delete mode 100644 cache/buffered_response_writer.go diff --git a/cache/buffered_response_writer.go b/cache/buffered_response_writer.go deleted file mode 100644 index bcda1672..00000000 --- a/cache/buffered_response_writer.go +++ /dev/null @@ -1,94 +0,0 @@ -package cache - -import ( - "bytes" - "io" - "net/http" - "strconv" -) - -// BufferedResponseWriter caches Clickhouse response. -// -// Collect response to the buffer, capture headers and status. -type BufferedResponseWriter struct { - http.ResponseWriter // the original response writer - - contentLength int64 - contentType string - contentEncoding string - headersCaptured bool - statusCode int - buffer *bytes.Buffer // buffer of clickhouse raw response -} - -func NewBufferedResponseWriter(rw http.ResponseWriter) *BufferedResponseWriter { - return &BufferedResponseWriter{ - ResponseWriter: rw, - buffer: &bytes.Buffer{}, - } -} - -func (rw *BufferedResponseWriter) Reader() io.Reader { - return rw.buffer -} - -func (rw *BufferedResponseWriter) GetCapturedContentType() string { - return rw.contentType -} - -func (rw *BufferedResponseWriter) GetCapturedContentLength() int64 { - if rw.contentLength == 0 { - rw.contentLength = int64(rw.buffer.Len()) - } - return rw.contentLength -} - -func (rw *BufferedResponseWriter) GetCapturedContentEncoding() string { - return rw.contentEncoding -} - -func (rw *BufferedResponseWriter) captureHeaders() error { - if rw.headersCaptured { - return nil - } - - rw.headersCaptured = true - h := rw.Header() - ct := h.Get("Content-Type") - cl, err := strconv.Atoi(h.Get("Content-Length")) - if err != nil { - cl = 0 - } - ce := h.Get("Content-Encoding") - rw.contentEncoding = ce - rw.contentType = ct - rw.contentLength = int64(cl) - - return nil -} - -// CloseNotify implements http.CloseNotifier -func (rw *BufferedResponseWriter) CloseNotify() <-chan bool { - return rw.ResponseWriter.(http.CloseNotifier).CloseNotify() -} - -// WriteHeader captures response status code. -func (rw *BufferedResponseWriter) WriteHeader(statusCode int) { - rw.statusCode = statusCode -} - -// StatusCode returns captured status code from WriteHeader. -func (rw *BufferedResponseWriter) StatusCode() int { - if rw.statusCode == 0 { - return http.StatusOK - } - return rw.statusCode -} - -// Write writes b into rw. -func (rw *BufferedResponseWriter) Write(b []byte) (int, error) { - if err := rw.captureHeaders(); err != nil { - return 0, err - } - return rw.buffer.Write(b) -} diff --git a/cache/filesystem_cache_test.go b/cache/filesystem_cache_test.go index 1735cee6..c6c9d82c 100644 --- a/cache/filesystem_cache_test.go +++ b/cache/filesystem_cache_test.go @@ -50,35 +50,24 @@ func TestCacheClose(t *testing.T) { func TestFilesystemCacheAddGet(t *testing.T) { c := newTestCache(t) defer c.Close() - c1 := newTestCache(t) - defer c1.Close() - cacheAddGetHelper(t, c, c1) + cacheAddGetHelper(t, c) } // metatest used for both filesystem and redis Cache -func cacheAddGetHelper(t *testing.T, c Cache, c1 Cache) { +func cacheAddGetHelper(t *testing.T, c Cache) { for i := 0; i < 10; i++ { key := &Key{ Query: []byte(fmt.Sprintf("SELECT %d", i)), } trw := &testResponseWriter{} - crw := NewBufferedResponseWriter(trw) ct := fmt.Sprintf("text/html; %d", i) - crw.Header().Set("Content-Type", ct) ce := fmt.Sprintf("gzip; %d", i) - crw.Header().Set("Content-Encoding", ce) - value := fmt.Sprintf("value %d", i) - bs := bytes.NewBufferString(value) - if _, err := io.Copy(crw, bs); err != nil { - t.Fatalf("cannot send response to cache: %s", err) - } - - buffer := crw.Reader() length := int64(len(value)) + buffer := strings.NewReader(value) if _, err := c.Put(buffer, ContentMetadata{Encoding: ce, Type: ct, Length: length}, key); err != nil { t.Fatalf("failed to put it to cache: %s", err) } @@ -187,7 +176,10 @@ func TestCacheClean(t *testing.T) { Query: []byte(fmt.Sprintf("SELECT %d cache clean", i)), } trw := &testResponseWriter{} - crw := NewBufferedResponseWriter(trw) + crw, err := NewTmpFileResponseWriter(trw, testTmpWriterDir) + if err != nil { + t.Fatalf("create tmp cache: %s", err) + } value := fmt.Sprintf("very big value %d", i) bs := bytes.NewBufferString(value) @@ -195,7 +187,10 @@ func TestCacheClean(t *testing.T) { t.Fatalf("cannot send response to cache: %s", err) } - reader := crw.Reader() + reader, err := crw.Reader() + if err != nil { + t.Fatalf("failed to put it to cache: %s", err) + } if _, err := c.Put(reader, ContentMetadata{}, key); err != nil { t.Fatalf("failed to put it to cache: %s", err) } diff --git a/cache/redis_cache_test.go b/cache/redis_cache_test.go index 3b5c902a..3d92fc7d 100644 --- a/cache/redis_cache_test.go +++ b/cache/redis_cache_test.go @@ -1,6 +1,7 @@ package cache import ( + "strings" "testing" "time" @@ -32,9 +33,7 @@ func TestCacheSize(t *testing.T) { t.Fatalf("the cache should be empty") } - trw := &testResponseWriter{} - crw := NewBufferedResponseWriter(trw) - buffer := crw.Reader() + buffer := strings.NewReader("an object") if _, err := redisCache.Put(buffer, ContentMetadata{}, &Key{Query: []byte("SELECT 1")}); err != nil { t.Fatalf("failed to put it to cache: %s", err) @@ -65,12 +64,10 @@ func generateRedisClientAndServer(t *testing.T) *redisCache { func TestRedisCacheAddGet(t *testing.T) { c := generateRedisClientAndServer(t) - c1 := generateRedisClientAndServer(t) defer func() { - c1.Close() c.Close() }() - cacheAddGetHelper(t, c, c1) + cacheAddGetHelper(t, c) } func TestRedisCacheMiss(t *testing.T) { diff --git a/proxy.go b/proxy.go index 09acae5e..bd011ad6 100644 --- a/proxy.go +++ b/proxy.go @@ -1,11 +1,9 @@ package main import ( - "bytes" "context" "errors" "fmt" - "io" "net/http" "net/http/httputil" "net/url" @@ -20,6 +18,9 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// tmpDir temporary path to store ongoing queries results +const tmpDir = "/tmp" + type reverseProxy struct { rp *httputil.ReverseProxy @@ -186,7 +187,7 @@ func (rp *reverseProxy) proxyRequest(s *scope, rw http.ResponseWriter, srw *stat // cache.FSResponseWriter pushes status code to srw on Finalize/Fail actions // but they didn't happen yet, so manually propagate the status code from crw to srw. - if crw, ok := rw.(*cache.BufferedResponseWriter); ok { + if crw, ok := rw.(*cache.TmpFileResponseWriter); ok { srw.statusCode = crw.StatusCode() } @@ -304,7 +305,13 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h // The response wasn't found in the cache. // Request it from clickhouse. - bufferedRespWriter := cache.NewBufferedResponseWriter(srw) + tmpFileRespWriter, err := cache.NewTmpFileResponseWriter(srw, tmpDir) + if err != nil { + err = fmt.Errorf("%s: %s; query: %q", s, err, q) + respondWith(srw, err, http.StatusInternalServerError) + return + } + defer tmpFileRespWriter.Close() // Initialise transaction err = userCache.Create(key) @@ -312,20 +319,30 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h log.Errorf("%s: %s; query: %q - failed to register transaction", s, err, q) } - // proxy request and capture response along with headers to [[BufferedResponseWriter]] - rp.proxyRequest(s, bufferedRespWriter, srw, req) + // proxy request and capture response along with headers to [[TmpFileResponseWriter]] + rp.proxyRequest(s, tmpFileRespWriter, srw, req) - contentEncoding := bufferedRespWriter.GetCapturedContentEncoding() - contentType := bufferedRespWriter.GetCapturedContentType() - contentLength := bufferedRespWriter.GetCapturedContentLength() - reader := bufferedRespWriter.Reader() + contentEncoding := tmpFileRespWriter.GetCapturedContentEncoding() + contentType := tmpFileRespWriter.GetCapturedContentType() + contentLength, err := tmpFileRespWriter.GetCapturedContentLength() + if err != nil { + log.Errorf("%s: %s; query: %q - failed to get contentLength of query", s, err, q) + respondWith(srw, err, http.StatusInternalServerError) + return + } + reader, err := tmpFileRespWriter.Reader() + if err != nil { + log.Errorf("%s: %s; query: %q - failed to get Reader from tmp file", s, err, q) + respondWith(srw, err, http.StatusInternalServerError) + return + } contentMetadata := cache.ContentMetadata{Length: contentLength, Encoding: contentEncoding, Type: contentType} - if bufferedRespWriter.StatusCode() != http.StatusOK || s.canceled { + if tmpFileRespWriter.StatusCode() != http.StatusOK || s.canceled { // Do not cache non-200 or cancelled responses. // Restore the original status code by proxyRequest if it was set. if srw.statusCode != 0 { - bufferedRespWriter.WriteHeader(srw.statusCode) + tmpFileRespWriter.WriteHeader(srw.statusCode) } // mark transaction as failed @@ -335,7 +352,7 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h log.Errorf("%s: %s; query: %q", s, err, q) } - err = RespondWithData(srw, reader, contentMetadata, 0*time.Second, bufferedRespWriter.StatusCode()) + err = RespondWithData(srw, reader, contentMetadata, 0*time.Second, tmpFileRespWriter.StatusCode()) if err != nil { err = fmt.Errorf("%s: %w; query: %q", s, err, q) respondWith(srw, err, http.StatusInternalServerError) @@ -344,21 +361,24 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h } else { cacheMiss.With(labels).Inc() log.Debugf("%s: cache miss", s) - - // we create this buffer to be able to stream data both to cache as well as to an end user - var buf bytes.Buffer - tee := io.TeeReader(reader, &buf) - expiration, err := userCache.Put(tee, contentMetadata, key) + //var expiration = 10 * time.Second + expiration, err := userCache.Put(reader, contentMetadata, key) if err != nil { log.Errorf("%s: %s; query: %q - failed to put response in the cache", s, err, q) } - // mark transaction as completed if err = userCache.Complete(key); err != nil { log.Errorf("%s: %s; query: %q", s, err, q) } - - err = RespondWithData(srw, &buf, contentMetadata, expiration, bufferedRespWriter.StatusCode()) + //we need to reset the offset since the reader of tmpFileRespWriter was already + // consumed in RespondWithData(...) + err = tmpFileRespWriter.ResetFileOffset() + if err != nil { + err = fmt.Errorf("%s: %w; query: %q", s, err, q) + respondWith(srw, err, http.StatusInternalServerError) + return + } + err = RespondWithData(srw, reader, contentMetadata, expiration, tmpFileRespWriter.StatusCode()) if err != nil { err = fmt.Errorf("%s: %w; query: %q", s, err, q) respondWith(srw, err, http.StatusInternalServerError) From 10b6cfad7ec533f7f58093bd34f38a6b66beed2c Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Wed, 24 Aug 2022 23:26:26 +0200 Subject: [PATCH 03/12] avoid memory overhead while fecting from filesystem_cache --- cache/cache.go | 2 +- cache/filesystem_cache.go | 16 +++++++--------- cache/redis_cache.go | 13 ++++++++++++- proxy.go | 2 ++ 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index 2a1c4f18..898af479 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -24,7 +24,7 @@ type ContentMetadata struct { type CachedData struct { ContentMetadata - Data io.Reader + Data io.ReadCloser Ttl time.Duration } diff --git a/cache/filesystem_cache.go b/cache/filesystem_cache.go index 26c337fe..03abddf9 100644 --- a/cache/filesystem_cache.go +++ b/cache/filesystem_cache.go @@ -1,7 +1,6 @@ package cache import ( - "bytes" "fmt" "io" "math/rand" @@ -101,7 +100,8 @@ func (f *fileSystemCache) Get(key *Key) (*CachedData, error) { return nil, ErrMissing } - defer file.Close() + // the file will be closed once it's read as an io.ReaderCloser + // This ReaderCloser is stored in the returned CachedData fi, err := file.Stat() if err != nil { return nil, fmt.Errorf("cache %q: cannot stat %q: %w", f.Name(), fp, err) @@ -111,28 +111,26 @@ func (f *fileSystemCache) Get(key *Key) (*CachedData, error) { if age > f.expire { // check if file exceeded expiration time + grace time if age > f.expire+f.grace { + file.Close() return nil, ErrMissing } // Serve expired file in the hope it will be substituted // with the fresh file during deadline. } - b, err := io.ReadAll(file) - if err != nil { + file.Close() return nil, fmt.Errorf("failed to read file content from %q: %w", f.Name(), err) } - reader := bytes.NewReader(b) - - metadata, err := decodeHeader(reader) + metadata, err := decodeHeader(file) if err != nil { return nil, err } value := &CachedData{ ContentMetadata: *metadata, - Data: reader, + Data: file, Ttl: f.expire - age, } @@ -141,7 +139,7 @@ func (f *fileSystemCache) Get(key *Key) (*CachedData, error) { // decodeHeader decodes header from raw byte stream. Data is encoded as follows: // length(contentType)|contentType|length(contentEncoding)|contentEncoding|length(contentLength)|contentLength|cachedData -func decodeHeader(reader *bytes.Reader) (*ContentMetadata, error) { +func decodeHeader(reader io.Reader) (*ContentMetadata, error) { contentType, err := readHeader(reader) if err != nil { return nil, fmt.Errorf("cannot read Content-Type from provided reader: %w", err) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index 6ddb792f..e8d900d8 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -126,19 +126,30 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { log.Errorf("failed to decode payload: %s , due to: %v ", payload.Payload, err) return nil, ErrMissing } + reader := &io_reader_decorator{Reader: bytes.NewReader(decoded)} value := &CachedData{ ContentMetadata: ContentMetadata{ Length: payload.Length, Type: payload.Type, Encoding: payload.Encoding, }, - Data: bytes.NewReader(decoded), + Data: reader, Ttl: ttl, } return value, nil } +// this struct is here because CachedData requires an io.ReadCloser +// but logic in the the Get function generates only an io.Reader +type io_reader_decorator struct { + io.Reader +} + +func (m io_reader_decorator) Close() error { + return nil +} + func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { data, err := toBytes(reader) if err != nil { diff --git a/proxy.go b/proxy.go index bd011ad6..7cad32dc 100644 --- a/proxy.go +++ b/proxy.go @@ -271,6 +271,7 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h cachedData, err := userCache.Get(key) if err == nil { // The response has been successfully served from cache. + defer cachedData.Data.Close() cacheHit.With(labels).Inc() since := time.Since(startTime).Seconds() cachedResponseDuration.With(labels).Observe(since) @@ -287,6 +288,7 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h } else { if transactionState.IsCompleted() { cachedData, err := userCache.Get(key) + defer cachedData.Data.Close() if err == nil { _ = RespondWithData(srw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, http.StatusOK) cacheHitFromConcurrentQueries.With(labels).Inc() From 0431469cd04ef046114eec3b3a37cb9d97b5d152 Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Thu, 25 Aug 2022 10:17:30 +0200 Subject: [PATCH 04/12] from costly (memory & cpu) json serialisation to binary one --- cache/filesystem_cache.go | 4 -- cache/key.go | 4 ++ cache/redis_cache.go | 109 ++++++++++++++++++++++---------------- cache/redis_cache_test.go | 44 +++++++++++++++ main_test.go | 54 +++++-------------- 5 files changed, 124 insertions(+), 91 deletions(-) diff --git a/cache/filesystem_cache.go b/cache/filesystem_cache.go index 03abddf9..ef32a974 100644 --- a/cache/filesystem_cache.go +++ b/cache/filesystem_cache.go @@ -16,10 +16,6 @@ import ( "github.com/contentsquare/chproxy/log" ) -// Version must be increased with each backward-incompatible change -// in the cache storage. -const Version = 3 - var cachefileRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) // fileSystemCache represents a file cache. diff --git a/cache/key.go b/cache/key.go index 7190a209..b2a1cd01 100644 --- a/cache/key.go +++ b/cache/key.go @@ -8,6 +8,10 @@ import ( "path/filepath" ) +// Version must be increased with each backward-incompatible change +// in the cache storage. +const Version = 4 + // Key is the key for use in the cache. type Key struct { // Query must contain full request query. diff --git a/cache/redis_cache.go b/cache/redis_cache.go index e8d900d8..dd8455b9 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -3,9 +3,8 @@ package cache import ( "bytes" "context" - "encoding/base64" - "encoding/json" "errors" + "fmt" "io" "regexp" "strconv" @@ -26,13 +25,6 @@ const getTimeout = 1 * time.Second const putTimeout = 2 * time.Second const statsTimeout = 500 * time.Millisecond -type redisCachePayload struct { - Length int64 `json:"l"` - Type string `json:"t"` - Encoding string `json:"enc"` - Payload string `json:"payload"` -} - func newRedisCache(client redis.UniversalClient, cfg config.Cache) *redisCache { redisCache := &redisCache{ name: cfg.Name, @@ -95,7 +87,6 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { ctx, cancelFunc := context.WithTimeout(context.Background(), getTimeout) defer cancelFunc() val, err := r.client.Get(ctx, key.String()).Result() - // if key not found in cache if errors.Is(err, redis.Nil) { return nil, ErrMissing @@ -106,35 +97,17 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { log.Errorf("failed to get key %s with error: %s", key.String(), err) return nil, ErrMissing } - - var payload redisCachePayload - err = json.Unmarshal([]byte(val), &payload) - - if err != nil { - log.Errorf("corrupted payload for key %s with error: %s", key.String(), err) - return nil, ErrMissing - } - ttl, err := r.client.TTL(ctx, key.String()).Result() - if err != nil { - log.Errorf("Not able to fetch TTL for: %s ", key) + return nil, fmt.Errorf("failed to ttl of key %s with error: %s", key.String(), err) } + content, reader := r.fromByte([]byte(val)) - decoded, err := base64.StdEncoding.DecodeString(payload.Payload) - if err != nil { - log.Errorf("failed to decode payload: %s , due to: %v ", payload.Payload, err) - return nil, ErrMissing - } - reader := &io_reader_decorator{Reader: bytes.NewReader(decoded)} + reader2 := &io_reader_decorator{Reader: reader} value := &CachedData{ - ContentMetadata: ContentMetadata{ - Length: payload.Length, - Type: payload.Type, - Encoding: payload.Encoding, - }, - Data: reader, - Ttl: ttl, + ContentMetadata: *content, + Data: reader2, + Ttl: ttl, } return value, nil @@ -149,27 +122,63 @@ type io_reader_decorator struct { func (m io_reader_decorator) Close() error { return nil } +func (r *redisCache) stringToBytes(s string) []byte { + n := uint32(len(s)) -func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { + b := make([]byte, 0, n+4) + b = append(b, byte(n>>24), byte(n>>16), byte(n>>8), byte(n)) + b = append(b, s...) + return b +} + +func (r *redisCache) stringFromBytes(bytes []byte) (string, int) { + b := bytes[:4] + n := uint32(b[3]) | (uint32(b[2]) << 8) | (uint32(b[1]) << 16) | (uint32(b[0]) << 24) + s := bytes[4 : 4+n] + return string(s), int(4 + n) +} + +func (r *redisCache) toByte(contentMetadata *ContentMetadata, reader io.Reader) ([]byte, error) { data, err := toBytes(reader) if err != nil { - return 0, err + return nil, err } + cLength := contentMetadata.Length + cType := r.stringToBytes(contentMetadata.Type) + cEncoding := r.stringToBytes(contentMetadata.Encoding) + b := make([]byte, 0, len(data)+len(cEncoding)+len(cType)+8) + b = append(b, byte(cLength>>56), byte(cLength>>48), byte(cLength>>40), byte(cLength>>32), byte(cLength>>24), byte(cLength>>16), byte(cLength>>8), byte(cLength)) + b = append(b, cType...) + b = append(b, cEncoding...) + b = append(b, data...) + return b, nil +} - encoded := base64.StdEncoding.EncodeToString(data) - payload := &redisCachePayload{ - Length: contentMetadata.Length, Type: contentMetadata.Type, Encoding: contentMetadata.Encoding, Payload: encoded, - } +func (r *redisCache) fromByte(b []byte) (*ContentMetadata, io.Reader) { + cLength := uint64(b[7]) | (uint64(b[6]) << 8) | (uint64(b[5]) << 16) | (uint64(b[4]) << 24) | uint64(b[3])<<32 | (uint64(b[2]) << 40) | (uint64(b[1]) << 48) | (uint64(b[0]) << 56) + offset := 8 + cType, sizeCType := r.stringFromBytes(b[offset:]) + offset += sizeCType + cEncoding, sizeCEncoding := r.stringFromBytes(b[offset:]) + offset += sizeCEncoding + payload := b[offset:] + metadata := &ContentMetadata{ + Length: int64(cLength), + Type: cType, + Encoding: cEncoding, + } + return metadata, bytes.NewReader(payload) +} + +func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { - marshalled, err := json.Marshal(payload) + payload, err := r.toByte(&contentMetadata, reader) if err != nil { - return 0, nil + return 0, err } - ctx, cancelFunc := context.WithTimeout(context.Background(), putTimeout) defer cancelFunc() - err = r.client.Set(ctx, key.String(), marshalled, r.expire).Err() - + err = r.client.Set(ctx, key.String(), payload, r.expire).Err() if err != nil { return 0, err } @@ -182,11 +191,17 @@ func (r *redisCache) Name() string { } func toBytes(stream io.Reader) ([]byte, error) { - buf := new(bytes.Buffer) + b, err := io.ReadAll(stream) + if err != nil { + return nil, err + } + return b, nil + + /*buf := new(bytes.Buffer) _, err := buf.ReadFrom(stream) if err != nil { return nil, err } - return buf.Bytes(), nil + return buf.Bytes(), nil*/ } diff --git a/cache/redis_cache_test.go b/cache/redis_cache_test.go index 3d92fc7d..093d13e2 100644 --- a/cache/redis_cache_test.go +++ b/cache/redis_cache_test.go @@ -1,6 +1,7 @@ package cache import ( + "io" "strings" "testing" "time" @@ -74,3 +75,46 @@ func TestRedisCacheMiss(t *testing.T) { c := generateRedisClientAndServer(t) cacheMissHelper(t, c) } +func TestStringFromToByte(t *testing.T) { + c := generateRedisClientAndServer(t) + b := c.stringToBytes("test") + s, size := c.stringFromBytes(b) + if s != "test" { + t.Fatalf("got: %s, expected %s", s, "test") + } + if size != 8 { + t.Fatalf("got: %d, expected %d", size, 8) + } +} +func TestPayloadFromToByte(t *testing.T) { + c := generateRedisClientAndServer(t) + + expectedMetadata := &ContentMetadata{ + Length: 12, + Type: "json", + Encoding: "gzip", + } + expectedContent := "abcdef" + r := strings.NewReader(expectedContent) + + b, err := c.toByte(expectedMetadata, r) + + if err != nil { + t.Fatalf("error during serialization %s", err) + } + metadata, reader := c.fromByte(b) + if metadata.Encoding != expectedMetadata.Encoding { + t.Fatalf("got: %s, expected %s", metadata.Encoding, expectedMetadata.Encoding) + } + if metadata.Type != expectedMetadata.Type { + t.Fatalf("got: %s, expected %s", metadata.Type, expectedMetadata.Type) + } + if metadata.Length != expectedMetadata.Length { + t.Fatalf("got: %d, expected %d", metadata.Length, expectedMetadata.Length) + } + content, _ := io.ReadAll(reader) + if string(content) != expectedContent { + t.Fatalf("got: %s, expected %s", string(content), expectedContent) + } + +} diff --git a/main_test.go b/main_test.go index 6e549286..26d53971 100644 --- a/main_test.go +++ b/main_test.go @@ -5,8 +5,6 @@ import ( "compress/gzip" "context" "crypto/tls" - "encoding/base64" - "encoding/json" "fmt" "io" "net" @@ -349,13 +347,18 @@ func TestServe(t *testing.T) { q := "SELECT redis_cache_mate" req, err := http.NewRequest("GET", "http://127.0.0.1:9090?query="+url.QueryEscape(q), nil) checkErr(t, err) + keys := redisClient.Keys() + // redis should be empty before the test + if len(keys) != 0 { + t.Fatalf("unexpected amount of keys in redis: %v", len(keys)) + } resp := httpRequest(t, req, http.StatusOK) checkResponse(t, resp.Body, expectedOkResp) resp2 := httpRequest(t, req, http.StatusOK) checkResponse(t, resp2.Body, expectedOkResp) - keys := redisClient.Keys() - if len(keys) != 2 { // 2 because there is a record stored for transaction and a cache item + keys = redisClient.Keys() + if len(keys) != 2 { // expected 2 because there is a record stored for transaction and a cache item t.Fatalf("unexpected amount of keys in redis: %v", len(keys)) } @@ -365,12 +368,6 @@ func TestServe(t *testing.T) { AcceptEncoding: "gzip", Version: cache.Version, } - str, err := redisClient.Get(key.String()) - checkErr(t, err) - - if !strings.Contains(str, base64.StdEncoding.EncodeToString([]byte("Ok."))) || !strings.Contains(str, "text/plain") || !strings.Contains(str, "charset=utf-8") { - t.Fatalf("result from cache query is wrong: %s", str) - } duration := redisClient.TTL(key.String()) if duration > 1*time.Minute || duration < 30*time.Second { @@ -385,6 +382,12 @@ func TestServe(t *testing.T) { func(t *testing.T) { redisClient.FlushAll() q := "SELECT 1 FORMAT TabSeparatedWithNamesAndTypes" + keys := redisClient.Keys() + // redis should be empty before the test + if len(keys) != 0 { + t.Fatalf("unexpected amount of keys in redis: %v", len(keys)) + } + req, err := http.NewRequest("GET", "http://127.0.0.1:9090?query="+url.QueryEscape(q), nil) checkErr(t, err) @@ -393,39 +396,10 @@ func TestServe(t *testing.T) { resp2 := httpRequest(t, req, http.StatusOK) // if we do not use base64 to encode/decode the cached payload, EOF error will be thrown here. checkResponse(t, resp2.Body, string(bytesWithInvalidUTFPairs)) - keys := redisClient.Keys() + keys = redisClient.Keys() if len(keys) != 2 { // 2 because there is a record stored for transaction, and a cache item t.Fatalf("unexpected amount of keys in redis: %v", len(keys)) } - - // check cached response - key := &cache.Key{ - Query: []byte(q), - AcceptEncoding: "gzip", - Version: cache.Version, - } - str, err := redisClient.Get(key.String()) - checkErr(t, err) - - type redisCachePayload struct { - Length int64 `json:"l"` - Type string `json:"t"` - Encoding string `json:"enc"` - Payload string `json:"payload"` - } - - var unMarshaledPayload redisCachePayload - err = json.Unmarshal([]byte(str), &unMarshaledPayload) - checkErr(t, err) - if unMarshaledPayload.Payload != base64.StdEncoding.EncodeToString(bytesWithInvalidUTFPairs) { - t.Fatalf("result from cache query is wrong: %s", str) - } - decoded, err := base64.StdEncoding.DecodeString(unMarshaledPayload.Payload) - checkErr(t, err) - - if unMarshaledPayload.Length != int64(len(decoded)) { - t.Fatalf("the declared length %d and actual length %d is not same", unMarshaledPayload.Length, len(decoded)) - } }, startHTTP, }, From 7a74fbd32b5cd888b8d98c61547f573cce9e21fe Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Thu, 25 Aug 2022 13:05:14 +0200 Subject: [PATCH 05/12] make insertion in redis memory & cpu efficient --- cache/redis_cache.go | 65 ++++++++++++++++++++++++--------------- cache/redis_cache_test.go | 17 +++------- 2 files changed, 45 insertions(+), 37 deletions(-) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index dd8455b9..f3461f0b 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -22,7 +22,7 @@ type redisCache struct { } const getTimeout = 1 * time.Second -const putTimeout = 2 * time.Second +const putTimeout = 5 * time.Second //the put is long engouh for very large cached result (+200MB) because it's also linked to the spead of the reader const statsTimeout = 500 * time.Millisecond func newRedisCache(client redis.UniversalClient, cfg config.Cache) *redisCache { @@ -138,50 +138,73 @@ func (r *redisCache) stringFromBytes(bytes []byte) (string, int) { return string(s), int(4 + n) } -func (r *redisCache) toByte(contentMetadata *ContentMetadata, reader io.Reader) ([]byte, error) { - data, err := toBytes(reader) - if err != nil { - return nil, err - } +func (r *redisCache) metadataToByte(contentMetadata *ContentMetadata) []byte { + cLength := contentMetadata.Length cType := r.stringToBytes(contentMetadata.Type) cEncoding := r.stringToBytes(contentMetadata.Encoding) - b := make([]byte, 0, len(data)+len(cEncoding)+len(cType)+8) + b := make([]byte, 0, len(cEncoding)+len(cType)+8) b = append(b, byte(cLength>>56), byte(cLength>>48), byte(cLength>>40), byte(cLength>>32), byte(cLength>>24), byte(cLength>>16), byte(cLength>>8), byte(cLength)) b = append(b, cType...) b = append(b, cEncoding...) - b = append(b, data...) - return b, nil + return b } - -func (r *redisCache) fromByte(b []byte) (*ContentMetadata, io.Reader) { +func (r *redisCache) metadataFromByte(b []byte) (*ContentMetadata, int) { cLength := uint64(b[7]) | (uint64(b[6]) << 8) | (uint64(b[5]) << 16) | (uint64(b[4]) << 24) | uint64(b[3])<<32 | (uint64(b[2]) << 40) | (uint64(b[1]) << 48) | (uint64(b[0]) << 56) offset := 8 cType, sizeCType := r.stringFromBytes(b[offset:]) offset += sizeCType cEncoding, sizeCEncoding := r.stringFromBytes(b[offset:]) offset += sizeCEncoding - payload := b[offset:] metadata := &ContentMetadata{ Length: int64(cLength), Type: cType, Encoding: cEncoding, } + return metadata, offset +} + +func (r *redisCache) fromByte(b []byte) (*ContentMetadata, io.Reader) { + metadata, offset := r.metadataFromByte(b) + payload := b[offset:] return metadata, bytes.NewReader(payload) } func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { - payload, err := r.toByte(&contentMetadata, reader) - if err != nil { - return 0, err - } + medatadata := r.metadataToByte(&contentMetadata) + ctx, cancelFunc := context.WithTimeout(context.Background(), putTimeout) defer cancelFunc() - err = r.client.Set(ctx, key.String(), payload, r.expire).Err() + stringKey := key.String() + err := r.client.Set(ctx, stringKey, medatadata, r.expire).Err() if err != nil { return 0, err } + //we don't read all the reader content then send it in one call to redis to avoid memory issue + //if the content is big (which is the case when chproxy users are fetching a lot of data) + buffer := make([]byte, 2*1024*1024) + for { + n, err := reader.Read(buffer) + // the reader should return an err = io.EOF once it has nothing to read or at the last read call with content. + // But this is not the case with this reader so we check the condition n == 0 to exit the read loop. + // We kept the err == io.EOF in the loop in case the behavior of the reader changes + + if n == 0 { + break + } + if err != nil && err != io.EOF { + return 0, err + } + err = r.client.Append(ctx, stringKey, string(buffer[:n])).Err() + if err != nil { + return 0, err + } + if err == io.EOF { + break + } + + } return r.expire, nil } @@ -196,12 +219,4 @@ func toBytes(stream io.Reader) ([]byte, error) { return nil, err } return b, nil - - /*buf := new(bytes.Buffer) - - _, err := buf.ReadFrom(stream) - if err != nil { - return nil, err - } - return buf.Bytes(), nil*/ } diff --git a/cache/redis_cache_test.go b/cache/redis_cache_test.go index 093d13e2..2f27827a 100644 --- a/cache/redis_cache_test.go +++ b/cache/redis_cache_test.go @@ -1,7 +1,6 @@ package cache import ( - "io" "strings" "testing" "time" @@ -86,7 +85,7 @@ func TestStringFromToByte(t *testing.T) { t.Fatalf("got: %d, expected %d", size, 8) } } -func TestPayloadFromToByte(t *testing.T) { +func TestMetadataFromToByte(t *testing.T) { c := generateRedisClientAndServer(t) expectedMetadata := &ContentMetadata{ @@ -94,15 +93,10 @@ func TestPayloadFromToByte(t *testing.T) { Type: "json", Encoding: "gzip", } - expectedContent := "abcdef" - r := strings.NewReader(expectedContent) - b, err := c.toByte(expectedMetadata, r) + b := c.metadataToByte(expectedMetadata) - if err != nil { - t.Fatalf("error during serialization %s", err) - } - metadata, reader := c.fromByte(b) + metadata, size := c.metadataFromByte(b) if metadata.Encoding != expectedMetadata.Encoding { t.Fatalf("got: %s, expected %s", metadata.Encoding, expectedMetadata.Encoding) } @@ -112,9 +106,8 @@ func TestPayloadFromToByte(t *testing.T) { if metadata.Length != expectedMetadata.Length { t.Fatalf("got: %d, expected %d", metadata.Length, expectedMetadata.Length) } - content, _ := io.ReadAll(reader) - if string(content) != expectedContent { - t.Fatalf("got: %s, expected %s", string(content), expectedContent) + if size != 24 { + t.Fatalf("got: %d, expected %d", size, 24) } } From f9631d9a4dfb8b5f53eae28a5441a2202115ec51 Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Thu, 25 Aug 2022 14:31:05 +0200 Subject: [PATCH 06/12] fix linter issues --- cache/redis_cache.go | 26 +++++++------------------- cache/tmp_file_response_writer.go | 15 +++++++-------- proxy.go | 5 ++--- 3 files changed, 16 insertions(+), 30 deletions(-) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index f3461f0b..70af4de3 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -22,7 +22,7 @@ type redisCache struct { } const getTimeout = 1 * time.Second -const putTimeout = 5 * time.Second //the put is long engouh for very large cached result (+200MB) because it's also linked to the spead of the reader +const putTimeout = 5 * time.Second // the put is long engouh for very large cached result (+200MB) because it's also linked to the spead of the reader const statsTimeout = 500 * time.Millisecond func newRedisCache(client redis.UniversalClient, cfg config.Cache) *redisCache { @@ -99,7 +99,7 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { } ttl, err := r.client.TTL(ctx, key.String()).Result() if err != nil { - return nil, fmt.Errorf("failed to ttl of key %s with error: %s", key.String(), err) + return nil, fmt.Errorf("failed to ttl of key %s with error: %w", key.String(), err) } content, reader := r.fromByte([]byte(val)) @@ -139,7 +139,6 @@ func (r *redisCache) stringFromBytes(bytes []byte) (string, int) { } func (r *redisCache) metadataToByte(contentMetadata *ContentMetadata) []byte { - cLength := contentMetadata.Length cType := r.stringToBytes(contentMetadata.Type) cEncoding := r.stringToBytes(contentMetadata.Encoding) @@ -149,6 +148,7 @@ func (r *redisCache) metadataToByte(contentMetadata *ContentMetadata) []byte { b = append(b, cEncoding...) return b } + func (r *redisCache) metadataFromByte(b []byte) (*ContentMetadata, int) { cLength := uint64(b[7]) | (uint64(b[6]) << 8) | (uint64(b[5]) << 16) | (uint64(b[4]) << 24) | uint64(b[3])<<32 | (uint64(b[2]) << 40) | (uint64(b[1]) << 48) | (uint64(b[0]) << 56) offset := 8 @@ -171,9 +171,7 @@ func (r *redisCache) fromByte(b []byte) (*ContentMetadata, io.Reader) { } func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { - medatadata := r.metadataToByte(&contentMetadata) - ctx, cancelFunc := context.WithTimeout(context.Background(), putTimeout) defer cancelFunc() stringKey := key.String() @@ -181,8 +179,8 @@ func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key if err != nil { return 0, err } - //we don't read all the reader content then send it in one call to redis to avoid memory issue - //if the content is big (which is the case when chproxy users are fetching a lot of data) + // we don't read all the reader content then send it in one call to redis to avoid memory issue + // if the content is big (which is the case when chproxy users are fetching a lot of data) buffer := make([]byte, 2*1024*1024) for { n, err := reader.Read(buffer) @@ -193,30 +191,20 @@ func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key if n == 0 { break } - if err != nil && err != io.EOF { + if err != nil && !errors.Is(err, io.EOF) { return 0, err } err = r.client.Append(ctx, stringKey, string(buffer[:n])).Err() if err != nil { return 0, err } - if err == io.EOF { + if errors.Is(err, io.EOF) { break } - } - return r.expire, nil } func (r *redisCache) Name() string { return r.name } - -func toBytes(stream io.Reader) ([]byte, error) { - b, err := io.ReadAll(stream) - if err != nil { - return nil, err - } - return b, nil -} diff --git a/cache/tmp_file_response_writer.go b/cache/tmp_file_response_writer.go index 828c975f..a515f8f6 100644 --- a/cache/tmp_file_response_writer.go +++ b/cache/tmp_file_response_writer.go @@ -27,7 +27,7 @@ type TmpFileResponseWriter struct { func NewTmpFileResponseWriter(rw http.ResponseWriter, dir string) (*TmpFileResponseWriter, error) { f, err := ioutil.TempFile(dir, "tmp") if err != nil { - return nil, fmt.Errorf("cannot create temporary file in %q: %s", dir, err) + return nil, fmt.Errorf("cannot create temporary file in %q: %w", dir, err) } return &TmpFileResponseWriter{ ResponseWriter: rw, @@ -47,7 +47,7 @@ func (rw *TmpFileResponseWriter) GetFile() (*os.File, error) { fn := rw.tmpFile.Name() err = rw.tmpFile.Close() err = os.Remove(fn) - return nil, fmt.Errorf("cannot flush data into %q: %s", fn, err) + return nil, fmt.Errorf("cannot flush data into %q: %w", fn, err) } return rw.tmpFile, nil @@ -56,7 +56,7 @@ func (rw *TmpFileResponseWriter) GetFile() (*os.File, error) { func (rw *TmpFileResponseWriter) Reader() (io.Reader, error) { f, err := rw.GetFile() if err != nil { - return nil, fmt.Errorf("cannot open tmp file: %s", err) + return nil, fmt.Errorf("cannot open tmp file: %w", err) } return f, nil } @@ -67,7 +67,7 @@ func (rw *TmpFileResponseWriter) ResetFileOffset() error { return err } if _, err := data.Seek(0, io.SeekStart); err != nil { - return fmt.Errorf("cannot reset offset in: %s", err) + return fmt.Errorf("cannot reset offset in: %w", err) } return nil } @@ -86,7 +86,7 @@ func (rw *TmpFileResponseWriter) captureHeaders() error { rw.contentEncoding = ce rw.contentType = ct - //nb the Content-Lenght http header is not set by CH so we can't get it + // nb: the Content-Length http header is not set by CH so we can't get it return nil } @@ -99,13 +99,12 @@ func (rw *TmpFileResponseWriter) GetCapturedContentLength() (int64, error) { // Determine Content-Length looking at the file data, err := rw.GetFile() if err != nil { - return 0, fmt.Errorf("GetCapturedContentLength: cannot open tmp file: %s", err) + return 0, fmt.Errorf("GetCapturedContentLength: cannot open tmp file: %w", err) } end, err := data.Seek(0, io.SeekEnd) if err != nil { - return 0, fmt.Errorf("GetCapturedContentLength: cannot determine the last position in: %s", err) - + return 0, fmt.Errorf("GetCapturedContentLength: cannot determine the last position in: %w", err) } if err := rw.ResetFileOffset(); err != nil { return 0, err diff --git a/proxy.go b/proxy.go index 7cad32dc..6a0f27c8 100644 --- a/proxy.go +++ b/proxy.go @@ -309,7 +309,7 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h // Request it from clickhouse. tmpFileRespWriter, err := cache.NewTmpFileResponseWriter(srw, tmpDir) if err != nil { - err = fmt.Errorf("%s: %s; query: %q", s, err, q) + err = fmt.Errorf("%s: %w; query: %q", s, err, q) respondWith(srw, err, http.StatusInternalServerError) return } @@ -363,7 +363,6 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h } else { cacheMiss.With(labels).Inc() log.Debugf("%s: cache miss", s) - //var expiration = 10 * time.Second expiration, err := userCache.Put(reader, contentMetadata, key) if err != nil { log.Errorf("%s: %s; query: %q - failed to put response in the cache", s, err, q) @@ -372,7 +371,7 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h if err = userCache.Complete(key); err != nil { log.Errorf("%s: %s; query: %q", s, err, q) } - //we need to reset the offset since the reader of tmpFileRespWriter was already + // we need to reset the offset since the reader of tmpFileRespWriter was already // consumed in RespondWithData(...) err = tmpFileRespWriter.ResetFileOffset() if err != nil { From 9ac36d3c10ad1e797c100bff97831f5ac4b08049 Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Fri, 26 Aug 2022 01:42:29 +0200 Subject: [PATCH 07/12] make fetching from redis memory & cpu efficient --- cache/filesystem_cache_test.go | 28 ++++++-- cache/redis_cache.go | 123 +++++++++++++++++++++++++++++++-- 2 files changed, 137 insertions(+), 14 deletions(-) diff --git a/cache/filesystem_cache_test.go b/cache/filesystem_cache_test.go index c6c9d82c..26bc200d 100644 --- a/cache/filesystem_cache_test.go +++ b/cache/filesystem_cache_test.go @@ -65,6 +65,11 @@ func cacheAddGetHelper(t *testing.T, c Cache) { ct := fmt.Sprintf("text/html; %d", i) ce := fmt.Sprintf("gzip; %d", i) value := fmt.Sprintf("value %d", i) + //we want to test what happen we the cache handle a big value + if i == 0 { + // 4MB string + value = strings.Repeat("a", 4*1024*1024) + } length := int64(len(value)) buffer := strings.NewReader(value) @@ -79,14 +84,14 @@ func cacheAddGetHelper(t *testing.T, c Cache) { // Verify trw contains valid headers. if cachedData.Type != ct { - t.Fatalf("unexpected Content-Type: %q; expecting %q", cachedData.Type, ct) + t.Fatalf("unexpected Content-Type: %s; expecting %s", cachedData.Type, ct) } if cachedData.Encoding != ce { - t.Fatalf("unexpected Content-Encoding: %q; expecting %q", cachedData.Encoding, ce) + t.Fatalf("unexpected Content-Encoding: %s; expecting %s", cachedData.Encoding, ce) } cl := length if cachedData.Length != cl { - t.Fatalf("unexpected Content-Length: %q; expecting %q", cachedData.Length, cl) + t.Fatalf("unexpected Content-Length: %d; expecting %d", cachedData.Length, cl) } buf := new(strings.Builder) _, err = io.Copy(buf, cachedData.Data) @@ -95,7 +100,11 @@ func cacheAddGetHelper(t *testing.T, c Cache) { } // Verify trw contains the response. if buf.String() != value { - t.Fatalf("unexpected response sent to client: %q; expecting %q", trw.b, value) + conditionalStr := "" + if len(value) > 30 { + conditionalStr = "..." + } + t.Fatalf("unexpected response sent to client: %q; expecting %q%s", trw.b, value[:30], conditionalStr) } } @@ -109,18 +118,23 @@ func cacheAddGetHelper(t *testing.T, c Cache) { t.Fatalf("failed to get data from filesystem cache: %s", err) } value := fmt.Sprintf("value %d", i) + //we want to test what happen we the cache handle a big value + if i == 0 { + // 4MB string + value = strings.Repeat("a", 4*1024*1024) + } ct := fmt.Sprintf("text/html; %d", i) ce := fmt.Sprintf("gzip; %d", i) // Verify trw contains valid headers. if cachedData.Type != ct { - t.Fatalf("unexpected Content-Type: %q; expecting %q", cachedData.Type, ct) + t.Fatalf("unexpected Content-Type: %s; expecting %s", cachedData.Type, ct) } if cachedData.Encoding != ce { - t.Fatalf("unexpected Content-Encoding: %q; expecting %q", cachedData.Encoding, ce) + t.Fatalf("unexpected Content-Encoding: %s; expecting %s", cachedData.Encoding, ce) } cl := int64(len(value)) if cachedData.Length != cl { - t.Fatalf("unexpected Content-Length: %q; expecting %q", cachedData.Length, cl) + t.Fatalf("unexpected Content-Length: %d; expecting %d", cachedData.Length, cl) } buf := new(strings.Builder) _, err = io.Copy(buf, cachedData.Data) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index 70af4de3..00eaa953 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -22,7 +22,7 @@ type redisCache struct { } const getTimeout = 1 * time.Second -const putTimeout = 5 * time.Second // the put is long engouh for very large cached result (+200MB) because it's also linked to the spead of the reader +const putTimeout = 10 * time.Second // the put is long enough for very large cached result (+200MB) because it's also linked to the speed of the reader const statsTimeout = 500 * time.Millisecond func newRedisCache(client redis.UniversalClient, cfg config.Cache) *redisCache { @@ -83,6 +83,7 @@ func (r *redisCache) nbOfBytes() uint64 { return uint64(cacheSize) } +/* func (r *redisCache) Get(key *Key) (*CachedData, error) { ctx, cancelFunc := context.WithTimeout(context.Background(), getTimeout) defer cancelFunc() @@ -112,6 +113,56 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { return value, nil } +*/ + +func (r *redisCache) Get(key *Key) (*CachedData, error) { + ctx, cancelFunc := context.WithTimeout(context.Background(), getTimeout) + defer cancelFunc() + nbBytesToFetch := int64(100 * 1024) + // fetching 100kBytes from redis to be sure to have the full metadata and, + // for most of the queries that fetch a few data, the cached results + val, err := r.client.GetRange(ctx, key.String(), 0, nbBytesToFetch).Result() + // if key not found in cache + if errors.Is(err, redis.Nil) || len(val) == 0 { + return nil, ErrMissing + } + + // others errors, such as timeouts + if err != nil { + log.Errorf("failed to get key %s with error: %s", key.String(), err) + return nil, ErrMissing + } + stringKey := key.String() + ttl, err := r.client.TTL(ctx, stringKey).Result() + if err != nil { + return nil, fmt.Errorf("failed to ttl of key %s with error: %w", key.String(), err) + } + b := []byte(val) + metadata, offset := r.metadataFromByte(b) + if (int64(offset) + metadata.Length) < nbBytesToFetch { + // the condition is true ony if the bytes fetched contain the metadata + the cached results + // so we extract from the remaining bytes the cached results + payload := b[offset:] + reader := &io_reader_decorator{Reader: bytes.NewReader(payload)} + value := &CachedData{ + ContentMetadata: *metadata, + Data: reader, + Ttl: ttl, + } + + return value, nil + } + // since the cached results in redis are too big, we can't fetch all of them because of the memory overhead. + // we will create an io.reader that will fetch redis bulk by bulk to reduce the memory usage + reader := NewRedisStreamReader(uint64(offset), r.client, stringKey) + value := &CachedData{ + ContentMetadata: *metadata, + Data: &io_reader_decorator{Reader: reader}, + Ttl: ttl, + } + + return value, nil +} // this struct is here because CachedData requires an io.ReadCloser // but logic in the the Get function generates only an io.Reader @@ -164,12 +215,6 @@ func (r *redisCache) metadataFromByte(b []byte) (*ContentMetadata, int) { return metadata, offset } -func (r *redisCache) fromByte(b []byte) (*ContentMetadata, io.Reader) { - metadata, offset := r.metadataFromByte(b) - payload := b[offset:] - return metadata, bytes.NewReader(payload) -} - func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { medatadata := r.metadataToByte(&contentMetadata) ctx, cancelFunc := context.WithTimeout(context.Background(), putTimeout) @@ -208,3 +253,67 @@ func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key func (r *redisCache) Name() string { return r.name } + +type RedisStreamReader struct { + isRedisEOF bool + redisOffset uint64 // the redisOffset that gives the beginning of the next bulk to fetch + key string // the key of the value we want to stream from redis + buffer []byte // internal buffer to store the bulks fetched from redis + bufferOffset int // the offset of the buffer that keep were the read() need to start copying data + client redis.UniversalClient // the redis client +} + +func NewRedisStreamReader(offset uint64, client redis.UniversalClient, key string) *RedisStreamReader { + bufferSize := uint64(2 * 1024 * 1024) + return &RedisStreamReader{ + isRedisEOF: false, + redisOffset: offset, + key: key, + bufferOffset: int(bufferSize), + buffer: make([]byte, bufferSize), + client: client, + } +} + +func (r *RedisStreamReader) Read(destBuf []byte) (n int, err error) { + // the logic is simple: + // 1) if the buffer still has data to write, it writes it into destBuf with overflowing destBuf + // 2) if the buffer only has already written data, the StreamRedis refresh the buffer with new data from redis + // 3) if the buffer only has already written data & redis has no more data to read then StreamRedis sends an EOF err + bufSize := len(r.buffer) + bytesWritten := 0 + // case 3) both the buffer & redis were fully consumed, we can tell the reader to stop reading + if r.bufferOffset >= bufSize && r.isRedisEOF { + return 0, io.EOF + } + + // case 2) the buffer only has already written data, we need to refresh it with redis datas + if r.bufferOffset >= bufSize { + ctx, cancelFunc := context.WithTimeout(context.Background(), getTimeout) + defer cancelFunc() + newBuf, err := r.client.GetRange(ctx, r.key, int64(r.redisOffset), int64(r.redisOffset+uint64(bufSize))).Result() + r.redisOffset += uint64(len(newBuf)) + if errors.Is(err, redis.Nil) || len(newBuf) == 0 { + r.isRedisEOF = true + } + // if redis gave less data than asked it means that it reached the end of the value + if len(newBuf) < bufSize { + r.isRedisEOF = true + } + + // others errors, such as timeouts + if err != nil && !errors.Is(err, redis.Nil) { + log.Errorf("failed to get key %s with error: %s", r.key, err) + return bytesWritten, err + } + r.bufferOffset = 0 + r.buffer = []byte(newBuf) + } + + // case 1) the buffer contains data to write into destBuf + if r.bufferOffset < bufSize { + bytesWritten = copy(destBuf, r.buffer[r.bufferOffset:]) + r.bufferOffset += bytesWritten + } + return bytesWritten, nil +} From 5f15b818618ea4c42ac1f3bdda17bdcf0a185784 Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Fri, 26 Aug 2022 19:20:45 +0200 Subject: [PATCH 08/12] fix from PR's comments --- cache/cache.go | 2 +- cache/filesystem_cache_test.go | 10 +++-- cache/redis_cache.go | 76 ++++++++++------------------------ cache/redis_cache_test.go | 8 ++-- 4 files changed, 33 insertions(+), 63 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index 898af479..c7c3b55b 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -24,7 +24,7 @@ type ContentMetadata struct { type CachedData struct { ContentMetadata - Data io.ReadCloser + Data io.ReadCloser // we need a ReadCloser because the reader is used oustide the scope where it was created and need to be closed by the function using it. Ttl time.Duration } diff --git a/cache/filesystem_cache_test.go b/cache/filesystem_cache_test.go index 26bc200d..f097959a 100644 --- a/cache/filesystem_cache_test.go +++ b/cache/filesystem_cache_test.go @@ -53,6 +53,8 @@ func TestFilesystemCacheAddGet(t *testing.T) { cacheAddGetHelper(t, c) } +const maxStringSizeToLog = 30 + // metatest used for both filesystem and redis Cache func cacheAddGetHelper(t *testing.T, c Cache) { @@ -100,11 +102,11 @@ func cacheAddGetHelper(t *testing.T, c Cache) { } // Verify trw contains the response. if buf.String() != value { - conditionalStr := "" - if len(value) > 30 { - conditionalStr = "..." + logSuffx := "" + if len(value) > maxStringSizeToLog { + logSuffx = "..." } - t.Fatalf("unexpected response sent to client: %q; expecting %q%s", trw.b, value[:30], conditionalStr) + t.Fatalf("unexpected response sent to client: %q; expecting %q%s", trw.b, value[:maxStringSizeToLog], logSuffx) } } diff --git a/cache/redis_cache.go b/cache/redis_cache.go index 00eaa953..f3d2367f 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -83,45 +83,14 @@ func (r *redisCache) nbOfBytes() uint64 { return uint64(cacheSize) } -/* -func (r *redisCache) Get(key *Key) (*CachedData, error) { - ctx, cancelFunc := context.WithTimeout(context.Background(), getTimeout) - defer cancelFunc() - val, err := r.client.Get(ctx, key.String()).Result() - // if key not found in cache - if errors.Is(err, redis.Nil) { - return nil, ErrMissing - } - - // others errors, such as timeouts - if err != nil { - log.Errorf("failed to get key %s with error: %s", key.String(), err) - return nil, ErrMissing - } - ttl, err := r.client.TTL(ctx, key.String()).Result() - if err != nil { - return nil, fmt.Errorf("failed to ttl of key %s with error: %w", key.String(), err) - } - content, reader := r.fromByte([]byte(val)) - - reader2 := &io_reader_decorator{Reader: reader} - value := &CachedData{ - ContentMetadata: *content, - Data: reader2, - Ttl: ttl, - } - - return value, nil -} -*/ - func (r *redisCache) Get(key *Key) (*CachedData, error) { ctx, cancelFunc := context.WithTimeout(context.Background(), getTimeout) defer cancelFunc() nbBytesToFetch := int64(100 * 1024) + stringKey := key.String() // fetching 100kBytes from redis to be sure to have the full metadata and, // for most of the queries that fetch a few data, the cached results - val, err := r.client.GetRange(ctx, key.String(), 0, nbBytesToFetch).Result() + val, err := r.client.GetRange(ctx, stringKey, 0, nbBytesToFetch).Result() // if key not found in cache if errors.Is(err, redis.Nil) || len(val) == 0 { return nil, ErrMissing @@ -129,21 +98,20 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { // others errors, such as timeouts if err != nil { - log.Errorf("failed to get key %s with error: %s", key.String(), err) + log.Errorf("failed to get key %s with error: %s", stringKey, err) return nil, ErrMissing } - stringKey := key.String() ttl, err := r.client.TTL(ctx, stringKey).Result() if err != nil { - return nil, fmt.Errorf("failed to ttl of key %s with error: %w", key.String(), err) + return nil, fmt.Errorf("failed to ttl of key %s with error: %w", stringKey, err) } b := []byte(val) - metadata, offset := r.metadataFromByte(b) + metadata, offset := r.decodeMetadata(b) if (int64(offset) + metadata.Length) < nbBytesToFetch { // the condition is true ony if the bytes fetched contain the metadata + the cached results // so we extract from the remaining bytes the cached results payload := b[offset:] - reader := &io_reader_decorator{Reader: bytes.NewReader(payload)} + reader := &ioReaderDecorator{Reader: bytes.NewReader(payload)} value := &CachedData{ ContentMetadata: *metadata, Data: reader, @@ -157,7 +125,7 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { reader := NewRedisStreamReader(uint64(offset), r.client, stringKey) value := &CachedData{ ContentMetadata: *metadata, - Data: &io_reader_decorator{Reader: reader}, + Data: &ioReaderDecorator{Reader: reader}, Ttl: ttl, } @@ -166,14 +134,14 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { // this struct is here because CachedData requires an io.ReadCloser // but logic in the the Get function generates only an io.Reader -type io_reader_decorator struct { +type ioReaderDecorator struct { io.Reader } -func (m io_reader_decorator) Close() error { +func (m ioReaderDecorator) Close() error { return nil } -func (r *redisCache) stringToBytes(s string) []byte { +func (r *redisCache) encodeString(s string) []byte { n := uint32(len(s)) b := make([]byte, 0, n+4) @@ -182,17 +150,17 @@ func (r *redisCache) stringToBytes(s string) []byte { return b } -func (r *redisCache) stringFromBytes(bytes []byte) (string, int) { +func (r *redisCache) decodeString(bytes []byte) (string, int) { b := bytes[:4] n := uint32(b[3]) | (uint32(b[2]) << 8) | (uint32(b[1]) << 16) | (uint32(b[0]) << 24) s := bytes[4 : 4+n] return string(s), int(4 + n) } -func (r *redisCache) metadataToByte(contentMetadata *ContentMetadata) []byte { +func (r *redisCache) encodeMetadata(contentMetadata *ContentMetadata) []byte { cLength := contentMetadata.Length - cType := r.stringToBytes(contentMetadata.Type) - cEncoding := r.stringToBytes(contentMetadata.Encoding) + cType := r.encodeString(contentMetadata.Type) + cEncoding := r.encodeString(contentMetadata.Encoding) b := make([]byte, 0, len(cEncoding)+len(cType)+8) b = append(b, byte(cLength>>56), byte(cLength>>48), byte(cLength>>40), byte(cLength>>32), byte(cLength>>24), byte(cLength>>16), byte(cLength>>8), byte(cLength)) b = append(b, cType...) @@ -200,12 +168,12 @@ func (r *redisCache) metadataToByte(contentMetadata *ContentMetadata) []byte { return b } -func (r *redisCache) metadataFromByte(b []byte) (*ContentMetadata, int) { +func (r *redisCache) decodeMetadata(b []byte) (*ContentMetadata, int) { cLength := uint64(b[7]) | (uint64(b[6]) << 8) | (uint64(b[5]) << 16) | (uint64(b[4]) << 24) | uint64(b[3])<<32 | (uint64(b[2]) << 40) | (uint64(b[1]) << 48) | (uint64(b[0]) << 56) offset := 8 - cType, sizeCType := r.stringFromBytes(b[offset:]) + cType, sizeCType := r.decodeString(b[offset:]) offset += sizeCType - cEncoding, sizeCEncoding := r.stringFromBytes(b[offset:]) + cEncoding, sizeCEncoding := r.decodeString(b[offset:]) offset += sizeCEncoding metadata := &ContentMetadata{ Length: int64(cLength), @@ -216,7 +184,7 @@ func (r *redisCache) metadataFromByte(b []byte) (*ContentMetadata, int) { } func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { - medatadata := r.metadataToByte(&contentMetadata) + medatadata := r.encodeMetadata(&contentMetadata) ctx, cancelFunc := context.WithTimeout(context.Background(), putTimeout) defer cancelFunc() stringKey := key.String() @@ -254,7 +222,7 @@ func (r *redisCache) Name() string { return r.name } -type RedisStreamReader struct { +type redisStreamReader struct { isRedisEOF bool redisOffset uint64 // the redisOffset that gives the beginning of the next bulk to fetch key string // the key of the value we want to stream from redis @@ -263,9 +231,9 @@ type RedisStreamReader struct { client redis.UniversalClient // the redis client } -func NewRedisStreamReader(offset uint64, client redis.UniversalClient, key string) *RedisStreamReader { +func NewRedisStreamReader(offset uint64, client redis.UniversalClient, key string) *redisStreamReader { bufferSize := uint64(2 * 1024 * 1024) - return &RedisStreamReader{ + return &redisStreamReader{ isRedisEOF: false, redisOffset: offset, key: key, @@ -275,7 +243,7 @@ func NewRedisStreamReader(offset uint64, client redis.UniversalClient, key strin } } -func (r *RedisStreamReader) Read(destBuf []byte) (n int, err error) { +func (r *redisStreamReader) Read(destBuf []byte) (n int, err error) { // the logic is simple: // 1) if the buffer still has data to write, it writes it into destBuf with overflowing destBuf // 2) if the buffer only has already written data, the StreamRedis refresh the buffer with new data from redis diff --git a/cache/redis_cache_test.go b/cache/redis_cache_test.go index 2f27827a..d0627a23 100644 --- a/cache/redis_cache_test.go +++ b/cache/redis_cache_test.go @@ -76,8 +76,8 @@ func TestRedisCacheMiss(t *testing.T) { } func TestStringFromToByte(t *testing.T) { c := generateRedisClientAndServer(t) - b := c.stringToBytes("test") - s, size := c.stringFromBytes(b) + b := c.encodeString("test") + s, size := c.decodeString(b) if s != "test" { t.Fatalf("got: %s, expected %s", s, "test") } @@ -94,9 +94,9 @@ func TestMetadataFromToByte(t *testing.T) { Encoding: "gzip", } - b := c.metadataToByte(expectedMetadata) + b := c.encodeMetadata(expectedMetadata) - metadata, size := c.metadataFromByte(b) + metadata, size := c.decodeMetadata(b) if metadata.Encoding != expectedMetadata.Encoding { t.Fatalf("got: %s, expected %s", metadata.Encoding, expectedMetadata.Encoding) } From 500a0bb331ac25598b2acf80057d5b7a8749ee9a Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Fri, 26 Aug 2022 23:02:49 +0200 Subject: [PATCH 09/12] handle edge case due to streaming --- cache/redis_cache.go | 55 ++++++++++++++++++++++-------- cache/redis_cache_test.go | 72 +++++++++++++++++++++++++++++++++++---- 2 files changed, 106 insertions(+), 21 deletions(-) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index f3d2367f..fadaf33c 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -121,8 +121,15 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { return value, nil } // since the cached results in redis are too big, we can't fetch all of them because of the memory overhead. - // we will create an io.reader that will fetch redis bulk by bulk to reduce the memory usage - reader := NewRedisStreamReader(uint64(offset), r.client, stringKey) + // We will create an io.reader that will fetch redis bulk by bulk to reduce the memory usage. + // But before that, since the usage of the reader could take time and the object in redis could disappear btw 2 fetchs + // we need to make sure the TTL will be long enough to avoid nasty side effects + // nb: it would be better to retry the flow if such a failure happened but this require a huge refactoring of proxy.go + if ttl <= 5*time.Second { + return nil, ErrMissing + } + + reader := NewRedisStreamReader(uint64(offset), r.client, stringKey, metadata.Length) value := &CachedData{ ContentMetadata: *metadata, Data: &ioReaderDecorator{Reader: reader}, @@ -223,23 +230,26 @@ func (r *redisCache) Name() string { } type redisStreamReader struct { - isRedisEOF bool - redisOffset uint64 // the redisOffset that gives the beginning of the next bulk to fetch - key string // the key of the value we want to stream from redis - buffer []byte // internal buffer to store the bulks fetched from redis - bufferOffset int // the offset of the buffer that keep were the read() need to start copying data - client redis.UniversalClient // the redis client + isRedisEOF bool + redisOffset uint64 // the redisOffset that gives the beginning of the next bulk to fetch + key string // the key of the value we want to stream from redis + buffer []byte // internal buffer to store the bulks fetched from redis + bufferOffset int // the offset of the buffer that keep were the read() need to start copying data + client redis.UniversalClient // the redis client + expectedPayloadSize int // the size of the object the streamer is supposed to read. + readPayloadSize int // the size of the object currently written by the reader } -func NewRedisStreamReader(offset uint64, client redis.UniversalClient, key string) *redisStreamReader { +func NewRedisStreamReader(offset uint64, client redis.UniversalClient, key string, payloadSize int64) *redisStreamReader { bufferSize := uint64(2 * 1024 * 1024) return &redisStreamReader{ - isRedisEOF: false, - redisOffset: offset, - key: key, - bufferOffset: int(bufferSize), - buffer: make([]byte, bufferSize), - client: client, + isRedisEOF: false, + redisOffset: offset, + key: key, + bufferOffset: int(bufferSize), + buffer: make([]byte, bufferSize), + client: client, + expectedPayloadSize: int(payloadSize), } } @@ -252,6 +262,11 @@ func (r *redisStreamReader) Read(destBuf []byte) (n int, err error) { bytesWritten := 0 // case 3) both the buffer & redis were fully consumed, we can tell the reader to stop reading if r.bufferOffset >= bufSize && r.isRedisEOF { + // Because of the way we fetch from redis, we need to do an extra check because we have no way + // to know if redis is really EOF or if the value was expired from cache while reading it + if r.readPayloadSize != r.expectedPayloadSize { + return 0, &RedisCacheError{readPayloadSize: r.readPayloadSize, expectedPayloadSize: r.expectedPayloadSize} + } return 0, io.EOF } @@ -282,6 +297,16 @@ func (r *redisStreamReader) Read(destBuf []byte) (n int, err error) { if r.bufferOffset < bufSize { bytesWritten = copy(destBuf, r.buffer[r.bufferOffset:]) r.bufferOffset += bytesWritten + r.readPayloadSize += bytesWritten } return bytesWritten, nil } + +type RedisCacheError struct { + readPayloadSize int + expectedPayloadSize int +} + +func (e *RedisCacheError) Error() string { + return fmt.Sprintf("error while reading cached result in redis, only %d bytes of %d were fetched", e.readPayloadSize, e.expectedPayloadSize) +} diff --git a/cache/redis_cache_test.go b/cache/redis_cache_test.go index d0627a23..b3f044b9 100644 --- a/cache/redis_cache_test.go +++ b/cache/redis_cache_test.go @@ -1,6 +1,9 @@ package cache import ( + "errors" + "fmt" + "io" "strings" "testing" "time" @@ -22,7 +25,7 @@ var redisConf = config.Cache{ } func TestCacheSize(t *testing.T) { - redisCache := generateRedisClientAndServer(t) + redisCache := getRedisCache(t) nbKeys := redisCache.nbOfKeys() if nbKeys > 0 { t.Fatalf("the cache should be empty") @@ -53,17 +56,21 @@ func TestCacheSize(t *testing.T) { // we can't check stats.Size because miniredis doesn't handle the memory usage of redis } -func generateRedisClientAndServer(t *testing.T) *redisCache { +func getRedisCacheAndServer(t *testing.T) (*redisCache, *miniredis.Miniredis) { s := miniredis.RunT(t) redisClient := redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: []string{s.Addr()}, }) redisCache := newRedisCache(redisClient, redisConf) + return redisCache, s +} +func getRedisCache(t *testing.T) *redisCache { + redisCache, _ := getRedisCacheAndServer(t) return redisCache } func TestRedisCacheAddGet(t *testing.T) { - c := generateRedisClientAndServer(t) + c := getRedisCache(t) defer func() { c.Close() }() @@ -71,11 +78,11 @@ func TestRedisCacheAddGet(t *testing.T) { } func TestRedisCacheMiss(t *testing.T) { - c := generateRedisClientAndServer(t) + c := getRedisCache(t) cacheMissHelper(t, c) } func TestStringFromToByte(t *testing.T) { - c := generateRedisClientAndServer(t) + c := getRedisCache(t) b := c.encodeString("test") s, size := c.decodeString(b) if s != "test" { @@ -86,7 +93,7 @@ func TestStringFromToByte(t *testing.T) { } } func TestMetadataFromToByte(t *testing.T) { - c := generateRedisClientAndServer(t) + c := getRedisCache(t) expectedMetadata := &ContentMetadata{ Length: 12, @@ -111,3 +118,56 @@ func TestMetadataFromToByte(t *testing.T) { } } + +func TestKeyExpirationWhileFetchtingRedis(t *testing.T) { + cache, redis := getRedisCacheAndServer(t) + defer cache.Close() + + key := &Key{ + Query: []byte(fmt.Sprintf("SELECT test")), + } + payloadSize := 4 * 1024 * 1024 + exepctedValue := strings.Repeat("a", payloadSize) + reader := strings.NewReader(exepctedValue) + + if _, err := cache.Put(reader, ContentMetadata{Encoding: "ce", Type: "ct", Length: int64(payloadSize)}, key); err != nil { + t.Fatalf("failed to put it to cache: %s", err) + } + cachedData, err := cache.Get(key) + if err != nil { + t.Fatalf("failed to get data from redis cache: %s", err) + } + + //simulating a cache expiration + if !redis.Del(key.String()) { + t.Fatalf("could not delete key") + } + _, err = io.ReadAll(cachedData.Data) + _, isRedisCacheError := err.(*RedisCacheError) + if err == nil || !isRedisCacheError { + t.Fatalf("expecting an error of type RedisCacheError, err=%s", err) + } +} +func TestSmallTTLOnBigPayload(t *testing.T) { + cache, redis := getRedisCacheAndServer(t) + defer cache.Close() + + key := &Key{ + Query: []byte(fmt.Sprintf("SELECT test")), + } + payloadSize := 4 * 1024 * 1024 + exepctedValue := strings.Repeat("a", payloadSize) + reader := strings.NewReader(exepctedValue) + + if _, err := cache.Put(reader, ContentMetadata{Encoding: "ce", Type: "ct", Length: int64(payloadSize)}, key); err != nil { + t.Fatalf("failed to put it to cache: %s", err) + } + + //simulate a value almost expired + redis.SetTTL(key.String(), 2*time.Second) + + _, err := cache.Get(key) + if err == nil || !errors.Is(err, ErrMissing) { + t.Fatalf("expecting an error of type ErrMissing, err=%s", err) + } +} From 0eba7cf1683e051cd887d649dbfc183cc2a9faab Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Sun, 28 Aug 2022 10:09:43 +0200 Subject: [PATCH 10/12] avoid panic while decoding redis payloads --- cache/redis_cache.go | 46 ++++++++++++++++++++++++++++++--------- cache/redis_cache_test.go | 25 +++++++++++++++++++-- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index fadaf33c..7ec5160d 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -24,6 +24,7 @@ type redisCache struct { const getTimeout = 1 * time.Second const putTimeout = 10 * time.Second // the put is long enough for very large cached result (+200MB) because it's also linked to the speed of the reader const statsTimeout = 500 * time.Millisecond +const minTTLForStreamingReader = 5 * time.Second func newRedisCache(client redis.UniversalClient, cfg config.Cache) *redisCache { redisCache := &redisCache{ @@ -92,12 +93,12 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { // for most of the queries that fetch a few data, the cached results val, err := r.client.GetRange(ctx, stringKey, 0, nbBytesToFetch).Result() // if key not found in cache - if errors.Is(err, redis.Nil) || len(val) == 0 { + if errors.Is(err, redis.Nil) { return nil, ErrMissing } // others errors, such as timeouts - if err != nil { + if err != nil || len(val) == 0 { log.Errorf("failed to get key %s with error: %s", stringKey, err) return nil, ErrMissing } @@ -106,7 +107,10 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { return nil, fmt.Errorf("failed to ttl of key %s with error: %w", stringKey, err) } b := []byte(val) - metadata, offset := r.decodeMetadata(b) + metadata, offset, err := r.decodeMetadata(b) + if err != nil { + return nil, err + } if (int64(offset) + metadata.Length) < nbBytesToFetch { // the condition is true ony if the bytes fetched contain the metadata + the cached results // so we extract from the remaining bytes the cached results @@ -125,7 +129,7 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { // But before that, since the usage of the reader could take time and the object in redis could disappear btw 2 fetchs // we need to make sure the TTL will be long enough to avoid nasty side effects // nb: it would be better to retry the flow if such a failure happened but this require a huge refactoring of proxy.go - if ttl <= 5*time.Second { + if ttl <= minTTLForStreamingReader { return nil, ErrMissing } @@ -157,11 +161,17 @@ func (r *redisCache) encodeString(s string) []byte { return b } -func (r *redisCache) decodeString(bytes []byte) (string, int) { +func (r *redisCache) decodeString(bytes []byte) (string, int, error) { + if len(bytes) < 4 { + return "", 0, &RedisCacheCorruptionError{} + } b := bytes[:4] n := uint32(b[3]) | (uint32(b[2]) << 8) | (uint32(b[1]) << 16) | (uint32(b[0]) << 24) + if len(bytes) < int(4+n) { + return "", 0, &RedisCacheCorruptionError{} + } s := bytes[4 : 4+n] - return string(s), int(4 + n) + return string(s), int(4 + n), nil } func (r *redisCache) encodeMetadata(contentMetadata *ContentMetadata) []byte { @@ -175,19 +185,28 @@ func (r *redisCache) encodeMetadata(contentMetadata *ContentMetadata) []byte { return b } -func (r *redisCache) decodeMetadata(b []byte) (*ContentMetadata, int) { +func (r *redisCache) decodeMetadata(b []byte) (*ContentMetadata, int, error) { + if len(b) < 8 { + return nil, 0, &RedisCacheCorruptionError{} + } cLength := uint64(b[7]) | (uint64(b[6]) << 8) | (uint64(b[5]) << 16) | (uint64(b[4]) << 24) | uint64(b[3])<<32 | (uint64(b[2]) << 40) | (uint64(b[1]) << 48) | (uint64(b[0]) << 56) offset := 8 - cType, sizeCType := r.decodeString(b[offset:]) + cType, sizeCType, err := r.decodeString(b[offset:]) + if err != nil { + return nil, 0, err + } offset += sizeCType - cEncoding, sizeCEncoding := r.decodeString(b[offset:]) + cEncoding, sizeCEncoding, err := r.decodeString(b[offset:]) + if err != nil { + return nil, 0, err + } offset += sizeCEncoding metadata := &ContentMetadata{ Length: int64(cLength), Type: cType, Encoding: cEncoding, } - return metadata, offset + return metadata, offset, nil } func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { @@ -310,3 +329,10 @@ type RedisCacheError struct { func (e *RedisCacheError) Error() string { return fmt.Sprintf("error while reading cached result in redis, only %d bytes of %d were fetched", e.readPayloadSize, e.expectedPayloadSize) } + +type RedisCacheCorruptionError struct { +} + +func (e *RedisCacheCorruptionError) Error() string { + return "chproxy can't decode the cached result from redis, it seems to have been corrupted" +} diff --git a/cache/redis_cache_test.go b/cache/redis_cache_test.go index b3f044b9..5932fee6 100644 --- a/cache/redis_cache_test.go +++ b/cache/redis_cache_test.go @@ -84,7 +84,7 @@ func TestRedisCacheMiss(t *testing.T) { func TestStringFromToByte(t *testing.T) { c := getRedisCache(t) b := c.encodeString("test") - s, size := c.decodeString(b) + s, size, _ := c.decodeString(b) if s != "test" { t.Fatalf("got: %s, expected %s", s, "test") } @@ -103,7 +103,7 @@ func TestMetadataFromToByte(t *testing.T) { b := c.encodeMetadata(expectedMetadata) - metadata, size := c.decodeMetadata(b) + metadata, size, _ := c.decodeMetadata(b) if metadata.Encoding != expectedMetadata.Encoding { t.Fatalf("got: %s, expected %s", metadata.Encoding, expectedMetadata.Encoding) } @@ -117,6 +117,27 @@ func TestMetadataFromToByte(t *testing.T) { t.Fatalf("got: %d, expected %d", size, 24) } +} +func TestDecodingCorruptedMetadata(t *testing.T) { + c := getRedisCache(t) + + // this test will make fetching the length of a payload fail + _, _, err := c.decodeMetadata([]byte{}) + if (err == nil || !errors.Is(err, &RedisCacheCorruptionError{})) { + t.Fatalf("expected a corruption error, err=%s", err) + } + + // this test will make fetching a string in the metadata fail because it can't read the length of a metadata string + _, _, err = c.decodeMetadata([]byte{0, 0, 0, 0, 1, 1, 1, 1}) + if (err == nil || !errors.Is(err, &RedisCacheCorruptionError{})) { + t.Fatalf("expected a corruption error, err=%s", err) + } + // this test will make fetching a string in the metadata fail because the length of a metadata string doesn't match it's size + _, _, err = c.decodeMetadata([]byte{0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}) + if (err == nil || !errors.Is(err, &RedisCacheCorruptionError{})) { + t.Fatalf("expected a corruption error, err=%s", err) + } + } func TestKeyExpirationWhileFetchtingRedis(t *testing.T) { From baa61484ae3d14f5da8b7ada2a6ea978c0147d41 Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Sun, 28 Aug 2022 16:04:57 +0200 Subject: [PATCH 11/12] improve edge case handling --- cache/redis_cache.go | 88 +++++++++++++++++++++++++++++++++------ cache/redis_cache_test.go | 56 +++++++++++++++++++++---- 2 files changed, 125 insertions(+), 19 deletions(-) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index 7ec5160d..e3be4686 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" "io" + "io/ioutil" + "os" "regexp" "strconv" "time" @@ -24,7 +26,11 @@ type redisCache struct { const getTimeout = 1 * time.Second const putTimeout = 10 * time.Second // the put is long enough for very large cached result (+200MB) because it's also linked to the speed of the reader const statsTimeout = 500 * time.Millisecond -const minTTLForStreamingReader = 5 * time.Second +const minTTLForRedisStreamingReader = 5 * time.Second + +// tmpDir temporary path to store ongoing queries results +const tmpDir = "/tmp" +const redisTmpFilePrefix = "chproxyRedisTmp" func newRedisCache(client redis.UniversalClient, cfg config.Cache) *redisCache { redisCache := &redisCache{ @@ -92,16 +98,17 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { // fetching 100kBytes from redis to be sure to have the full metadata and, // for most of the queries that fetch a few data, the cached results val, err := r.client.GetRange(ctx, stringKey, 0, nbBytesToFetch).Result() - // if key not found in cache - if errors.Is(err, redis.Nil) { - return nil, ErrMissing - } - // others errors, such as timeouts - if err != nil || len(val) == 0 { + // errors, such as timeouts + if err != nil { log.Errorf("failed to get key %s with error: %s", stringKey, err) return nil, ErrMissing } + // if key not found in cache + if len(val) == 0 { + return nil, ErrMissing + } + ttl, err := r.client.TTL(ctx, stringKey).Result() if err != nil { return nil, fmt.Errorf("failed to ttl of key %s with error: %w", stringKey, err) @@ -126,17 +133,37 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { } // since the cached results in redis are too big, we can't fetch all of them because of the memory overhead. // We will create an io.reader that will fetch redis bulk by bulk to reduce the memory usage. + redisStreamreader := newRedisStreamReader(uint64(offset), r.client, stringKey, metadata.Length) + // But before that, since the usage of the reader could take time and the object in redis could disappear btw 2 fetchs // we need to make sure the TTL will be long enough to avoid nasty side effects + // if the TTL is too short we will put all the data into a file and use it as a streamer // nb: it would be better to retry the flow if such a failure happened but this require a huge refactoring of proxy.go - if ttl <= minTTLForStreamingReader { - return nil, ErrMissing + + if ttl <= minTTLForRedisStreamingReader { + fileStream, err := newFileWriterReader(tmpDir) + if err != nil { + return nil, err + } + _, err = io.Copy(fileStream, redisStreamreader) + if err != nil { + return nil, err + } + err = fileStream.reseOffset() + if err != nil { + return nil, err + } + value := &CachedData{ + ContentMetadata: *metadata, + Data: fileStream, + Ttl: ttl, + } + return value, nil } - reader := NewRedisStreamReader(uint64(offset), r.client, stringKey, metadata.Length) value := &CachedData{ ContentMetadata: *metadata, - Data: &ioReaderDecorator{Reader: reader}, + Data: &ioReaderDecorator{Reader: redisStreamreader}, Ttl: ttl, } @@ -259,7 +286,7 @@ type redisStreamReader struct { readPayloadSize int // the size of the object currently written by the reader } -func NewRedisStreamReader(offset uint64, client redis.UniversalClient, key string, payloadSize int64) *redisStreamReader { +func newRedisStreamReader(offset uint64, client redis.UniversalClient, key string, payloadSize int64) *redisStreamReader { bufferSize := uint64(2 * 1024 * 1024) return &redisStreamReader{ isRedisEOF: false, @@ -321,6 +348,43 @@ func (r *redisStreamReader) Read(destBuf []byte) (n int, err error) { return bytesWritten, nil } +type fileWriterReader struct { + f *os.File +} + +func newFileWriterReader(dir string) (*fileWriterReader, error) { + f, err := ioutil.TempFile(dir, redisTmpFilePrefix) + if err != nil { + return nil, fmt.Errorf("cannot create temporary file in %q: %w", dir, err) + } + return &fileWriterReader{ + f: f, + }, nil +} + +func (f *fileWriterReader) Close() error { + err := f.f.Close() + if err != nil { + return err + } + return os.Remove(f.f.Name()) +} + +func (r *fileWriterReader) Read(destBuf []byte) (n int, err error) { + return r.f.Read(destBuf) +} + +func (w *fileWriterReader) Write(p []byte) (n int, err error) { + return w.f.Write(p) +} + +func (f *fileWriterReader) reseOffset() error { + if _, err := f.f.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("cannot reset offset in: %w", err) + } + return nil +} + type RedisCacheError struct { readPayloadSize int expectedPayloadSize int diff --git a/cache/redis_cache_test.go b/cache/redis_cache_test.go index 5932fee6..bc482d14 100644 --- a/cache/redis_cache_test.go +++ b/cache/redis_cache_test.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "os" "strings" "testing" "time" @@ -169,16 +170,15 @@ func TestKeyExpirationWhileFetchtingRedis(t *testing.T) { t.Fatalf("expecting an error of type RedisCacheError, err=%s", err) } } -func TestSmallTTLOnBigPayload(t *testing.T) { +func TestSmallTTLOnBigPayloadAreCacheWithFile(t *testing.T) { cache, redis := getRedisCacheAndServer(t) defer cache.Close() - key := &Key{ Query: []byte(fmt.Sprintf("SELECT test")), } payloadSize := 4 * 1024 * 1024 - exepctedValue := strings.Repeat("a", payloadSize) - reader := strings.NewReader(exepctedValue) + expectedValue := strings.Repeat("a", payloadSize) + reader := strings.NewReader(expectedValue) if _, err := cache.Put(reader, ContentMetadata{Encoding: "ce", Type: "ct", Length: int64(payloadSize)}, key); err != nil { t.Fatalf("failed to put it to cache: %s", err) @@ -186,9 +186,51 @@ func TestSmallTTLOnBigPayload(t *testing.T) { //simulate a value almost expired redis.SetTTL(key.String(), 2*time.Second) + nbFileCacheBeforeGet, err := countFilesWithPrefix(tmpDir, redisTmpFilePrefix) + if err != nil { + t.Fatalf("could not read directory %s", err) + } + + cachedData, err := cache.Get(key) + if err != nil { + t.Fatalf("expected cached to have the value") + } + nbFileCacheAfterGet, err := countFilesWithPrefix(tmpDir, redisTmpFilePrefix) + if err != nil { + t.Fatalf("could not read directory %s", err) + } + if nbFileCacheBeforeGet+1 != nbFileCacheAfterGet { + t.Fatalf("expected a file to be stored by redisFileCache ") + } - _, err := cache.Get(key) - if err == nil || !errors.Is(err, ErrMissing) { - t.Fatalf("expecting an error of type ErrMissing, err=%s", err) + cachedValue, err := io.ReadAll(cachedData.Data) + if err != nil { + t.Fatalf("could not read data from redisFileCache, err=%s", err) + } + if string(cachedValue) != expectedValue { + t.Fatalf("got a value different than the expected one len(value)=%d vs len(expectedValue)=%d", len(string(cachedValue)), len(expectedValue)) + } + cachedData.Data.Close() + nbFileCacheAfterClose, err := countFilesWithPrefix(tmpDir, redisTmpFilePrefix) + if err != nil { + t.Fatalf("could not read directory %s", err) + } + + if nbFileCacheBeforeGet != nbFileCacheAfterClose { + t.Fatalf("expected the file stored by redisFileCache to be removed: nbFileCacheBeforeGet=%d | nbFileCacheAfterClose=%d", nbFileCacheBeforeGet, nbFileCacheAfterClose) + } +} + +func countFilesWithPrefix(dir, prefix string) (int, error) { + count := 0 + files, err := os.ReadDir(dir) + if err != nil { + return 0, err + } + for _, file := range files { + if strings.HasPrefix(file.Name(), prefix) { + count++ + } } + return count, nil } From d2b0bde8ba960657208405e19c886610bbbc3810 Mon Sep 17 00:00:00 2001 From: Christophe Kalenzaga Date: Mon, 29 Aug 2022 10:04:16 +0200 Subject: [PATCH 12/12] log redis key in case of corrupted data --- cache/redis_cache.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cache/redis_cache.go b/cache/redis_cache.go index e3be4686..0e56c00f 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -116,6 +116,9 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { b := []byte(val) metadata, offset, err := r.decodeMetadata(b) if err != nil { + if (errors.Is(err, &RedisCacheCorruptionError{})) { + log.Errorf("an error happened while handling redis key =%s, err=%s", stringKey, err) + } return nil, err } if (int64(offset) + metadata.Length) < nbBytesToFetch {