Skip to content

Commit

Permalink
fix(write): drop empty lines from write batches (#20671)
Browse files Browse the repository at this point in the history
  • Loading branch information
danxmoran authored Feb 3, 2021
1 parent 9097554 commit e5814a3
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 21 deletions.
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ jobs:
key: influxdb-build-{{ .Branch }}-{{ .Revision }}
paths:
- /tmp/go-cache
- store_artifacts:
path: bin/linux
- persist_to_workspace:
root: .
paths:
Expand Down Expand Up @@ -636,6 +638,8 @@ jobs:
- install_core_deps
- run_goreleaser:
publish_release: false
- store_artifacts:
path: dist
- save_cache:
name: Save GOCACHE
key: influxdb-cross-build-{{ .Branch }}-{{ .Revision }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards
1. [20592](https://github.com/influxdata/influxdb/pull/20592): Set correct Content-Type on v1 query responses.
1. [20592](https://github.com/influxdata/influxdb/pull/20592): Update V1 API spec to document all valid Accept headers and matching Content-Types.
1. [20611](https://github.com/influxdata/influxdb/pull/20611): Respect the --skip-verify flag when running `influx query`.
1. [20671](https://github.com/influxdata/influxdb/pull/20671): Remove blank lines from payloads sent by `influx write`.

## v2.0.3 [2020-12-14]
----------------------
Expand Down
4 changes: 3 additions & 1 deletion write/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func (b *Batcher) read(ctx context.Context, r io.Reader, lines chan<- []byte, er
for scanner.Scan() {
// exit early if the context is done
select {
// NOTE: We purposefully don't use scanner.Bytes() here because it returns a slice
// pointing to an array which is reused / overwritten on every call to Scan().
case lines <- []byte(scanner.Text()):
case <-ctx.Done():
errC <- ctx.Err()
Expand Down Expand Up @@ -134,7 +136,7 @@ func (b *Batcher) write(ctx context.Context, writeFn func(batch []byte) error, l
for more {
select {
case line, more = <-lines:
if more {
if more && string(line) != "\n" {
buf = append(buf, line...)
}
// write if we exceed the max lines OR read routine has finished
Expand Down
60 changes: 40 additions & 20 deletions write/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/pkg/testing/assert"
"github.com/stretchr/testify/require"
)

func TestScanLines(t *testing.T) {
Expand Down Expand Up @@ -181,11 +182,12 @@ func TestBatcher_write(t *testing.T) {
errC chan error
}
tests := []struct {
name string
fields fields
args args
want string
wantErr bool
name string
fields fields
args args
want string
wantErr bool
wantNoCall bool
}{
{
name: "sending a single line will send a line to the service",
Expand Down Expand Up @@ -240,10 +242,11 @@ func TestBatcher_write(t *testing.T) {
org: platform.ID(1),
bucket: platform.ID(2),
line: "m1,t1=v1 f1=1",
lines: make(chan []byte),
errC: make(chan error),
lines: make(chan []byte, 1),
errC: make(chan error, 1),
},
wantErr: true,
wantErr: true,
wantNoCall: true,
},
{
name: "write service returning error stops the write",
Expand All @@ -260,6 +263,20 @@ func TestBatcher_write(t *testing.T) {
},
wantErr: true,
},
{
name: "blank line is not sent to service",
fields: fields{
MaxFlushBytes: 1,
},
args: args{
org: platform.ID(1),
bucket: platform.ID(2),
line: "\n",
lines: make(chan []byte),
errC: make(chan error),
},
wantNoCall: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -271,9 +288,11 @@ func TestBatcher_write(t *testing.T) {

// mocking the write service here to either return an error
// or get back all the bytes from the reader.
writeCalled := false
var got string
svc := &mock.WriteService{
WriteF: func(ctx context.Context, org, bucket platform.ID, r io.Reader) error {
writeCalled = true
if tt.args.writeError {
return fmt.Errorf("error")
}
Expand All @@ -282,6 +301,7 @@ func TestBatcher_write(t *testing.T) {
return err
},
WriteToF: func(ctx context.Context, filter platform.BucketFilter, r io.Reader) error {
writeCalled = true
if tt.args.writeError {
return fmt.Errorf("error")
}
Expand All @@ -304,26 +324,26 @@ func TestBatcher_write(t *testing.T) {

if cancel != nil {
cancel()
} else {
tt.args.lines <- []byte(tt.args.line)
// if the max flush interval is not zero, we are testing to see
// if the data is flushed via the timer rather than forced by
// closing the channel.
if tt.fields.MaxFlushInterval != 0 {
time.Sleep(tt.fields.MaxFlushInterval * 100)
}
close(tt.args.lines)
time.Sleep(500 * time.Millisecond)
}

tt.args.lines <- []byte(tt.args.line)
// if the max flush interval is not zero, we are testing to see
// if the data is flushed via the timer rather than forced by
// closing the channel.
if tt.fields.MaxFlushInterval != 0 {
time.Sleep(tt.fields.MaxFlushInterval * 100)
}
close(tt.args.lines)

err := <-tt.args.errC
if (err != nil) != tt.wantErr {
t.Errorf("ScanLines.read() error = %v, wantErr %v", err, tt.wantErr)
return
}

if !cmp.Equal(got, tt.want) {
t.Errorf("%q. Batcher.write() = -got/+want %s", tt.name, cmp.Diff(got, tt.want))
}
require.Equal(t, tt.wantNoCall, !writeCalled)
require.Equal(t, tt.want, got)
})
}
}
Expand Down

0 comments on commit e5814a3

Please sign in to comment.