Skip to content

Commit

Permalink
Add optimized ReadAll function (elastic#229)
Browse files Browse the repository at this point in the history
* Add optimized ReadAll function

This adds an optimized io.ReadAll that uses a bytes.Buffer + io.Copy
internally, improving the buffer growth ratio over the runtime append
and taking advange of io.WriterTo when available.

The only scenario where this optimizes version is slower than io.ReadAll is
when reading < 512 bytes of a reader that does not implement io.WriterTo.

For now I only updated tests to use the new function as updating the code
requires more careful consideration.

While at it, migrate from the deprecated ioutil.ReadAll to io.ReadAll.

goos: linux
goarch: amd64
pkg: github.com/elastic/elastic-agent-libs/iobuf
cpu: Intel(R) Xeon(R) CPU E5-2697A v4 @ 2.60GHz
BenchmarkReadAll/size_32B/io.ReadAll/WriterTo-32         	 7479658	       162.4 ns/op	     512 B/op	       1 allocs/op
BenchmarkReadAll/size_32B/io.ReadAll/Reader-32           	 7000012	       169.3 ns/op	     512 B/op	       1 allocs/op
BenchmarkReadAll/size_32B/ReadAll/WriterTo-32            	10630838	       112.3 ns/op	     112 B/op	       2 allocs/op
BenchmarkReadAll/size_32B/ReadAll/Reader-32              	 2111246	       567.3 ns/op	    1584 B/op	       3 allocs/op
BenchmarkReadAll/size_64B/io.ReadAll/WriterTo-32         	 7154652	       163.2 ns/op	     512 B/op	       1 allocs/op
BenchmarkReadAll/size_64B/io.ReadAll/Reader-32           	 7223264	       166.1 ns/op	     512 B/op	       1 allocs/op
BenchmarkReadAll/size_64B/ReadAll/WriterTo-32            	10400562	       113.5 ns/op	     112 B/op	       2 allocs/op
BenchmarkReadAll/size_64B/ReadAll/Reader-32              	 2129949	       558.7 ns/op	    1584 B/op	       3 allocs/op
BenchmarkReadAll/size_512B/io.ReadAll/WriterTo-32        	 2843871	       419.1 ns/op	    1408 B/op	       2 allocs/op
BenchmarkReadAll/size_512B/io.ReadAll/Reader-32          	 2871580	       413.1 ns/op	    1408 B/op	       2 allocs/op
BenchmarkReadAll/size_512B/ReadAll/WriterTo-32           	 4976233	       241.5 ns/op	     560 B/op	       2 allocs/op
BenchmarkReadAll/size_512B/ReadAll/Reader-32             	 2183186	       552.9 ns/op	    1584 B/op	       3 allocs/op
BenchmarkReadAll/size_10KB/io.ReadAll/WriterTo-32        	  142633	      8235 ns/op	   46080 B/op	      10 allocs/op
BenchmarkReadAll/size_10KB/io.ReadAll/Reader-32          	  148326	      8229 ns/op	   46080 B/op	      10 allocs/op
BenchmarkReadAll/size_10KB/ReadAll/WriterTo-32           	  574903	      2210 ns/op	   10288 B/op	       2 allocs/op
BenchmarkReadAll/size_10KB/ReadAll/Reader-32             	  147210	      7995 ns/op	   32304 B/op	       7 allocs/op
BenchmarkReadAll/size_100KB/io.ReadAll/WriterTo-32       	   13171	     90853 ns/op	  514304 B/op	      18 allocs/op
BenchmarkReadAll/size_100KB/io.ReadAll/Reader-32         	   12892	     91787 ns/op	  514304 B/op	      18 allocs/op
BenchmarkReadAll/size_100KB/ReadAll/WriterTo-32          	   51472	     22420 ns/op	  106544 B/op	       2 allocs/op
BenchmarkReadAll/size_100KB/ReadAll/Reader-32            	   21568	     55070 ns/op	  261680 B/op	      10 allocs/op
BenchmarkReadAll/size_1MB/io.ReadAll/WriterTo-32         	    1220	    983276 ns/op	 5241098 B/op	      27 allocs/op
BenchmarkReadAll/size_1MB/io.ReadAll/Reader-32           	    1089	    990818 ns/op	 5241100 B/op	      27 allocs/op
BenchmarkReadAll/size_1MB/ReadAll/WriterTo-32            	    4153	    294507 ns/op	 1048627 B/op	       2 allocs/op
BenchmarkReadAll/size_1MB/ReadAll/Reader-32              	    1195	    944781 ns/op	 4193852 B/op	      14 allocs/op

* check error returned by Seek

* update ReadAll doc

* fix linter error in doc

* nolint for http.Serve

* fix license headers

* fix code duplication

* use t.TempDir instead of os.MkdirTemp

* remove os.CreateTemp

---------

Co-authored-by: Denis <denis@rdner.de>
  • Loading branch information
mauri870 and rdner authored Oct 2, 2024
1 parent eb7e16b commit f1a8d99
Show file tree
Hide file tree
Showing 16 changed files with 232 additions and 117 deletions.
9 changes: 5 additions & 4 deletions api/npipe/listener_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ package npipe

import (
"fmt"
"io/ioutil"
"net/http"
"testing"

"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -43,7 +43,8 @@ func TestHTTPOverNamedPipe(t *testing.T) {
})

