Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unblock relaxed write consistency level #3901

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions cluster/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
}

// response channel for each shard writer go routine
ch := make(chan error, len(shard.Owners))
type AsyncWriteResult struct {
Owner meta.ShardOwner
Err error
}
ch := make(chan *AsyncWriteResult, len(shard.Owners))

for _, owner := range shard.Owners {
go func(shardID uint64, owner meta.ShardOwner, points []tsdb.Point) {
Expand All @@ -244,12 +248,12 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
if err == tsdb.ErrShardNotFound {
err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID)
if err != nil {
ch <- err
ch <- &AsyncWriteResult{owner, err}
return
}
err = w.TSDBStore.WriteToShard(shardID, points)
}
ch <- err
ch <- &AsyncWriteResult{owner, err}
return
}

Expand All @@ -262,44 +266,44 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
// be considered a successful write so send nil to the response channel
// otherwise, let the original error propogate to the response channel
if hherr == nil && consistency == ConsistencyLevelAny {
ch <- nil
ch <- &AsyncWriteResult{owner, nil}
return
}
}
ch <- err
ch <- &AsyncWriteResult{owner, err}

}(shard.ID, owner, points)
}

var wrote int
timeout := time.After(w.WriteTimeout)
var writeError error
for _, owner := range shard.Owners {
for range shard.Owners {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout:
// return timeout error to caller
return ErrTimeout
case err := <-ch:
case result := <-ch:
// If the write returned an error, continue to the next response
if err != nil {
w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, owner.NodeID, err)
if result.Err != nil {
w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, result.Owner.NodeID, result.Err)

// Keep track of the first error we see to return back to the client
if writeError == nil {
writeError = err
writeError = result.Err
}
continue
}

wrote += 1
}
}

// We wrote the required consistency level
if wrote >= required {
return nil
// We wrote the required consistency level
if wrote >= required {
return nil
}
}
}

if wrote > 0 {
Expand Down
61 changes: 42 additions & 19 deletions cmd/influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"text/tabwriter"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/importer/v8"
"github.com/peterh/liner"
)
Expand All @@ -36,24 +37,25 @@ const (
)

type CommandLine struct {
Client *client.Client
Line *liner.State
Host string
Port int
Username string
Password string
Database string
Ssl bool
RetentionPolicy string
Version string
Pretty bool // controls pretty print for json
Format string // controls the output format. Valid values are json, csv, or column
Execute string
ShowVersion bool
Import bool
PPS int // Controls how many points per second the import will allow via throttling
Path string
Compressed bool
Client *client.Client
Line *liner.State
Host string
Port int
Username string
Password string
Database string
Ssl bool
RetentionPolicy string
Version string
Pretty bool // controls pretty print for json
Format string // controls the output format. Valid values are json, csv, or column
WriteConsistency string
Execute string
ShowVersion bool
Import bool
PPS int // Controls how many points per second the import will allow via throttling
Path string
Compressed bool
}

func main() {
Expand All @@ -67,6 +69,7 @@ func main() {
fs.StringVar(&c.Database, "database", c.Database, "Database to connect to the server.")
fs.BoolVar(&c.Ssl, "ssl", false, "Use https for connecting to cluster.")
fs.StringVar(&c.Format, "format", defaultFormat, "Format specifies the format of the server responses: json, csv, or column.")
fs.StringVar(&c.WriteConsistency, "consistency", "any", "Set write consistency level: any, one, quorum, or all.")
fs.BoolVar(&c.Pretty, "pretty", false, "Turns on pretty print for the json format.")
fs.StringVar(&c.Execute, "execute", c.Execute, "Execute command and quit.")
fs.BoolVar(&c.ShowVersion, "version", false, "Displays the InfluxDB version.")
Expand Down Expand Up @@ -96,6 +99,8 @@ func main() {
Execute command and quit.
-format 'json|csv|column'
Format specifies the format of the server responses: json, csv, or column.
-consistency 'any|one|quorum|all'
Set write consistency level: any, one, quorum, or all
-pretty
Turns on pretty print for the json format.
-import
Expand Down Expand Up @@ -244,6 +249,8 @@ func (c *CommandLine) ParseCommand(cmd string) bool {
c.help()
case strings.HasPrefix(lcmd, "format"):
c.SetFormat(cmd)
case strings.HasPrefix(lcmd, "consistency"):
c.SetWriteConsistency(cmd)
case strings.HasPrefix(lcmd, "settings"):
c.Settings()
case strings.HasPrefix(lcmd, "pretty"):
Expand Down Expand Up @@ -358,6 +365,20 @@ func (c *CommandLine) SetFormat(cmd string) {
}
}

func (c *CommandLine) SetWriteConsistency(cmd string) {
// Remove the "consistency" keyword if it exists
cmd = strings.TrimSpace(strings.Replace(cmd, "consistency", "", -1))
// normalize cmd
cmd = strings.ToLower(cmd)

_, err := cluster.ParseConsistencyLevel(cmd)
if err != nil {
fmt.Printf("Unknown consistency level %q. Please use any, one, quorum, or all.\n", cmd)
return
}
c.WriteConsistency = cmd
}

// isWhitespace returns true if the rune is a space, tab, or newline.
func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' }

Expand Down Expand Up @@ -444,7 +465,7 @@ func (c *CommandLine) Insert(stmt string) error {
Database: c.Database,
RetentionPolicy: c.RetentionPolicy,
Precision: "n",
WriteConsistency: client.ConsistencyAny,
WriteConsistency: c.WriteConsistency,
})
if err != nil {
fmt.Printf("ERR: %s\n", err)
Expand Down Expand Up @@ -641,6 +662,7 @@ func (c *CommandLine) Settings() {
fmt.Fprintf(w, "Database\t%s\n", c.Database)
fmt.Fprintf(w, "Pretty\t%v\n", c.Pretty)
fmt.Fprintf(w, "Format\t%s\n", c.Format)
fmt.Fprintf(w, "Write Consistency\t%s\n", c.WriteConsistency)
fmt.Fprintln(w)
w.Flush()
}
Expand All @@ -652,6 +674,7 @@ func (c *CommandLine) help() {
pretty toggle pretty print
use <db_name> set current databases
format <format> set the output format: json, csv, or column
consistency <level> set write consistency level: any, one, quorum, or all
settings output the current settings for the shell
exit quit the influx shell

Expand Down