Skip to content

Commit

Permalink
feat(compress): add snappy (#208)
Browse files Browse the repository at this point in the history
### Add snappy compression

###Validation 

![image](https://github.com/user-attachments/assets/6fb32179-a6c6-4213-85d6-734404a354b2)

Signed-off-by: kaixuan xu <triumph_9431@qq.com>
  • Loading branch information
xkx9431 authored Dec 16, 2024
1 parent ebbbf52 commit f19dc1d
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 32 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/openGemini/opengemini-client-go
go 1.22

require (
github.com/golang/snappy v0.0.4
github.com/klauspost/compress v1.17.11
github.com/libgox/gocollections v0.1.1
github.com/libgox/unicodex v0.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
Expand Down
14 changes: 4 additions & 10 deletions opengemini/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,10 @@ const (
)

const (
CompressMethodZstd CompressMethod = "ZSTD"
CompressMethodGzip CompressMethod = "GZIP"
CompressMethodNone CompressMethod = "NONE"
)

// Define constants for different encode/decode config

const (
CodecMsgPack Codec = "MsgPack"
CodecZstd Codec = "ZSTD"
CompressMethodZstd CompressMethod = "ZSTD"
CompressMethodGzip CompressMethod = "GZIP"
CompressMethodSnappy CompressMethod = "SNAPPY"
CompressMethodNone CompressMethod = "NONE"
)

// Client represents a openGemini client.
Expand Down
68 changes: 46 additions & 22 deletions opengemini/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"time"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/vmihailenco/msgpack/v5"
)
Expand All @@ -33,6 +34,7 @@ const (
HttpContentTypeJSON = "application/json"
HttpEncodingGzip = "gzip"
HttpEncodingZstd = "zstd"
HttpEncodingSnappy = "snappy"
)

type Query struct {
Expand Down Expand Up @@ -124,6 +126,8 @@ func applyCodec(req *requestDetails, config *Config) {
req.header.Set("Accept-Encoding", HttpEncodingGzip)
case CompressMethodZstd:
req.header.Set("Accept-Encoding", HttpEncodingZstd)
case CompressMethodSnappy:
req.header.Set("Accept-Encoding", HttpEncodingSnappy)
}

}
Expand All @@ -141,32 +145,32 @@ func retrieveQueryResFromResp(resp *http.Response) (*QueryResult, error) {
contentType := resp.Header.Get("Content-Type")
contentEncoding := resp.Header.Get("Content-Encoding")
var qr = new(QueryResult)
var decompressedBody []byte

// First, handle decompression
switch contentEncoding {
case HttpEncodingZstd:
decompressedBody, err = decodeZstdBody(body)
if err != nil {
return qr, err
}
case HttpEncodingGzip:
decompressedBody, err = decodeGzipBody(body)
if err != nil {
return qr, err
}
default:
decompressedBody = body
// handle decompression first
decompressedBody, err := decompressBody(contentEncoding, body)
if err != nil {
return qr, err
}

// Then, handle deserialization based on content type
switch contentType {
case HttpContentTypeMsgpack:
return qr, unmarshalMsgpack(decompressedBody, qr)
case HttpContentTypeJSON:
return qr, unmarshalJson(decompressedBody, qr)
// then handle deserialization based on content type
err = deserializeBody(contentType, decompressedBody, qr)
if err != nil {
return qr, err
}

return qr, nil
}

func decompressBody(encoding string, body []byte) ([]byte, error) {
switch encoding {
case HttpEncodingZstd:
return decodeZstdBody(body)
case HttpEncodingGzip:
return decodeGzipBody(body)
case HttpEncodingSnappy:
return decodeSnappyBody(body)
default:
return qr, fmt.Errorf("unsupported content type: %s", contentType)
return body, nil
}
}

Expand Down Expand Up @@ -200,6 +204,26 @@ func decodeZstdBody(compressedBody []byte) ([]byte, error) {
return decompressedBody, nil
}

func decodeSnappyBody(compressedBody []byte) ([]byte, error) {
reader := snappy.NewReader(bytes.NewReader(compressedBody))
decompressedBody, err := io.ReadAll(reader)
if err != nil {
return nil, errors.New("failed to decompress snappy body: " + err.Error())
}
return decompressedBody, nil
}

func deserializeBody(contentType string, body []byte, qr *QueryResult) error {
switch contentType {
case HttpContentTypeMsgpack:
return unmarshalMsgpack(body, qr)
case HttpContentTypeJSON:
return unmarshalJson(body, qr)
default:
return fmt.Errorf("unsupported content type: %s", contentType)
}
}

func unmarshalMsgpack(body []byte, qr *QueryResult) error {
err := msgpack.Unmarshal(body, qr)
if err != nil {
Expand Down
58 changes: 58 additions & 0 deletions opengemini/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,64 @@ func TestQueryWithZSTD(t *testing.T) {
}
}

func TestQueryWithSnappy(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []Address{{
Host: "localhost",
Port: 8086,
}},
CompressMethod: CompressMethodSnappy,
})

// create a test database with rand suffix
database := randomDatabaseName()
err := c.CreateDatabase(database)
assert.Nil(t, err)

// delete test database before exit test case
defer func() {
err := c.DropDatabase(database)
assert.Nil(t, err)
}()

testMeasurement := randomMeasurement()
p := &Point{}
p.Measurement = testMeasurement
p.AddField("TestField", 123)
p.Time = time.Now()

err = c.WritePoint(database, p, func(err error) {
assert.Nil(t, err)
})
assert.Nil(t, err)

time.Sleep(time.Second * 5)

PrecisionTimestampLength := make(map[Precision]int64)
PrecisionTimestampLength[PrecisionNanosecond] = 19
PrecisionTimestampLength[PrecisionMicrosecond] = 16
PrecisionTimestampLength[PrecisionMillisecond] = 13
PrecisionTimestampLength[PrecisionSecond] = 10
PrecisionTimestampLength[PrecisionMinute] = 8
PrecisionTimestampLength[PrecisionHour] = 6

// check whether write success
for precision, length := range PrecisionTimestampLength {
q := Query{
Database: database,
Command: "select * from " + testMeasurement,
Precision: precision,
}
result, err := c.Query(q)
assert.Nil(t, err)
v, err := convertToInt64(result.Results[0].Series[0].Values[0][0])
if err != nil {
t.Fatalf("conversion error: %v", err)
}
assert.Equal(t, length, getTimestampLength(v))
}
}

func TestQueryWithZSTDAndMsgPack(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []Address{{
Expand Down

0 comments on commit f19dc1d

Please sign in to comment.