Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added consistency params support #348

Merged
merged 1 commit into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## [unreleased]
### Features
- [#348](https://github.com/influxdata/influxdb-client-go/pull/348) Added `write.Options.Consitency` parameter to support InfluxDB Enterprise.

## 2.9.2 [2022-07-29]
### Bug fixes
Expand Down
30 changes: 30 additions & 0 deletions api/write/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,27 @@ type Options struct {
maxRetryTime uint
// The base for the exponential retry delay
exponentialBase uint
// InfluxDB Enterprise write consistency as explained in https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/clustering/#write-consistency
consistency Consistency
}

const (
// ConsistencyOne requires at least one data node acknowledged a write.
ConsistencyOne Consistency = "one"

// ConsistencyAll requires all data nodes to acknowledge a write.
ConsistencyAll Consistency = "all"

// ConsistencyQuorum requires a quorum of data nodes to acknowledge a write.
ConsistencyQuorum Consistency = "quorum"

// ConsistencyAny allows for hinted hand off, potentially no write happened yet.
ConsistencyAny Consistency = "any"
)

// Consistency defines enum for allows consistency values for InfluxDB Enterprise, as explained https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/clustering/#write-consistency
type Consistency string

// BatchSize returns size of batch
func (o *Options) BatchSize() uint {
return o.batchSize
Expand Down Expand Up @@ -162,6 +181,17 @@ func (o *Options) DefaultTags() map[string]string {
return o.defaultTags
}

// Consistency returns consistency for param value
func (o *Options) Consistency() Consistency {
return o.consistency
}

// SetConsistency allows setting InfluxDB Enterprise write consistency, as explained in https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/clustering/#write-consistency */
func (o *Options) SetConsistency(consistency Consistency) *Options {
o.consistency = consistency
return o
}

// DefaultOptions returns Options object with default values
func DefaultOptions() *Options {
return &Options{batchSize: 5_000, flushInterval: 1_000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 50_000, defaultTags: make(map[string]string),
Expand Down
5 changes: 4 additions & 1 deletion api/write/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestDefaultOptions(t *testing.T) {
assert.EqualValues(t, 125_000, opts.MaxRetryInterval())
assert.EqualValues(t, 180_000, opts.MaxRetryTime())
assert.EqualValues(t, 2, opts.ExponentialBase())
assert.EqualValues(t, "", opts.Consistency())
assert.Len(t, opts.DefaultTags(), 0)
}

Expand All @@ -40,7 +41,8 @@ func TestSettingsOptions(t *testing.T) {
SetExponentialBase(3).
SetMaxRetryTime(200_000).
AddDefaultTag("a", "1").
AddDefaultTag("b", "2")
AddDefaultTag("b", "2").
SetConsistency(write.ConsistencyOne)
assert.EqualValues(t, 5, opts.BatchSize())
assert.EqualValues(t, true, opts.UseGZip())
assert.EqualValues(t, 5000, opts.FlushInterval())
Expand All @@ -51,5 +53,6 @@ func TestSettingsOptions(t *testing.T) {
assert.EqualValues(t, 150_000, opts.MaxRetryInterval())
assert.EqualValues(t, 200_000, opts.MaxRetryTime())
assert.EqualValues(t, 3, opts.ExponentialBase())
assert.EqualValues(t, "one", opts.Consistency())
assert.Len(t, opts.DefaultTags(), 2)
}
3 changes: 3 additions & 0 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func NewService(org string, bucket string, httpService http2.Service, options *w
params.Set("org", org)
params.Set("bucket", bucket)
params.Set("precision", precisionToString(options.Precision()))
if options.Consistency() != "" {
params.Set("consistency", string(options.Consistency()))
}
u.RawQuery = params.Encode()
writeURL := u.String()
return &Service{
Expand Down
12 changes: 12 additions & 0 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,15 @@ func TestFlush(t *testing.T) {
assert.Len(t, hs.Lines(), 5)
assert.Equal(t, 0, srv.retryQueue.list.Len())
}

func TestConsistencyParam(t *testing.T) {
hs := test.NewTestService(t, "http://localhost:8888")
opts := write.DefaultOptions().SetConsistency(write.ConsistencyQuorum)
srv := NewService("org", "buc", hs, opts)

require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&consistency=quorum&org=org&precision=ns", srv.WriteURL())
opts = write.DefaultOptions()
srv = NewService("org", "buc", hs, opts)

require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&org=org&precision=ns", srv.WriteURL())
}