Skip to content

Commit

Permalink
feat(perf): enable config encoding for querying large results (#191)
Browse files Browse the repository at this point in the history
### Description

This is to adapt the large response from the server side, in the inner
server side , we already support the `x-msgpack` content type.

## Related DOCS update
openGemini/openGemini.github.io#136


### Changes
1. add config property `Codec` for client creation
2. when the Codec is enabled, will apply to the http request by setting
the header accordingly

---------

Signed-off-by: kaixuan xu <triumph_9431@qq.com>
Signed-off-by: ZhangJian He <shoothzj@gmail.com>
Co-authored-by: ZhangJian He <shoothzj@gmail.com>
  • Loading branch information
xkx9431 and hezhangjian authored Nov 7, 2024
1 parent 2d4441a commit b19a843
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 32 deletions.
1 change: 1 addition & 0 deletions examples/example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func main() {
Host: "127.0.0.1",
Port: 8086,
}},
Codec: opengemini.CodecMsgPack,
}
client, err := opengemini.NewClient(config)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/libgox/unicodex v0.1.0
github.com/prometheus/client_golang v1.20.5
github.com/stretchr/testify v1.9.0
github.com/vmihailenco/msgpack/v5 v5.4.1
)

require (
Expand All @@ -21,6 +22,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/sys v0.22.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
Expand Down
10 changes: 10 additions & 0 deletions opengemini/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ const (
AuthTypeToken
)

type Codec string

// Define constants for different encode/decode config

const (
CodecMsgPack Codec = "MsgPack"
)

// Client represents a openGemini client.
type Client interface {
// Ping check that status of cluster.
Expand Down Expand Up @@ -136,6 +144,8 @@ type Config struct {
MaxIdleConnsPerHost int
// GzipEnabled determines whether to use gzip for data transmission.
GzipEnabled bool
// Codec determines the Codec mode used for data transmission.
Codec Codec
// TlsConfig configuration information for tls.
TlsConfig *tls.Config
// CustomMetricsLabels add custom labels to all the metrics reported by this client instance
Expand Down
78 changes: 54 additions & 24 deletions opengemini/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"io"
"net/http"
"time"

"github.com/vmihailenco/msgpack/v5"
)

type Query struct {
Expand All @@ -31,13 +33,12 @@ type Query struct {

// Query sends a command to the server
func (c *client) Query(q Query) (*QueryResult, error) {
req := requestDetails{
queryValues: make(map[string][]string),
}
req.queryValues.Add("db", q.Database)
req.queryValues.Add("q", q.Command)
req.queryValues.Add("rp", q.RetentionPolicy)
req.queryValues.Add("epoch", q.Precision.Epoch())
req := buildRequestDetails(c.config, func(req *requestDetails) {
req.queryValues.Add("db", q.Database)
req.queryValues.Add("q", q.Command)
req.queryValues.Add("rp", q.RetentionPolicy)
req.queryValues.Add("epoch", q.Precision.Epoch())
})

// metric
c.metrics.queryCounter.Add(1)
Expand All @@ -53,34 +54,55 @@ func (c *client) Query(q Query) (*QueryResult, error) {
if err != nil {
return nil, errors.New("query request failed, error: " + err.Error())
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
qr, err := retrieveQueryResFromResp(resp)
if err != nil {
return nil, errors.New("query resp read failed, error: " + err.Error())
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, errors.New("query error resp, code: " + resp.Status + "body: " + string(body))
return qr, nil
}

func (c *client) queryPost(q Query) (*QueryResult, error) {
req := buildRequestDetails(c.config, func(req *requestDetails) {
req.queryValues.Add("db", q.Database)
req.queryValues.Add("q", q.Command)
})

resp, err := c.executeHttpPost(UrlQuery, req)
if err != nil {
return nil, errors.New("request failed, error: " + err.Error())
}
var qr = new(QueryResult)
err = json.Unmarshal(body, qr)
qr, err := retrieveQueryResFromResp(resp)
if err != nil {
return nil, errors.New("query unmarshal resp body failed, error: " + err.Error())
return nil, err
}
return qr, nil
}

func (c *client) queryPost(q Query) (*QueryResult, error) {
func buildRequestDetails(c *Config, requestModifier func(*requestDetails)) requestDetails {
req := requestDetails{
queryValues: make(map[string][]string),
}

req.queryValues.Add("db", q.Database)
req.queryValues.Add("q", q.Command)
resp, err := c.executeHttpPost(UrlQuery, req)
if err != nil {
return nil, errors.New("request failed, error: " + err.Error())
applyCodec(&req, c)

if requestModifier != nil {
requestModifier(&req)
}

return req
}

func applyCodec(req *requestDetails, config *Config) {
if config.Codec == CodecMsgPack {
if req.header == nil {
req.header = make(http.Header)
}
req.header.Set("Accept", "application/x-msgpack")
}
}

// retrieve query result from the response
func retrieveQueryResFromResp(resp *http.Response) (*QueryResult, error) {
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
Expand All @@ -89,10 +111,18 @@ func (c *client) queryPost(q Query) (*QueryResult, error) {
if resp.StatusCode != http.StatusOK {
return nil, errors.New("error resp, code: " + resp.Status + "body: " + string(body))
}
contentType := resp.Header.Get("Content-Type")
var qr = new(QueryResult)
err = json.Unmarshal(body, qr)
if err != nil {
return nil, errors.New("unmarshal resp body failed, error: " + err.Error())
if contentType == "application/x-msgpack" {
err = msgpack.Unmarshal(body, qr)
if err != nil {
return nil, errors.New("unmarshal msgpack body failed, error: " + err.Error())
}
} else {
err = json.Unmarshal(body, qr)
if err != nil {
return nil, errors.New("unmarshal json body failed, error: " + err.Error())
}
}
return qr, nil
}
8 changes: 4 additions & 4 deletions opengemini/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ const RpColumnLen = 8

// SeriesResult contains the results of a series query
type SeriesResult struct {
Series []*Series `json:"series,omitempty"`
Error string `json:"error,omitempty"`
Series []*Series `json:"series,omitempty" msgpack:"series,omitempty"`
Error string `json:"error,omitempty" msgpack:"error,omitempty"`
}

// QueryResult is the top-level struct
type QueryResult struct {
Results []*SeriesResult `json:"results,omitempty"`
Error string `json:"error,omitempty"`
Results []*SeriesResult `json:"results,omitempty" msgpack:"results,omitempty"`
Error string `json:"error,omitempty" msgpack:"error,omitempty"`
}

func (result *QueryResult) hasError() error {
Expand Down
72 changes: 72 additions & 0 deletions opengemini/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package opengemini

import (
"fmt"
"testing"
"time"

"github.com/libgox/addr"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -69,6 +71,63 @@ func TestQueryWithEpoch(t *testing.T) {
assert.Equal(t, length, getTimestampLength(v))
}
}
func TestQueryWithMsgPack(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []addr.Address{{
Host: "localhost",
Port: 8086,
}},
Codec: CodecMsgPack,
})

// create a test database with rand suffix
database := randomDatabaseName()
err := c.CreateDatabase(database)
assert.Nil(t, err)

// delete test database before exit test case
defer func() {
err := c.DropDatabase(database)
assert.Nil(t, err)
}()

testMeasurement := randomMeasurement()
p := &Point{}
p.Measurement = testMeasurement
p.AddField("TestField", 123)
p.Time = time.Now()

err = c.WritePoint(database, p, func(err error) {
assert.Nil(t, err)
})
assert.Nil(t, err)

time.Sleep(time.Second * 5)

PrecisionTimestampLength := make(map[Precision]int64)
PrecisionTimestampLength[PrecisionNanosecond] = 19
PrecisionTimestampLength[PrecisionMicrosecond] = 16
PrecisionTimestampLength[PrecisionMillisecond] = 13
PrecisionTimestampLength[PrecisionSecond] = 10
PrecisionTimestampLength[PrecisionMinute] = 8
PrecisionTimestampLength[PrecisionHour] = 6

// check whether write success
for precision, length := range PrecisionTimestampLength {
q := Query{
Database: database,
Command: "select * from " + testMeasurement,
Precision: precision,
}
result, err := c.Query(q)
assert.Nil(t, err)
v, err := convertToInt64(result.Results[0].Series[0].Values[0][0])
if err != nil {
t.Fatalf("conversion error: %v", err)
}
assert.Equal(t, length, getTimestampLength(v))
}
}

func getTimestampLength(timestamp int64) int64 {
var length int64 = 0
Expand All @@ -77,3 +136,16 @@ func getTimestampLength(timestamp int64) int64 {
}
return length
}

func convertToInt64(value interface{}) (int64, error) {
switch val := value.(type) {
case float64:
return int64(val), nil
case int64:
return val, nil
case int32:
return int64(val), nil
default:
return 0, fmt.Errorf("unsupported type: %T", value)
}
}
8 changes: 4 additions & 4 deletions opengemini/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type SeriesValues []SeriesValue

// Series defines the structure for series data
type Series struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values SeriesValues `json:"values,omitempty"`
Name string `json:"name,omitempty" msgpack:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty" msgpack:"tags,omitempty"`
Columns []string `json:"columns,omitempty" msgpack:"columns,omitempty"`
Values SeriesValues `json:"values,omitempty" msgpack:"values,omitempty"`
}

0 comments on commit b19a843

Please sign in to comment.