Skip to content

Commit

Permalink
prepare v0.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksandr Emelin committed Jan 9, 2021
1 parent 5604864 commit 18b653f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 33 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ This is an opinionated modification of [github.com/tarantool/go-tarantool](https
* Per-request timeout detached from underlying connection read and write timeouts.
* `Op` type to express different update/upsert operations.
* Some other cosmetic changes including several linter fixes.
* No default `Logger` – developer needs to provide custom implementation explicitly.

The networking core of `github.com/tarantool/go-tarantool` kept mostly unchanged at the moment so this package should behave in similar way.

Expand Down
25 changes: 25 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
v0.1.0
======

* Remove default logger - user must explicitly provide logger to get logs from a package

v0.0.1
======

This is an opinionated modification of [github.com/tarantool/go-tarantool](https://github.com/tarantool/go-tarantool) package.

Changes from the original:

* API changed, some non-obvious (mostly to me personally) API removed.
* This package uses the latest msgpack library [github.com/vmihailenco/msgpack/v5](https://github.com/vmihailenco/msgpack) instead of `v2` in original.
* Uses `enc.UseArrayEncodedStructs(true)` for `msgpack.Encoder` internally so there is no need to define `msgpack:",as_array"` struct tags.
* Supports out-of-bound pushes (see [box.session.push](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_session/#box-session-push))
* Adds optional support for `context.Context` (though performance will suffer a bit, if you want a maximum performance then use non-context methods which use per-connection timeout).
* Uses sync.Pool for `*msgpack.Decoder` to reduce allocations on decoding stage a bit. Actually this package allocates a bit more than the original one, but allocations are small and overall performance is comparable to the original (based on observations from internal benchmarks).
* No `multi` and `queue` packages.
* Only one version of `Call` which uses Tarantool 1.7 request code.
* Modified connection address behavior: refer to `Connect` function docs to see details.
* Per-request timeout detached from underlying connection read and write timeouts.
* `Op` type to express different update/upsert operations.
* Some other cosmetic changes including several linter fixes.
* No default `Logger` – developer needs to provide custom implementation explicitly.
12 changes: 11 additions & 1 deletion config.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
box.cfg{}
fiber = require 'fiber'

box.cfg{
readahead = 10 * 1024 * 1024, -- to keep up with benchmark load.
net_msg_max = 10 * 1024, -- to keep up with benchmark load.
}

box.once("init", function()
local s = box.schema.space.create('test', {
Expand Down Expand Up @@ -46,6 +51,11 @@ box.schema.user.grant('test', 'read,write', 'space', 'test')
box.schema.user.grant('test', 'read,write', 'space', 'schematest')
end)

function timeout()
fiber.sleep(1)
return 1
end

function simple_incr(a)
return a+1
end
Expand Down
46 changes: 14 additions & 32 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net"
"net/url"
"runtime"
Expand Down Expand Up @@ -76,26 +74,6 @@ type Logger interface {
Report(event ConnLogKind, conn *Connection, v ...interface{})
}

type defaultLogger struct{}

func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interface{}) {
switch event {
case LogReconnectFailed:
reconnects := v[0].(uint64)
err := v[1].(error)
log.Printf("tarantool: reconnect (%d/%d) to %s://%s failed: %s\n", reconnects, conn.opts.MaxReconnects, conn.opts.network, conn.opts.address, err.Error())
case LogLastReconnectFailed:
err := v[0].(error)
log.Printf("tarantool: last reconnect to %s://%s failed: %s, giving it up.\n", conn.opts.network, conn.opts.address, err.Error())
case LogUnexpectedResultID:
resp := v[0].(*Response)
log.Printf("tarantool: connection %s://%s got unexpected resultId (%d) in response", conn.opts.network, conn.opts.address, resp.RequestID)
default:
args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
log.Print(args...)
}
}

// Connection to Tarantool.
//
// It is created and configured with Connect function, and could not be
Expand Down Expand Up @@ -212,7 +190,7 @@ type Opts struct {
Notify chan<- ConnEvent
// Handle is user specified value, that could be retrieved with Handle() method.
Handle interface{}
// Logger is user specified logger used for error messages.
// Logger is user specified logger used for log messages.
Logger Logger

network string
Expand Down Expand Up @@ -276,10 +254,6 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
}
}

if conn.opts.Logger == nil {
conn.opts.Logger = defaultLogger{}
}

if err = conn.createConnection(false); err != nil {
ter, ok := err.(Error)
if conn.opts.ReconnectDelay <= 0 {
Expand Down Expand Up @@ -408,7 +382,7 @@ func (conn *Connection) timeouts() {
}
fut.err = ClientError{
Code: ErrTimedOut,
Msg: fmt.Sprintf("client timeout for request %d", fut.requestID),
Msg: "request timeout",
}
fut.markReady(conn)
shard.bufMu.Unlock()
Expand Down Expand Up @@ -632,12 +606,16 @@ func (conn *Connection) createConnection(reconnect bool) (err error) {
return
}
if conn.opts.MaxReconnects > 0 && reconnects > conn.opts.MaxReconnects {
conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
if conn.opts.Logger != nil {
conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
}
err = ClientError{ErrConnectionClosed, "last reconnect failed"}
// mark connection as closed to avoid reopening by another goroutine.
return
}
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
if conn.opts.Logger != nil {
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
}
conn.notify(ReconnectFailed)
reconnects++
conn.mutex.Unlock()
Expand Down Expand Up @@ -797,15 +775,19 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
if fut := conn.peekFuture(resp.RequestID); fut != nil {
fut.markPushReady(resp)
} else {
conn.opts.Logger.Report(LogUnexpectedResultID, conn, resp)
if conn.opts.Logger != nil {
conn.opts.Logger.Report(LogUnexpectedResultID, conn, resp)
}
}
continue
}
if fut := conn.fetchFuture(resp.RequestID); fut != nil {
fut.resp = resp
fut.markReady(conn)
} else {
conn.opts.Logger.Report(LogUnexpectedResultID, conn, resp)
if conn.opts.Logger != nil {
conn.opts.Logger.Report(LogUnexpectedResultID, conn, resp)
}
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,35 @@ func BenchmarkClientFutureParallelTyped(b *testing.B) {
})
}

func BenchmarkClientParallelTimeouts(b *testing.B) {
var err error

var options = Opts{
RequestTimeout: time.Millisecond,
User: "test",
Password: "test",
SkipSchema: true,
}

conn, err := Connect(server, options)
if err != nil {
b.Errorf("No connection available")
return
}
defer func() { _ = conn.Close() }()

b.SetParallelism(1024)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := conn.Exec(Call("timeout", [][]interface{}{}))
if err.(ClientError).Code != ErrTimedOut {
b.Fatal(err.Error())
}
}
})
}

func BenchmarkClientParallel(b *testing.B) {
conn, err := Connect(server, opts)
if err != nil {
Expand Down

0 comments on commit 18b653f

Please sign in to comment.