diff --git a/.circleci/config.yml b/.circleci/config.yml index 0a7c1d4..0eaaef3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,7 +3,7 @@ jobs: build: working_directory: ~/repo docker: - - image: cimg/go:1.20.3 + - image: cimg/go:1.20.13 steps: - checkout - restore_cache: diff --git a/README.md b/README.md index 0db5a87..39a30e2 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ which allows you to execute SQL queries against InfluxDB IOx. ## Installation -Add the latest version of the client package to your project dependencies (`go.mod`): +Add the latest version of the client package to your project dependencies: ```sh go get github.com/InfluxCommunity/influxdb3-go @@ -41,19 +41,27 @@ go get github.com/InfluxCommunity/influxdb3-go ## Usage -set environment variables: +Client can be instantiated using -- `INFLUXDB_URL` region of your influxdb cloud e.g. *`https://us-east-1-1.aws.cloud2.influxdata.com/`* -- `INFLUXDB_TOKEN` read/write token generated in cloud -- `INFLUXDB_DATABASE` name of database e.g .*`my-database`* +* `influxb3.ClientConfig` +* environment variables +* connection string + +### Environment variables + +Set environment variables: + +* `INFLUX_URL` region of your influxdb cloud e.g. *`https://us-east-1-1.aws.cloud2.influxdata.com/`* +* `INFLUX_TOKEN` read/write token generated in cloud +* `INFLUX_DATABASE` name of database e.g .*`my-database`*
linux/macos ```sh -export INFLUXDB_URL="" -export INFLUXDB_DATABASE="" -export INFLUXDB_TOKEN="" +export INFLUX_URL="" +export INFLUX_TOKEN="" +export INFLUX_DATABASE="" ```
@@ -62,9 +70,9 @@ export INFLUXDB_TOKEN="" windows ```powershell -setx INFLUXDB_URL "" -setx INFLUXDB_DATABASE "" -setx INFLUXDB_TOKEN "" +setx INFLUX_URL "" +setx INFLUX_TOKEN "" +setx INFLUX_DATABASE "" ``` @@ -82,49 +90,86 @@ import ( ) ``` -Create `influxdb3.Client` with `New` function. Make sure to `Close` client after with `defer` keyword. +Create `influxdb3.Client` with `New` function. Make sure to `Close` the client at the end. ```go -url := os.Getenv("INFLUXDB_URL") -token := os.Getenv("INFLUXDB_TOKEN") -database := os.Getenv("INFLUXDB_DATABASE") - -// Create a new client using an InfluxDB server base URL and an authentication token -client, err := influxdb3.New(influxdb3.ClientConfig{ - Host: url, - Token: token, - Database: database, -}) -// Close client at the end and escalate error if present -defer func (client *influxdb3.Client) { +// Create a new client using INFLUX_* environment variables +client, err := influxdb3.New() + +// Close client at the end and escalate an error if occurs +defer func () { err := client.Close() if err != nil { panic(err) } -}(client) +}() ``` -The `client` can be now used to insert data using [line-protocol](https://docs.influxdata.com/influxdb/cloud-serverless/reference/syntax/line-protocol/). +### Write data + +The `client` can insert data using [line-protocol](https://docs.influxdata.com/influxdb/cloud-serverless/reference/syntax/line-protocol/): ```go -line := "stat,unit=temperature avg=23.5,max=45.0" +line := "stat,location=Paris temperature=23.5,humidity=45i" err = client.Write(context.Background(), []byte(line)) ``` -Fetch data using FlightSQL query and print result. +The `client` can also write points + +```go +p1 := influxdb3.Point{ + influxdb3.NewPoint("stat", + map[string]string{ + "location": "Paris", + }, + map[string]any{ + "temperature": 24.5, + "humidity": 40, + }, + time.Now(), + ), +} +points := []*influxdb3.Point{p1} +err = client.WritePoints(context.Background(), points) +``` + +and/or annotated structs + +```go +s1 := struct { + Measurement string `lp:"measurement"` + Sensor string `lp:"tag,location"` + Temp float64 `lp:"field,temperature"` + Hum int `lp:"field,humidity"` + Time time.Time `lp:"timestamp"` + Description string `lp:"-"` +}{ + "stat", + "Paris", + 23.5, + 55, + time.Now(), + "Paris weather conditions", +} +data := []any{s1} +err = client.WriteData(context.Background(), data) +``` + +### Query + +Use FlightSQL to query and print result. ```go query := ` - SELECT * - FROM "stat" - WHERE + SELECT * + FROM stat + WHERE time >= now() - interval '5 minute' AND - "unit" IN ('temperature') -`; + location IN ('Paris') +` iterator, err := client.Query(context.Background(), query) - if err != nil { panic(err) } @@ -132,14 +177,34 @@ if err != nil { for iterator.Next() { value := iterator.Value() - fmt.Printf("avg is %f\n", value["avg"]) - fmt.Printf("max is %f\n", value["max"]) + fmt.Printf("temperature in Paris is %f\n", value["temperature"]) + fmt.Printf("humidity in Paris is %d%%\n", value["humidity"]) } ``` -## Example +Queries can be parameterized: + +```go +query := ` + SELECT * + FROM stat + WHERE + time >= now() - interval '5 minute' + AND + location = $location +` +parameters := influxdb3.QueryParameters{ + "location": "Paris", +} + +iterator, err := client.QueryWithParameters(context.Background(), query, parameters) + +// process result +``` + +## Examples -Prepare environment like in [Usage](#usage) and run `go run ./example/main.go`. +Prepare environment like in [Usage](#usage) and check ['examples'](./examples/README.md) folder. ## Feedback diff --git a/example/Downsampling/downsampling.go b/example/Downsampling/downsampling.go deleted file mode 100644 index 685fa69..0000000 --- a/example/Downsampling/downsampling.go +++ /dev/null @@ -1,113 +0,0 @@ -package main - -import ( - "context" - "fmt" - "github.com/apache/arrow/go/v13/arrow" - "time" - - "github.com/InfluxCommunity/influxdb3-go/influxdb3" -) - -func main() { - url := "https://us-east-1-1.aws.cloud2.influxdata.com" - token := "my-token" - database := "my-database" - - client, err := influxdb3.New(influxdb3.ClientConfig{ - Host: url, - Token: token, - Database: database, - }) - - if err != nil { - panic(err) - } - // Close client at the end and escalate error if present - defer func(client *influxdb3.Client) { - err := client.Close() - if err != nil { - panic(err) - } - }(client) - - // - // Write data - // - err = client.WritePoints(context.Background(), influxdb3.NewPointWithMeasurement("stat"). - SetTag("unit", "temperature"). - SetDoubleField("avg", 23.2). - SetDoubleField("max", 45.0)) - if err != nil { - panic(err) - } - time.Sleep(1 * time.Second) - - err = client.WritePoints(context.Background(), influxdb3.NewPointWithMeasurement("stat"). - SetTag("unit", "temperature"). - SetDoubleField("avg", 28.0). - SetDoubleField("max", 40.3)) - if err != nil { - panic(err) - } - time.Sleep(1 * time.Second) - - err = client.WritePoints(context.Background(), influxdb3.NewPointWithMeasurement("stat"). - SetTag("unit", "temperature"). - SetDoubleField("avg", 23.2). - SetDoubleField("max", 45.0)) - if err != nil { - panic(err) - } - time.Sleep(1 * time.Second) - - // - // Query Downsampled data - // - query := ` - SELECT - date_bin('5 minutes', "time") as window_start, - AVG("avg") as avg, - MAX("max") as max - FROM "stat" - WHERE - "time" >= now() - interval '1 hour' - GROUP BY window_start - ORDER BY window_start ASC; - ` - - // - // Execute downsampling query into PointValues - // - iterator, err := client.Query(context.Background(), query) - if err != nil { - panic(err) - } - - for iterator.Next() { - row := iterator.AsPoints() - timestamp := int64(row.GetField("window_start").(arrow.Timestamp)) - - avgValue := row.GetDoubleField("avg") - maxValue := row.GetDoubleField("max") - fmt.Printf("%s: avg is %.2f, max is %.2f\n", time.Unix(0, timestamp), *avgValue, *maxValue) - - // - // write back downsampled date to 'stat_downsampled' measurement - // - downsampledPoint, err := row.AsPointWithMeasurement("stat_downsampled") - if err != nil { - panic(err) - } - - downsampledPoint = downsampledPoint. - RemoveField("window_start"). - SetTimestampWithEpoch(timestamp) - - err = client.WritePoints(context.Background(), downsampledPoint) - if err != nil { - panic(err) - } - } - -} diff --git a/examples/Downsampling/downsampling.go b/examples/Downsampling/downsampling.go new file mode 100644 index 0000000..cac41ad --- /dev/null +++ b/examples/Downsampling/downsampling.go @@ -0,0 +1,119 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/apache/arrow/go/v14/arrow" +) + +func main() { + // Use env variables to initialize client + url := os.Getenv("INFLUX_URL") + token := os.Getenv("INFLUX_TOKEN") + database := os.Getenv("INFLUX_DATABASE") + + // Create a new client using an InfluxDB server base URL and an authentication token + client, err := influxdb3.New(influxdb3.ClientConfig{ + Host: url, + Token: token, + Database: database, + }) + if err != nil { + panic(err) + } + + // Close client at the end and escalate error if present + defer func(client *influxdb3.Client) { + err := client.Close() + if err != nil { + panic(err) + } + }(client) + + // + // Write data + // + err = client.WritePoints(context.Background(), []*influxdb3.Point{ + influxdb3.NewPointWithMeasurement("stat"). + SetTag("location", "Paris"). + SetDoubleField("temperature", 23.2), + }) + if err != nil { + panic(err) + } + time.Sleep(1 * time.Second) + + err = client.WritePoints(context.Background(), []*influxdb3.Point{ + influxdb3.NewPointWithMeasurement("stat"). + SetTag("location", "Paris"). + SetDoubleField("temperature", 24.1), + }) + if err != nil { + panic(err) + } + time.Sleep(1 * time.Second) + + err = client.WritePoints(context.Background(), []*influxdb3.Point{ + influxdb3.NewPointWithMeasurement("stat"). + SetTag("location", "Paris"). + SetDoubleField("temperature", 23.9), + }) + if err != nil { + panic(err) + } + time.Sleep(1 * time.Second) + + // + // Query Downsampled data + // + query := ` + SELECT + DATE_BIN(INTERVAL '5 minutes', time) as window_start, + location, + AVG(temperature) as avg, + MAX(temperature) as max + FROM stat + WHERE + time >= now() - interval '1 hour' + GROUP BY window_start, location + ORDER BY location, window_start + ` + + // + // Execute downsampling query into PointValues + // + iterator, err := client.Query(context.Background(), query) + if err != nil { + panic(err) + } + + for iterator.Next() { + row := iterator.AsPoints() + timestamp := (row.GetField("window_start").(arrow.Timestamp)).ToTime(arrow.Nanosecond) + location := row.GetStringField("location") + avgValue := row.GetDoubleField("avg") + maxValue := row.GetDoubleField("max") + fmt.Printf("%s %s temperature: avg %.2f, max %.2f\n", timestamp.Format(time.RFC822), *location, *avgValue, *maxValue) + + // + // write back downsampled date to 'stat_downsampled' measurement + // + downsampledPoint, err := row.AsPointWithMeasurement("stat_downsampled") + if err != nil { + panic(err) + } + + downsampledPoint = downsampledPoint. + RemoveField("window_start"). + SetTimestampWithEpoch(timestamp.UnixNano()) + + err = client.WritePoints(context.Background(), []*influxdb3.Point{downsampledPoint}) + if err != nil { + panic(err) + } + } +} diff --git a/example/IOx/iox.go b/examples/IOx/iox.go similarity index 54% rename from example/IOx/iox.go rename to examples/IOx/iox.go index 0297fe8..8be99a8 100644 --- a/example/IOx/iox.go +++ b/examples/IOx/iox.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/apache/arrow/go/v14/arrow" "os" "time" @@ -11,9 +12,9 @@ import ( func main() { // Use env variables to initialize client - url := os.Getenv("INFLUXDB_URL") - token := os.Getenv("INFLUXDB_TOKEN") - database := os.Getenv("INFLUXDB_DATABASE") + url := os.Getenv("INFLUX_URL") + token := os.Getenv("INFLUX_TOKEN") + database := os.Getenv("INFLUX_DATABASE") // Create a new client using an InfluxDB server base URL and an authentication token client, err := influxdb3.New(influxdb3.ClientConfig{ @@ -21,10 +22,10 @@ func main() { Token: token, Database: database, }) - if err != nil { panic(err) } + // Close client at the end and escalate error if present defer func(client *influxdb3.Client) { err := client.Close() @@ -35,40 +36,49 @@ func main() { // Create point using full params constructor p := influxdb3.NewPoint("stat", - map[string]string{"unit": "temperature"}, - map[string]interface{}{"avg": 24.5, "max": 45.0}, + map[string]string{"location": "Paris"}, + map[string]interface{}{ + "temperature": 24.5, + "humidity": 40, + }, time.Now()) + // write point synchronously - err = client.WritePoints(context.Background(), p) + err = client.WritePoints(context.Background(), []*influxdb3.Point{p}) if err != nil { panic(err) } + // Create point using fluent style p = influxdb3.NewPointWithMeasurement("stat"). - SetTag("unit", "temperature"). - SetField("avg", 23.2). - SetField("max", 45.0). + SetTag("location", "London"). + SetField("temperature", 17.1). + SetField("humidity", 65). SetTimestamp(time.Now()) + // write point synchronously - err = client.WritePoints(context.Background(), p) + err = client.WritePoints(context.Background(), []*influxdb3.Point{p}) if err != nil { panic(err) } + // Prepare custom type sensorData := struct { Table string `lp:"measurement"` - Unit string `lp:"tag,unit"` - Avg float64 `lp:"field,avg"` - Max float64 `lp:"field,max"` + Unit string `lp:"tag,location"` + Temp float64 `lp:"field,temperature"` + Humid int64 `lp:"field,humidity"` Time time.Time `lp:"timestamp"` - }{"stat", "temperature", 22.3, 40.3, time.Now()} + }{"stat", "Madrid", 33.8, 35, time.Now()} + // Write point - err = client.WriteData(context.Background(), sensorData) + err = client.WriteData(context.Background(), []any{sensorData}) if err != nil { panic(err) } + // Or write directly line protocol - line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0) + line := fmt.Sprintf("stat,location=Berlin temperature=%f,humidity=%di", 20.1, 55) err = client.Write(context.Background(), []byte(line)) if err != nil { panic(err) @@ -77,24 +87,23 @@ func main() { // Prepare FlightSQL query query := ` SELECT * - FROM "stat" + FROM stat WHERE - time >= now() - interval '5 minute' + time >= now() - interval '5 minute' AND - "unit" IN ('temperature') + location IN ('Paris', 'London', 'Madrid') ` + // Run the query iterator, err := client.Query(context.Background(), query) - if err != nil { panic(err) } - for iterator.Next() { value := iterator.Value() - - fmt.Printf("avg is %f\n", value["avg"]) - fmt.Printf("max is %f\n", value["max"]) + fmt.Printf("%s at %v:\n", value["location"], + (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC822)) + fmt.Printf(" temperature: %f\n", value["temperature"]) + fmt.Printf(" humidity : %d%%\n", value["humidity"]) } - } diff --git a/example/README.md b/examples/README.md similarity index 100% rename from example/README.md rename to examples/README.md diff --git a/go.mod b/go.mod index 54d3de2..b454fb0 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,8 @@ module github.com/InfluxCommunity/influxdb3-go go 1.20 require ( - github.com/apache/arrow/go/v13 v13.0.0 + github.com/apache/arrow/go/v14 v14.0.2 + github.com/google/go-cmp v0.6.0 github.com/influxdata/line-protocol/v2 v2.2.1 github.com/stretchr/testify v1.8.4 google.golang.org/grpc v1.62.0 @@ -11,20 +12,20 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/goccy/go-json v0.10.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/flatbuffers v23.1.21+incompatible // indirect - github.com/klauspost/compress v1.15.15 // indirect - github.com/klauspost/cpuid/v2 v2.2.3 // indirect + github.com/google/flatbuffers v23.5.26+incompatible // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/kr/pretty v0.3.0 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - golang.org/x/mod v0.8.0 // indirect + golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.6.0 // indirect + golang.org/x/tools v0.14.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect google.golang.org/protobuf v1.32.0 // indirect diff --git a/go.sum b/go.sum index da5ebb4..d667a09 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk= -github.com/apache/arrow/go/v13 v13.0.0/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= +github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= +github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -7,16 +7,17 @@ github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk= github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= -github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= -github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/flatbuffers v23.1.21+incompatible h1:bUqzx/MXCDxuS0hRJL2EfjyZL3uQrPbMocUa8zGqsTA= -github.com/google/flatbuffers v23.1.21+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= +github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98= github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig= @@ -25,10 +26,10 @@ github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY= github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE= github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM= -github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= -github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= -github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU= -github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -38,8 +39,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= -github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= @@ -50,18 +51,31 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= +golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= +golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= diff --git a/influxdb3/client.go b/influxdb3/client.go index 72bab27..4003e84 100644 --- a/influxdb3/client.go +++ b/influxdb3/client.go @@ -35,7 +35,7 @@ import ( "strconv" "strings" - "github.com/apache/arrow/go/v13/arrow/flight" + "github.com/apache/arrow/go/v14/arrow/flight" ) // Client implements an InfluxDB client. diff --git a/influxdb3/client_e2e_test.go b/influxdb3/client_e2e_test.go index 456b2c2..625d125 100644 --- a/influxdb3/client_e2e_test.go +++ b/influxdb3/client_e2e_test.go @@ -30,7 +30,7 @@ import ( "time" "github.com/InfluxCommunity/influxdb3-go/influxdb3" - "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v14/arrow" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -66,7 +66,7 @@ func TestWriteAndQueryExample(t *testing.T) { SetField("testId", testId). SetField("text", "a1"). SetTimestamp(now) - err = client.WritePoints(context.Background(), p) + err = client.WritePoints(context.Background(), []*influxdb3.Point{p}) require.NoError(t, err) sensorData := struct { @@ -80,7 +80,7 @@ func TestWriteAndQueryExample(t *testing.T) { Text string `lp:"field,text"` Time time.Time `lp:"timestamp"` }{tableName, tagValue, 24.5, -15, uint64(150), false, testId, "b1", now.Add(1 * time.Second)} - err = client.WriteData(context.Background(), sensorData) + err = client.WriteData(context.Background(), []any{sensorData}) require.NoError(t, err) // SQL query test @@ -148,6 +148,74 @@ func TestWriteAndQueryExample(t *testing.T) { assert.True(t, newPoint != nil) } +func TestQueryWithParameters(t *testing.T) { + now := time.Now().UTC() + testId := now.UnixNano() + + url := os.Getenv("TESTING_INFLUXDB_URL") + token := os.Getenv("TESTING_INFLUXDB_TOKEN") + database := os.Getenv("TESTING_INFLUXDB_DATABASE") + + client, err := influxdb3.New(influxdb3.ClientConfig{ + Host: url, + Token: token, + Database: database, + }) + require.NoError(t, err) + defer client.Close() + + p := influxdb3.NewPointWithMeasurement("weather"). + SetTag("location", "sun-valley-1"). + SetField("temp", 15.5). + SetField("index", 80). + SetField("uindex", uint64(800)). + SetField("valid", true). + SetField("testId", testId). + SetField("text", "a1"). + SetTimestamp(now) + err = client.WritePoints(context.Background(), []*influxdb3.Point{p}) + require.NoError(t, err) + + query := ` + SELECT * + FROM weather + WHERE + time >= now() - interval '10 minute' + AND + location = $location + AND + "testId" = $testId + ORDER BY time + ` + parameters := influxdb3.QueryParameters{ + "location": "sun-valley-1", + "testId": testId, + } + + sleepTime := 5 * time.Second + time.Sleep(sleepTime) + + iterator, err := client.QueryWithParameters(context.Background(), query, parameters) + require.NoError(t, err) + require.NotNil(t, iterator) + + hasValue := iterator.Next() + assert.True(t, hasValue) + + value := iterator.Value() + assert.Equal(t, "sun-valley-1", value["location"]) + assert.Equal(t, 15.5, value["temp"]) + assert.Equal(t, int64(80), value["index"]) + assert.Equal(t, uint64(800), value["uindex"]) + assert.Equal(t, true, value["valid"]) + assert.Equal(t, "a1", value["text"]) + assert.Equal(t, now, value["time"].(arrow.Timestamp).ToTime(arrow.Nanosecond)) + + assert.False(t, iterator.Done()) + assert.False(t, iterator.Next()) + assert.True(t, iterator.Done()) +} + func TestQueryDatabaseDoesNotExist(t *testing.T) { url := os.Getenv("TESTING_INFLUXDB_URL") token := os.Getenv("TESTING_INFLUXDB_TOKEN") @@ -180,7 +248,7 @@ func TestQuerySchema(t *testing.T) { assert.NotNil(t, iterator.Raw()) } -func TestQueryWithOptions(t *testing.T) { +func TestQuerySchemaWithOptions(t *testing.T) { url := os.Getenv("TESTING_INFLUXDB_URL") token := os.Getenv("TESTING_INFLUXDB_TOKEN") database := os.Getenv("TESTING_INFLUXDB_DATABASE") @@ -190,11 +258,8 @@ func TestQueryWithOptions(t *testing.T) { Token: token, Database: "does not exist", }) - options := influxdb3.QueryOptions{ - Database: database, - } - iterator, err := client.QueryWithOptions(context.Background(), &options, "SHOW NAMESPACES") + iterator, err := client.Query(context.Background(), "SHOW NAMESPACES", influxdb3.WithDatabase(database)) require.NoError(t, err) assert.NotNil(t, iterator.Raw()) } @@ -210,10 +275,7 @@ func TestQuerySchemaInfluxQL(t *testing.T) { Database: database, }) - options := influxdb3.QueryOptions{ - QueryType: influxdb3.InfluxQL, - } - iterator, err := client.QueryWithOptions(context.Background(), &options, "SHOW MEASUREMENTS") + iterator, err := client.Query(context.Background(), "SHOW MEASUREMENTS", influxdb3.WithQueryType(influxdb3.InfluxQL)) require.NoError(t, err) assert.NotNil(t, iterator.Raw()) } diff --git a/influxdb3/options.go b/influxdb3/options.go index 1b1da21..3d38846 100644 --- a/influxdb3/options.go +++ b/influxdb3/options.go @@ -28,7 +28,7 @@ import ( // QueryOptions holds options for query type QueryOptions struct { - // Database for querying. Use in `QueryWithOptions` method to override default database in `ClientConfig`. + // Database for querying. Use to override default database in `ClientConfig`. Database string // Query type. @@ -37,11 +37,11 @@ type QueryOptions struct { // WriteOptions holds options for write type WriteOptions struct { - // Database for writing. Use in `WriteWithOptions` methods to override default database in `ClientConfig`. + // Database for writing. Use to override default database in `ClientConfig`. Database string - // Precision to use in writes for timestamp. - // Default `lineprotocol.Nanosecond` + // Precision of timestamp to use when writing data. + // Default value: lineprotocol.Nanosecond Precision lineprotocol.Precision // Tags added to each point during writing. If a point already has a tag with the same key, it is left unchanged. @@ -54,6 +54,7 @@ type WriteOptions struct { // Database: "my-database", // }) // + // Example: // options := WriteOptions{ // DefaultTags: map[string]string{ // "rack": "main", @@ -77,7 +78,6 @@ type WriteOptions struct { // DefaultTags: map[string]string{ // "rack": "main", // }, - // Precision: lineprotocol.Millisecond, // }, // }) // @@ -88,7 +88,7 @@ type WriteOptions struct { // c.WritePoints(context.Background(), p) DefaultTags map[string]string - // Write body larger than the threshold is gzipped. 0 to don't gzip at all + // Write body larger than the threshold is gzipped. 0 for no compression. GzipThreshold int } @@ -102,3 +102,86 @@ var DefaultWriteOptions = WriteOptions{ Precision: lineprotocol.Nanosecond, GzipThreshold: 1_000, } + +// Option is a functional option type that can be passed to Client.Query and Client.Write methods. +type Option func(o *options) + +// QueryOption is a functional option type that can be passed to Client.Query. +// Available options: +// - WithDatabase +// - WithQueryType +type QueryOption = Option + +// WriteOption is a functional option type that can be passed to Client.Write methods. +// Available options: +// - WithDatabase +// - WithPrecision +// - WithGzipThreshold +// - WithDefaultTags +type WriteOption = Option + +// WithDatabase is used to override default database in Client.Query and Client.Write methods. +func WithDatabase(database string) Option { + return func(o *options) { + o.QueryOptions.Database = database + o.WriteOptions.Database = database + } +} + +// WithQueryType is used to override default query type in Client.Query method. +func WithQueryType(queryType QueryType) Option { + return func(o *options) { + o.QueryType = queryType + } +} + +// WithPrecision is used to override default precision in Client.Write methods. +func WithPrecision(precision lineprotocol.Precision) Option { + return func(o *options) { + o.Precision = precision + } +} + +// WithGzipThreshold is used to override default GZIP threshold in Client.Write methods. +func WithGzipThreshold(gzipThreshold int) Option { + return func(o *options) { + o.GzipThreshold = gzipThreshold + } +} + +// WithDefaultTags is used to override default tags in Client.Write methods. +func WithDefaultTags(tags map[string]string) Option { + return func(o *options) { + o.DefaultTags = tags + } +} + +type options struct { + QueryOptions + WriteOptions +} + +func newQueryOptions(defaults *QueryOptions, opts []Option) *QueryOptions { + return &(newOptions(defaults, nil, opts).QueryOptions) +} + +func newWriteOptions(defaults *WriteOptions, opts []Option) *WriteOptions { + return &(newOptions(nil, defaults, opts).WriteOptions) +} + +func newOptions(defaultQueryOptions *QueryOptions, defaultWriteOptions *WriteOptions, opts []Option) *options { + o := &options{} + + if defaultQueryOptions != nil { + o.QueryOptions = *defaultQueryOptions + } + if defaultWriteOptions != nil { + o.WriteOptions = *defaultWriteOptions + } + + for _, opt := range opts { + opt(o) + } + + return o +} diff --git a/influxdb3/options_test.go b/influxdb3/options_test.go new file mode 100644 index 0000000..c909625 --- /dev/null +++ b/influxdb3/options_test.go @@ -0,0 +1,122 @@ +package influxdb3 + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/line-protocol/v2/lineprotocol" +) + +func TestQueryOptions(t *testing.T) { + fn := func(options ...QueryOption) *QueryOptions { + return newQueryOptions(&DefaultQueryOptions, options) + } + va := func(options ...QueryOption) []QueryOption { + return options + } + + testCases := []struct { + name string + opts []QueryOption + want *QueryOptions + }{ + { + name: "default", + opts: nil, + want: &DefaultQueryOptions, + }, + { + name: "override database", + opts: va(WithDatabase("db-x")), + want: &QueryOptions{ + Database: "db-x", + }, + }, + { + name: "override database and query type", + opts: va(WithDatabase("db-x"), WithQueryType(InfluxQL)), + want: &QueryOptions{ + Database: "db-x", + QueryType: InfluxQL, + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + options := fn(tc.opts...) + if diff := cmp.Diff(tc.want, options); diff != "" { + t.Fatal(diff) + } + }) + } +} + +func TestWriteOptions(t *testing.T) { + fn := func(options ...WriteOption) *WriteOptions { + return newWriteOptions(&DefaultWriteOptions, options) + } + va := func(options ...WriteOption) []WriteOption { + return options + } + + testCases := []struct { + name string + opts []WriteOption + want *WriteOptions + }{ + { + name: "default", + want: &DefaultWriteOptions, + }, + { + name: "default", + opts: nil, + want: &DefaultWriteOptions, + }, + { + name: "override database", + opts: va(WithDatabase("db-x")), + want: &WriteOptions{ + Database: "db-x", + Precision: DefaultWriteOptions.Precision, + GzipThreshold: DefaultWriteOptions.GzipThreshold, + }, + }, + { + name: "override database and precision", + opts: va(WithDatabase("db-x"), WithPrecision(lineprotocol.Millisecond)), + want: &WriteOptions{ + Database: "db-x", + Precision: lineprotocol.Millisecond, + GzipThreshold: DefaultWriteOptions.GzipThreshold, + }, + }, + { + name: "override database and precision and GZIP threshold", + opts: va( + WithDatabase("db-x"), + WithPrecision(lineprotocol.Millisecond), + WithGzipThreshold(4096), + ), + want: &WriteOptions{ + Database: "db-x", + Precision: lineprotocol.Millisecond, + GzipThreshold: 4096, + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + options := fn(tc.opts...) + if diff := cmp.Diff(tc.want, options); diff != "" { + t.Fatal(diff) + } + }) + } +} diff --git a/influxdb3/query.go b/influxdb3/query.go index 01ddf3a..fbb21e7 100644 --- a/influxdb3/query.go +++ b/influxdb3/query.go @@ -26,12 +26,13 @@ import ( "context" "crypto/x509" "encoding/json" + "errors" "fmt" "strings" - "github.com/apache/arrow/go/v13/arrow/flight" - "github.com/apache/arrow/go/v13/arrow/ipc" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow/flight" + "github.com/apache/arrow/go/v14/arrow/ipc" + "github.com/apache/arrow/go/v14/arrow/memory" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -66,16 +67,36 @@ func (c *Client) initializeQueryClient() error { return nil } -// Query data from InfluxDB IOx with FlightSQL. +// QueryParameters is a type for query parameters. +type QueryParameters = map[string]any + +// Query queries data from InfluxDB IOx. +// Parameters: +// - ctx: The context.Context to use for the request. +// - query: The query string to execute. +// - options: The optional query options. See QueryOption for available options. +// +// Returns: +// - A custom iterator (*QueryIterator). +// - An error, if any. +func (c *Client) Query(ctx context.Context, query string, options ...QueryOption) (*QueryIterator, error) { + return c.query(ctx, query, nil, newQueryOptions(&DefaultQueryOptions, options)) +} + +// QueryWithParameters queries data from InfluxDB IOx with parameterized query. // Parameters: // - ctx: The context.Context to use for the request. -// - query: The InfluxQL query string to execute. +// - query: The query string to execute. +// - parameters: The query parameters. +// - options: The optional query options. See QueryOption for available options. // // Returns: // - A custom iterator (*QueryIterator). // - An error, if any. -func (c *Client) Query(ctx context.Context, query string) (*QueryIterator, error) { - return c.QueryWithOptions(ctx, &DefaultQueryOptions, query) +func (c *Client) QueryWithParameters(ctx context.Context, query string, parameters QueryParameters, + options ...QueryOption) (*QueryIterator, error) { + + return c.query(ctx, query, parameters, newQueryOptions(&DefaultQueryOptions, options)) } // QueryWithOptions Query data from InfluxDB IOx with query options. @@ -87,21 +108,28 @@ func (c *Client) Query(ctx context.Context, query string) (*QueryIterator, error // Returns: // - A custom iterator (*QueryIterator) that can also be used to get raw flightsql reader. // - An error, if any. +// +// Deprecated: use Query with variadic QueryOption options. func (c *Client) QueryWithOptions(ctx context.Context, options *QueryOptions, query string) (*QueryIterator, error) { if options == nil { - return nil, fmt.Errorf("options not set") + return nil, errors.New("options not set") } + return c.query(ctx, query, nil, options) +} + +func (c *Client) query(ctx context.Context, query string, parameters QueryParameters, options *QueryOptions) (*QueryIterator, error) { var database string - var queryType QueryType if options.Database != "" { database = options.Database } else { database = c.config.Database } if database == "" { - return nil, fmt.Errorf("database not specified") + return nil, errors.New("database not specified") } + + var queryType QueryType queryType = options.QueryType ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+c.config.Token) @@ -113,6 +141,10 @@ func (c *Client) QueryWithOptions(ctx context.Context, options *QueryOptions, qu "query_type": strings.ToLower(queryType.String()), } + if len(parameters) > 0 { + ticketData["params"] = parameters + } + ticketJSON, err := json.Marshal(ticketData) if err != nil { return nil, fmt.Errorf("serialize: %s", err) diff --git a/influxdb3/query_iterator.go b/influxdb3/query_iterator.go index 0f73336..ccc3a92 100644 --- a/influxdb3/query_iterator.go +++ b/influxdb3/query_iterator.go @@ -26,9 +26,9 @@ import ( "fmt" "strings" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/flight" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/flight" ) // QueryIterator is a custom query iterator that encapsulates and simplifies the logic for diff --git a/influxdb3/query_iterator_test.go b/influxdb3/query_iterator_test.go index 58285af..bce6975 100644 --- a/influxdb3/query_iterator_test.go +++ b/influxdb3/query_iterator_test.go @@ -2,11 +2,11 @@ package influxdb3 import ( "bytes" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/flight" - "github.com/apache/arrow/go/v13/arrow/ipc" - "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/flight" + "github.com/apache/arrow/go/v14/arrow/ipc" + "github.com/apache/arrow/go/v14/arrow/memory" "github.com/stretchr/testify/assert" "testing" ) diff --git a/influxdb3/write.go b/influxdb3/write.go index 838b825..35096ad 100644 --- a/influxdb3/write.go +++ b/influxdb3/write.go @@ -25,6 +25,7 @@ package influxdb3 import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -42,11 +43,12 @@ import ( // Parameters: // - ctx: The context.Context to use for the request. // - points: The points to write. +// - options: Optional write options. See WriteOption for available options. // // Returns: // - An error, if any. -func (c *Client) WritePoints(ctx context.Context, points ...*Point) error { - return c.WritePointsWithOptions(ctx, c.config.WriteOptions, points...) +func (c *Client) WritePoints(ctx context.Context, points []*Point, options ...WriteOption) error { + return c.writePoints(ctx, points, newWriteOptions(c.config.WriteOptions, options)) } // WritePointsWithOptions writes all the given points to the server into the given database. @@ -59,7 +61,17 @@ func (c *Client) WritePoints(ctx context.Context, points ...*Point) error { // // Returns: // - An error, if any. +// +// Deprecated: use WritePoints with variadic WriteOption options. func (c *Client) WritePointsWithOptions(ctx context.Context, options *WriteOptions, points ...*Point) error { + if options == nil { + return errors.New("options not set") + } + + return c.writePoints(ctx, points, options) +} + +func (c *Client) writePoints(ctx context.Context, points []*Point, options *WriteOptions) error { var buff []byte var precision lineprotocol.Precision if options != nil { @@ -81,7 +93,8 @@ func (c *Client) WritePointsWithOptions(ctx context.Context, options *WriteOptio } buff = append(buff, bts...) } - return c.WriteWithOptions(ctx, options, buff) + + return c.write(ctx, buff, options) } // Write writes line protocol record(s) to the server into the given database. @@ -91,11 +104,12 @@ func (c *Client) WritePointsWithOptions(ctx context.Context, options *WriteOptio // Parameters: // - ctx: The context.Context to use for the request. // - buff: The line protocol record(s) to write. +// - options: Optional write options. See WriteOption for available options. // // Returns: // - An error, if any. -func (c *Client) Write(ctx context.Context, buff []byte) error { - return c.WriteWithOptions(ctx, c.config.WriteOptions, buff) +func (c *Client) Write(ctx context.Context, buff []byte, options ...WriteOption) error { + return c.write(ctx, buff, newWriteOptions(c.config.WriteOptions, options)) } // WriteWithOptions writes line protocol record(s) to the server into the given database. @@ -109,13 +123,18 @@ func (c *Client) Write(ctx context.Context, buff []byte) error { // // Returns: // - An error, if any. +// +// Deprecated: use WritePoints with WithWriteOptions option func (c *Client) WriteWithOptions(ctx context.Context, options *WriteOptions, buff []byte) error { if options == nil { - return fmt.Errorf("options not set") + return errors.New("options not set") } + + return c.write(ctx, buff, options) +} + +func (c *Client) write(ctx context.Context, buff []byte, options *WriteOptions) error { var database string - var precision lineprotocol.Precision - var gzipThreshold int if options.Database != "" { database = options.Database } else { @@ -124,7 +143,11 @@ func (c *Client) WriteWithOptions(ctx context.Context, options *WriteOptions, bu if database == "" { return fmt.Errorf("database not specified") } + + var precision lineprotocol.Precision precision = options.Precision + + var gzipThreshold int gzipThreshold = options.GzipThreshold var body io.Reader @@ -177,11 +200,12 @@ func (c *Client) WriteWithOptions(ctx context.Context, options *WriteOptions, bu // Parameters: // - ctx: The context.Context to use for the request. // - points: The custom points to encode and write. +// - options: Optional write options. See WriteOption for available options. // // Returns: // - An error, if any. -func (c *Client) WriteData(ctx context.Context, points ...interface{}) error { - return c.WriteDataWithOptions(ctx, c.config.WriteOptions, points...) +func (c *Client) WriteData(ctx context.Context, points []interface{}, options ...WriteOption) error { + return c.writeData(ctx, points, newWriteOptions(c.config.WriteOptions, options)) } // WriteDataWithOptions encodes fields of custom points into line protocol @@ -211,23 +235,34 @@ func (c *Client) WriteData(ctx context.Context, points ...interface{}) error { // // Returns: // - An error, if any. +// +// Deprecated: use Query with WithQueryOptions option func (c *Client) WriteDataWithOptions(ctx context.Context, options *WriteOptions, points ...interface{}) error { + if options == nil { + return errors.New("options not set") + } + + return c.writeData(ctx, points, options) +} + +func (c *Client) writeData(ctx context.Context, points []interface{}, options *WriteOptions) error { var buff []byte for _, p := range points { - byts, err := encode(p, options) + b, err := encode(p, options) if err != nil { return fmt.Errorf("error encoding point: %w", err) } - buff = append(buff, byts...) + buff = append(buff, b...) } - return c.WriteWithOptions(ctx, options, buff) + return c.write(ctx, buff, options) } func encode(x interface{}, options *WriteOptions) ([]byte, error) { if err := checkContainerType(x, false, "point"); err != nil { return nil, err } + t := reflect.TypeOf(x) v := reflect.ValueOf(x) if t.Kind() == reflect.Ptr { @@ -251,7 +286,7 @@ func encode(x interface{}, options *WriteOptions) ([]byte, error) { } parts := strings.Split(tag, ",") if len(parts) > 2 { - return nil, fmt.Errorf("multiple tag attributes are not supported") + return nil, errors.New("multiple tag attributes are not supported") } typ := parts[0] if len(parts) == 2 { @@ -260,7 +295,7 @@ func encode(x interface{}, options *WriteOptions) ([]byte, error) { switch typ { case "measurement": if point.GetMeasurement() != "" { - return nil, fmt.Errorf("multiple measurement fields") + return nil, errors.New("multiple measurement fields") } point.SetMeasurement(v.FieldByIndex(f.Index).String()) case "tag": @@ -278,10 +313,11 @@ func encode(x interface{}, options *WriteOptions) ([]byte, error) { } } if point.GetMeasurement() == "" { - return nil, fmt.Errorf("no struct field with tag 'measurement'") + return nil, errors.New("no struct field with tag 'measurement'") } if !point.HasFields() { - return nil, fmt.Errorf("no struct field with tag 'field'") + return nil, errors.New("no struct field with tag 'field'") } + return point.MarshalBinaryWithDefaultTags(options.Precision, options.DefaultTags) } diff --git a/influxdb3/write_test.go b/influxdb3/write_test.go index 6940bce..8e07714 100644 --- a/influxdb3/write_test.go +++ b/influxdb3/write_test.go @@ -370,7 +370,7 @@ func TestWritePointsAndBytes(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, reqs) - err = c.WritePoints(context.Background(), points...) + err = c.WritePoints(context.Background(), points) assert.NoError(t, err) assert.Equal(t, 2, reqs) @@ -381,14 +381,14 @@ func TestWritePointsAndBytes(t *testing.T) { assert.Equal(t, "invalid: error lens are not equal 911244 vs 4", err.Error()) } -func TestWritePointsWithOptions(t *testing.T) { +func TestWritePointsWithOptionsDeprecated(t *testing.T) { points := genPoints(t, 1) defaultTags := map[string]string{ "defaultTag": "default", "rack": "main", } lp := points2bytes(t, points, defaultTags) - correctPath := "/api/v2/write?bucket=x-db&org=&precision=ms" + correctPath := "/api/v2/write?bucket=db-x&org=&precision=ms" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // initialization of query client if r.Method == "PRI" { @@ -408,7 +408,7 @@ func TestWritePointsWithOptions(t *testing.T) { Database: "my-database", }) options := WriteOptions{ - Database: "x-db", + Database: "db-x", Precision: lineprotocol.Millisecond, DefaultTags: defaultTags, } @@ -417,6 +417,40 @@ func TestWritePointsWithOptions(t *testing.T) { assert.NoError(t, err) } +func TestWritePointsWithOptions(t *testing.T) { + points := genPoints(t, 1) + defaultTags := map[string]string{ + "defaultTag": "default", + "rack": "main", + } + lp := points2bytes(t, points, defaultTags) + correctPath := "/api/v2/write?bucket=db-x&org=&precision=ms" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // initialization of query client + if r.Method == "PRI" { + return + } + + assert.EqualValues(t, correctPath, r.URL.String()) + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + assert.Equal(t, string(lp), string(body)) + w.WriteHeader(204) + })) + defer ts.Close() + c, err := New(ClientConfig{ + Host: ts.URL, + Token: "my-token", + Database: "my-database", + }) + require.NoError(t, err) + err = c.WritePoints(context.Background(), points, + WithPrecision(lineprotocol.Millisecond), + WithDatabase("db-x"), + WithDefaultTags(defaultTags)) + assert.NoError(t, err) +} + func TestWriteData(t *testing.T) { now := time.Now() s := struct { @@ -454,11 +488,11 @@ func TestWriteData(t *testing.T) { Database: "my-database", }) require.NoError(t, err) - err = c.WriteData(context.Background(), s) + err = c.WriteData(context.Background(), []any{s}) assert.NoError(t, err) } -func TestWriteDataWithOptions(t *testing.T) { +func TestWriteDataWithOptionsDeprecated(t *testing.T) { now := time.Now() s := struct { Measurement string `lp:"measurement"` @@ -477,8 +511,12 @@ func TestWriteDataWithOptions(t *testing.T) { now, "Room temp", } - lp := fmt.Sprintf("air,defaultTag=default,device_id=10,sensor=SHT31 humidity=55i,temperature=23.5 %d\n", now.Unix()) - correctPath := "/api/v2/write?bucket=x-db&org=my-org&precision=s" + defaultTags := map[string]string{ + "defaultTag": "default", + "rack": "main", + } + lp := fmt.Sprintf("air,defaultTag=default,device_id=10,rack=main,sensor=SHT31 humidity=55i,temperature=23.5 %d\n", now.Unix()) + correctPath := "/api/v2/write?bucket=db-x&org=my-org&precision=s" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // initialization of query client if r.Method == "PRI" { @@ -498,17 +536,66 @@ func TestWriteDataWithOptions(t *testing.T) { Database: "my-database", }) options := WriteOptions{ - Database: "x-db", - Precision: lineprotocol.Second, - DefaultTags: map[string]string{ - "defaultTag": "default", - }, + Database: "db-x", + Precision: lineprotocol.Second, + DefaultTags: defaultTags, } require.NoError(t, err) err = c.WriteDataWithOptions(context.Background(), &options, s) assert.NoError(t, err) } +func TestWriteDataWithOptions(t *testing.T) { + now := time.Now() + s := struct { + Measurement string `lp:"measurement"` + Sensor string `lp:"tag,sensor"` + ID string `lp:"tag,device_id"` + Temp float64 `lp:"field,temperature"` + Hum int `lp:"field,humidity"` + Time time.Time `lp:"timestamp"` + Description string `lp:"-"` + }{ + "air", + "SHT31", + "10", + 23.5, + 55, + now, + "Room temp", + } + defaultTags := map[string]string{ + "defaultTag": "default", + "rack": "main", + } + lp := fmt.Sprintf("air,defaultTag=default,device_id=10,rack=main,sensor=SHT31 humidity=55i,temperature=23.5 %d\n", now.Unix()) + correctPath := "/api/v2/write?bucket=db-x&org=my-org&precision=s" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // initialization of query client + if r.Method == "PRI" { + return + } + assert.EqualValues(t, correctPath, r.URL.String()) + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + assert.Equal(t, lp, string(body)) + w.WriteHeader(204) + })) + defer ts.Close() + c, err := New(ClientConfig{ + Host: ts.URL, + Token: "my-token", + Organization: "my-org", + Database: "my-database", + }) + require.NoError(t, err) + err = c.WriteData(context.Background(), []any{s}, + WithDatabase("db-x"), + WithPrecision(lineprotocol.Second), + WithDefaultTags(defaultTags)) + assert.NoError(t, err) +} + func TestGzip(t *testing.T) { points := genPoints(t, 1) byts := points2bytes(t, points) @@ -588,7 +675,7 @@ func TestCustomHeaders(t *testing.T) { }, }) require.NoError(t, err) - err = c.WritePoints(context.Background(), p) + err = c.WritePoints(context.Background(), []*Point{p}) require.NoError(t, err) } @@ -615,7 +702,7 @@ func TestWriteErrorMarshalPoint(t *testing.T) { p.SetTimestamp(time.Now()) - err = c.WritePoints(context.Background(), p) + err = c.WritePoints(context.Background(), []*Point{p}) assert.Error(t, err) err = c.WriteData(context.Background(), []interface{}{ @@ -641,7 +728,7 @@ func TestHttpError(t *testing.T) { Database: "my-database", }) require.NoError(t, err) - err = c.WritePoints(context.Background(), p) + err = c.WritePoints(context.Background(), []*Point{p}) assert.Error(t, err) assert.ErrorContains(t, err, "error calling") } @@ -655,7 +742,7 @@ func TestWriteDatabaseNotSet(t *testing.T) { Token: "my-token", }) require.NoError(t, err) - err = c.WritePoints(context.Background(), p) + err = c.WritePoints(context.Background(), []*Point{p}) assert.Error(t, err) assert.EqualError(t, err, "database not specified") }