Skip to content

Commit

Permalink
api: create different responses for different requests
Browse files Browse the repository at this point in the history
Different implementations of the `Response` interface created.
Special types of responses are used with special requests.

Thus `Response` interface was simplified: some special methods
were moved to the corresponding implementations.
This means that if a user wants to access this methods, the
response should be casted to its actual type.

`SelectResponse`, `ExecuteResponse`, `PrepareResponse`,
`PushResponse` are part of a public API. `Pos()`, `MetaData()`,
`SQLInfo()` methods created for them to get specific info.

`Future` constructors now accept `Request` as their argument.
`Future` methods `AppendPush` and `SetResponse` accepts
response `Header` and data as their arguments.

`IsPush()` method is used to return the information if the current
response is a `PushResponse`. `PushCode` constant is removed. To get
information, if the current response is a push response, `IsPush()`
method could be used instead.

Part of #237
  • Loading branch information
DerekBum committed Jan 11, 2024
1 parent bac0680 commit 9d6df42
Show file tree
Hide file tree
Showing 24 changed files with 1,405 additions and 741 deletions.
9 changes: 2 additions & 7 deletions box_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,7 @@ func TestErrorTypeEval(t *testing.T) {

for name, testcase := range tupleCases {
t.Run(name, func(t *testing.T) {
resp, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
require.Nil(t, err)
data, err := resp.Decode()
data, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
require.Nil(t, err)
require.NotNil(t, data)
require.Equal(t, len(data), 1)
Expand Down Expand Up @@ -438,14 +436,11 @@ func TestErrorTypeSelect(t *testing.T) {
_, err := conn.Eval(insertEval, []interface{}{})
require.Nilf(t, err, "Tuple has been successfully inserted")

var resp Response
var offset uint32 = 0
var limit uint32 = 1
resp, err = conn.Select(space, index, offset, limit, IterEq,
data, err := conn.Select(space, index, offset, limit, IterEq,
[]interface{}{testcase.tuple.pk})
require.Nil(t, err)
data, err := resp.Decode()
require.Nil(t, err)
require.NotNil(t, data)
require.Equalf(t, len(data), 1, "Exactly one tuple had been found")
tpl, ok := data[0].([]interface{})
Expand Down
36 changes: 20 additions & 16 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
conn.Addr(), err)
case LogUnexpectedResultId:
respHeader := v[0].(header)
header := v[0].(Header)
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
conn.Addr(), respHeader.requestId)
conn.Addr(), header.RequestId)
case LogWatchEventReadFailed:
err := v[0].(error)
log.Printf("tarantool: unable to parse watch event: %s", err)
Expand Down Expand Up @@ -808,7 +808,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
return
}
buf := smallBuf{b: respBytes}
respHeader, err := decodeHeader(conn.dec, &buf)
header, err := decodeHeader(conn.dec, &buf)
if err != nil {
err = ClientError{
ErrProtocolError,
Expand All @@ -818,10 +818,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
return
}

resp := &ConnResponse{header: respHeader, buf: buf}
var fut *Future = nil
if iproto.Type(respHeader.code) == iproto.IPROTO_EVENT {
if event, err := readWatchEvent(&resp.buf); err == nil {
if iproto.Type(header.Code) == iproto.IPROTO_EVENT {
if event, err := readWatchEvent(&buf); err == nil {
events <- event
} else {
err = ClientError{
Expand All @@ -831,19 +830,19 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
}
continue
} else if respHeader.code == PushCode {
if fut = conn.peekFuture(respHeader.requestId); fut != nil {
fut.AppendPush(resp)
} else if header.Code == PushCode {
if fut = conn.peekFuture(header.RequestId); fut != nil {
fut.AppendPush(header, &buf)
}
} else {
if fut = conn.fetchFuture(respHeader.requestId); fut != nil {
fut.SetResponse(resp)
if fut = conn.fetchFuture(header.RequestId); fut != nil {
fut.SetResponse(header, &buf)
conn.markDone(fut)
}
}

if fut == nil {
conn.opts.Logger.Report(LogUnexpectedResultId, conn, respHeader)
conn.opts.Logger.Report(LogUnexpectedResultId, conn, header)
}
}
}
Expand Down Expand Up @@ -873,8 +872,10 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) {
}
}

func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
func (conn *Connection) newFuture(req Request) (fut *Future) {
ctx := req.Ctx()
fut = NewFuture()
fut.SetRequest(req)
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
select {
case conn.rlimit <- struct{}{}:
Expand Down Expand Up @@ -984,7 +985,7 @@ func (conn *Connection) decrementRequestCnt() {
func (conn *Connection) send(req Request, streamId uint64) *Future {
conn.incrementRequestCnt()

fut := conn.newFuture(req.Ctx())
fut := conn.newFuture(req)
if fut.ready == nil {
conn.decrementRequestCnt()
return fut
Expand Down Expand Up @@ -1053,8 +1054,11 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {

if req.Async() {
if fut = conn.fetchFuture(reqid); fut != nil {
resp := &ConnResponse{}
fut.SetResponse(resp)
header := Header{
RequestId: reqid,
Code: OkCode,
}
fut.SetResponse(header, nil)
conn.markDone(fut)
}
}
Expand Down
24 changes: 12 additions & 12 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,41 @@ type Connector interface {

// Deprecated: the method will be removed in the next major version,
// use a PingRequest object + Do() instead.
Ping() (Response, error)
Ping() ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a SelectRequest object + Do() instead.
Select(space, index interface{}, offset, limit uint32, iterator Iter,
key interface{}) (Response, error)
key interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use an InsertRequest object + Do() instead.
Insert(space interface{}, tuple interface{}) (Response, error)
Insert(space interface{}, tuple interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a ReplicaRequest object + Do() instead.
Replace(space interface{}, tuple interface{}) (Response, error)
Replace(space interface{}, tuple interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a DeleteRequest object + Do() instead.
Delete(space, index interface{}, key interface{}) (Response, error)
Delete(space, index interface{}, key interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a UpdateRequest object + Do() instead.
Update(space, index interface{}, key interface{}, ops *Operations) (Response, error)
Update(space, index interface{}, key interface{}, ops *Operations) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a UpsertRequest object + Do() instead.
Upsert(space interface{}, tuple interface{}, ops *Operations) (Response, error)
Upsert(space interface{}, tuple interface{}, ops *Operations) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a CallRequest object + Do() instead.
Call(functionName string, args interface{}) (Response, error)
Call(functionName string, args interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a Call16Request object + Do() instead.
Call16(functionName string, args interface{}) (Response, error)
Call16(functionName string, args interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a Call17Request object + Do() instead.
Call17(functionName string, args interface{}) (Response, error)
Call17(functionName string, args interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use an EvalRequest object + Do() instead.
Eval(expr string, args interface{}) (Response, error)
Eval(expr string, args interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use an ExecuteRequest object + Do() instead.
Execute(expr string, args interface{}) (Response, error)
Execute(expr string, args interface{}) ([]interface{}, error)

// Deprecated: the method will be removed in the next major version,
// use a SelectRequest object + Do() instead.
Expand Down
7 changes: 7 additions & 0 deletions crud/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ package crud

import (
"context"
"io"

"github.com/tarantool/go-iproto"

Expand Down Expand Up @@ -84,6 +85,12 @@ func (req baseRequest) Async() bool {
return req.impl.Async()
}

// Response creates a response for the baseRequest.
func (req baseRequest) Response(header tarantool.Header,
body io.Reader) (tarantool.Response, error) {
return req.impl.Response(header, body)
}

type spaceRequest struct {
baseRequest
space string
Expand Down
7 changes: 7 additions & 0 deletions crud/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package crud

import (
"context"
"io"

"github.com/vmihailenco/msgpack/v5"

Expand Down Expand Up @@ -133,3 +134,9 @@ func (req SelectRequest) Context(ctx context.Context) SelectRequest {

return req
}

// Response creates a response for the SelectRequest.
func (req SelectRequest) Response(header tarantool.Header,
body io.Reader) (tarantool.Response, error) {
return req.impl.Response(header, body)
}
21 changes: 8 additions & 13 deletions dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
"strings"
"time"

"github.com/vmihailenco/msgpack/v5"

"github.com/tarantool/go-iproto"
"github.com/vmihailenco/msgpack/v5"
)

const bufSize = 128 * 1024
Expand Down Expand Up @@ -405,15 +404,11 @@ func identify(w writeFlusher, r io.Reader) (ProtocolInfo, error) {
}
data, err := resp.Decode()
if err != nil {
switch err := err.(type) {
case Error:
if err.Code == iproto.ER_UNKNOWN_REQUEST_TYPE {
return info, nil
}
return info, err
default:
return info, fmt.Errorf("decode response body error: %w", err)
if iproto.Error(resp.Header().Code) == iproto.ER_UNKNOWN_REQUEST_TYPE {
// IPROTO_ID requests are not supported by server.
return info, nil
}
return info, err
}

if len(data) == 0 {
Expand Down Expand Up @@ -511,12 +506,12 @@ func readResponse(r io.Reader) (Response, error) {

respBytes, err := read(r, lenbuf[:])
if err != nil {
return &ConnResponse{}, fmt.Errorf("read error: %w", err)
return &BaseResponse{}, fmt.Errorf("read error: %w", err)
}

buf := smallBuf{b: respBytes}
respHeader, err := decodeHeader(msgpack.NewDecoder(&smallBuf{}), &buf)
resp := &ConnResponse{header: respHeader, buf: buf}
header, err := decodeHeader(msgpack.NewDecoder(&smallBuf{}), &buf)
resp := &BaseResponse{header: header, buf: buf}
if err != nil {
return resp, fmt.Errorf("decode response header error: %w", err)
}
Expand Down
Loading

0 comments on commit 9d6df42

Please sign in to comment.