Skip to content

Commit

Permalink
Support chunked queries in the Go InfluxDB client
Browse files Browse the repository at this point in the history
Modify the CLI to always use chunked queries.
  • Loading branch information
jsternberg committed Mar 31, 2016
1 parent d9b32ea commit 8752d1b
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [#5372](https://github.com/influxdata/influxdb/pull/5372): Faster shard loading
- [#6148](https://github.com/influxdata/influxdb/pull/6148): Build script is now compatible with Python 3. Added ability to create detached signatures for packages. Build script now uses Python logging facility for messages.
- [#6115](https://github.com/influxdata/influxdb/issues/6115): Support chunking query results mid-series. Limit non-chunked output.
- [#6166](https://github.com/influxdata/influxdb/pull/6166): Teach influxdb client how to use chunked queries and use in the CLI.

### Bugfixes

Expand Down
87 changes: 75 additions & 12 deletions client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand All @@ -32,6 +33,18 @@ const (
type Query struct {
Command string
Database string

// Chunked tells the server to send back chunked responses. This places
// less load on the server by sending back chunks of the response rather
// than waiting for the entire response all at once.
Chunked bool

// ChunkSize sets the maximum number of rows that will be returned per
// chunk. Chunks are either divided based on their series or if they hit
// the chunk size limit.
//
// Chunked must be set to true for this option to be used.
ChunkSize int
}

// ParseConnectionString will parse a string to create a valid connection URL
Expand Down Expand Up @@ -157,6 +170,12 @@ func (c *Client) Query(q Query) (*Response, error) {
values := u.Query()
values.Set("q", q.Command)
values.Set("db", q.Database)
if q.Chunked {
values.Set("chunked", "true")
if q.ChunkSize > 0 {
values.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
}
if c.precision != "" {
values.Set("epoch", c.precision)
}
Expand All @@ -178,19 +197,38 @@ func (c *Client) Query(q Query) (*Response, error) {
defer resp.Body.Close()

var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
// If we got an error while decoding the response, send that back.
return nil, err
}

// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, decErr
if r == nil {
break
}

response.Results = append(response.Results, r.Results...)
if r.Err != nil {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
if err := dec.Decode(&response); err != nil {
// Ignore EOF errors if we got an invalid status code.
if !(err == io.EOF && resp.StatusCode != http.StatusOK) {
return nil, err
}
}
}
// If we don't have an error in our json response, and didn't get StatusOK, then send back an error

// If we don't have an error in our json response, and didn't get StatusOK,
// then send back an error.
if resp.StatusCode != http.StatusOK && response.Error() == nil {
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
}
Expand Down Expand Up @@ -437,7 +475,7 @@ func (r *Response) UnmarshalJSON(b []byte) error {

// Error returns the first error from any statement.
// Returns nil if no errors occurred on any statements.
func (r Response) Error() error {
func (r *Response) Error() error {
if r.Err != nil {
return r.Err
}
Expand All @@ -449,6 +487,31 @@ func (r Response) Error() error {
return nil
}

// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
}

// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
dec := json.NewDecoder(r)
dec.UseNumber()
return &ChunkedResponse{dec: dec}
}

// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response
if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
}
return nil, err
}
return &response, nil
}

// Point defines the fields that will be written to the database
// Measurement, Time, and Fields are required
// Precision can be specified if the time is in epoch format (integer).
Expand Down
49 changes: 49 additions & 0 deletions client/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,30 @@ func TestClient_Query(t *testing.T) {
}
}

func TestClient_ChunkedQuery(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data client.Response
w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
_ = enc.Encode(data)
_ = enc.Encode(data)
}))
defer ts.Close()

u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}

query := client.Query{Chunked: true}
_, err = c.Query(query)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
}

func TestClient_BasicAuth(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u, p, ok := r.BasicAuth()
Expand Down Expand Up @@ -741,3 +765,28 @@ war3JNM1mGB3o2iAtuOJlFIKLpI1x+1e8pI=
}
}
}

func TestChunkedResponse(t *testing.T) {
s := `{"results":[{},{}]}{"results":[{}]}`
r := client.NewChunkedResponse(strings.NewReader(s))
resp, err := r.NextResponse()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
} else if actual := len(resp.Results); actual != 2 {
t.Fatalf("unexpected number of results. expected %v, actual %v", 2, actual)
}

resp, err = r.NextResponse()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
} else if actual := len(resp.Results); actual != 1 {
t.Fatalf("unexpected number of results. expected %v, actual %v", 1, actual)
}

resp, err = r.NextResponse()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
} else if resp != nil {
t.Fatalf("unexpected response. expected %v, actual %v", nil, resp)
}
}
14 changes: 12 additions & 2 deletions cmd/influx/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type CommandLine struct {
PPS int // Controls how many points per second the import will allow via throttling
Path string
Compressed bool
Chunked bool
Quit chan struct{}
IgnoreSignals bool // Ignore signals normally caught by this process (used primarily for testing)
osSignals chan os.Signal
Expand Down Expand Up @@ -518,9 +519,18 @@ func (c *CommandLine) Insert(stmt string) error {
return nil
}

// query creates a query struct to be used with the client.
func (c *CommandLine) query(query string, database string) client.Query {
return client.Query{
Command: query,
Database: database,
Chunked: true,
}
}

// ExecuteQuery runs any query statement
func (c *CommandLine) ExecuteQuery(query string) error {
response, err := c.Client.Query(client.Query{Command: query, Database: c.Database})
response, err := c.Client.Query(c.query(query, c.Database))
if err != nil {
fmt.Printf("ERR: %s\n", err)
return err
Expand All @@ -539,7 +549,7 @@ func (c *CommandLine) ExecuteQuery(query string) error {

// DatabaseToken retrieves database token
func (c *CommandLine) DatabaseToken() (string, error) {
response, err := c.Client.Query(client.Query{Command: "SHOW DIAGNOSTICS for 'registration'"})
response, err := c.Client.Query(c.query("SHOW DIAGNOSTICS for 'registration'", ""))
if err != nil {
return "", err
}
Expand Down

0 comments on commit 8752d1b

Please sign in to comment.