-
Notifications
You must be signed in to change notification settings - Fork 265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix memory issue & improve processing speed #212
Changes from 1 commit
10b9aa3
a22ece0
10b6cfa
0431469
7a74fbd
f9631d9
9ac36d3
5f15b81
500a0bb
0eba7cf
baa6148
d2b0bde
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,9 +3,8 @@ package cache | |
import ( | ||
"bytes" | ||
"context" | ||
"encoding/base64" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"regexp" | ||
"strconv" | ||
|
@@ -26,13 +25,6 @@ const getTimeout = 1 * time.Second | |
const putTimeout = 2 * time.Second | ||
const statsTimeout = 500 * time.Millisecond | ||
|
||
type redisCachePayload struct { | ||
Length int64 `json:"l"` | ||
Type string `json:"t"` | ||
Encoding string `json:"enc"` | ||
Payload string `json:"payload"` | ||
} | ||
|
||
func newRedisCache(client redis.UniversalClient, cfg config.Cache) *redisCache { | ||
redisCache := &redisCache{ | ||
name: cfg.Name, | ||
|
@@ -95,7 +87,6 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { | |
ctx, cancelFunc := context.WithTimeout(context.Background(), getTimeout) | ||
defer cancelFunc() | ||
val, err := r.client.Get(ctx, key.String()).Result() | ||
|
||
// if key not found in cache | ||
if errors.Is(err, redis.Nil) { | ||
return nil, ErrMissing | ||
|
@@ -106,35 +97,17 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) { | |
log.Errorf("failed to get key %s with error: %s", key.String(), err) | ||
return nil, ErrMissing | ||
} | ||
|
||
var payload redisCachePayload | ||
err = json.Unmarshal([]byte(val), &payload) | ||
|
||
if err != nil { | ||
log.Errorf("corrupted payload for key %s with error: %s", key.String(), err) | ||
return nil, ErrMissing | ||
} | ||
|
||
ttl, err := r.client.TTL(ctx, key.String()).Result() | ||
|
||
if err != nil { | ||
log.Errorf("Not able to fetch TTL for: %s ", key) | ||
return nil, fmt.Errorf("failed to ttl of key %s with error: %s", key.String(), err) | ||
} | ||
content, reader := r.fromByte([]byte(val)) | ||
|
||
decoded, err := base64.StdEncoding.DecodeString(payload.Payload) | ||
if err != nil { | ||
log.Errorf("failed to decode payload: %s , due to: %v ", payload.Payload, err) | ||
return nil, ErrMissing | ||
} | ||
reader := &io_reader_decorator{Reader: bytes.NewReader(decoded)} | ||
reader2 := &io_reader_decorator{Reader: reader} | ||
value := &CachedData{ | ||
ContentMetadata: ContentMetadata{ | ||
Length: payload.Length, | ||
Type: payload.Type, | ||
Encoding: payload.Encoding, | ||
}, | ||
Data: reader, | ||
Ttl: ttl, | ||
ContentMetadata: *content, | ||
Data: reader2, | ||
Ttl: ttl, | ||
} | ||
|
||
return value, nil | ||
|
@@ -149,27 +122,63 @@ type io_reader_decorator struct { | |
func (m io_reader_decorator) Close() error { | ||
return nil | ||
} | ||
func (r *redisCache) stringToBytes(s string) []byte { | ||
n := uint32(len(s)) | ||
|
||
func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { | ||
b := make([]byte, 0, n+4) | ||
b = append(b, byte(n>>24), byte(n>>16), byte(n>>8), byte(n)) | ||
b = append(b, s...) | ||
return b | ||
} | ||
|
||
func (r *redisCache) stringFromBytes(bytes []byte) (string, int) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And then we could call that one |
||
b := bytes[:4] | ||
n := uint32(b[3]) | (uint32(b[2]) << 8) | (uint32(b[1]) << 16) | (uint32(b[0]) << 24) | ||
s := bytes[4 : 4+n] | ||
return string(s), int(4 + n) | ||
} | ||
|
||
func (r *redisCache) toByte(contentMetadata *ContentMetadata, reader io.Reader) ([]byte, error) { | ||
data, err := toBytes(reader) | ||
if err != nil { | ||
return 0, err | ||
return nil, err | ||
} | ||
cLength := contentMetadata.Length | ||
cType := r.stringToBytes(contentMetadata.Type) | ||
cEncoding := r.stringToBytes(contentMetadata.Encoding) | ||
b := make([]byte, 0, len(data)+len(cEncoding)+len(cType)+8) | ||
b = append(b, byte(cLength>>56), byte(cLength>>48), byte(cLength>>40), byte(cLength>>32), byte(cLength>>24), byte(cLength>>16), byte(cLength>>8), byte(cLength)) | ||
b = append(b, cType...) | ||
b = append(b, cEncoding...) | ||
b = append(b, data...) | ||
return b, nil | ||
} | ||
|
||
encoded := base64.StdEncoding.EncodeToString(data) | ||
payload := &redisCachePayload{ | ||
Length: contentMetadata.Length, Type: contentMetadata.Type, Encoding: contentMetadata.Encoding, Payload: encoded, | ||
} | ||
func (r *redisCache) fromByte(b []byte) (*ContentMetadata, io.Reader) { | ||
cLength := uint64(b[7]) | (uint64(b[6]) << 8) | (uint64(b[5]) << 16) | (uint64(b[4]) << 24) | uint64(b[3])<<32 | (uint64(b[2]) << 40) | (uint64(b[1]) << 48) | (uint64(b[0]) << 56) | ||
offset := 8 | ||
cType, sizeCType := r.stringFromBytes(b[offset:]) | ||
offset += sizeCType | ||
cEncoding, sizeCEncoding := r.stringFromBytes(b[offset:]) | ||
offset += sizeCEncoding | ||
payload := b[offset:] | ||
metadata := &ContentMetadata{ | ||
Length: int64(cLength), | ||
Type: cType, | ||
Encoding: cEncoding, | ||
} | ||
return metadata, bytes.NewReader(payload) | ||
} | ||
|
||
func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key *Key) (time.Duration, error) { | ||
|
||
marshalled, err := json.Marshal(payload) | ||
payload, err := r.toByte(&contentMetadata, reader) | ||
if err != nil { | ||
return 0, nil | ||
return 0, err | ||
} | ||
|
||
ctx, cancelFunc := context.WithTimeout(context.Background(), putTimeout) | ||
defer cancelFunc() | ||
err = r.client.Set(ctx, key.String(), marshalled, r.expire).Err() | ||
|
||
err = r.client.Set(ctx, key.String(), payload, r.expire).Err() | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
@@ -182,11 +191,17 @@ func (r *redisCache) Name() string { | |
} | ||
|
||
func toBytes(stream io.Reader) ([]byte, error) { | ||
buf := new(bytes.Buffer) | ||
b, err := io.ReadAll(stream) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return b, nil | ||
|
||
/*buf := new(bytes.Buffer) | ||
|
||
_, err := buf.ReadFrom(stream) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return buf.Bytes(), nil | ||
return buf.Bytes(), nil*/ | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package cache | ||
|
||
import ( | ||
"io" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
@@ -74,3 +75,46 @@ func TestRedisCacheMiss(t *testing.T) { | |
c := generateRedisClientAndServer(t) | ||
cacheMissHelper(t, c) | ||
} | ||
func TestStringFromToByte(t *testing.T) { | ||
c := generateRedisClientAndServer(t) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not needed to test encode and decode functions (once they're decoupled from redis cache) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point. I asked myself this question before doing this test but thought having a dedicated test that only that only focus on this part would be easier for debugging if this part contains a pb (at least for coding this part very helpful). |
||
b := c.stringToBytes("test") | ||
s, size := c.stringFromBytes(b) | ||
if s != "test" { | ||
t.Fatalf("got: %s, expected %s", s, "test") | ||
} | ||
if size != 8 { | ||
t.Fatalf("got: %d, expected %d", size, 8) | ||
} | ||
} | ||
func TestPayloadFromToByte(t *testing.T) { | ||
c := generateRedisClientAndServer(t) | ||
|
||
expectedMetadata := &ContentMetadata{ | ||
Length: 12, | ||
Type: "json", | ||
Encoding: "gzip", | ||
} | ||
expectedContent := "abcdef" | ||
r := strings.NewReader(expectedContent) | ||
|
||
b, err := c.toByte(expectedMetadata, r) | ||
|
||
if err != nil { | ||
t.Fatalf("error during serialization %s", err) | ||
} | ||
metadata, reader := c.fromByte(b) | ||
if metadata.Encoding != expectedMetadata.Encoding { | ||
t.Fatalf("got: %s, expected %s", metadata.Encoding, expectedMetadata.Encoding) | ||
} | ||
if metadata.Type != expectedMetadata.Type { | ||
t.Fatalf("got: %s, expected %s", metadata.Type, expectedMetadata.Type) | ||
} | ||
if metadata.Length != expectedMetadata.Length { | ||
t.Fatalf("got: %d, expected %d", metadata.Length, expectedMetadata.Length) | ||
} | ||
content, _ := io.ReadAll(reader) | ||
if string(content) != expectedContent { | ||
t.Fatalf("got: %s, expected %s", string(content), expectedContent) | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more than
stringToBytes
. The way I understand it, this function:Can we call it
encodeString
and leave the comment? Reading this function take a momentThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I just realised that it's the same code as in filesystem cache, isn't it? (both encode and decode - there it's called writeHeader and readHeader, maybe that's a better naming. Except that there we write to writer and read from reader)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is really transforming a string to a byte[] in order for serialization usage, I'll rename it to encodeString (but we will lose the information that the string becomes a byte array). Yes the logic is closed to writeHeader/readHeader execpt that it's more granulare since we're only dealing with a single string and not the headers.