From 9d6df421993ca84f95a30bc8e2992ce867324991 Mon Sep 17 00:00:00 2001 From: DerekBum Date: Thu, 14 Dec 2023 18:35:00 +0300 Subject: [PATCH] api: create different responses for different requests Different implementations of the `Response` interface created. Special types of responses are used with special requests. Thus `Response` interface was simplified: some special methods were moved to the corresponding implementations. This means that if a user wants to access this methods, the response should be casted to its actual type. `SelectResponse`, `ExecuteResponse`, `PrepareResponse`, `PushResponse` are part of a public API. `Pos()`, `MetaData()`, `SQLInfo()` methods created for them to get specific info. `Future` constructors now accept `Request` as their argument. `Future` methods `AppendPush` and `SetResponse` accepts response `Header` and data as their arguments. `IsPush()` method is used to return the information if the current response is a `PushResponse`. `PushCode` constant is removed. To get information, if the current response is a push response, `IsPush()` method could be used instead. Part of #237 --- box_error_test.go | 9 +- connection.go | 36 ++- connector.go | 24 +- crud/common.go | 7 + crud/select.go | 7 + dial.go | 21 +- example_test.go | 157 +++++++-- future.go | 32 +- future_test.go | 101 +++--- pool/connection_pool.go | 24 +- pool/connection_pool_test.go | 189 +++++------ pool/connector.go | 24 +- pool/connector_test.go | 62 ++-- pool/pooler.go | 24 +- prepared.go | 41 +++ request.go | 141 +++++++-- response.go | 594 ++++++++++++++++++++++++++--------- settings/example_test.go | 28 +- settings/request.go | 13 + settings/tarantool_test.go | 114 +++++-- tarantool_test.go | 465 +++++++++++++-------------- test_helpers/main.go | 6 +- test_helpers/request_mock.go | 8 + watch.go | 19 ++ 24 files changed, 1405 insertions(+), 741 deletions(-) diff --git a/box_error_test.go b/box_error_test.go index 6839a1477..c48303bba 100644 --- a/box_error_test.go +++ b/box_error_test.go @@ -304,9 +304,7 @@ func TestErrorTypeEval(t *testing.T) { for name, testcase := range tupleCases { t.Run(name, func(t *testing.T) { - resp, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val}) - require.Nil(t, err) - data, err := resp.Decode() + data, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val}) require.Nil(t, err) require.NotNil(t, data) require.Equal(t, len(data), 1) @@ -438,14 +436,11 @@ func TestErrorTypeSelect(t *testing.T) { _, err := conn.Eval(insertEval, []interface{}{}) require.Nilf(t, err, "Tuple has been successfully inserted") - var resp Response var offset uint32 = 0 var limit uint32 = 1 - resp, err = conn.Select(space, index, offset, limit, IterEq, + data, err := conn.Select(space, index, offset, limit, IterEq, []interface{}{testcase.tuple.pk}) require.Nil(t, err) - data, err := resp.Decode() - require.Nil(t, err) require.NotNil(t, data) require.Equalf(t, len(data), 1, "Exactly one tuple had been found") tpl, ok := data[0].([]interface{}) diff --git a/connection.go b/connection.go index 04f66660f..7b58f7871 100644 --- a/connection.go +++ b/connection.go @@ -97,9 +97,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac log.Printf("tarantool: last reconnect to %s failed: %s, giving it up", conn.Addr(), err) case LogUnexpectedResultId: - respHeader := v[0].(header) + header := v[0].(Header) log.Printf("tarantool: connection %s got unexpected resultId (%d) in response", - conn.Addr(), respHeader.requestId) + conn.Addr(), header.RequestId) case LogWatchEventReadFailed: err := v[0].(error) log.Printf("tarantool: unable to parse watch event: %s", err) @@ -808,7 +808,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) { return } buf := smallBuf{b: respBytes} - respHeader, err := decodeHeader(conn.dec, &buf) + header, err := decodeHeader(conn.dec, &buf) if err != nil { err = ClientError{ ErrProtocolError, @@ -818,10 +818,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) { return } - resp := &ConnResponse{header: respHeader, buf: buf} var fut *Future = nil - if iproto.Type(respHeader.code) == iproto.IPROTO_EVENT { - if event, err := readWatchEvent(&resp.buf); err == nil { + if iproto.Type(header.Code) == iproto.IPROTO_EVENT { + if event, err := readWatchEvent(&buf); err == nil { events <- event } else { err = ClientError{ @@ -831,19 +830,19 @@ func (conn *Connection) reader(r io.Reader, c Conn) { conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err) } continue - } else if respHeader.code == PushCode { - if fut = conn.peekFuture(respHeader.requestId); fut != nil { - fut.AppendPush(resp) + } else if header.Code == PushCode { + if fut = conn.peekFuture(header.RequestId); fut != nil { + fut.AppendPush(header, &buf) } } else { - if fut = conn.fetchFuture(respHeader.requestId); fut != nil { - fut.SetResponse(resp) + if fut = conn.fetchFuture(header.RequestId); fut != nil { + fut.SetResponse(header, &buf) conn.markDone(fut) } } if fut == nil { - conn.opts.Logger.Report(LogUnexpectedResultId, conn, respHeader) + conn.opts.Logger.Report(LogUnexpectedResultId, conn, header) } } } @@ -873,8 +872,10 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) { } } -func (conn *Connection) newFuture(ctx context.Context) (fut *Future) { +func (conn *Connection) newFuture(req Request) (fut *Future) { + ctx := req.Ctx() fut = NewFuture() + fut.SetRequest(req) if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop { select { case conn.rlimit <- struct{}{}: @@ -984,7 +985,7 @@ func (conn *Connection) decrementRequestCnt() { func (conn *Connection) send(req Request, streamId uint64) *Future { conn.incrementRequestCnt() - fut := conn.newFuture(req.Ctx()) + fut := conn.newFuture(req) if fut.ready == nil { conn.decrementRequestCnt() return fut @@ -1053,8 +1054,11 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { if req.Async() { if fut = conn.fetchFuture(reqid); fut != nil { - resp := &ConnResponse{} - fut.SetResponse(resp) + header := Header{ + RequestId: reqid, + Code: OkCode, + } + fut.SetResponse(header, nil) conn.markDone(fut) } } diff --git a/connector.go b/connector.go index b7f5affed..72b5d19a8 100644 --- a/connector.go +++ b/connector.go @@ -13,41 +13,41 @@ type Connector interface { // Deprecated: the method will be removed in the next major version, // use a PingRequest object + Do() instead. - Ping() (Response, error) + Ping() ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a SelectRequest object + Do() instead. Select(space, index interface{}, offset, limit uint32, iterator Iter, - key interface{}) (Response, error) + key interface{}) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use an InsertRequest object + Do() instead. - Insert(space interface{}, tuple interface{}) (Response, error) + Insert(space interface{}, tuple interface{}) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a ReplicaRequest object + Do() instead. - Replace(space interface{}, tuple interface{}) (Response, error) + Replace(space interface{}, tuple interface{}) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a DeleteRequest object + Do() instead. - Delete(space, index interface{}, key interface{}) (Response, error) + Delete(space, index interface{}, key interface{}) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a UpdateRequest object + Do() instead. - Update(space, index interface{}, key interface{}, ops *Operations) (Response, error) + Update(space, index interface{}, key interface{}, ops *Operations) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a UpsertRequest object + Do() instead. - Upsert(space interface{}, tuple interface{}, ops *Operations) (Response, error) + Upsert(space interface{}, tuple interface{}, ops *Operations) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a CallRequest object + Do() instead. - Call(functionName string, args interface{}) (Response, error) + Call(functionName string, args interface{}) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a Call16Request object + Do() instead. - Call16(functionName string, args interface{}) (Response, error) + Call16(functionName string, args interface{}) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a Call17Request object + Do() instead. - Call17(functionName string, args interface{}) (Response, error) + Call17(functionName string, args interface{}) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use an EvalRequest object + Do() instead. - Eval(expr string, args interface{}) (Response, error) + Eval(expr string, args interface{}) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use an ExecuteRequest object + Do() instead. - Execute(expr string, args interface{}) (Response, error) + Execute(expr string, args interface{}) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a SelectRequest object + Do() instead. diff --git a/crud/common.go b/crud/common.go index 7a6f9e03b..061818487 100644 --- a/crud/common.go +++ b/crud/common.go @@ -55,6 +55,7 @@ package crud import ( "context" + "io" "github.com/tarantool/go-iproto" @@ -84,6 +85,12 @@ func (req baseRequest) Async() bool { return req.impl.Async() } +// Response creates a response for the baseRequest. +func (req baseRequest) Response(header tarantool.Header, + body io.Reader) (tarantool.Response, error) { + return req.impl.Response(header, body) +} + type spaceRequest struct { baseRequest space string diff --git a/crud/select.go b/crud/select.go index 24dbd0cb0..b922f2889 100644 --- a/crud/select.go +++ b/crud/select.go @@ -2,6 +2,7 @@ package crud import ( "context" + "io" "github.com/vmihailenco/msgpack/v5" @@ -133,3 +134,9 @@ func (req SelectRequest) Context(ctx context.Context) SelectRequest { return req } + +// Response creates a response for the SelectRequest. +func (req SelectRequest) Response(header tarantool.Header, + body io.Reader) (tarantool.Response, error) { + return req.impl.Response(header, body) +} diff --git a/dial.go b/dial.go index d674342bc..378687925 100644 --- a/dial.go +++ b/dial.go @@ -12,9 +12,8 @@ import ( "strings" "time" - "github.com/vmihailenco/msgpack/v5" - "github.com/tarantool/go-iproto" + "github.com/vmihailenco/msgpack/v5" ) const bufSize = 128 * 1024 @@ -405,15 +404,11 @@ func identify(w writeFlusher, r io.Reader) (ProtocolInfo, error) { } data, err := resp.Decode() if err != nil { - switch err := err.(type) { - case Error: - if err.Code == iproto.ER_UNKNOWN_REQUEST_TYPE { - return info, nil - } - return info, err - default: - return info, fmt.Errorf("decode response body error: %w", err) + if iproto.Error(resp.Header().Code) == iproto.ER_UNKNOWN_REQUEST_TYPE { + // IPROTO_ID requests are not supported by server. + return info, nil } + return info, err } if len(data) == 0 { @@ -511,12 +506,12 @@ func readResponse(r io.Reader) (Response, error) { respBytes, err := read(r, lenbuf[:]) if err != nil { - return &ConnResponse{}, fmt.Errorf("read error: %w", err) + return &BaseResponse{}, fmt.Errorf("read error: %w", err) } buf := smallBuf{b: respBytes} - respHeader, err := decodeHeader(msgpack.NewDecoder(&smallBuf{}), &buf) - resp := &ConnResponse{header: respHeader, buf: buf} + header, err := decodeHeader(msgpack.NewDecoder(&smallBuf{}), &buf) + resp := &BaseResponse{header: header, buf: buf} if err != nil { return resp, fmt.Errorf("decode response header error: %w", err) } diff --git a/example_test.go b/example_test.go index e002fbc94..0d0e92ab2 100644 --- a/example_test.go +++ b/example_test.go @@ -570,8 +570,17 @@ func ExampleExecuteRequest() { data, err := resp.Decode() fmt.Println("Error", err) fmt.Println("Data", data) - fmt.Println("MetaData", resp.MetaData()) - fmt.Println("SQL Info", resp.SQLInfo()) + exResp, ok := resp.(*tarantool.ExecuteResponse) + if !ok { + fmt.Printf("wrong response type") + return + } + metaData, err := exResp.MetaData() + fmt.Println("MetaData", metaData) + fmt.Println("Error", err) + sqlInfo, err := exResp.SQLInfo() + fmt.Println("SQL Info", sqlInfo) + fmt.Println("Error", err) // There are 4 options to pass named parameters to an SQL query: // 1) The simple map; @@ -608,8 +617,17 @@ func ExampleExecuteRequest() { data, err = resp.Decode() fmt.Println("Error", err) fmt.Println("Data", data) - fmt.Println("MetaData", resp.MetaData()) - fmt.Println("SQL Info", resp.SQLInfo()) + exResp, ok = resp.(*tarantool.ExecuteResponse) + if !ok { + fmt.Printf("wrong response type") + return + } + metaData, err = exResp.MetaData() + fmt.Println("MetaData", metaData) + fmt.Println("Error", err) + sqlInfo, err = exResp.SQLInfo() + fmt.Println("SQL Info", sqlInfo) + fmt.Println("Error", err) // 2) req = req.Args(sqlBind2) @@ -619,8 +637,17 @@ func ExampleExecuteRequest() { data, err = resp.Decode() fmt.Println("Error", err) fmt.Println("Data", data) - fmt.Println("MetaData", resp.MetaData()) - fmt.Println("SQL Info", resp.SQLInfo()) + exResp, ok = resp.(*tarantool.ExecuteResponse) + if !ok { + fmt.Printf("wrong response type") + return + } + metaData, err = exResp.MetaData() + fmt.Println("MetaData", metaData) + fmt.Println("Error", err) + sqlInfo, err = exResp.SQLInfo() + fmt.Println("SQL Info", sqlInfo) + fmt.Println("Error", err) // 3) req = req.Args(sqlBind3) @@ -630,8 +657,17 @@ func ExampleExecuteRequest() { data, err = resp.Decode() fmt.Println("Error", err) fmt.Println("Data", data) - fmt.Println("MetaData", resp.MetaData()) - fmt.Println("SQL Info", resp.SQLInfo()) + exResp, ok = resp.(*tarantool.ExecuteResponse) + if !ok { + fmt.Printf("wrong response type") + return + } + metaData, err = exResp.MetaData() + fmt.Println("MetaData", metaData) + fmt.Println("Error", err) + sqlInfo, err = exResp.SQLInfo() + fmt.Println("SQL Info", sqlInfo) + fmt.Println("Error", err) // 4) req = req.Args(sqlBind4) @@ -641,8 +677,17 @@ func ExampleExecuteRequest() { data, err = resp.Decode() fmt.Println("Error", err) fmt.Println("Data", data) - fmt.Println("MetaData", resp.MetaData()) - fmt.Println("SQL Info", resp.SQLInfo()) + exResp, ok = resp.(*tarantool.ExecuteResponse) + if !ok { + fmt.Printf("wrong response type") + return + } + metaData, err = exResp.MetaData() + fmt.Println("MetaData", metaData) + fmt.Println("Error", err) + sqlInfo, err = exResp.SQLInfo() + fmt.Println("SQL Info", sqlInfo) + fmt.Println("Error", err) // The way to pass positional arguments to an SQL query. req = tarantool.NewExecuteRequest( @@ -654,8 +699,17 @@ func ExampleExecuteRequest() { data, err = resp.Decode() fmt.Println("Error", err) fmt.Println("Data", data) - fmt.Println("MetaData", resp.MetaData()) - fmt.Println("SQL Info", resp.SQLInfo()) + exResp, ok = resp.(*tarantool.ExecuteResponse) + if !ok { + fmt.Printf("wrong response type") + return + } + metaData, err = exResp.MetaData() + fmt.Println("MetaData", metaData) + fmt.Println("Error", err) + sqlInfo, err = exResp.SQLInfo() + fmt.Println("SQL Info", sqlInfo) + fmt.Println("Error", err) // The way to pass SQL expression with using custom packing/unpacking for // a type. @@ -680,8 +734,17 @@ func ExampleExecuteRequest() { data, err = resp.Decode() fmt.Println("Error", err) fmt.Println("Data", data) - fmt.Println("MetaData", resp.MetaData()) - fmt.Println("SQL Info", resp.SQLInfo()) + exResp, ok = resp.(*tarantool.ExecuteResponse) + if !ok { + fmt.Printf("wrong response type") + return + } + metaData, err = exResp.MetaData() + fmt.Println("MetaData", metaData) + fmt.Println("Error", err) + sqlInfo, err = exResp.SQLInfo() + fmt.Println("SQL Info", sqlInfo) + fmt.Println("Error", err) } func getTestTxnDialer() tarantool.Dialer { @@ -942,17 +1005,25 @@ func ExampleFuture_GetIterator() { for it = fut.GetIterator().WithTimeout(timeout); it.Next(); { resp := it.Value() data, _ := resp.Decode() - fmt.Printf("response: %v\n", data[0]) + if it.IsPush() { + // It is a push message. + fmt.Printf("push message: %v\n", data[0]) + } else if resp.Header().Code == tarantool.OkCode { + // It is a regular response. + fmt.Printf("response: %v", data[0]) + } else { + fmt.Printf("an unexpected response code %d", resp.Header().Code) + } } if err := it.Err(); err != nil { fmt.Printf("error in call of push_func is %v", err) return } // Output: - // response: 1 - // response: 2 - // response: 3 - // response: 4 + // push message: 1 + // push message: 2 + // push message: 3 + // push message: 4 // response: 4 } @@ -1134,6 +1205,54 @@ func ExampleConnection_Do() { // [[1111 foo bar]] } +func ExampleConnection_Do_failure() { + conn := exampleConnect(dialer, opts) + defer conn.Close() + + // It could be any request. + req := tarantool.NewCallRequest("not_exist") + + // We got a future, the request actually not performed yet. + future := conn.Do(req) + + // When the future receives the response, the result of the Future is set + // and becomes available. We could wait for that moment with Future.Get(), + // Future.GetResponse() or Future.GetTyped() methods. + resp, err := future.GetResponse() + if err != nil { + fmt.Printf("Error in the future: %s\n", err) + } + + data, err := future.Get() + fmt.Printf("Data: %v\n", data) + + if err != nil { + // We don't print the error here to keep the example reproducible. + // fmt.Printf("Failed to execute the request: %s\n", err) + if resp == nil { + // Something happens in a client process (timeout, IO error etc). + fmt.Printf("Resp == nil, ClientErr = %s\n", err.(tarantool.ClientError)) + } else { + // Response exist. So it could be a Tarantool error or a decode + // error. We need to check the error code. + fmt.Printf("Error code from the response: %d\n", resp.Header().Code) + if resp.Header().Code == tarantool.OkCode { + fmt.Printf("Decode error: %s\n", err) + } else { + code := err.(tarantool.Error).Code + fmt.Printf("Error code from the error: %d\n", code) + fmt.Printf("Error short from the error: %s\n", code) + } + } + } + + // Output: + // Data: [] + // Error code from the response: 33 + // Error code from the error: 33 + // Error short from the error: ER_NO_SUCH_PROC +} + // To use prepared statements to query a tarantool instance, call NewPrepared. func ExampleConnection_NewPrepared() { // Tarantool supports SQL since version 2.0.0 diff --git a/future.go b/future.go index 81ba1c5af..bcaf03263 100644 --- a/future.go +++ b/future.go @@ -1,6 +1,7 @@ package tarantool import ( + "io" "sync" "time" ) @@ -8,6 +9,7 @@ import ( // Future is a handle for asynchronous request. type Future struct { requestId uint32 + req Request next *Future timeout time.Duration mutex sync.Mutex @@ -135,30 +137,46 @@ func NewFuture() (fut *Future) { // // Deprecated: the method will be removed in the next major version, // use Connector.NewWatcher() instead of box.session.push(). -func (fut *Future) AppendPush(resp Response) { +func (fut *Future) AppendPush(header Header, body io.Reader) error { fut.mutex.Lock() defer fut.mutex.Unlock() if fut.isDone() { - return + return nil + } + resp, err := createPushResponse(header, body) + if err != nil { + return err } fut.pushes = append(fut.pushes, resp) fut.ready <- struct{}{} + return nil +} + +// SetRequest sets a request, for which the future was created. +func (fut *Future) SetRequest(req Request) { + fut.req = req } // SetResponse sets a response for the future and finishes the future. -func (fut *Future) SetResponse(resp Response) { +func (fut *Future) SetResponse(header Header, body io.Reader) error { fut.mutex.Lock() defer fut.mutex.Unlock() if fut.isDone() { - return + return nil + } + + resp, err := fut.req.Response(header, body) + if err != nil { + return err } fut.resp = resp close(fut.ready) close(fut.done) + return nil } // SetError sets an error for the future and finishes the future. @@ -183,11 +201,7 @@ func (fut *Future) SetError(err error) { // or ClientError, if something bad happens in a client process. func (fut *Future) GetResponse() (Response, error) { fut.wait() - if fut.err != nil { - return fut.resp, fut.err - } - _, err := fut.resp.Decode() - return fut.resp, err + return fut.resp, fut.err } // Get waits for Future to be filled and returns the data of the Response and error. diff --git a/future_test.go b/future_test.go index 131cdafdd..3a5f5e6b9 100644 --- a/future_test.go +++ b/future_test.go @@ -6,10 +6,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" . "github.com/tarantool/go-tarantool/v2" ) -func assertResponseIteratorValue(t testing.TB, it ResponseIterator, resp Response) { +func assertResponseIteratorValue(t testing.TB, it ResponseIterator, + isPush bool, resp Response) { t.Helper() if it.Err() != nil { @@ -18,11 +20,15 @@ func assertResponseIteratorValue(t testing.TB, it ResponseIterator, resp Respons if it.Value() == nil { t.Errorf("An unexpected nil value") + } else if it.IsPush() != isPush { + if isPush { + t.Errorf("An unexpected response type, expected to be push") + } else { + t.Errorf("An unexpected response type, expected not to be push") + } } - if it.Value() != resp { - t.Errorf("An unexpected response %v, expected %v", it.Value(), resp) - } + assert.Equalf(t, it.Value(), resp, "An unexpected response %v, expected %v", it.Value(), resp) } func assertResponseIteratorFinished(t testing.TB, it ResponseIterator) { @@ -48,12 +54,15 @@ func TestFutureGetIteratorNoItems(t *testing.T) { } func TestFutureGetIteratorNoResponse(t *testing.T) { - push := &ConnResponse{} + pushHeader := Header{} + push := &PushResponse{} fut := NewFuture() - fut.AppendPush(push) + fut.AppendPush(pushHeader, nil) + + push.Decode() if it := fut.GetIterator(); it.Next() { - assertResponseIteratorValue(t, it, push) + assertResponseIteratorValue(t, it, true, push) if it.Next() == true { t.Errorf("An unexpected next value.") } @@ -64,12 +73,15 @@ func TestFutureGetIteratorNoResponse(t *testing.T) { } func TestFutureGetIteratorNoResponseTimeout(t *testing.T) { - push := &ConnResponse{} + pushHeader := Header{} + push := &PushResponse{} fut := NewFuture() - fut.AppendPush(push) + fut.AppendPush(pushHeader, nil) + + push.Decode() if it := fut.GetIterator().WithTimeout(1 * time.Nanosecond); it.Next() { - assertResponseIteratorValue(t, it, push) + assertResponseIteratorValue(t, it, true, push) if it.Next() == true { t.Errorf("An unexpected next value.") } @@ -80,10 +92,15 @@ func TestFutureGetIteratorNoResponseTimeout(t *testing.T) { } func TestFutureGetIteratorResponseOnTimeout(t *testing.T) { - push := &ConnResponse{} - resp := &ConnResponse{} + pushHeader := Header{} + respHeader := Header{} + push := &PushResponse{} + resp := &BaseResponse{} fut := NewFuture() - fut.AppendPush(push) + fut.AppendPush(pushHeader, nil) + + push.Decode() + resp.Decode() var done sync.WaitGroup var wait sync.WaitGroup @@ -96,11 +113,14 @@ func TestFutureGetIteratorResponseOnTimeout(t *testing.T) { var it ResponseIterator var cnt = 0 for it = fut.GetIterator().WithTimeout(5 * time.Second); it.Next(); { - r := push + var r Response + isPush := true + r = push if cnt == 1 { + isPush = false r = resp } - assertResponseIteratorValue(t, it, r) + assertResponseIteratorValue(t, it, isPush, r) cnt += 1 if cnt == 1 { wait.Done() @@ -114,19 +134,23 @@ func TestFutureGetIteratorResponseOnTimeout(t *testing.T) { }() wait.Wait() - fut.SetResponse(resp) + + fut.SetRequest(&InsertRequest{}) + fut.SetResponse(respHeader, nil) done.Wait() } func TestFutureGetIteratorFirstResponse(t *testing.T) { - resp1 := &ConnResponse{} - resp2 := &ConnResponse{} + resp := &BaseResponse{} fut := NewFuture() - fut.SetResponse(resp1) - fut.SetResponse(resp2) + fut.SetRequest(&InsertRequest{}) + fut.SetResponse(Header{}, nil) + fut.SetResponse(Header{}, nil) + + resp.Decode() if it := fut.GetIterator(); it.Next() { - assertResponseIteratorValue(t, it, resp1) + assertResponseIteratorValue(t, it, false, resp) if it.Next() == true { t.Errorf("An unexpected next value.") } @@ -155,17 +179,20 @@ func TestFutureGetIteratorFirstError(t *testing.T) { } func TestFutureGetIteratorResponse(t *testing.T) { - responses := []*ConnResponse{ - {}, - {}, - {}, + responses := []Response{ + &PushResponse{}, + &PushResponse{}, + &BaseResponse{}, } + header := Header{} fut := NewFuture() + fut.SetRequest(&InsertRequest{}) for i, resp := range responses { + resp.Decode() if i == len(responses)-1 { - fut.SetResponse(resp) + fut.SetResponse(header, nil) } else { - fut.AppendPush(resp) + fut.AppendPush(header, nil) } } @@ -176,7 +203,11 @@ func TestFutureGetIteratorResponse(t *testing.T) { for _, it := range its { var cnt = 0 for it.Next() { - assertResponseIteratorValue(t, it, responses[cnt]) + isPush := true + if cnt == len(responses)-1 { + isPush = false + } + assertResponseIteratorValue(t, it, isPush, responses[cnt]) cnt += 1 } assertResponseIteratorFinished(t, it) @@ -189,14 +220,15 @@ func TestFutureGetIteratorResponse(t *testing.T) { func TestFutureGetIteratorError(t *testing.T) { const errMsg = "error message" - responses := []*ConnResponse{ + responses := []*PushResponse{ {}, {}, } err := errors.New(errMsg) fut := NewFuture() for _, resp := range responses { - fut.AppendPush(resp) + fut.AppendPush(Header{}, nil) + resp.Decode() } fut.SetError(err) @@ -207,7 +239,7 @@ func TestFutureGetIteratorError(t *testing.T) { for _, it := range its { var cnt = 0 for it.Next() { - assertResponseIteratorValue(t, it, responses[cnt]) + assertResponseIteratorValue(t, it, true, responses[cnt]) cnt += 1 } if err = it.Err(); err != nil { @@ -226,19 +258,18 @@ func TestFutureGetIteratorError(t *testing.T) { func TestFutureSetStateRaceCondition(t *testing.T) { err := errors.New("any error") - resp := &ConnResponse{} for i := 0; i < 1000; i++ { fut := NewFuture() + fut.SetRequest(&InsertRequest{}) for j := 0; j < 9; j++ { go func(opt int) { if opt%3 == 0 { - respAppend := &ConnResponse{} - fut.AppendPush(respAppend) + fut.AppendPush(Header{}, nil) } else if opt%3 == 1 { fut.SetError(err) } else { - fut.SetResponse(resp) + fut.SetResponse(Header{}, nil) } }(j) } diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 747de45f4..3734c4c0a 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -373,7 +373,7 @@ func (p *ConnectionPool) GetInfo() map[string]ConnectionInfo { // // Deprecated: the method will be removed in the next major version, // use a PingRequest object + Do() instead. -func (p *ConnectionPool) Ping(userMode Mode) (tarantool.Response, error) { +func (p *ConnectionPool) Ping(userMode Mode) ([]interface{}, error) { conn, err := p.getNextConnection(userMode) if err != nil { return nil, err @@ -388,7 +388,7 @@ func (p *ConnectionPool) Ping(userMode Mode) (tarantool.Response, error) { // use a SelectRequest object + Do() instead. func (p *ConnectionPool) Select(space, index interface{}, offset, limit uint32, - iterator tarantool.Iter, key interface{}, userMode ...Mode) (tarantool.Response, error) { + iterator tarantool.Iter, key interface{}, userMode ...Mode) ([]interface{}, error) { conn, err := p.getConnByMode(ANY, userMode) if err != nil { return nil, err @@ -403,7 +403,7 @@ func (p *ConnectionPool) Select(space, index interface{}, // Deprecated: the method will be removed in the next major version, // use an InsertRequest object + Do() instead. func (p *ConnectionPool) Insert(space interface{}, tuple interface{}, - userMode ...Mode) (tarantool.Response, error) { + userMode ...Mode) ([]interface{}, error) { conn, err := p.getConnByMode(RW, userMode) if err != nil { return nil, err @@ -418,7 +418,7 @@ func (p *ConnectionPool) Insert(space interface{}, tuple interface{}, // Deprecated: the method will be removed in the next major version, // use a ReplaceRequest object + Do() instead. func (p *ConnectionPool) Replace(space interface{}, tuple interface{}, - userMode ...Mode) (tarantool.Response, error) { + userMode ...Mode) ([]interface{}, error) { conn, err := p.getConnByMode(RW, userMode) if err != nil { return nil, err @@ -433,7 +433,7 @@ func (p *ConnectionPool) Replace(space interface{}, tuple interface{}, // Deprecated: the method will be removed in the next major version, // use a DeleteRequest object + Do() instead. func (p *ConnectionPool) Delete(space, index interface{}, key interface{}, - userMode ...Mode) (tarantool.Response, error) { + userMode ...Mode) ([]interface{}, error) { conn, err := p.getConnByMode(RW, userMode) if err != nil { return nil, err @@ -448,7 +448,7 @@ func (p *ConnectionPool) Delete(space, index interface{}, key interface{}, // Deprecated: the method will be removed in the next major version, // use a UpdateRequest object + Do() instead. func (p *ConnectionPool) Update(space, index interface{}, key interface{}, - ops *tarantool.Operations, userMode ...Mode) (tarantool.Response, error) { + ops *tarantool.Operations, userMode ...Mode) ([]interface{}, error) { conn, err := p.getConnByMode(RW, userMode) if err != nil { return nil, err @@ -463,7 +463,7 @@ func (p *ConnectionPool) Update(space, index interface{}, key interface{}, // Deprecated: the method will be removed in the next major version, // use a UpsertRequest object + Do() instead. func (p *ConnectionPool) Upsert(space interface{}, tuple interface{}, - ops *tarantool.Operations, userMode ...Mode) (tarantool.Response, error) { + ops *tarantool.Operations, userMode ...Mode) ([]interface{}, error) { conn, err := p.getConnByMode(RW, userMode) if err != nil { return nil, err @@ -478,7 +478,7 @@ func (p *ConnectionPool) Upsert(space interface{}, tuple interface{}, // Deprecated: the method will be removed in the next major version, // use a CallRequest object + Do() instead. func (p *ConnectionPool) Call(functionName string, args interface{}, - userMode Mode) (tarantool.Response, error) { + userMode Mode) ([]interface{}, error) { conn, err := p.getNextConnection(userMode) if err != nil { return nil, err @@ -494,7 +494,7 @@ func (p *ConnectionPool) Call(functionName string, args interface{}, // Deprecated: the method will be removed in the next major version, // use a Call16Request object + Do() instead. func (p *ConnectionPool) Call16(functionName string, args interface{}, - userMode Mode) (tarantool.Response, error) { + userMode Mode) ([]interface{}, error) { conn, err := p.getNextConnection(userMode) if err != nil { return nil, err @@ -509,7 +509,7 @@ func (p *ConnectionPool) Call16(functionName string, args interface{}, // Deprecated: the method will be removed in the next major version, // use a Call17Request object + Do() instead. func (p *ConnectionPool) Call17(functionName string, args interface{}, - userMode Mode) (tarantool.Response, error) { + userMode Mode) ([]interface{}, error) { conn, err := p.getNextConnection(userMode) if err != nil { return nil, err @@ -523,7 +523,7 @@ func (p *ConnectionPool) Call17(functionName string, args interface{}, // Deprecated: the method will be removed in the next major version, // use an EvalRequest object + Do() instead. func (p *ConnectionPool) Eval(expr string, args interface{}, - userMode Mode) (tarantool.Response, error) { + userMode Mode) ([]interface{}, error) { conn, err := p.getNextConnection(userMode) if err != nil { return nil, err @@ -537,7 +537,7 @@ func (p *ConnectionPool) Eval(expr string, args interface{}, // Deprecated: the method will be removed in the next major version, // use an ExecuteRequest object + Do() instead. func (p *ConnectionPool) Execute(expr string, args interface{}, - userMode Mode) (tarantool.Response, error) { + userMode Mode) ([]interface{}, error) { conn, err := p.getNextConnection(userMode) if err != nil { return nil, err diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index 50096cc4c..ae1acd223 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -1140,11 +1140,9 @@ func TestCall(t *testing.T) { defer connPool.Close() // PreferRO - resp, err := connPool.Call("box.info", []interface{}{}, pool.PreferRO) + data, err := connPool.Call("box.info", []interface{}{}, pool.PreferRO) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err := resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val := data[0].(map[interface{}]interface{})["ro"] @@ -1153,11 +1151,9 @@ func TestCall(t *testing.T) { require.Truef(t, ro, "expected `true` with mode `PreferRO`") // PreferRW - resp, err = connPool.Call("box.info", []interface{}{}, pool.PreferRW) + data, err = connPool.Call("box.info", []interface{}{}, pool.PreferRW) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val = data[0].(map[interface{}]interface{})["ro"] @@ -1166,11 +1162,9 @@ func TestCall(t *testing.T) { require.Falsef(t, ro, "expected `false` with mode `PreferRW`") // RO - resp, err = connPool.Call("box.info", []interface{}{}, pool.RO) + data, err = connPool.Call("box.info", []interface{}{}, pool.RO) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val = data[0].(map[interface{}]interface{})["ro"] @@ -1179,11 +1173,9 @@ func TestCall(t *testing.T) { require.Truef(t, ro, "expected `true` with mode `RO`") // RW - resp, err = connPool.Call("box.info", []interface{}{}, pool.RW) + data, err = connPool.Call("box.info", []interface{}{}, pool.RW) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val = data[0].(map[interface{}]interface{})["ro"] @@ -1207,11 +1199,9 @@ func TestCall16(t *testing.T) { defer connPool.Close() // PreferRO - resp, err := connPool.Call16("box.info", []interface{}{}, pool.PreferRO) + data, err := connPool.Call16("box.info", []interface{}{}, pool.PreferRO) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err := resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val := data[0].([]interface{})[0].(map[interface{}]interface{})["ro"] @@ -1220,11 +1210,9 @@ func TestCall16(t *testing.T) { require.Truef(t, ro, "expected `true` with mode `PreferRO`") // PreferRW - resp, err = connPool.Call16("box.info", []interface{}{}, pool.PreferRW) + data, err = connPool.Call16("box.info", []interface{}{}, pool.PreferRW) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val = data[0].([]interface{})[0].(map[interface{}]interface{})["ro"] @@ -1233,11 +1221,9 @@ func TestCall16(t *testing.T) { require.Falsef(t, ro, "expected `false` with mode `PreferRW`") // RO - resp, err = connPool.Call16("box.info", []interface{}{}, pool.RO) + data, err = connPool.Call16("box.info", []interface{}{}, pool.RO) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val = data[0].([]interface{})[0].(map[interface{}]interface{})["ro"] @@ -1246,11 +1232,9 @@ func TestCall16(t *testing.T) { require.Truef(t, ro, "expected `true` with mode `RO`") // RW - resp, err = connPool.Call16("box.info", []interface{}{}, pool.RW) + data, err = connPool.Call16("box.info", []interface{}{}, pool.RW) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val = data[0].([]interface{})[0].(map[interface{}]interface{})["ro"] @@ -1274,11 +1258,9 @@ func TestCall17(t *testing.T) { defer connPool.Close() // PreferRO - resp, err := connPool.Call17("box.info", []interface{}{}, pool.PreferRO) + data, err := connPool.Call17("box.info", []interface{}{}, pool.PreferRO) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err := resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val := data[0].(map[interface{}]interface{})["ro"] @@ -1287,11 +1269,9 @@ func TestCall17(t *testing.T) { require.Truef(t, ro, "expected `true` with mode `PreferRO`") // PreferRW - resp, err = connPool.Call17("box.info", []interface{}{}, pool.PreferRW) + data, err = connPool.Call17("box.info", []interface{}{}, pool.PreferRW) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val = data[0].(map[interface{}]interface{})["ro"] @@ -1300,11 +1280,9 @@ func TestCall17(t *testing.T) { require.Falsef(t, ro, "expected `false` with mode `PreferRW`") // RO - resp, err = connPool.Call17("box.info", []interface{}{}, pool.RO) + data, err = connPool.Call17("box.info", []interface{}{}, pool.RO) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val = data[0].(map[interface{}]interface{})["ro"] @@ -1313,11 +1291,9 @@ func TestCall17(t *testing.T) { require.Truef(t, ro, "expected `true` with mode `RO`") // RW - resp, err = connPool.Call17("box.info", []interface{}{}, pool.RW) + data, err = connPool.Call17("box.info", []interface{}{}, pool.RW) require.Nilf(t, err, "failed to Call") - require.NotNilf(t, resp, "response is nil after Call") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Call") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Call") val = data[0].(map[interface{}]interface{})["ro"] @@ -1341,11 +1317,9 @@ func TestEval(t *testing.T) { defer connPool.Close() // PreferRO - resp, err := connPool.Eval("return box.info().ro", []interface{}{}, pool.PreferRO) + data, err := connPool.Eval("return box.info().ro", []interface{}{}, pool.PreferRO) require.Nilf(t, err, "failed to Eval") - require.NotNilf(t, resp, "response is nil after Eval") - data, err := resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Eval") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Eval") val, ok := data[0].(bool) @@ -1353,11 +1327,9 @@ func TestEval(t *testing.T) { require.Truef(t, val, "expected `true` with mode `PreferRO`") // PreferRW - resp, err = connPool.Eval("return box.info().ro", []interface{}{}, pool.PreferRW) + data, err = connPool.Eval("return box.info().ro", []interface{}{}, pool.PreferRW) require.Nilf(t, err, "failed to Eval") - require.NotNilf(t, resp, "response is nil after Eval") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Eval") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Eval") val, ok = data[0].(bool) @@ -1365,11 +1337,9 @@ func TestEval(t *testing.T) { require.Falsef(t, val, "expected `false` with mode `PreferRW`") // RO - resp, err = connPool.Eval("return box.info().ro", []interface{}{}, pool.RO) + data, err = connPool.Eval("return box.info().ro", []interface{}{}, pool.RO) require.Nilf(t, err, "failed to Eval") - require.NotNilf(t, resp, "response is nil after Eval") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Eval") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Eval") val, ok = data[0].(bool) @@ -1377,11 +1347,9 @@ func TestEval(t *testing.T) { require.Truef(t, val, "expected `true` with mode `RO`") // RW - resp, err = connPool.Eval("return box.info().ro", []interface{}{}, pool.RW) + data, err = connPool.Eval("return box.info().ro", []interface{}{}, pool.RW) require.Nilf(t, err, "failed to Eval") - require.NotNilf(t, resp, "response is nil after Eval") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Eval") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Eval") val, ok = data[0].(bool) @@ -1430,11 +1398,9 @@ func TestExecute(t *testing.T) { request := "SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0 == 1;" // Execute - resp, err := connPool.Execute(request, []interface{}{}, pool.ANY) + data, err := connPool.Execute(request, []interface{}{}, pool.ANY) require.Nilf(t, err, "failed to Execute") - require.NotNilf(t, resp, "response is nil after Execute") - data, err := resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Execute") require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Execute") require.Equalf(t, len(data[0].([]interface{})), 2, "unexpected response") @@ -1872,11 +1838,9 @@ func TestInsert(t *testing.T) { defer connPool.Close() // Mode is `RW` by default, we have only one RW instance (servers[2]) - resp, err := connPool.Insert(spaceName, []interface{}{"rw_insert_key", "rw_insert_value"}) + data, err := connPool.Insert(spaceName, []interface{}{"rw_insert_key", "rw_insert_value"}) require.Nilf(t, err, "failed to Insert") - require.NotNilf(t, resp, "response is nil after Insert") - data, err := resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Insert") require.Equalf(t, len(data), 1, "response Body len != 1 after Insert") tpl, ok := data[0].([]interface{}) @@ -1918,12 +1882,10 @@ func TestInsert(t *testing.T) { require.Equalf(t, "rw_insert_value", value, "unexpected body of Select (1)") // PreferRW - resp, err = connPool.Insert(spaceName, + data, err = connPool.Insert(spaceName, []interface{}{"preferRW_insert_key", "preferRW_insert_value"}) require.Nilf(t, err, "failed to Insert") - require.NotNilf(t, resp, "response is nil after Insert") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Insert") require.Equalf(t, len(data), 1, "response Body len != 1 after Insert") tpl, ok = data[0].([]interface{}) @@ -1997,11 +1959,9 @@ func TestDelete(t *testing.T) { require.Equalf(t, "delete_value", value, "unexpected body of Insert (1)") // Mode is `RW` by default, we have only one RW instance (servers[2]) - resp, err := connPool.Delete(spaceName, indexNo, []interface{}{"delete_key"}) + data, err = connPool.Delete(spaceName, indexNo, []interface{}{"delete_key"}) require.Nilf(t, err, "failed to Delete") - require.NotNilf(t, resp, "response is nil after Delete") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Delete") require.Equalf(t, len(data), 1, "response Body len != 1 after Delete") tpl, ok = data[0].([]interface{}) @@ -2046,18 +2006,18 @@ func TestUpsert(t *testing.T) { defer conn.Close() // Mode is `RW` by default, we have only one RW instance (servers[2]) - resp, err := connPool.Upsert(spaceName, + data, err := connPool.Upsert(spaceName, []interface{}{"upsert_key", "upsert_value"}, tarantool.NewOperations().Assign(1, "new_value")) require.Nilf(t, err, "failed to Upsert") - require.NotNilf(t, resp, "response is nil after Upsert") + require.NotNilf(t, data, "response is nil after Upsert") sel := tarantool.NewSelectRequest(spaceNo). Index(indexNo). Limit(1). Iterator(tarantool.IterEq). Key([]interface{}{"upsert_key"}) - data, err := conn.Do(sel).Get() + data, err = conn.Do(sel).Get() require.Nilf(t, err, "failed to Select") require.Equalf(t, len(data), 1, "response Body len != 1 after Select") @@ -2074,12 +2034,12 @@ func TestUpsert(t *testing.T) { require.Equalf(t, "upsert_value", value, "unexpected body of Select (1)") // PreferRW - resp, err = connPool.Upsert( + data, err = connPool.Upsert( spaceName, []interface{}{"upsert_key", "upsert_value"}, tarantool.NewOperations().Assign(1, "new_value"), pool.PreferRW) require.Nilf(t, err, "failed to Upsert") - require.NotNilf(t, resp, "response is nil after Upsert") + require.NotNilf(t, data, "response is nil after Upsert") data, err = conn.Do(sel).Get() require.Nilf(t, err, "failed to Select") @@ -2308,11 +2268,9 @@ func TestSelect(t *testing.T) { require.Nil(t, err) //default: ANY - resp, err := connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, anyKey) + data, err := connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, anyKey) require.Nilf(t, err, "failed to Select") - require.NotNilf(t, resp, "response is nil after Select") - data, err := resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Select") require.Equalf(t, len(data), 1, "response Body len != 1 after Select") tpl, ok := data[0].([]interface{}) @@ -2328,11 +2286,9 @@ func TestSelect(t *testing.T) { require.Equalf(t, "any_select_value", value, "unexpected body of Select (1)") // PreferRO - resp, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, roKey, pool.PreferRO) + data, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, roKey, pool.PreferRO) require.Nilf(t, err, "failed to Select") - require.NotNilf(t, resp, "response is nil after Select") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Select") require.Equalf(t, len(data), 1, "response Body len != 1 after Select") tpl, ok = data[0].([]interface{}) @@ -2344,11 +2300,9 @@ func TestSelect(t *testing.T) { require.Equalf(t, "ro_select_key", key, "unexpected body of Select (0)") // PreferRW - resp, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, rwKey, pool.PreferRW) + data, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, rwKey, pool.PreferRW) require.Nilf(t, err, "failed to Select") - require.NotNilf(t, resp, "response is nil after Select") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Select") require.Equalf(t, len(data), 1, "response Body len != 1 after Select") tpl, ok = data[0].([]interface{}) @@ -2364,11 +2318,9 @@ func TestSelect(t *testing.T) { require.Equalf(t, "rw_select_value", value, "unexpected body of Select (1)") // RO - resp, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, roKey, pool.RO) + data, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, roKey, pool.RO) require.Nilf(t, err, "failed to Select") - require.NotNilf(t, resp, "response is nil after Select") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Select") require.Equalf(t, len(data), 1, "response Body len != 1 after Select") tpl, ok = data[0].([]interface{}) @@ -2384,11 +2336,9 @@ func TestSelect(t *testing.T) { require.Equalf(t, "ro_select_value", value, "unexpected body of Select (1)") // RW - resp, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, rwKey, pool.RW) + data, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, rwKey, pool.RW) require.Nilf(t, err, "failed to Select") - require.NotNilf(t, resp, "response is nil after Select") - data, err = resp.Decode() - require.Nilf(t, err, "failed to Decode") + require.NotNilf(t, data, "response is nil after Select") require.Equalf(t, len(data), 1, "response Body len != 1 after Select") tpl, ok = data[0].([]interface{}) @@ -2419,29 +2369,29 @@ func TestPing(t *testing.T) { defer connPool.Close() // ANY - resp, err := connPool.Ping(pool.ANY) + data, err := connPool.Ping(pool.ANY) require.Nilf(t, err, "failed to Ping") - require.NotNilf(t, resp, "response is nil after Ping") + require.Nilf(t, data, "response data is not nil after Ping") // RW - resp, err = connPool.Ping(pool.RW) + data, err = connPool.Ping(pool.RW) require.Nilf(t, err, "failed to Ping") - require.NotNilf(t, resp, "response is nil after Ping") + require.Nilf(t, data, "response data is not nil after Ping") // RO - resp, err = connPool.Ping(pool.RO) + data, err = connPool.Ping(pool.RO) require.Nilf(t, err, "failed to Ping") - require.NotNilf(t, resp, "response is nil after Ping") + require.Nilf(t, data, "response data is not nil after Ping") // PreferRW - resp, err = connPool.Ping(pool.PreferRW) + data, err = connPool.Ping(pool.PreferRW) require.Nilf(t, err, "failed to Ping") - require.NotNilf(t, resp, "response is nil after Ping") + require.Nilf(t, data, "response data is not nil after Ping") // PreferRO - resp, err = connPool.Ping(pool.PreferRO) + data, err = connPool.Ping(pool.PreferRO) require.Nilf(t, err, "failed to Ping") - require.NotNilf(t, resp, "response is nil after Ping") + require.Nilf(t, data, "response data is not nil after Ping") } func TestDo(t *testing.T) { @@ -2548,7 +2498,14 @@ func TestNewPrepared(t *testing.T) { if reflect.DeepEqual(data[0], []interface{}{1, "test"}) { t.Error("Select with named arguments failed") } - metaData := resp.MetaData() + prepResp, ok := resp.(*tarantool.ExecuteResponse) + if !ok { + t.Fatalf("Not a Prepare response") + } + metaData, err := prepResp.MetaData() + if err != nil { + t.Errorf("Error while getting MetaData: %s", err.Error()) + } if metaData[0].FieldType != "unsigned" || metaData[0].FieldName != "NAME0" || metaData[1].FieldType != "string" || diff --git a/pool/connector.go b/pool/connector.go index 74a60bd74..23cc7275d 100644 --- a/pool/connector.go +++ b/pool/connector.go @@ -60,7 +60,7 @@ func (c *ConnectorAdapter) ConfiguredTimeout() time.Duration { // // Deprecated: the method will be removed in the next major version, // use a PingRequest object + Do() instead. -func (c *ConnectorAdapter) Ping() (tarantool.Response, error) { +func (c *ConnectorAdapter) Ping() ([]interface{}, error) { return c.pool.Ping(c.mode) } @@ -70,7 +70,7 @@ func (c *ConnectorAdapter) Ping() (tarantool.Response, error) { // use a SelectRequest object + Do() instead. func (c *ConnectorAdapter) Select(space, index interface{}, offset, limit uint32, iterator tarantool.Iter, - key interface{}) (tarantool.Response, error) { + key interface{}) ([]interface{}, error) { return c.pool.Select(space, index, offset, limit, iterator, key, c.mode) } @@ -79,7 +79,7 @@ func (c *ConnectorAdapter) Select(space, index interface{}, // Deprecated: the method will be removed in the next major version, // use an InsertRequest object + Do() instead. func (c *ConnectorAdapter) Insert(space interface{}, - tuple interface{}) (tarantool.Response, error) { + tuple interface{}) ([]interface{}, error) { return c.pool.Insert(space, tuple, c.mode) } @@ -88,7 +88,7 @@ func (c *ConnectorAdapter) Insert(space interface{}, // Deprecated: the method will be removed in the next major version, // use a ReplaceRequest object + Do() instead. func (c *ConnectorAdapter) Replace(space interface{}, - tuple interface{}) (tarantool.Response, error) { + tuple interface{}) ([]interface{}, error) { return c.pool.Replace(space, tuple, c.mode) } @@ -97,7 +97,7 @@ func (c *ConnectorAdapter) Replace(space interface{}, // Deprecated: the method will be removed in the next major version, // use a DeleteRequest object + Do() instead. func (c *ConnectorAdapter) Delete(space, index interface{}, - key interface{}) (tarantool.Response, error) { + key interface{}) ([]interface{}, error) { return c.pool.Delete(space, index, key, c.mode) } @@ -106,7 +106,7 @@ func (c *ConnectorAdapter) Delete(space, index interface{}, // Deprecated: the method will be removed in the next major version, // use a UpdateRequest object + Do() instead. func (c *ConnectorAdapter) Update(space, index interface{}, - key interface{}, ops *tarantool.Operations) (tarantool.Response, error) { + key interface{}, ops *tarantool.Operations) ([]interface{}, error) { return c.pool.Update(space, index, key, ops, c.mode) } @@ -115,7 +115,7 @@ func (c *ConnectorAdapter) Update(space, index interface{}, // Deprecated: the method will be removed in the next major version, // use a UpsertRequest object + Do() instead. func (c *ConnectorAdapter) Upsert(space, tuple interface{}, - ops *tarantool.Operations) (tarantool.Response, error) { + ops *tarantool.Operations) ([]interface{}, error) { return c.pool.Upsert(space, tuple, ops, c.mode) } @@ -125,7 +125,7 @@ func (c *ConnectorAdapter) Upsert(space, tuple interface{}, // Deprecated: the method will be removed in the next major version, // use a CallRequest object + Do() instead. func (c *ConnectorAdapter) Call(functionName string, - args interface{}) (tarantool.Response, error) { + args interface{}) ([]interface{}, error) { return c.pool.Call(functionName, args, c.mode) } @@ -136,7 +136,7 @@ func (c *ConnectorAdapter) Call(functionName string, // Deprecated: the method will be removed in the next major version, // use a Call16Request object + Do() instead. func (c *ConnectorAdapter) Call16(functionName string, - args interface{}) (tarantool.Response, error) { + args interface{}) ([]interface{}, error) { return c.pool.Call16(functionName, args, c.mode) } @@ -146,7 +146,7 @@ func (c *ConnectorAdapter) Call16(functionName string, // Deprecated: the method will be removed in the next major version, // use a Call17Request object + Do() instead. func (c *ConnectorAdapter) Call17(functionName string, - args interface{}) (tarantool.Response, error) { + args interface{}) ([]interface{}, error) { return c.pool.Call17(functionName, args, c.mode) } @@ -155,7 +155,7 @@ func (c *ConnectorAdapter) Call17(functionName string, // Deprecated: the method will be removed in the next major version, // use an EvalRequest object + Do() instead. func (c *ConnectorAdapter) Eval(expr string, - args interface{}) (tarantool.Response, error) { + args interface{}) ([]interface{}, error) { return c.pool.Eval(expr, args, c.mode) } @@ -164,7 +164,7 @@ func (c *ConnectorAdapter) Eval(expr string, // Deprecated: the method will be removed in the next major version, // use an ExecuteRequest object + Do() instead. func (c *ConnectorAdapter) Execute(expr string, - args interface{}) (tarantool.Response, error) { + args interface{}) ([]interface{}, error) { return c.pool.Execute(expr, args, c.mode) } diff --git a/pool/connector_test.go b/pool/connector_test.go index 190d2f9cc..87bebbd53 100644 --- a/pool/connector_test.go +++ b/pool/connector_test.go @@ -135,7 +135,7 @@ type baseRequestMock struct { mode Mode } -var reqResp tarantool.Response = &tarantool.ConnResponse{} +var reqData []interface{} var errReq error = errors.New("response error") var reqFuture *tarantool.Future = &tarantool.Future{} @@ -190,7 +190,7 @@ type selectMock struct { func (m *selectMock) Select(space, index interface{}, offset, limit uint32, iterator tarantool.Iter, key interface{}, - mode ...Mode) (tarantool.Response, error) { + mode ...Mode) ([]interface{}, error) { m.called++ m.space = space m.index = index @@ -199,7 +199,7 @@ func (m *selectMock) Select(space, index interface{}, m.iterator = iterator m.key = key m.mode = mode[0] - return reqResp, errReq + return reqData, errReq } func TestConnectorSelect(t *testing.T) { @@ -208,7 +208,7 @@ func TestConnectorSelect(t *testing.T) { resp, err := c.Select(reqSpace, reqIndex, reqOffset, reqLimit, reqIterator, reqKey) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqSpace, m.space, "unexpected space was passed") @@ -299,12 +299,12 @@ type insertMock struct { } func (m *insertMock) Insert(space, tuple interface{}, - mode ...Mode) (tarantool.Response, error) { + mode ...Mode) ([]interface{}, error) { m.called++ m.space = space m.tuple = tuple m.mode = mode[0] - return reqResp, errReq + return reqData, errReq } func TestConnectorInsert(t *testing.T) { @@ -313,7 +313,7 @@ func TestConnectorInsert(t *testing.T) { resp, err := c.Insert(reqSpace, reqTuple) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqSpace, m.space, "unexpected space was passed") @@ -380,12 +380,12 @@ type replaceMock struct { } func (m *replaceMock) Replace(space, tuple interface{}, - mode ...Mode) (tarantool.Response, error) { + mode ...Mode) ([]interface{}, error) { m.called++ m.space = space m.tuple = tuple m.mode = mode[0] - return reqResp, errReq + return reqData, errReq } func TestConnectorReplace(t *testing.T) { @@ -394,7 +394,7 @@ func TestConnectorReplace(t *testing.T) { resp, err := c.Replace(reqSpace, reqTuple) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqSpace, m.space, "unexpected space was passed") @@ -461,13 +461,13 @@ type deleteMock struct { } func (m *deleteMock) Delete(space, index, key interface{}, - mode ...Mode) (tarantool.Response, error) { + mode ...Mode) ([]interface{}, error) { m.called++ m.space = space m.index = index m.key = key m.mode = mode[0] - return reqResp, errReq + return reqData, errReq } func TestConnectorDelete(t *testing.T) { @@ -476,7 +476,7 @@ func TestConnectorDelete(t *testing.T) { resp, err := c.Delete(reqSpace, reqIndex, reqKey) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqSpace, m.space, "unexpected space was passed") @@ -548,14 +548,14 @@ type updateMock struct { } func (m *updateMock) Update(space, index, key interface{}, - ops *tarantool.Operations, mode ...Mode) (tarantool.Response, error) { + ops *tarantool.Operations, mode ...Mode) ([]interface{}, error) { m.called++ m.space = space m.index = index m.key = key m.ops = ops m.mode = mode[0] - return reqResp, errReq + return reqData, errReq } func TestConnectorUpdate(t *testing.T) { @@ -564,7 +564,7 @@ func TestConnectorUpdate(t *testing.T) { resp, err := c.Update(reqSpace, reqIndex, reqKey, reqOps) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqSpace, m.space, "unexpected space was passed") @@ -641,13 +641,13 @@ type upsertMock struct { } func (m *upsertMock) Upsert(space, tuple interface{}, ops *tarantool.Operations, - mode ...Mode) (tarantool.Response, error) { + mode ...Mode) ([]interface{}, error) { m.called++ m.space = space m.tuple = tuple m.ops = ops m.mode = mode[0] - return reqResp, errReq + return reqData, errReq } func TestConnectorUpsert(t *testing.T) { @@ -656,7 +656,7 @@ func TestConnectorUpsert(t *testing.T) { resp, err := c.Upsert(reqSpace, reqTuple, reqOps) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqSpace, m.space, "unexpected space was passed") @@ -698,12 +698,12 @@ type baseCallMock struct { } func (m *baseCallMock) call(functionName string, args interface{}, - mode Mode) (tarantool.Response, error) { + mode Mode) ([]interface{}, error) { m.called++ m.functionName = functionName m.args = args m.mode = mode - return reqResp, errReq + return reqData, errReq } func (m *baseCallMock) callTyped(functionName string, args interface{}, @@ -730,7 +730,7 @@ type callMock struct { } func (m *callMock) Call(functionName string, args interface{}, - mode Mode) (tarantool.Response, error) { + mode Mode) ([]interface{}, error) { return m.call(functionName, args, mode) } @@ -740,7 +740,7 @@ func TestConnectorCall(t *testing.T) { resp, err := c.Call(reqFunctionName, reqArgs) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqFunctionName, m.functionName, @@ -801,7 +801,7 @@ type call16Mock struct { } func (m *call16Mock) Call16(functionName string, args interface{}, - mode Mode) (tarantool.Response, error) { + mode Mode) ([]interface{}, error) { return m.call(functionName, args, mode) } @@ -811,7 +811,7 @@ func TestConnectorCall16(t *testing.T) { resp, err := c.Call16(reqFunctionName, reqArgs) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqFunctionName, m.functionName, @@ -872,7 +872,7 @@ type call17Mock struct { } func (m *call17Mock) Call17(functionName string, args interface{}, - mode Mode) (tarantool.Response, error) { + mode Mode) ([]interface{}, error) { return m.call(functionName, args, mode) } @@ -882,7 +882,7 @@ func TestConnectorCall17(t *testing.T) { resp, err := c.Call17(reqFunctionName, reqArgs) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqFunctionName, m.functionName, @@ -943,7 +943,7 @@ type evalMock struct { } func (m *evalMock) Eval(functionName string, args interface{}, - mode Mode) (tarantool.Response, error) { + mode Mode) ([]interface{}, error) { return m.call(functionName, args, mode) } @@ -953,7 +953,7 @@ func TestConnectorEval(t *testing.T) { resp, err := c.Eval(reqFunctionName, reqArgs) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqFunctionName, m.functionName, @@ -1014,7 +1014,7 @@ type executeMock struct { } func (m *executeMock) Execute(functionName string, args interface{}, - mode Mode) (tarantool.Response, error) { + mode Mode) ([]interface{}, error) { return m.call(functionName, args, mode) } @@ -1024,7 +1024,7 @@ func TestConnectorExecute(t *testing.T) { resp, err := c.Execute(reqFunctionName, reqArgs) - require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqData, resp, "unexpected response") require.Equalf(t, errReq, err, "unexpected error") require.Equalf(t, 1, m.called, "should be called only once") require.Equalf(t, reqFunctionName, m.functionName, diff --git a/pool/pooler.go b/pool/pooler.go index 6256c2d24..975162d37 100644 --- a/pool/pooler.go +++ b/pool/pooler.go @@ -19,51 +19,51 @@ type Pooler interface { // Deprecated: the method will be removed in the next major version, // use a PingRequest object + Do() instead. - Ping(mode Mode) (tarantool.Response, error) + Ping(mode Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a SelectRequest object + Do() instead. Select(space, index interface{}, offset, limit uint32, iterator tarantool.Iter, - key interface{}, mode ...Mode) (tarantool.Response, error) + key interface{}, mode ...Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use an InsertRequest object + Do() instead. Insert(space interface{}, tuple interface{}, - mode ...Mode) (tarantool.Response, error) + mode ...Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a ReplaceRequest object + Do() instead. Replace(space interface{}, tuple interface{}, - mode ...Mode) (tarantool.Response, error) + mode ...Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a DeleteRequest object + Do() instead. Delete(space, index interface{}, key interface{}, - mode ...Mode) (tarantool.Response, error) + mode ...Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a UpdateRequest object + Do() instead. Update(space, index interface{}, key interface{}, ops *tarantool.Operations, - mode ...Mode) (tarantool.Response, error) + mode ...Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a UpsertRequest object + Do() instead. Upsert(space interface{}, tuple interface{}, ops *tarantool.Operations, - mode ...Mode) (tarantool.Response, error) + mode ...Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a CallRequest object + Do() instead. Call(functionName string, args interface{}, - mode Mode) (tarantool.Response, error) + mode Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a Call16Request object + Do() instead. Call16(functionName string, args interface{}, - mode Mode) (tarantool.Response, error) + mode Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a Call17Request object + Do() instead. Call17(functionName string, args interface{}, - mode Mode) (tarantool.Response, error) + mode Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use an EvalRequest object + Do() instead. Eval(expr string, args interface{}, - mode Mode) (tarantool.Response, error) + mode Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use an ExecuteRequest object + Do() instead. Execute(expr string, args interface{}, - mode Mode) (tarantool.Response, error) + mode Mode) ([]interface{}, error) // Deprecated: the method will be removed in the next major version, // use a SelectRequest object + Do() instead. diff --git a/prepared.go b/prepared.go index f4fc1cdf1..69fbf2ecb 100644 --- a/prepared.go +++ b/prepared.go @@ -3,6 +3,7 @@ package tarantool import ( "context" "fmt" + "io" "github.com/tarantool/go-iproto" "github.com/vmihailenco/msgpack/v5" @@ -95,6 +96,26 @@ func (req *PrepareRequest) Context(ctx context.Context) *PrepareRequest { return req } +// Response creates a response for the PrepareRequest. +func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, error) { + if body == nil { + baseResp := BaseResponse{header: header} + return &PrepareResponse{BaseResponse: baseResp}, nil + } + if buf, ok := body.(*smallBuf); ok { + baseResp := BaseResponse{header: header, buf: *buf} + return &PrepareResponse{BaseResponse: baseResp}, nil + } + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + baseResp := BaseResponse{ + header: header, buf: smallBuf{b: data}, + } + return &PrepareResponse{BaseResponse: baseResp}, nil +} + // UnprepareRequest helps you to create an unprepare request object for // execution by a Connection. type UnprepareRequest struct { @@ -175,3 +196,23 @@ func (req *ExecutePreparedRequest) Context(ctx context.Context) *ExecutePrepared req.ctx = ctx return req } + +// Response creates a response for the ExecutePreparedRequest. +func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Response, error) { + if body == nil { + baseResp := BaseResponse{header: header} + return &ExecuteResponse{BaseResponse: baseResp}, nil + } + if buf, ok := body.(*smallBuf); ok { + baseResp := BaseResponse{header: header, buf: *buf} + return &ExecuteResponse{BaseResponse: baseResp}, nil + } + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + baseResp := BaseResponse{ + header: header, buf: smallBuf{b: data}, + } + return &ExecuteResponse{BaseResponse: baseResp}, nil +} diff --git a/request.go b/request.go index bdf1c50ea..6ede97380 100644 --- a/request.go +++ b/request.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "reflect" "strings" "sync" @@ -260,8 +261,8 @@ func fillWatchOnce(enc *msgpack.Encoder, key string) error { // // Deprecated: the method will be removed in the next major version, // use a PingRequest object + Do() instead. -func (conn *Connection) Ping() (Response, error) { - return conn.Do(NewPingRequest()).GetResponse() +func (conn *Connection) Ping() ([]interface{}, error) { + return conn.Do(NewPingRequest()).Get() } // Select performs select to box space. @@ -271,8 +272,8 @@ func (conn *Connection) Ping() (Response, error) { // Deprecated: the method will be removed in the next major version, // use a SelectRequest object + Do() instead. func (conn *Connection) Select(space, index interface{}, offset, limit uint32, iterator Iter, - key interface{}) (Response, error) { - return conn.SelectAsync(space, index, offset, limit, iterator, key).GetResponse() + key interface{}) ([]interface{}, error) { + return conn.SelectAsync(space, index, offset, limit, iterator, key).Get() } // Insert performs insertion to box space. @@ -282,8 +283,8 @@ func (conn *Connection) Select(space, index interface{}, offset, limit uint32, i // // Deprecated: the method will be removed in the next major version, // use an InsertRequest object + Do() instead. -func (conn *Connection) Insert(space interface{}, tuple interface{}) (Response, error) { - return conn.InsertAsync(space, tuple).GetResponse() +func (conn *Connection) Insert(space interface{}, tuple interface{}) ([]interface{}, error) { + return conn.InsertAsync(space, tuple).Get() } // Replace performs "insert or replace" action to box space. @@ -293,8 +294,8 @@ func (conn *Connection) Insert(space interface{}, tuple interface{}) (Response, // // Deprecated: the method will be removed in the next major version, // use a ReplaceRequest object + Do() instead. -func (conn *Connection) Replace(space interface{}, tuple interface{}) (Response, error) { - return conn.ReplaceAsync(space, tuple).GetResponse() +func (conn *Connection) Replace(space interface{}, tuple interface{}) ([]interface{}, error) { + return conn.ReplaceAsync(space, tuple).Get() } // Delete performs deletion of a tuple by key. @@ -304,8 +305,8 @@ func (conn *Connection) Replace(space interface{}, tuple interface{}) (Response, // // Deprecated: the method will be removed in the next major version, // use a DeleteRequest object + Do() instead. -func (conn *Connection) Delete(space, index interface{}, key interface{}) (Response, error) { - return conn.DeleteAsync(space, index, key).GetResponse() +func (conn *Connection) Delete(space, index interface{}, key interface{}) ([]interface{}, error) { + return conn.DeleteAsync(space, index, key).Get() } // Update performs update of a tuple by key. @@ -315,8 +316,9 @@ func (conn *Connection) Delete(space, index interface{}, key interface{}) (Respo // // Deprecated: the method will be removed in the next major version, // use a UpdateRequest object + Do() instead. -func (conn *Connection) Update(space, index, key interface{}, ops *Operations) (Response, error) { - return conn.UpdateAsync(space, index, key, ops).GetResponse() +func (conn *Connection) Update(space, index, key interface{}, + ops *Operations) ([]interface{}, error) { + return conn.UpdateAsync(space, index, key, ops).Get() } // Upsert performs "update or insert" action of a tuple by key. @@ -326,8 +328,8 @@ func (conn *Connection) Update(space, index, key interface{}, ops *Operations) ( // // Deprecated: the method will be removed in the next major version, // use a UpsertRequest object + Do() instead. -func (conn *Connection) Upsert(space, tuple interface{}, ops *Operations) (Response, error) { - return conn.UpsertAsync(space, tuple, ops).GetResponse() +func (conn *Connection) Upsert(space, tuple interface{}, ops *Operations) ([]interface{}, error) { + return conn.UpsertAsync(space, tuple, ops).Get() } // Call calls registered Tarantool function. @@ -337,8 +339,8 @@ func (conn *Connection) Upsert(space, tuple interface{}, ops *Operations) (Respo // // Deprecated: the method will be removed in the next major version, // use a CallRequest object + Do() instead. -func (conn *Connection) Call(functionName string, args interface{}) (Response, error) { - return conn.CallAsync(functionName, args).GetResponse() +func (conn *Connection) Call(functionName string, args interface{}) ([]interface{}, error) { + return conn.CallAsync(functionName, args).Get() } // Call16 calls registered Tarantool function. @@ -349,8 +351,8 @@ func (conn *Connection) Call(functionName string, args interface{}) (Response, e // // Deprecated: the method will be removed in the next major version, // use a Call16Request object + Do() instead. -func (conn *Connection) Call16(functionName string, args interface{}) (Response, error) { - return conn.Call16Async(functionName, args).GetResponse() +func (conn *Connection) Call16(functionName string, args interface{}) ([]interface{}, error) { + return conn.Call16Async(functionName, args).Get() } // Call17 calls registered Tarantool function. @@ -360,8 +362,8 @@ func (conn *Connection) Call16(functionName string, args interface{}) (Response, // // Deprecated: the method will be removed in the next major version, // use a Call17Request object + Do() instead. -func (conn *Connection) Call17(functionName string, args interface{}) (Response, error) { - return conn.Call17Async(functionName, args).GetResponse() +func (conn *Connection) Call17(functionName string, args interface{}) ([]interface{}, error) { + return conn.Call17Async(functionName, args).Get() } // Eval passes Lua expression for evaluation. @@ -370,8 +372,8 @@ func (conn *Connection) Call17(functionName string, args interface{}) (Response, // // Deprecated: the method will be removed in the next major version, // use an EvalRequest object + Do() instead. -func (conn *Connection) Eval(expr string, args interface{}) (Response, error) { - return conn.EvalAsync(expr, args).GetResponse() +func (conn *Connection) Eval(expr string, args interface{}) ([]interface{}, error) { + return conn.EvalAsync(expr, args).Get() } // Execute passes sql expression to Tarantool for execution. @@ -381,8 +383,8 @@ func (conn *Connection) Eval(expr string, args interface{}) (Response, error) { // // Deprecated: the method will be removed in the next major version, // use an ExecuteRequest object + Do() instead. -func (conn *Connection) Execute(expr string, args interface{}) (Response, error) { - return conn.ExecuteAsync(expr, args).GetResponse() +func (conn *Connection) Execute(expr string, args interface{}) ([]interface{}, error) { + return conn.ExecuteAsync(expr, args).Get() } // single used for conn.GetTyped for decode one tuple. @@ -532,9 +534,20 @@ func (conn *Connection) EvalTyped(expr string, args interface{}, result interfac // use an ExecuteRequest object + Do() instead. func (conn *Connection) ExecuteTyped(expr string, args interface{}, result interface{}) (SQLInfo, []ColumnMetaData, error) { + var ( + sqlInfo SQLInfo + metaData []ColumnMetaData + ) + fut := conn.ExecuteAsync(expr, args) err := fut.GetTyped(&result) - return fut.resp.SQLInfo(), fut.resp.MetaData(), err + if resp, ok := fut.resp.(*ExecuteResponse); ok { + sqlInfo = resp.sqlInfo + metaData = resp.metaData + } else if err == nil { + err = fmt.Errorf("unexpected response type %T, want: *ExecuteResponse", fut.resp) + } + return sqlInfo, metaData, err } // SelectAsync sends select request to Tarantool and returns Future. @@ -807,6 +820,8 @@ type Request interface { Ctx() context.Context // Async returns true if the request does not expect response. Async() bool + // Response creates a response for current request type. + Response(header Header, body io.Reader) (Response, error) } // ConnectedRequest is an interface that provides the info about a Connection @@ -838,6 +853,24 @@ func (req *baseRequest) Ctx() context.Context { return req.ctx } +// Response creates a response for the baseRequest. +func (req *baseRequest) Response(header Header, body io.Reader) (Response, error) { + if body == nil { + return &BaseResponse{header: header}, nil + } + if buf, ok := body.(*smallBuf); ok { + return &BaseResponse{header: header, buf: *buf}, nil + } + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + resp := BaseResponse{ + header: header, buf: smallBuf{b: data}, + } + return &resp, nil +} + type spaceRequest struct { baseRequest space interface{} @@ -928,6 +961,24 @@ func (req authRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { return nil } +// Response creates a response for the authRequest. +func (req authRequest) Response(header Header, body io.Reader) (Response, error) { + if body == nil { + return &BaseResponse{header: header}, nil + } + if buf, ok := body.(*smallBuf); ok { + return &BaseResponse{header: header, buf: *buf}, nil + } + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + resp := BaseResponse{ + header: header, buf: smallBuf{b: data}, + } + return &resp, nil +} + // PingRequest helps you to create an execute request object for execution // by a Connection. type PingRequest struct { @@ -1070,6 +1121,26 @@ func (req *SelectRequest) Context(ctx context.Context) *SelectRequest { return req } +// Response creates a response for the SelectRequest. +func (req *SelectRequest) Response(header Header, body io.Reader) (Response, error) { + if body == nil { + baseResp := BaseResponse{header: header} + return &SelectResponse{BaseResponse: baseResp}, nil + } + if buf, ok := body.(*smallBuf); ok { + baseResp := BaseResponse{header: header, buf: *buf} + return &SelectResponse{BaseResponse: baseResp}, nil + } + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + baseResp := BaseResponse{ + header: header, buf: smallBuf{b: data}, + } + return &SelectResponse{BaseResponse: baseResp}, nil +} + // InsertRequest helps you to create an insert request object for execution // by a Connection. type InsertRequest struct { @@ -1469,6 +1540,26 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest { return req } +// Response creates a response for the ExecuteRequest. +func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, error) { + if body == nil { + baseResp := BaseResponse{header: header} + return &ExecuteResponse{BaseResponse: baseResp}, nil + } + if buf, ok := body.(*smallBuf); ok { + baseResp := BaseResponse{header: header, buf: *buf} + return &ExecuteResponse{BaseResponse: baseResp}, nil + } + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + baseResp := BaseResponse{ + header: header, buf: smallBuf{b: data}, + } + return &ExecuteResponse{BaseResponse: baseResp}, nil +} + // WatchOnceRequest synchronously fetches the value currently associated with a // specified notification key without subscribing to changes. type WatchOnceRequest struct { diff --git a/response.go b/response.go index f2a55c692..5acde7b2b 100644 --- a/response.go +++ b/response.go @@ -2,29 +2,64 @@ package tarantool import ( "fmt" + "io" "github.com/tarantool/go-iproto" "github.com/vmihailenco/msgpack/v5" ) type Response interface { + Header() Header Decode() ([]interface{}, error) DecodeTyped(res interface{}) error - - Pos() []byte - MetaData() []ColumnMetaData - SQLInfo() SQLInfo } -type ConnResponse struct { - header header +type BaseResponse struct { + header Header // data contains deserialized data for untyped requests. - data []interface{} + data []interface{} + buf smallBuf + decoded bool + decodedTyped bool +} + +func (resp *BaseResponse) SetHeader(header Header) { + resp.header = header +} + +type PushResponse struct { + BaseResponse +} + +func createPushResponse(header Header, body io.Reader) (Response, error) { + if body == nil { + resp := BaseResponse{header: header} + return &PushResponse{resp}, nil + } + if buf, ok := body.(*smallBuf); ok { + resp := BaseResponse{header: header, buf: *buf} + return &PushResponse{resp}, nil + } + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + resp := BaseResponse{header: header, buf: smallBuf{b: data}} + return &PushResponse{resp}, nil +} + +type SelectResponse struct { + BaseResponse // pos contains a position descriptor of last selected tuple. - pos []byte + pos []byte +} + +type PrepareResponse ExecuteResponse + +type ExecuteResponse struct { + BaseResponse metaData []ColumnMetaData sqlInfo SQLInfo - buf smallBuf } type ColumnMetaData struct { @@ -41,9 +76,9 @@ type SQLInfo struct { InfoAutoincrementIds []uint64 } -type header struct { - requestId uint32 - code uint32 +type Header struct { + RequestId uint32 + Code uint32 } func (meta *ColumnMetaData) DecodeMsgpack(d *msgpack.Decoder) error { @@ -126,54 +161,153 @@ func smallInt(d *msgpack.Decoder, buf *smallBuf) (i int, err error) { return d.DecodeInt() } -func decodeHeader(d *msgpack.Decoder, buf *smallBuf) (header, error) { +func decodeHeader(d *msgpack.Decoder, buf *smallBuf) (Header, error) { var l int var err error d.Reset(buf) if l, err = d.DecodeMapLen(); err != nil { - return header{}, err + return Header{}, err } - decodedHeader := header{} + decodedHeader := Header{} for ; l > 0; l-- { var cd int if cd, err = smallInt(d, buf); err != nil { - return header{}, err + return Header{}, err } switch iproto.Key(cd) { case iproto.IPROTO_SYNC: var rid uint64 if rid, err = d.DecodeUint64(); err != nil { - return header{}, err + return Header{}, err } - decodedHeader.requestId = uint32(rid) + decodedHeader.RequestId = uint32(rid) case iproto.IPROTO_REQUEST_TYPE: var rcode uint64 if rcode, err = d.DecodeUint64(); err != nil { - return header{}, err + return Header{}, err } - decodedHeader.code = uint32(rcode) + decodedHeader.Code = uint32(rcode) default: if err = d.Skip(); err != nil { - return header{}, err + return Header{}, err } } } return decodedHeader, nil } -func (resp *ConnResponse) Decode() ([]interface{}, error) { +type decodeInfo struct { + stmtID uint64 + bindCount uint64 + serverProtocolInfo ProtocolInfo + errorExtendedInfo *BoxError + + decodedError string +} + +func (info *decodeInfo) parseData(resp *BaseResponse) error { + if info.stmtID != 0 { + stmt := &Prepared{ + StatementID: PreparedID(info.stmtID), + ParamCount: info.bindCount, + } + resp.data = []interface{}{stmt} + return nil + } + + // Tarantool may send only version >= 1 + if info.serverProtocolInfo.Version != ProtocolVersion(0) || + info.serverProtocolInfo.Features != nil { + if info.serverProtocolInfo.Version == ProtocolVersion(0) { + return fmt.Errorf("no protocol version provided in Id response") + } + if info.serverProtocolInfo.Features == nil { + return fmt.Errorf("no features provided in Id response") + } + resp.data = []interface{}{info.serverProtocolInfo} + return nil + } + return nil +} + +func decodeCommonField(d *msgpack.Decoder, cd int, data *[]interface{}, + info *decodeInfo) (bool, error) { + var feature iproto.Feature var err error - if resp.buf.Len() > 2 { - var decodedError string + switch iproto.Key(cd) { + case iproto.IPROTO_DATA: + var res interface{} + var ok bool + if res, err = d.DecodeInterface(); err != nil { + return false, err + } + if *data, ok = res.([]interface{}); !ok { + return false, fmt.Errorf("result is not array: %v", res) + } + case iproto.IPROTO_ERROR: + if info.errorExtendedInfo, err = decodeBoxError(d); err != nil { + return false, err + } + case iproto.IPROTO_ERROR_24: + if info.decodedError, err = d.DecodeString(); err != nil { + return false, err + } + case iproto.IPROTO_STMT_ID: + if info.stmtID, err = d.DecodeUint64(); err != nil { + return false, err + } + case iproto.IPROTO_BIND_COUNT: + if info.bindCount, err = d.DecodeUint64(); err != nil { + return false, err + } + case iproto.IPROTO_VERSION: + if err = d.Decode(&info.serverProtocolInfo.Version); err != nil { + return false, err + } + case iproto.IPROTO_FEATURES: + var larr int + if larr, err = d.DecodeArrayLen(); err != nil { + return false, err + } + + info.serverProtocolInfo.Features = make([]iproto.Feature, larr) + for i := 0; i < larr; i++ { + if err = d.Decode(&feature); err != nil { + return false, err + } + info.serverProtocolInfo.Features[i] = feature + } + case iproto.IPROTO_AUTH_TYPE: + var auth string + if auth, err = d.DecodeString(); err != nil { + return false, err + } + found := false + for _, a := range [...]Auth{ChapSha1Auth, PapSha256Auth} { + if auth == a.String() { + info.serverProtocolInfo.Auth = a + found = true + } + } + if !found { + return false, fmt.Errorf("unknown auth type %s", auth) + } + default: + return false, nil + } + return true, nil +} + +func (resp *BaseResponse) Decode() ([]interface{}, error) { + resp.decoded = true + var err error + if resp.buf.Len() > 2 { offset := resp.buf.Offset() defer resp.buf.Seek(offset) - var l, larr int - var stmtID, bindCount uint64 - var serverProtocolInfo ProtocolInfo - var feature iproto.Feature - var errorExtendedInfo *BoxError = nil + var l int + info := &decodeInfo{} d := msgpack.NewDecoder(&resp.buf) d.SetMapDecoder(func(dec *msgpack.Decoder) (interface{}, error) { @@ -188,117 +322,171 @@ func (resp *ConnResponse) Decode() ([]interface{}, error) { if cd, err = smallInt(d, &resp.buf); err != nil { return nil, err } - switch iproto.Key(cd) { - case iproto.IPROTO_DATA: - var res interface{} - var ok bool - if res, err = d.DecodeInterface(); err != nil { - return nil, err - } - if resp.data, ok = res.([]interface{}); !ok { - return nil, fmt.Errorf("result is not array: %v", res) - } - case iproto.IPROTO_ERROR: - if errorExtendedInfo, err = decodeBoxError(d); err != nil { - return nil, err - } - case iproto.IPROTO_ERROR_24: - if decodedError, err = d.DecodeString(); err != nil { - return nil, err - } - case iproto.IPROTO_SQL_INFO: - if err = d.Decode(&resp.sqlInfo); err != nil { - return nil, err - } - case iproto.IPROTO_METADATA: - if err = d.Decode(&resp.metaData); err != nil { - return nil, err - } - case iproto.IPROTO_STMT_ID: - if stmtID, err = d.DecodeUint64(); err != nil { - return nil, err - } - case iproto.IPROTO_BIND_COUNT: - if bindCount, err = d.DecodeUint64(); err != nil { - return nil, err - } - case iproto.IPROTO_VERSION: - if err = d.Decode(&serverProtocolInfo.Version); err != nil { - return nil, err - } - case iproto.IPROTO_FEATURES: - if larr, err = d.DecodeArrayLen(); err != nil { + decoded, err := decodeCommonField(d, cd, &resp.data, info) + if err != nil { + return nil, err + } + if !decoded { + if err = d.Skip(); err != nil { return nil, err } + } + } + err = info.parseData(resp) + if err != nil { + return nil, err + } - serverProtocolInfo.Features = make([]iproto.Feature, larr) - for i := 0; i < larr; i++ { - if err = d.Decode(&feature); err != nil { - return nil, err + if info.decodedError != "" { + resp.header.Code &^= uint32(iproto.IPROTO_TYPE_ERROR) + err = Error{iproto.Error(resp.header.Code), info.decodedError, info.errorExtendedInfo} + } + } + return resp.data, err +} + +func (resp *SelectResponse) Decode() ([]interface{}, error) { + resp.decoded = true + var err error + if resp.buf.Len() > 2 { + offset := resp.buf.Offset() + defer resp.buf.Seek(offset) + + var l int + info := &decodeInfo{} + + d := msgpack.NewDecoder(&resp.buf) + d.SetMapDecoder(func(dec *msgpack.Decoder) (interface{}, error) { + return dec.DecodeUntypedMap() + }) + + if l, err = d.DecodeMapLen(); err != nil { + return nil, err + } + for ; l > 0; l-- { + var cd int + if cd, err = smallInt(d, &resp.buf); err != nil { + return nil, err + } + decoded, err := decodeCommonField(d, cd, &resp.data, info) + if err != nil { + return nil, err + } + if !decoded { + switch iproto.Key(cd) { + case iproto.IPROTO_POSITION: + if resp.pos, err = d.DecodeBytes(); err != nil { + return nil, fmt.Errorf("unable to decode a position: %w", err) } - serverProtocolInfo.Features[i] = feature - } - case iproto.IPROTO_AUTH_TYPE: - var auth string - if auth, err = d.DecodeString(); err != nil { - return nil, err - } - found := false - for _, a := range [...]Auth{ChapSha1Auth, PapSha256Auth} { - if auth == a.String() { - serverProtocolInfo.Auth = a - found = true + default: + if err = d.Skip(); err != nil { + return nil, err } } - if !found { - return nil, fmt.Errorf("unknown auth type %s", auth) - } - case iproto.IPROTO_POSITION: - if resp.pos, err = d.DecodeBytes(); err != nil { - return nil, fmt.Errorf("unable to decode a position: %w", err) - } - default: - if err = d.Skip(); err != nil { - return nil, err - } } } - if stmtID != 0 { - stmt := &Prepared{ - StatementID: PreparedID(stmtID), - ParamCount: bindCount, - MetaData: resp.metaData, - } - resp.data = []interface{}{stmt} + err = info.parseData(&resp.BaseResponse) + if err != nil { + return nil, err + } + + if info.decodedError != "" { + resp.header.Code &^= uint32(iproto.IPROTO_TYPE_ERROR) + err = Error{iproto.Error(resp.header.Code), info.decodedError, info.errorExtendedInfo} } + } + return resp.data, err +} + +func (resp *ExecuteResponse) Decode() ([]interface{}, error) { + resp.decoded = true + var err error + if resp.buf.Len() > 2 { + offset := resp.buf.Offset() + defer resp.buf.Seek(offset) + + var l int + info := &decodeInfo{} + + d := msgpack.NewDecoder(&resp.buf) + d.SetMapDecoder(func(dec *msgpack.Decoder) (interface{}, error) { + return dec.DecodeUntypedMap() + }) - // Tarantool may send only version >= 1 - if serverProtocolInfo.Version != ProtocolVersion(0) || serverProtocolInfo.Features != nil { - if serverProtocolInfo.Version == ProtocolVersion(0) { - return nil, fmt.Errorf("no protocol version provided in Id response") + if l, err = d.DecodeMapLen(); err != nil { + return nil, err + } + for ; l > 0; l-- { + var cd int + if cd, err = smallInt(d, &resp.buf); err != nil { + return nil, err } - if serverProtocolInfo.Features == nil { - return nil, fmt.Errorf("no features provided in Id response") + decoded, err := decodeCommonField(d, cd, &resp.data, info) + if err != nil { + return nil, err + } + if !decoded { + switch iproto.Key(cd) { + case iproto.IPROTO_SQL_INFO: + if err = d.Decode(&resp.sqlInfo); err != nil { + return nil, err + } + case iproto.IPROTO_METADATA: + if err = d.Decode(&resp.metaData); err != nil { + return nil, err + } + default: + if err = d.Skip(); err != nil { + return nil, err + } + } } - resp.data = []interface{}{serverProtocolInfo} + } + err = info.parseData(&resp.BaseResponse) + if err != nil { + return nil, err } - if decodedError != "" { - resp.header.code &^= uint32(iproto.IPROTO_TYPE_ERROR) - err = Error{iproto.Error(resp.header.code), decodedError, errorExtendedInfo} + if info.decodedError != "" { + resp.header.Code &^= uint32(iproto.IPROTO_TYPE_ERROR) + err = Error{iproto.Error(resp.header.Code), info.decodedError, info.errorExtendedInfo} } } return resp.data, err } -func (resp *ConnResponse) DecodeTyped(res interface{}) error { +func decodeTypedCommonField(d *msgpack.Decoder, res interface{}, cd int, + info *decodeInfo) (bool, error) { + var err error + + switch iproto.Key(cd) { + case iproto.IPROTO_DATA: + if err = d.Decode(res); err != nil { + return false, err + } + case iproto.IPROTO_ERROR: + if info.errorExtendedInfo, err = decodeBoxError(d); err != nil { + return false, err + } + case iproto.IPROTO_ERROR_24: + if info.decodedError, err = d.DecodeString(); err != nil { + return false, err + } + default: + return false, nil + } + return true, nil +} + +func (resp *BaseResponse) DecodeTyped(res interface{}) error { + resp.decodedTyped = true + var err error if resp.buf.Len() > 0 { offset := resp.buf.Offset() defer resp.buf.Seek(offset) - var errorExtendedInfo *BoxError = nil - + info := &decodeInfo{} var l int d := msgpack.NewDecoder(&resp.buf) @@ -309,66 +497,162 @@ func (resp *ConnResponse) DecodeTyped(res interface{}) error { if l, err = d.DecodeMapLen(); err != nil { return err } - var decodedError string for ; l > 0; l-- { var cd int if cd, err = smallInt(d, &resp.buf); err != nil { return err } - switch iproto.Key(cd) { - case iproto.IPROTO_DATA: - if err = d.Decode(res); err != nil { - return err - } - case iproto.IPROTO_ERROR: - if errorExtendedInfo, err = decodeBoxError(d); err != nil { - return err - } - case iproto.IPROTO_ERROR_24: - if decodedError, err = d.DecodeString(); err != nil { - return err - } - case iproto.IPROTO_SQL_INFO: - if err = d.Decode(&resp.sqlInfo); err != nil { - return err - } - case iproto.IPROTO_METADATA: - if err = d.Decode(&resp.metaData); err != nil { + decoded, err := decodeTypedCommonField(d, res, cd, info) + if err != nil { + return err + } + if !decoded { + if err = d.Skip(); err != nil { return err } - case iproto.IPROTO_POSITION: - if resp.pos, err = d.DecodeBytes(); err != nil { - return fmt.Errorf("unable to decode a position: %w", err) + } + } + if info.decodedError != "" { + resp.header.Code &^= uint32(iproto.IPROTO_TYPE_ERROR) + err = Error{iproto.Error(resp.header.Code), info.decodedError, info.errorExtendedInfo} + } + } + return err +} + +func (resp *SelectResponse) DecodeTyped(res interface{}) error { + resp.decodedTyped = true + + var err error + if resp.buf.Len() > 0 { + offset := resp.buf.Offset() + defer resp.buf.Seek(offset) + + info := &decodeInfo{} + var l int + + d := msgpack.NewDecoder(&resp.buf) + d.SetMapDecoder(func(dec *msgpack.Decoder) (interface{}, error) { + return dec.DecodeUntypedMap() + }) + + if l, err = d.DecodeMapLen(); err != nil { + return err + } + for ; l > 0; l-- { + var cd int + if cd, err = smallInt(d, &resp.buf); err != nil { + return err + } + decoded, err := decodeTypedCommonField(d, res, cd, info) + if err != nil { + return err + } + if !decoded { + switch iproto.Key(cd) { + case iproto.IPROTO_POSITION: + if resp.pos, err = d.DecodeBytes(); err != nil { + return fmt.Errorf("unable to decode a position: %w", err) + } + default: + if err = d.Skip(); err != nil { + return err + } } - default: - if err = d.Skip(); err != nil { - return err + } + } + if info.decodedError != "" { + resp.header.Code &^= uint32(iproto.IPROTO_TYPE_ERROR) + err = Error{iproto.Error(resp.header.Code), info.decodedError, info.errorExtendedInfo} + } + } + return err +} + +func (resp *ExecuteResponse) DecodeTyped(res interface{}) error { + resp.decodedTyped = true + + var err error + if resp.buf.Len() > 0 { + offset := resp.buf.Offset() + defer resp.buf.Seek(offset) + + info := &decodeInfo{} + var l int + + d := msgpack.NewDecoder(&resp.buf) + d.SetMapDecoder(func(dec *msgpack.Decoder) (interface{}, error) { + return dec.DecodeUntypedMap() + }) + + if l, err = d.DecodeMapLen(); err != nil { + return err + } + for ; l > 0; l-- { + var cd int + if cd, err = smallInt(d, &resp.buf); err != nil { + return err + } + decoded, err := decodeTypedCommonField(d, res, cd, info) + if err != nil { + return err + } + if !decoded { + switch iproto.Key(cd) { + case iproto.IPROTO_SQL_INFO: + if err = d.Decode(&resp.sqlInfo); err != nil { + return err + } + case iproto.IPROTO_METADATA: + if err = d.Decode(&resp.metaData); err != nil { + return err + } + default: + if err = d.Skip(); err != nil { + return err + } } } } - if decodedError != "" { - resp.header.code &^= uint32(iproto.IPROTO_TYPE_ERROR) - err = Error{iproto.Error(resp.header.code), decodedError, errorExtendedInfo} + if info.decodedError != "" { + resp.header.Code &^= uint32(iproto.IPROTO_TYPE_ERROR) + err = Error{iproto.Error(resp.header.Code), info.decodedError, info.errorExtendedInfo} } } return err } -func (resp *ConnResponse) Pos() []byte { - return resp.pos +func (resp *BaseResponse) Header() Header { + return resp.header } -func (resp *ConnResponse) MetaData() []ColumnMetaData { - return resp.metaData +func (resp *SelectResponse) Pos() ([]byte, error) { + var err error + if !resp.decoded && !resp.decodedTyped { + _, err = resp.Decode() + } + return resp.pos, err } -func (resp *ConnResponse) SQLInfo() SQLInfo { - return resp.sqlInfo +func (resp *ExecuteResponse) MetaData() ([]ColumnMetaData, error) { + var err error + if !resp.decoded && !resp.decodedTyped { + _, err = resp.Decode() + } + return resp.metaData, err +} + +func (resp *ExecuteResponse) SQLInfo() (SQLInfo, error) { + var err error + if !resp.decoded && !resp.decodedTyped { + _, err = resp.Decode() + } + return resp.sqlInfo, err } -func (resp *ConnResponse) String() (str string) { - if resp.header.code == OkCode { - return fmt.Sprintf("<%d OK %v>", resp.header.requestId, resp.data) +func (resp *BaseResponse) String() (str string) { + if resp.header.Code == OkCode { + return fmt.Sprintf("<%d OK %v>", resp.header.RequestId, resp.data) } - return fmt.Sprintf("<%d ERR 0x%x>", resp.header.requestId, resp.header.code) + return fmt.Sprintf("<%d ERR 0x%x>", resp.header.RequestId, resp.header.Code) } diff --git a/settings/example_test.go b/settings/example_test.go index 0a1dcc9b7..e51cadef0 100644 --- a/settings/example_test.go +++ b/settings/example_test.go @@ -69,12 +69,23 @@ func Example_sqlFullColumnNames() { // Get some data with SQL query. req = tarantool.NewExecuteRequest("SELECT x FROM example WHERE id = 1;") - _, err = conn.Do(req).Get() + resp, err = conn.Do(req).GetResponse() if err != nil { fmt.Printf("error on select: %v\n", err) return } - metaData := resp.MetaData() + + exResp, ok := resp.(*tarantool.ExecuteResponse) + if !ok { + fmt.Printf("wrong response type") + return + } + + metaData, err := exResp.MetaData() + if err != nil { + fmt.Printf("error on getting MetaData: %v\n", err) + return + } // Show response metadata. fmt.Printf("full column name: %v\n", metaData[0].FieldName) @@ -86,12 +97,21 @@ func Example_sqlFullColumnNames() { } // Get some data with SQL query. - _, err = conn.Do(req).Get() + resp, err = conn.Do(req).GetResponse() if err != nil { fmt.Printf("error on select: %v\n", err) return } - metaData = resp.MetaData() + exResp, ok = resp.(*tarantool.ExecuteResponse) + if !ok { + fmt.Printf("wrong response type") + return + } + metaData, err = exResp.MetaData() + if err != nil { + fmt.Printf("error on getting MetaData: %v\n", err) + return + } // Show response metadata. fmt.Printf("short column name: %v\n", metaData[0].FieldName) } diff --git a/settings/request.go b/settings/request.go index 02252fe47..1c106dc8d 100644 --- a/settings/request.go +++ b/settings/request.go @@ -60,6 +60,7 @@ package settings import ( "context" + "io" "github.com/tarantool/go-iproto" "github.com/vmihailenco/msgpack/v5" @@ -107,6 +108,12 @@ func (req *SetRequest) Async() bool { return req.impl.Async() } +// Response creates a response for the SetRequest. +func (req *SetRequest) Response(header tarantool.Header, + body io.Reader) (tarantool.Response, error) { + return req.impl.Response(header, body) +} + // GetRequest helps to get session settings. type GetRequest struct { impl *tarantool.SelectRequest @@ -147,6 +154,12 @@ func (req *GetRequest) Async() bool { return req.impl.Async() } +// Response creates a response for the GetRequest. +func (req *GetRequest) Response(header tarantool.Header, + body io.Reader) (tarantool.Response, error) { + return req.impl.Response(header, body) +} + // NewErrorMarshalingEnabledSetRequest creates a request to // update current session ErrorMarshalingEnabled setting. func NewErrorMarshalingEnabledSetRequest(value bool) *SetRequest { diff --git a/settings/tarantool_test.go b/settings/tarantool_test.go index 6ca64455a..56cee33ce 100644 --- a/settings/tarantool_test.go +++ b/settings/tarantool_test.go @@ -114,7 +114,11 @@ func TestSQLDefaultEngineSetting(t *testing.T) { resp, err := conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok := resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err := exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Check new space engine. eval := tarantool.NewEvalRequest("return box.space['T1_VINYL'].engine") @@ -137,7 +141,11 @@ func TestSQLDefaultEngineSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*tarantool.ExecuteResponse) + sqlInfo, err = exResp.SQLInfo() + require.Nil(t, err) + require.True(t, ok, "wrong response type") + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Check new space engine. eval = tarantool.NewEvalRequest("return box.space['T2_MEMTX'].engine") @@ -161,7 +169,11 @@ func TestSQLDeferForeignKeysSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok := resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err := exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Create a space with reference to the parent space. exec = tarantool.NewExecuteRequest( @@ -169,7 +181,11 @@ func TestSQLDeferForeignKeysSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err = exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) deferEval := ` box.begin() @@ -229,14 +245,22 @@ func TestSQLFullColumnNamesSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok := resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err := exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Fill it with some data. exec = tarantool.NewExecuteRequest("INSERT INTO FKNAME VALUES (1, 1);") resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err = exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Disable displaying full column names in metadata. data, err := conn.Do(NewSQLFullColumnNamesSetRequest(false)).Get() @@ -253,7 +277,11 @@ func TestSQLFullColumnNamesSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, "X", resp.MetaData()[0].FieldName) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + metaData, err := exResp.MetaData() + require.Nil(t, err) + require.Equal(t, "X", metaData[0].FieldName) // Enable displaying full column names in metadata. data, err = conn.Do(NewSQLFullColumnNamesSetRequest(true)).Get() @@ -270,7 +298,11 @@ func TestSQLFullColumnNamesSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, "FKNAME.X", resp.MetaData()[0].FieldName) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + metaData, err = exResp.MetaData() + require.Nil(t, err) + require.Equal(t, "FKNAME.X", metaData[0].FieldName) } func TestSQLFullMetadataSetting(t *testing.T) { @@ -287,14 +319,22 @@ func TestSQLFullMetadataSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok := resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err := exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Fill it with some data. exec = tarantool.NewExecuteRequest("INSERT INTO fmt VALUES (1, 1);") resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err = exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Disable displaying additional fields in metadata. data, err := conn.Do(NewSQLFullMetadataSetRequest(false)).Get() @@ -311,7 +351,11 @@ func TestSQLFullMetadataSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, "", resp.MetaData()[0].FieldSpan) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + metaData, err := exResp.MetaData() + require.Nil(t, err) + require.Equal(t, "", metaData[0].FieldSpan) // Enable displaying full column names in metadata. data, err = conn.Do(NewSQLFullMetadataSetRequest(true)).Get() @@ -328,7 +372,11 @@ func TestSQLFullMetadataSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, "x", resp.MetaData()[0].FieldSpan) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + metaData, err = exResp.MetaData() + require.Nil(t, err) + require.Equal(t, "x", metaData[0].FieldSpan) } func TestSQLParserDebugSetting(t *testing.T) { @@ -378,14 +426,22 @@ func TestSQLRecursiveTriggersSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok := resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err := exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Fill it with some data. exec = tarantool.NewExecuteRequest("INSERT INTO rec VALUES(1, 1, 2);") resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err = exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Create a recursive trigger (with infinite depth). exec = tarantool.NewExecuteRequest(` @@ -395,7 +451,11 @@ func TestSQLRecursiveTriggersSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err = exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Enable SQL recursive triggers. data, err := conn.Do(NewSQLRecursiveTriggersSetRequest(true)).Get() @@ -429,7 +489,11 @@ func TestSQLRecursiveTriggersSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err = exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) } func TestSQLReverseUnorderedSelectsSetting(t *testing.T) { @@ -446,20 +510,32 @@ func TestSQLReverseUnorderedSelectsSetting(t *testing.T) { resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok := resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err := exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Fill it with some data. exec = tarantool.NewExecuteRequest("INSERT INTO data VALUES('1');") resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err = exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) exec = tarantool.NewExecuteRequest("INSERT INTO data VALUES('2');") resp, err = conn.Do(exec).GetResponse() require.Nil(t, err) require.NotNil(t, resp) - require.Equal(t, uint64(1), resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*tarantool.ExecuteResponse) + require.True(t, ok, "wrong response type") + sqlInfo, err = exResp.SQLInfo() + require.Nil(t, err) + require.Equal(t, uint64(1), sqlInfo.AffectedCount) // Disable reverse order in unordered selects. data, err := conn.Do(NewSQLReverseUnorderedSelectsSetRequest(false)).Get() diff --git a/tarantool_test.go b/tarantool_test.go index aa3f248f9..93bc6d2e8 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -821,39 +821,25 @@ func TestFutureMultipleGetTypedWithError(t *testing.T) { /////////////////// func TestClient(t *testing.T) { - var resp Response var err error conn := test_helpers.ConnectWithValidation(t, dialer, opts) defer conn.Close() // Ping - resp, err = conn.Ping() + data, err := conn.Ping() if err != nil { t.Fatalf("Failed to Ping: %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Ping") - } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") + if data != nil { + t.Fatalf("Response data is not nil after Ping") } // Insert - resp, err = conn.Insert(spaceNo, []interface{}{uint(1), "hello", "world"}) + data, err = conn.Insert(spaceNo, []interface{}{uint(1), "hello", "world"}) if err != nil { t.Fatalf("Failed to Insert: %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Insert") - } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err := resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if len(data) != 1 { t.Errorf("Response Body len != 1") } @@ -870,30 +856,19 @@ func TestClient(t *testing.T) { t.Errorf("Unexpected body of Insert (1)") } } - resp, err = conn.Insert(spaceNo, &Tuple{Id: 1, Msg: "hello", Name: "world"}) + data, err = conn.Insert(spaceNo, &Tuple{Id: 1, Msg: "hello", Name: "world"}) if tntErr, ok := err.(Error); !ok || tntErr.Code != iproto.ER_TUPLE_FOUND { t.Errorf("Expected %s but got: %v", iproto.ER_TUPLE_FOUND, err) } - data, _ = resp.Decode() if len(data) != 0 { t.Errorf("Response Body len != 0") } // Delete - resp, err = conn.Delete(spaceNo, indexNo, []interface{}{uint(1)}) + data, err = conn.Delete(spaceNo, indexNo, []interface{}{uint(1)}) if err != nil { t.Fatalf("Failed to Delete: %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Delete") - } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if len(data) != 1 { t.Errorf("Response Body len != 1") } @@ -910,46 +885,26 @@ func TestClient(t *testing.T) { t.Errorf("Unexpected body of Delete (1)") } } - resp, err = conn.Delete(spaceNo, indexNo, []interface{}{uint(101)}) + data, err = conn.Delete(spaceNo, indexNo, []interface{}{uint(101)}) if err != nil { t.Fatalf("Failed to Delete: %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Delete") - } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if len(data) != 0 { t.Errorf("Response Data len != 0") } // Replace - resp, err = conn.Replace(spaceNo, []interface{}{uint(2), "hello", "world"}) + data, err = conn.Replace(spaceNo, []interface{}{uint(2), "hello", "world"}) if err != nil { t.Fatalf("Failed to Replace: %s", err.Error()) } - if resp == nil { + if data == nil { t.Fatalf("Response is nil after Replace") } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - resp, err = conn.Replace(spaceNo, []interface{}{uint(2), "hi", "planet"}) + data, err = conn.Replace(spaceNo, []interface{}{uint(2), "hi", "planet"}) if err != nil { t.Fatalf("Failed to Replace (duplicate): %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Replace (duplicate)") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if len(data) != 1 { t.Errorf("Response Data len != 1") } @@ -968,21 +923,11 @@ func TestClient(t *testing.T) { } // Update - resp, err = conn.Update(spaceNo, indexNo, []interface{}{uint(2)}, + data, err = conn.Update(spaceNo, indexNo, []interface{}{uint(2)}, NewOperations().Assign(1, "bye").Delete(2, 1)) if err != nil { t.Fatalf("Failed to Update: %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Update") - } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if len(data) != 1 { t.Errorf("Response Data len != 1") } @@ -1001,56 +946,39 @@ func TestClient(t *testing.T) { } // Upsert - resp, err = conn.Upsert(spaceNo, []interface{}{uint(3), 1}, + data, err = conn.Upsert(spaceNo, []interface{}{uint(3), 1}, NewOperations().Add(1, 1)) if err != nil { t.Fatalf("Failed to Upsert (insert): %s", err.Error()) } - if resp == nil { + if data == nil { t.Fatalf("Response is nil after Upsert (insert)") } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - resp, err = conn.Upsert(spaceNo, []interface{}{uint(3), 1}, + data, err = conn.Upsert(spaceNo, []interface{}{uint(3), 1}, NewOperations().Add(1, 1)) if err != nil { t.Fatalf("Failed to Upsert (update): %s", err.Error()) } - if resp == nil { + if data == nil { t.Errorf("Response is nil after Upsert (update)") } // Select for i := 10; i < 20; i++ { - resp, err = conn.Replace(spaceNo, []interface{}{uint(i), fmt.Sprintf("val %d", i), "bla"}) + data, err = conn.Replace(spaceNo, []interface{}{uint(i), fmt.Sprintf("val %d", i), "bla"}) if err != nil { t.Fatalf("Failed to Replace: %s", err.Error()) } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - _, err := resp.Decode() - if err != nil { - t.Errorf("Failed to replace: %s", err.Error()) + if data == nil { + t.Errorf("Response is nil after Replace") } } - resp, err = conn.Select(spaceNo, indexNo, 0, 1, IterEq, []interface{}{uint(10)}) + data, err = conn.Select(spaceNo, indexNo, 0, 1, IterEq, []interface{}{uint(10)}) if err != nil { t.Fatalf("Failed to Select: %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Select") - } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if len(data) != 1 { - t.Errorf("Response Data len != 1") + t.Fatalf("Response Data len != 1") } if tpl, ok := data[0].([]interface{}); !ok { t.Errorf("Unexpected body of Select") @@ -1064,20 +992,10 @@ func TestClient(t *testing.T) { } // Select empty - resp, err = conn.Select(spaceNo, indexNo, 0, 1, IterEq, []interface{}{uint(30)}) + data, err = conn.Select(spaceNo, indexNo, 0, 1, IterEq, []interface{}{uint(30)}) if err != nil { t.Fatalf("Failed to Select: %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Select") - } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if len(data) != 0 { t.Errorf("Response Data len != 0") } @@ -1137,70 +1055,36 @@ func TestClient(t *testing.T) { } // Call16 - resp, err = conn.Call16("box.info", []interface{}{"box.schema.SPACE_ID"}) + data, err = conn.Call16("box.info", []interface{}{"box.schema.SPACE_ID"}) if err != nil { t.Fatalf("Failed to Call16: %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Call16") - } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if len(data) < 1 { t.Errorf("Response.Data is empty after Eval") } // Call16 vs Call17 - resp, err = conn.Call16("simple_concat", []interface{}{"1"}) + data, err = conn.Call16("simple_concat", []interface{}{"1"}) if err != nil { t.Errorf("Failed to use Call16") } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if val, ok := data[0].([]interface{})[0].(string); !ok || val != "11" { t.Errorf("result is not {{1}} : %v", data) } - resp, err = conn.Call17("simple_concat", []interface{}{"1"}) + data, err = conn.Call17("simple_concat", []interface{}{"1"}) if err != nil { t.Errorf("Failed to use Call") } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if val, ok := data[0].(string); !ok || val != "11" { t.Errorf("result is not {{1}} : %v", data) } // Eval - resp, err = conn.Eval("return 5 + 6", []interface{}{}) + data, err = conn.Eval("return 5 + 6", []interface{}{}) if err != nil { t.Fatalf("Failed to Eval: %s", err.Error()) } - if resp == nil { - t.Fatalf("Response is nil after Eval") - } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } - data, err = resp.Decode() - if err != nil { - t.Fatalf("Failed to Decode: %s", err.Error()) - } if len(data) < 1 { t.Errorf("Response.Data is empty after Eval") } @@ -1260,6 +1144,9 @@ func TestClientSessionPush(t *testing.T) { } for i := 0; i < len(its); i++ { + pushCnt := uint64(0) + respCnt := uint64(0) + it = its[i] for it.Next() { resp := it.Value() @@ -1267,9 +1154,6 @@ func TestClientSessionPush(t *testing.T) { t.Errorf("Response is empty after it.Next() == true") break } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } data, err := resp.Decode() if err != nil { t.Errorf("Failed to Decode: %s", err.Error()) @@ -1279,11 +1163,32 @@ func TestClientSessionPush(t *testing.T) { t.Errorf("Response.Data is empty after CallAsync") break } + if it.IsPush() { + pushCnt += 1 + val, err := test_helpers.ConvertUint64(data[0]) + if err != nil || val != pushCnt { + t.Errorf("Unexpected push data = %v", data) + } + } else { + respCnt += 1 + val, err := test_helpers.ConvertUint64(data[0]) + if err != nil || val != pushMax { + t.Errorf("Result is not %d: %v", pushMax, data) + } + } } if err = it.Err(); err != nil { t.Errorf("An unexpected iteration error: %s", err.Error()) } + + if pushCnt != pushMax { + t.Errorf("Expect %d pushes but got %d", pushMax, pushCnt) + } + + if respCnt != 1 { + t.Errorf("Expect %d responses but got %d", 1, respCnt) + } } // We can collect original responses after iterations. @@ -1472,7 +1377,8 @@ func TestSQL(t *testing.T) { defer conn.Close() for i, test := range testCases { - resp, err := conn.Execute(test.Query, test.Args) + req := NewExecuteRequest(test.Query).Args(test.Args) + resp, err := conn.Do(req).GetResponse() assert.NoError(t, err, "Failed to Execute, query: %s", test.Query) assert.NotNil(t, resp, "Response is nil after Execute\nQuery number: %d", i) data, err := resp.Decode() @@ -1480,21 +1386,18 @@ func TestSQL(t *testing.T) { for j := range data { assert.Equal(t, data[j], test.data[j], "Response data is wrong") } - assert.Equal(t, resp.SQLInfo().AffectedCount, test.sqlInfo.AffectedCount, + exResp, ok := resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + sqlInfo, err := exResp.SQLInfo() + assert.Nil(t, err, "Error while getting SQLInfo") + assert.Equal(t, sqlInfo.AffectedCount, test.sqlInfo.AffectedCount, "Affected count is wrong") errorMsg := "Response Metadata is wrong" - metaData := resp.MetaData() + metaData, err := exResp.MetaData() + assert.Nil(t, err, "Error while getting MetaData") for j := range metaData { - assert.Equal(t, metaData[j].FieldIsAutoincrement, - test.metaData[j].FieldIsAutoincrement, errorMsg) - assert.Equal(t, metaData[j].FieldIsNullable, - test.metaData[j].FieldIsNullable, errorMsg) - assert.Equal(t, metaData[j].FieldCollation, - test.metaData[j].FieldCollation, errorMsg) - assert.Equal(t, metaData[j].FieldName, test.metaData[j].FieldName, errorMsg) - assert.Equal(t, metaData[j].FieldSpan, test.metaData[j].FieldSpan, errorMsg) - assert.Equal(t, metaData[j].FieldType, test.metaData[j].FieldType, errorMsg) + assert.Equal(t, metaData[j], test.metaData[j], errorMsg) } } } @@ -1575,7 +1478,8 @@ func TestSQLBindings(t *testing.T) { } for _, bind := range namedSQLBinds { - resp, err := conn.Execute(selectNamedQuery2, bind) + req := NewExecuteRequest(selectNamedQuery2).Args(bind) + resp, err := conn.Do(req).GetResponse() if err != nil { t.Fatalf("Failed to Execute: %s", err.Error()) } @@ -1589,7 +1493,10 @@ func TestSQLBindings(t *testing.T) { if reflect.DeepEqual(data[0], []interface{}{1, testData[1]}) { t.Error("Select with named arguments failed") } - metaData := resp.MetaData() + exResp, ok := resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + metaData, err := exResp.MetaData() + assert.Nil(t, err, "Error while getting MetaData") if metaData[0].FieldType != "unsigned" || metaData[0].FieldName != "NAME0" || metaData[1].FieldType != "string" || @@ -1598,7 +1505,8 @@ func TestSQLBindings(t *testing.T) { } } - resp, err := conn.Execute(selectPosQuery2, sqlBind5) + req := NewExecuteRequest(selectPosQuery2).Args(sqlBind5) + resp, err := conn.Do(req).GetResponse() if err != nil { t.Fatalf("Failed to Execute: %s", err.Error()) } @@ -1612,7 +1520,10 @@ func TestSQLBindings(t *testing.T) { if reflect.DeepEqual(data[0], []interface{}{1, testData[1]}) { t.Error("Select with positioned arguments failed") } - metaData := resp.MetaData() + exResp, ok := resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + metaData, err := exResp.MetaData() + assert.Nil(t, err, "Error while getting MetaData") if metaData[0].FieldType != "unsigned" || metaData[0].FieldName != "NAME0" || metaData[1].FieldType != "string" || @@ -1620,7 +1531,8 @@ func TestSQLBindings(t *testing.T) { t.Error("Wrong metadata") } - resp, err = conn.Execute(mixedQuery, sqlBind6) + req = NewExecuteRequest(mixedQuery).Args(sqlBind6) + resp, err = conn.Do(req).GetResponse() if err != nil { t.Fatalf("Failed to Execute: %s", err.Error()) } @@ -1634,7 +1546,10 @@ func TestSQLBindings(t *testing.T) { if reflect.DeepEqual(data[0], []interface{}{1, testData[1]}) { t.Error("Select with positioned arguments failed") } - metaData = resp.MetaData() + exResp, ok = resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + metaData, err = exResp.MetaData() + assert.Nil(t, err, "Error while getting MetaData") if metaData[0].FieldType != "unsigned" || metaData[0].FieldName != "NAME0" || metaData[1].FieldType != "string" || @@ -1646,81 +1561,131 @@ func TestSQLBindings(t *testing.T) { func TestStressSQL(t *testing.T) { test_helpers.SkipIfSQLUnsupported(t) - var resp Response - conn := test_helpers.ConnectWithValidation(t, dialer, opts) defer conn.Close() - resp, err := conn.Execute(createTableQuery, []interface{}{}) + req := NewExecuteRequest(createTableQuery) + resp, err := conn.Do(req).GetResponse() if err != nil { - t.Fatalf("Failed to Execute: %s", err.Error()) + t.Fatalf("Failed to create an Execute: %s", err.Error()) } if resp == nil { t.Fatal("Response is nil after Execute") } - if resp.SQLInfo().AffectedCount != 1 { - t.Errorf("Incorrect count of created spaces: %d", resp.SQLInfo().AffectedCount) + exResp, ok := resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + sqlInfo, err := exResp.SQLInfo() + assert.Nil(t, err, "Error while getting SQLInfo") + if sqlInfo.AffectedCount != 1 { + t.Errorf("Incorrect count of created spaces: %d", sqlInfo.AffectedCount) } // create table with the same name - resp, err = conn.Execute(createTableQuery, []interface{}{}) - if err == nil { - t.Fatal("Unexpected lack of error") + req = NewExecuteRequest(createTableQuery) + resp, err = conn.Do(req).GetResponse() + if err != nil { + t.Fatalf("Failed to create an Execute: %s", err.Error()) } if resp == nil { t.Fatal("Response is nil after Execute") } + _, err = resp.Decode() + assert.NotNil(t, err, "Expected error while decoding") + + tntErr, ok := err.(Error) + assert.True(t, ok) + assert.Equal(t, iproto.ER_SPACE_EXISTS, tntErr.Code) + if iproto.Error(resp.Header().Code) != iproto.ER_SPACE_EXISTS { + t.Fatalf("Unexpected response code: %d", resp.Header().Code) + } - if resp.SQLInfo().AffectedCount != 0 { - t.Errorf("Incorrect count of created spaces: %d", resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + sqlInfo, err = exResp.SQLInfo() + assert.Nil(t, err, "Unexpected error") + if sqlInfo.AffectedCount != 0 { + t.Errorf("Incorrect count of created spaces: %d", sqlInfo.AffectedCount) } // execute with nil argument - resp, err = conn.Execute(createTableQuery, nil) - if err == nil { - t.Fatal("Unexpected lack of error") + req = NewExecuteRequest(createTableQuery).Args(nil) + resp, err = conn.Do(req).GetResponse() + if err != nil { + t.Fatalf("Failed to create an Execute: %s", err.Error()) } if resp == nil { t.Fatal("Response is nil after Execute") } - if resp.SQLInfo().AffectedCount != 0 { - t.Errorf("Incorrect count of created spaces: %d", resp.SQLInfo().AffectedCount) + if resp.Header().Code == OkCode { + t.Fatal("Unexpected successful Execute") + } + exResp, ok = resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + sqlInfo, err = exResp.SQLInfo() + assert.NotNil(t, err, "Expected an error") + if sqlInfo.AffectedCount != 0 { + t.Errorf("Incorrect count of created spaces: %d", sqlInfo.AffectedCount) } // execute with zero string - resp, err = conn.Execute("", []interface{}{}) - if err == nil { - t.Fatal("Unexpected lack of error") + req = NewExecuteRequest("") + resp, err = conn.Do(req).GetResponse() + if err != nil { + t.Fatalf("Failed to create an Execute: %s", err.Error()) } if resp == nil { t.Fatal("Response is nil after Execute") } - if resp.SQLInfo().AffectedCount != 0 { - t.Errorf("Incorrect count of created spaces: %d", resp.SQLInfo().AffectedCount) + if resp.Header().Code == OkCode { + t.Fatal("Unexpected successful Execute") + } + exResp, ok = resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + sqlInfo, err = exResp.SQLInfo() + assert.NotNil(t, err, "Expected an error") + if sqlInfo.AffectedCount != 0 { + t.Errorf("Incorrect count of created spaces: %d", sqlInfo.AffectedCount) } // drop table query - resp, err = conn.Execute(dropQuery2, []interface{}{}) + req = NewExecuteRequest(dropQuery2) + resp, err = conn.Do(req).GetResponse() if err != nil { t.Fatalf("Failed to Execute: %s", err.Error()) } if resp == nil { t.Fatal("Response is nil after Execute") } - if resp.SQLInfo().AffectedCount != 1 { - t.Errorf("Incorrect count of dropped spaces: %d", resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + sqlInfo, err = exResp.SQLInfo() + assert.Nil(t, err, "Error while getting SQLInfo") + if sqlInfo.AffectedCount != 1 { + t.Errorf("Incorrect count of dropped spaces: %d", sqlInfo.AffectedCount) } // drop the same table - resp, err = conn.Execute(dropQuery2, []interface{}{}) - if err == nil { - t.Fatal("Unexpected lack of error") + req = NewExecuteRequest(dropQuery2) + resp, err = conn.Do(req).GetResponse() + if err != nil { + t.Fatalf("Failed to create an Execute: %s", err.Error()) } if resp == nil { t.Fatal("Response is nil after Execute") } - if resp.SQLInfo().AffectedCount != 0 { - t.Errorf("Incorrect count of created spaces: %d", resp.SQLInfo().AffectedCount) + if resp.Header().Code == OkCode { + t.Fatal("Unexpected successful Execute") + } + _, err = resp.Decode() + if err == nil { + t.Fatal("Unexpected lack of error") + } + exResp, ok = resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + sqlInfo, err = exResp.SQLInfo() + assert.Nil(t, err, "Error while getting SQLInfo") + if sqlInfo.AffectedCount != 0 { + t.Errorf("Incorrect count of created spaces: %d", sqlInfo.AffectedCount) } } @@ -1749,7 +1714,10 @@ func TestNewPrepared(t *testing.T) { if reflect.DeepEqual(data[0], []interface{}{1, "test"}) { t.Error("Select with named arguments failed") } - metaData := resp.MetaData() + prepResp, ok := resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + metaData, err := prepResp.MetaData() + assert.Nil(t, err, "Error while getting MetaData") if metaData[0].FieldType != "unsigned" || metaData[0].FieldName != "NAME0" || metaData[1].FieldType != "string" || @@ -1786,7 +1754,7 @@ func TestNewPrepared(t *testing.T) { if len(data) == 0 { t.Errorf("failed to prepare: response Data has no elements") } - stmt, ok := data[0].(*Prepared) + stmt, ok = data[0].(*Prepared) if !ok { t.Errorf("failed to prepare: failed to cast the response Data to Prepared object") } @@ -2043,85 +2011,82 @@ func TestSchema_IsNullable(t *testing.T) { } func TestClientNamed(t *testing.T) { - var resp Response - var err error - conn := test_helpers.ConnectWithValidation(t, dialer, opts) defer conn.Close() // Insert - resp, err = conn.Insert(spaceName, []interface{}{uint(1001), "hello2", "world2"}) + data, err := conn.Insert(spaceName, []interface{}{uint(1001), "hello2", "world2"}) if err != nil { t.Fatalf("Failed to Insert: %s", err.Error()) } - if resp == nil { + if data == nil { t.Errorf("Response is nil after Insert") } // Delete - resp, err = conn.Delete(spaceName, indexName, []interface{}{uint(1001)}) + data, err = conn.Delete(spaceName, indexName, []interface{}{uint(1001)}) if err != nil { t.Fatalf("Failed to Delete: %s", err.Error()) } - if resp == nil { + if data == nil { t.Errorf("Response is nil after Delete") } // Replace - resp, err = conn.Replace(spaceName, []interface{}{uint(1002), "hello", "world"}) + data, err = conn.Replace(spaceName, []interface{}{uint(1002), "hello", "world"}) if err != nil { t.Fatalf("Failed to Replace: %s", err.Error()) } - if resp == nil { + if data == nil { t.Errorf("Response is nil after Replace") } // Update - resp, err = conn.Update(spaceName, indexName, + data, err = conn.Update(spaceName, indexName, []interface{}{ uint(1002)}, NewOperations().Assign(1, "buy").Delete(2, 1)) if err != nil { t.Fatalf("Failed to Update: %s", err.Error()) } - if resp == nil { + if data == nil { t.Errorf("Response is nil after Update") } // Upsert - resp, err = conn.Upsert(spaceName, + data, err = conn.Upsert(spaceName, []interface{}{uint(1003), 1}, NewOperations().Add(1, 1)) if err != nil { t.Fatalf("Failed to Upsert (insert): %s", err.Error()) } - if resp == nil { + if data == nil { t.Errorf("Response is nil after Upsert (insert)") } - resp, err = conn.Upsert(spaceName, + data, err = conn.Upsert(spaceName, []interface{}{uint(1003), 1}, NewOperations().Add(1, 1)) if err != nil { t.Fatalf("Failed to Upsert (update): %s", err.Error()) } - if resp == nil { + if data == nil { t.Errorf("Response is nil after Upsert (update)") } // Select for i := 1010; i < 1020; i++ { - resp, err = conn.Replace(spaceName, + data, err = conn.Replace(spaceName, []interface{}{uint(i), fmt.Sprintf("val %d", i), "bla"}) if err != nil { t.Fatalf("Failed to Replace: %s", err.Error()) } - if resp == nil { + if data == nil { t.Errorf("Response is nil after Replace") } } - resp, err = conn.Select(spaceName, indexName, 0, 1, IterEq, []interface{}{uint(1010)}) + data, err = conn.Select(spaceName, indexName, 0, 1, IterEq, []interface{}{uint(1010)}) if err != nil { t.Fatalf("Failed to Select: %s", err.Error()) } - if resp == nil { + if data == nil { t.Errorf("Response is nil after Select") } @@ -2375,9 +2340,6 @@ func TestClientRequestObjects(t *testing.T) { if resp == nil { t.Fatal("Response is nil after Execute") } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } data, err = resp.Decode() if err != nil { t.Fatalf("Failed to Decode: %s", err.Error()) @@ -2385,8 +2347,12 @@ func TestClientRequestObjects(t *testing.T) { if len(data) != 0 { t.Fatalf("Response Body len != 0") } - if resp.SQLInfo().AffectedCount != 1 { - t.Errorf("Incorrect count of created spaces: %d", resp.SQLInfo().AffectedCount) + exResp, ok := resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + sqlInfo, err := exResp.SQLInfo() + assert.Nil(t, err, "Error while getting SQLInfo") + if sqlInfo.AffectedCount != 1 { + t.Errorf("Incorrect count of created spaces: %d", sqlInfo.AffectedCount) } req = NewExecuteRequest(dropQuery2) @@ -2397,9 +2363,6 @@ func TestClientRequestObjects(t *testing.T) { if resp == nil { t.Fatal("Response is nil after Execute") } - if resp.Pos() != nil { - t.Errorf("Response should not have a position") - } data, err = resp.Decode() if err != nil { t.Fatalf("Failed to Decode: %s", err.Error()) @@ -2407,8 +2370,12 @@ func TestClientRequestObjects(t *testing.T) { if len(data) != 0 { t.Fatalf("Response Body len != 0") } - if resp.SQLInfo().AffectedCount != 1 { - t.Errorf("Incorrect count of dropped spaces: %d", resp.SQLInfo().AffectedCount) + exResp, ok = resp.(*ExecuteResponse) + assert.True(t, ok, "Got wrong response type") + sqlInfo, err = exResp.SQLInfo() + assert.Nil(t, err, "Error while getting SQLInfo") + if sqlInfo.AffectedCount != 1 { + t.Errorf("Incorrect count of dropped spaces: %d", sqlInfo.AffectedCount) } } @@ -2425,7 +2392,7 @@ func testConnectionDoSelectRequestPrepare(t *testing.T, conn Connector) { } func testConnectionDoSelectRequestCheck(t *testing.T, - resp Response, err error, pos bool, dataLen int, firstKey uint64) { + resp *SelectResponse, err error, pos bool, dataLen int, firstKey uint64) { t.Helper() if err != nil { @@ -2434,10 +2401,14 @@ func testConnectionDoSelectRequestCheck(t *testing.T, if resp == nil { t.Fatalf("Response is nil after Select") } - if !pos && resp.Pos() != nil { + respPos, err := resp.Pos() + if err != nil { + t.Errorf("Error while getting Pos: %s", err.Error()) + } + if !pos && respPos != nil { t.Errorf("Response should not have a position descriptor") } - if pos && resp.Pos() == nil { + if pos && respPos == nil { t.Fatalf("A response must have a position descriptor") } data, err := resp.Decode() @@ -2482,7 +2453,10 @@ func TestConnectionDoSelectRequest(t *testing.T) { Key([]interface{}{uint(1010)}) resp, err := conn.Do(req).GetResponse() - testConnectionDoSelectRequestCheck(t, resp, err, false, 10, 1010) + selResp, ok := resp.(*SelectResponse) + assert.True(t, ok, "Got wrong response type") + + testConnectionDoSelectRequestCheck(t, selResp, err, false, 10, 1010) } func TestConnectionDoWatchOnceRequest(t *testing.T) { @@ -2542,7 +2516,10 @@ func TestConnectionDoSelectRequest_fetch_pos(t *testing.T) { Key([]interface{}{uint(1010)}) resp, err := conn.Do(req).GetResponse() - testConnectionDoSelectRequestCheck(t, resp, err, true, 2, 1010) + selResp, ok := resp.(*SelectResponse) + assert.True(t, ok, "Got wrong response type") + + testConnectionDoSelectRequestCheck(t, selResp, err, true, 2, 1010) } func TestConnectDoSelectRequest_after_tuple(t *testing.T) { @@ -2562,7 +2539,10 @@ func TestConnectDoSelectRequest_after_tuple(t *testing.T) { After([]interface{}{uint(1012)}) resp, err := conn.Do(req).GetResponse() - testConnectionDoSelectRequestCheck(t, resp, err, true, 2, 1013) + selResp, ok := resp.(*SelectResponse) + assert.True(t, ok, "Got wrong response type") + + testConnectionDoSelectRequestCheck(t, selResp, err, true, 2, 1013) } func TestConnectionDoSelectRequest_pagination_pos(t *testing.T) { @@ -2581,28 +2561,29 @@ func TestConnectionDoSelectRequest_pagination_pos(t *testing.T) { Key([]interface{}{uint(1010)}) resp, err := conn.Do(req).GetResponse() - testConnectionDoSelectRequestCheck(t, resp, err, true, 2, 1010) + selResp, ok := resp.(*SelectResponse) + assert.True(t, ok, "Got wrong response type") + + testConnectionDoSelectRequestCheck(t, selResp, err, true, 2, 1010) - resp, err = conn.Do(req.After(resp.Pos())).GetResponse() + selPos, err := selResp.Pos() + assert.Nil(t, err, "Error while getting Pos") - testConnectionDoSelectRequestCheck(t, resp, err, true, 2, 1012) + resp, err = conn.Do(req.After(selPos)).GetResponse() + selResp, ok = resp.(*SelectResponse) + assert.True(t, ok, "Got wrong response type") + + testConnectionDoSelectRequestCheck(t, selResp, err, true, 2, 1012) } func TestConnection_Call(t *testing.T) { - var resp Response - var err error - conn := test_helpers.ConnectWithValidation(t, dialer, opts) defer conn.Close() - resp, err = conn.Call("simple_concat", []interface{}{"1"}) + data, err := conn.Call("simple_concat", []interface{}{"1"}) if err != nil { t.Errorf("Failed to use Call") } - data, err := resp.Decode() - if err != nil { - t.Errorf("Failed to Decode: %s", err.Error()) - } if val, ok := data[0].(string); !ok || val != "11" { t.Errorf("result is not {{1}} : %v", data) } @@ -2678,6 +2659,12 @@ func (req *waitCtxRequest) Async() bool { return NewPingRequest().Async() } +func (req *waitCtxRequest) Response(header Header, body io.Reader) (Response, error) { + resp := BaseResponse{} + resp.SetHeader(header) + return &resp, nil +} + func TestClientRequestObjectsWithContext(t *testing.T) { var err error conn := test_helpers.ConnectWithValidation(t, dialer, opts) diff --git a/test_helpers/main.go b/test_helpers/main.go index 0a0c7f1cd..200c3f474 100644 --- a/test_helpers/main.go +++ b/test_helpers/main.go @@ -83,7 +83,6 @@ type TarantoolInstance struct { func isReady(dialer tarantool.Dialer, opts *tarantool.Opts) error { var err error var conn *tarantool.Connection - var resp tarantool.Response ctx, cancel := GetConnectContext() defer cancel() @@ -96,13 +95,10 @@ func isReady(dialer tarantool.Dialer, opts *tarantool.Opts) error { } defer conn.Close() - resp, err = conn.Do(tarantool.NewPingRequest()).GetResponse() + _, err = conn.Do(tarantool.NewPingRequest()).Get() if err != nil { return err } - if resp == nil { - return errors.New("response is nil after ping") - } return nil } diff --git a/test_helpers/request_mock.go b/test_helpers/request_mock.go index 80ca1b541..980c35db2 100644 --- a/test_helpers/request_mock.go +++ b/test_helpers/request_mock.go @@ -2,6 +2,7 @@ package test_helpers import ( "context" + "io" "github.com/tarantool/go-iproto" "github.com/vmihailenco/msgpack/v5" @@ -35,3 +36,10 @@ func (sr *StrangerRequest) Conn() *tarantool.Connection { func (sr *StrangerRequest) Ctx() context.Context { return nil } + +func (sr *StrangerRequest) Response(header tarantool.Header, + body io.Reader) (tarantool.Response, error) { + resp := tarantool.BaseResponse{} + resp.SetHeader(header) + return &resp, nil +} diff --git a/watch.go b/watch.go index 0508899f0..f8884c22a 100644 --- a/watch.go +++ b/watch.go @@ -2,6 +2,7 @@ package tarantool import ( "context" + "io" "github.com/tarantool/go-iproto" "github.com/vmihailenco/msgpack/v5" @@ -55,6 +56,24 @@ func (req *BroadcastRequest) Async() bool { return req.call.Async() } +// Response creates a response for a BroadcastRequest. +func (req *BroadcastRequest) Response(header Header, body io.Reader) (Response, error) { + if body == nil { + return &BaseResponse{header: header}, nil + } + if buf, ok := body.(*smallBuf); ok { + return &BaseResponse{header: header, buf: *buf}, nil + } + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + resp := BaseResponse{ + header: header, buf: smallBuf{b: data}, + } + return &resp, nil +} + // watchRequest subscribes to the updates of a specified key defined on the // server. After receiving the notification, you should send a new // watchRequest to acknowledge the notification.