go func() {
_ = http.Serve(l, mux)
_ = http.Serve(l, mux) //nolint:gosec // Serve does not support setting timeouts, it is fine for tests.

}()

c := http.Client{
Expand All @@ -52,10 +53,10 @@ func TestHTTPOverNamedPipe(t *testing.T) {
},
}

// nolint:noctx // for testing purposes
//nolint:noctx // for testing purposes
r, err := c.Get("http://npipe/echo-hello")
require.NoError(t, err)
body, err := ioutil.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)
defer r.Body.Close()

Expand Down
20 changes: 6 additions & 14 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package api
import (
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
Expand All @@ -32,6 +30,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

const (
Expand Down Expand Up @@ -71,10 +70,7 @@ func TestSocket(t *testing.T) {
}

t.Run("socket doesn't exist before", func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

tmpDir := t.TempDir()
sockFile := tmpDir + "/test.sock"

cfg := config.MustNewConfigFrom(map[string]interface{}{
Expand Down Expand Up @@ -102,11 +98,7 @@ func TestSocket(t *testing.T) {
})

t.Run("starting beat and recover a dangling socket file", func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

sockFile := tmpDir + "/test.sock"
sockFile := t.TempDir() + "/test.sock"

// Create the socket before the server.
f, err := os.Create(sockFile)
Expand Down Expand Up @@ -161,7 +153,7 @@ func getResponse(t *testing.T, sockFile, url string) string {
require.NoError(t, err)
defer r.Body.Close()

body, err := ioutil.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)
return string(body)
}
Expand All @@ -188,7 +180,7 @@ func TestHTTP(t *testing.T) {
require.NoError(t, err)
}()

body, err := ioutil.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)

assert.Equal(t, "ehlo!", string(body))
Expand Down Expand Up @@ -229,7 +221,7 @@ func TestAttachHandler(t *testing.T) {
require.NoError(t, err)
}()

body, err := io.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)

assert.Equal(t, "test!", string(body))
Expand Down
6 changes: 3 additions & 3 deletions api/server_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package api

import (
"io/ioutil"
"net/http"
"testing"

Expand All @@ -29,6 +28,7 @@ import (

"github.com/elastic/elastic-agent-libs/api/npipe"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

func TestNamedPipe(t *testing.T) {
Expand All @@ -52,12 +52,12 @@ func TestNamedPipe(t *testing.T) {
},
}

// nolint:noctx // for testing purposes
//nolint:noctx // for testing purposes
r, err := c.Get("http://npipe/echo-hello")
require.NoError(t, err)
defer r.Body.Close()

body, err := ioutil.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)

assert.Equal(t, "ehlo!", string(body))
Expand Down
8 changes: 4 additions & 4 deletions file/fileinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
package file_test

import (
"io/ioutil"
"os"
"path/filepath"
"testing"
Expand All @@ -35,13 +34,14 @@ import (
)

func TestStat(t *testing.T) {
f, err := ioutil.TempFile("", "teststat")
tmpDir := t.TempDir()
f, err := os.Create(filepath.Join(tmpDir, "teststat"))
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
defer f.Close()

link := filepath.Join(os.TempDir(), "teststat-link")
link := filepath.Join(tmpDir, "teststat-link")
if err := os.Symlink(f.Name(), link); err != nil {
t.Fatal(err)
}
Expand Down
56 changes: 20 additions & 36 deletions file/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package file

import (
"io/ioutil"
"fmt"
"os"
"path/filepath"
"testing"
Expand All @@ -29,19 +29,15 @@ import (
)

func TestSafeFileRotateExistingFile(t *testing.T) {
tempdir, err := ioutil.TempDir("", "")
assert.NoError(t, err)
defer func() {
assert.NoError(t, os.RemoveAll(tempdir))
}()
tempdir := t.TempDir()

// create an existing registry file
err = ioutil.WriteFile(filepath.Join(tempdir, "registry"),
err := os.WriteFile(filepath.Join(tempdir, "registry"),
[]byte("existing filebeat"), 0x777)
assert.NoError(t, err)

// create a new registry.new file
err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"),
err = os.WriteFile(filepath.Join(tempdir, "registry.new"),
[]byte("new filebeat"), 0x777)
assert.NoError(t, err)

Expand All @@ -50,35 +46,23 @@ func TestSafeFileRotateExistingFile(t *testing.T) {
filepath.Join(tempdir, "registry.new"))
assert.NoError(t, err)

contents, err := ioutil.ReadFile(filepath.Join(tempdir, "registry"))
contents, err := os.ReadFile(filepath.Join(tempdir, "registry"))
assert.NoError(t, err)
assert.Equal(t, []byte("new filebeat"), contents)

// do it again to make sure we deal with deleting the old file

err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"),
[]byte("new filebeat 1"), 0x777)
assert.NoError(t, err)

err = SafeFileRotate(filepath.Join(tempdir, "registry"),
filepath.Join(tempdir, "registry.new"))
assert.NoError(t, err)

contents, err = ioutil.ReadFile(filepath.Join(tempdir, "registry"))
assert.NoError(t, err)
assert.Equal(t, []byte("new filebeat 1"), contents)

// and again for good measure

err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"),
[]byte("new filebeat 2"), 0x777)
assert.NoError(t, err)

err = SafeFileRotate(filepath.Join(tempdir, "registry"),
filepath.Join(tempdir, "registry.new"))
assert.NoError(t, err)

contents, err = ioutil.ReadFile(filepath.Join(tempdir, "registry"))
assert.NoError(t, err)
assert.Equal(t, []byte("new filebeat 2"), contents)
// do it twice to make sure we deal with deleting the old file
for i := 0; i < 2; i++ {
expectedContents := []byte(fmt.Sprintf("new filebeat %d", i))
err = os.WriteFile(filepath.Join(tempdir, "registry.new"),
expectedContents, 0x777)
assert.NoError(t, err)

err = SafeFileRotate(filepath.Join(tempdir, "registry"),
filepath.Join(tempdir, "registry.new"))
assert.NoError(t, err)

contents, err = os.ReadFile(filepath.Join(tempdir, "registry"))
assert.NoError(t, err)
assert.Equal(t, expectedContents, contents)
}
}
36 changes: 36 additions & 0 deletions iobuf/iobuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package iobuf

import (
"bytes"
"io"
)

// ReadAll reads all data from r and returns it as a byte slice.
// A successful call returns err == nil, not err == EOF. It does not
// treat an EOF as an error to be reported.
//
// This function is similar to io.ReadAll, but uses a bytes.Buffer to
// accumulate the data, which has a more efficient growing algorithm and
// uses io.WriterTo if r implements it.
func ReadAll(r io.Reader) ([]byte, error) {
var buf bytes.Buffer
_, err := io.Copy(&buf, r)
return buf.Bytes(), err
}
Loading

0 comments on commit f1a8d99

Please sign in to comment.